Source
yaml
id: data-engineering-pipeline
namespace: tutorial
description: Data Engineering Pipelines
inputs:
  - id: columns_to_keep
    type: ARRAY
    itemType: STRING
    defaults:
      - brand
      - price
tasks:
  - id: extract
    type: io.kestra.plugin.core.http.Download
    uri: https://dummyjson.com/products
  - id: transform
    type: io.kestra.plugin.scripts.python.Script
    containerImage: python:3.11-alpine
    inputFiles:
      data.json: "{{ outputs.extract.uri }}"
    outputFiles:
      - "*.json"
    script: |
      import json
      import os
      with open("data.json", "r") as file:
          data = json.load(file)
      filtered_data = [
          {column: product.get(column, "N/A") for column in {{ inputs.columns_to_keep }}}
          for product in data["products"]
      ]
      with open("products.json", "w") as file:
          json.dump(filtered_data, file, indent=4)
  - id: query
    type: io.kestra.plugin.jdbc.duckdb.Query
    inputFiles:
      products.json: "{{ outputs.transform.outputFiles['products.json'] }}"
    sql: |
      INSTALL json;
      LOAD json;
      SELECT brand, round(avg(price), 2) as avg_price
      FROM read_json_auto('{{ workingDir }}/products.json')
      GROUP BY brand
      ORDER BY avg_price DESC;
    fetchType: STORE
About this blueprint
Getting Started Python SQL API
This flow is a simple example of a Kestra flow used for a data engineering use case. It downloads a JSON file, filters the data, and calculates the average price per brand.
The flow has three tasks:
- The first task downloads a JSON file.
 - The second task filters the data and writes it to a new JSON file.
 - The third task reads the filtered data, calculates the average price per brand using DuckDB, and stores the result as a Kestra output which can be previewed and downloaded from the UI.