Tag: data-engineering

  • Automated ETL with Airflow and Python: A Practical Guide

    Automated ETL with Airflow and Python: A Practical Guide

    In the world of data, consistency is king. Manually running scripts to fetch and process data is not just tedious; it’s prone to errors, delays, and gaps in your analytics. To build a reliable data-driven culture, you need automation. This is where building an automated ETL with Airflow and Python becomes a data engineer’s most valuable skill.

    Apache Airflow is the industry-standard open-source platform for orchestrating complex data workflows. When combined with the power and flexibility of Python for data manipulation, you can create robust, scheduled, and maintainable pipelines that feed your analytics platforms with fresh data, day in and day out.

    This guide will walk you through a practical example: building an Airflow DAG that automatically fetches cryptocurrency data from a public API, processes it with Python, and prepares it for analysis.

    The Architecture: A Simple, Powerful Workflow

    Our automated pipeline will consist of a few key components, orchestrated entirely by Airflow. The goal is to create a DAG (Directed Acyclic Graph) that defines the sequence of tasks required to get data from our source to its destination.

    Here’s the high-level architecture of our ETL pipeline:

    Public API: Our data source. We’ll use the free CoinGecko API to fetch the latest cryptocurrency prices.

    Python Script: The core of our transformation logic. We’ll use the requests library to call the API and pandas to process the JSON response into a clean, tabular format.

    Apache Airflow: The orchestrator. We will define a DAG that runs on a schedule (e.g., daily), executes our Python script, and handles logging, retries, and alerting.

    Data Warehouse/Lake: The destination. The processed data will be saved as a CSV, which in a real-world scenario would be loaded into a data warehouse like Snowflake, BigQuery, or a data lake like Amazon S3.

    Let’s get into the code.

    Step 1: The Python ETL Script

    First, we need a Python script that handles the logic of fetching and processing the data. This script will be called by our Airflow DAG. We’ll use a PythonVirtualenvOperator in Airflow, which means our script can have its own dependencies.

    Create a file named get_crypto_prices.py in your Airflow project’s /include directory.

    /include/get_crypto_prices.py

    Python

    import requests
    import pandas as pd
    from datetime import datetime
    
    def fetch_and_process_crypto_data():
        """
        Fetches cryptocurrency data from the CoinGecko API and processes it.
        """
        print("Fetching data from CoinGecko API...")
        url = "https://api.coingecko.com/api/v3/simple/price"
        params = {
            'ids': 'bitcoin,ethereum,ripple,cardano,solana',
            'vs_currencies': 'usd',
            'include_market_cap': 'true',
            'include_24hr_vol': 'true',
            'include_24hr_change': 'true'
        }
        
        try:
            response = requests.get(url, params=params)
            response.raise_for_status()  # Raise an exception for bad status codes
            data = response.json()
            print("Data fetched successfully.")
    
            # Process the JSON data into a list of dictionaries
            processed_data = []
            for coin, details in data.items():
                processed_data.append({
                    'coin': coin,
                    'price_usd': details.get('usd'),
                    'market_cap_usd': details.get('usd_market_cap'),
                    'volume_24h_usd': details.get('usd_24h_vol'),
                    'change_24h_percent': details.get('usd_24h_change'),
                    'timestamp': datetime.now().isoformat()
                })
    
            # Create a pandas DataFrame
            df = pd.DataFrame(processed_data)
            
            # In a real pipeline, you'd load this to a database.
            # For this example, we'll save it to a CSV in the local filesystem.
            output_path = '/tmp/crypto_prices.csv'
            df.to_csv(output_path, index=False)
            print(f"Data processed and saved to {output_path}")
            
        except requests.exceptions.RequestException as e:
            print(f"Error fetching data from API: {e}")
            raise
    
    if __name__ == "__main__":
        fetch_and_process_crypto_data()
    

    Step 2: Creating the Airflow DAG

    Now, let’s create the Airflow DAG that will schedule and run this script. This file will live in your Airflow dags/ folder.

    We’ll use the @task decorator and the PythonVirtualenvOperator to create a clean, isolated task.

    dags/crypto_etl_dag.py

    Python

    from __future__ import annotations
    
    import pendulum
    
    from airflow.models.dag import DAG
    from airflow.operators.python import PythonVirtualenvOperator
    
    with DAG(
        dag_id="crypto_price_etl_pipeline",
        start_date=pendulum.datetime(2025, 9, 27, tz="UTC"),
        schedule="0 8 * * *",  # Run daily at 8:00 AM UTC
        catchup=False,
        tags=["api", "python", "etl"],
        doc_md="""
        ## Cryptocurrency Price ETL Pipeline
        This DAG fetches the latest crypto prices from the CoinGecko API,
        processes the data with Python, and saves it as a CSV.
        """,
    ) as dag:
        
        run_etl_task = PythonVirtualenvOperator(
            task_id="run_python_etl_script",
            python_callable_source="""
    from include.get_crypto_prices import fetch_and_process_crypto_data
    fetch_and_process_crypto_data()
    """,
            requirements=["pandas==2.1.0", "requests==2.31.0"],
            system_site_packages=False,
        )
    
    

    This DAG is simple but powerful. Airflow will now:

    • Run this pipeline automatically every day at 8:00 AM UTC.
    • Create a temporary virtual environment and install pandas and requests for the task.
    • Execute our Python function to fetch and process the data.
    • Log the entire process, and alert you if anything fails.

    Step 3: The Analytics Payoff

    With our pipeline running automatically, we now have a consistently updated CSV file (/tmp/crypto_prices.csv on the Airflow worker). In a real-world scenario where this data is loaded into a SQL data warehouse, an analyst can now run queries to derive insights, knowing the data is always fresh.

    An analyst could now answer questions like:

    • What is the daily trend of Bitcoin’s market cap?
    • Which coin had the highest percentage change in the last 24 hours?
    • How does trading volume correlate with price changes across different coins?

    Conclusion: Build Once, Benefit Forever

    By investing a little time to build an automated ETL with Airflow and Python, you create a resilient and reliable data asset. This approach eliminates manual, error-prone work and provides your analytics team with the fresh, trustworthy data they need to make critical business decisions. This is the core of modern data engineering: building automated systems that deliver consistent value.

  • SQL Window Functions: The Ultimate Guide for Data Analysts

    SQL Window Functions: The Ultimate Guide for Data Analysts

     Every data professional knows the power of GROUP BY. It’s the trusty tool we all learn first, allowing us to aggregate data and calculate metrics like total sales per category or the number of users per city. But what happens when the questions get more complex?

    • What are the top 3 best-selling products within each category?
    • How does this month’s revenue compare to last month’s for each department?
    • What is the running total of sales day-by-day?

    Trying to answer these questions with GROUP BY alone can lead to complex, inefficient, and often unreadable queries. This is where SQL window functions come in. They are the superpower you need to perform complex analysis while keeping your queries clean and performant.

    What Are Window Functions, Really?

    A window function performs a calculation across a set of table rows that are somehow related to the current row. Unlike a GROUP BY which collapses rows into a single output row, a window function returns a value for every single row.

    Think of it like this: a GROUP BY looks at the whole room and gives you one summary. A window function gives each person in the room a piece of information based on looking at a specific “window” of people around them (e.g., “the 3 tallest people in your group”).

    The magic happens with the OVER() clause, which defines the “window” of rows the function should consider.

    The Core Syntax

    The basic syntax for a window function looks like this:

    SQL

    SELECT
      column_a,
      column_b,
      AGGREGATE_FUNCTION() OVER (PARTITION BY ... ORDER BY ...) AS new_column
    FROM your_table;
    
    • AGGREGATE_FUNCTION(): Can be an aggregate function like SUM()AVG()COUNT(), or a specialized window function like RANK().
    • OVER(): This is the mandatory clause that tells SQL you’re using a window function.
    • PARTITION BY column_name: This is like a GROUP BY within the window. It divides the rows into partitions (groups), and the function is calculated independently for each partition.
    • ORDER BY column_name: This sorts the rows within each partition. This is essential for functions that depend on order, like RANK() or running totals.

    Practical Examples: From Theory to Insight

    Let’s use a sample sales table to see window functions in action.

    order_idsale_datecategoryproductamount
    1012025-09-01ElectronicsLaptop1200
    1022025-09-01BooksSQL Guide45
    1032025-09-02ElectronicsMouse25
    1042025-09-02ElectronicsKeyboard75
    1052025-09-03BooksData Viz55

    1. Calculating a Running Total

    Goal: Find the cumulative sales total for each day.

    SQL

    SELECT
      sale_date,
      amount,
      SUM(amount) OVER (ORDER BY sale_date) AS running_total_sales
    FROM sales;
    

    Result:

    sale_dateamountrunning_total_sales
    2025-09-0112001200
    2025-09-01451245
    2025-09-02251270
    2025-09-02751345
    2025-09-03551400

    2. Ranking Rows within a Group (RANKDENSE_RANKROW_NUMBER)

    Goal: Rank products by sales amount within each category.

    This is where PARTITION BY becomes essential.

    SQL

    SELECT
      category,
      product,
      amount,
      RANK() OVER (PARTITION BY category ORDER BY amount DESC) AS rank_num,
      DENSE_RANK() OVER (PARTITION BY category ORDER BY amount DESC) AS dense_rank_num,
      ROW_NUMBER() OVER (PARTITION BY category ORDER BY amount DESC) AS row_num
    FROM sales;
    
    • RANK(): Gives the same rank for ties, but skips the next rank. (1, 2, 2, 4)
    • DENSE_RANK(): Gives the same rank for ties, but does not skip. (1, 2, 2, 3)
    • ROW_NUMBER(): Assigns a unique number to every row, regardless of ties. (1, 2, 3, 4)

    3. Comparing to Previous/Next Rows (LAG and LEAD)

    Goal: Find the sales amount from the previous day for each category.

    LAG() looks “behind” in the partition, while LEAD() looks “ahead”.

    SQL

    SELECT
      sale_date,
      category,
      amount,
      LAG(amount, 1, 0) OVER (PARTITION BY category ORDER BY sale_date) AS previous_day_sales
    FROM sales;
    

    The 1 means look back one row, and the 0 is the default value if no previous row exists.

    Result:

    sale_datecategoryamountprevious_day_sales
    2025-09-01Books450
    2025-09-03Books5545
    2025-09-01Electronics12000
    2025-09-02Electronics251200
    2025-09-02Electronics7525

    Conclusion: Go Beyond GROUP BY

    While GROUP BY is essential for aggregation, SQL window functions are the key to unlocking a deeper level of analytical insights. They allow you to perform calculations on a specific subset of rows without losing the detail of the individual rows.

    By mastering functions like RANK()SUM() OVER (...)LAG(), and LEAD(), you can write cleaner, more efficient queries and solve complex business problems that would be a nightmare to tackle with traditional aggregation alone.

  • Building a Serverless Data Pipeline on AWS: A Step-by-Step Guide

    Building a Serverless Data Pipeline on AWS: A Step-by-Step Guide

     For data engineers, the dream is to build pipelines that are robust, scalable, and cost-effective. For years, this meant managing complex clusters and servers. But with the power of the cloud, a new paradigm has emerged: the serverless data pipeline on AWS. This approach allows you to process massive amounts of data without managing a single server, paying only for the compute you actually consume.

    Going serverless means you can say goodbye to idle clusters, patching servers, and capacity planning. Instead, you use a suite of powerful AWS services that automatically scale to meet demand. This isn’t just a technical shift; it’s a strategic advantage that allows your team to focus on delivering value from data, not managing infrastructure.

    In this guide, we’ll walk you through the essential components and steps to build a modern, event-driven serverless data pipeline on AWS using S3, Lambda, AWS Glue, and Athena.

    The Architecture: A Four-Part Harmony

    A successful serverless pipeline relies on a few core AWS services working together seamlessly. Each service has a specific role, creating an efficient and automated workflow from raw data ingestion to analytics-ready insights.

    Here’s a high-level look at our architecture:

    1. Amazon S3 (Simple Storage Service): The foundation of our pipeline. S3 acts as a highly durable and scalable data lake where we will store our raw, processed, and curated data in different stages.
    2. AWS Lambda: The trigger and orchestrator. Lambda functions are small, serverless pieces of code that can run in response to events, such as a new file being uploaded to S3.
    3. AWS Glue: The serverless ETL engine. Glue can automatically discover the schema of our data and run powerful Spark jobs to clean, transform, and enrich it, converting it into an optimized format like Parquet.
    4. Amazon Athena: The interactive query service. Athena allows us to run standard SQL queries directly on our processed data stored in S3, making it instantly available for analysis without needing a traditional data warehouse.

    Now, let’s build it step-by-step.

    Step 1: Setting Up the S3 Data Lake Buckets

    First, we need a place to store our data. A best practice is to use separate prefixes or even separate buckets to represent the different stages of your data pipeline, creating a clear and organized data lake.

    For this guide, we’ll use a single bucket with three prefixes:

    • s3://your-data-lake-bucket/raw/: This is where raw, unaltered data lands from your sources.
    • s3://your-data-lake-bucket/processed/: After cleaning and transformation by our Glue job, the data is stored here in an optimized format (e.g., Parquet).
    • s3://your-data-lake-bucket/curated/: (Optional) A final layer for business-level aggregations or specific data marts.

    Step 2: Creating the Lambda Trigger

    Next, we need a mechanism to automatically start our pipeline when new data arrives. AWS Lambda is perfect for this. We will create a Lambda function that “listens” for a file upload event in our raw/ S3 prefix and then starts our AWS Glue ETL job.

    Here is a sample Python code for the Lambda function:

    lambda_function.py

    Python

    import boto3
    import os
    
    def lambda_handler(event, context):
        """
        This Lambda function is triggered by an S3 event and starts an AWS Glue ETL job.
        """
        # Get the Glue job name from environment variables
        glue_job_name = os.environ['GLUE_JOB_NAME']
        
        # Extract the bucket and key from the S3 event
        bucket = event['Records'][0]['s3']['bucket']['name']
        key = event['Records'][0]['s3']['object']['key']
        
        print(f"File uploaded: s3://{bucket}/{key}")
        
        # Initialize the Glue client
        glue_client = boto3.client('glue')
        
        try:
            print(f"Starting Glue job: {glue_job_name}")
            response = glue_client.start_job_run(
                JobName=glue_job_name,
                Arguments={
                    '--S3_SOURCE_PATH': f"s3://{bucket}/{key}"
                }
            )
            print(f"Successfully started Glue job run. Run ID: {response['JobRunId']}")
            return {
                'statusCode': 200,
                'body': f"Started Glue job {glue_job_name} for file s3://{bucket}/{key}"
            }
        except Exception as e:
            print(f"Error starting Glue job: {e}")
            raise e
    
    

    To make this work, you need to:

    1. Create this Lambda function in the AWS console.
    2. Set an environment variable named GLUE_JOB_NAME with the name of the Glue job you’ll create in the next step.
    3. Configure an S3 trigger on the function, pointing it to your s3://your-data-lake-bucket/raw/ prefix for “All object create events.”

    Step 3: Transforming Data with AWS Glue

    AWS Glue is the heavy lifter in our pipeline. It’s a fully managed ETL service that makes it easy to prepare and load your data for analytics. For this step, you would create a Glue ETL job.

    Inside the Glue Studio, you can visually build a job or write a PySpark script. The job will:

    1. Read the raw data (e.g., CSV) from the source path passed by the Lambda function.
    2. Perform transformations, such as changing data types, dropping columns, or joining with other datasets.
    3. Write the transformed data to the processed/ S3 prefix in Apache Parquet format. Parquet is a columnar storage format that is highly optimized for analytical queries.

    Your Glue job will have a simple script that looks something like this:

    Python

    import sys
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    # Get job arguments
    args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_SOURCE_PATH'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    # Read the raw CSV data from S3
    source_dyf = glueContext.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={"paths": [args['S3_SOURCE_PATH']]},
        format="csv",
        format_options={"withHeader": True},
    )
    
    # Convert to Parquet and write to the processed location
    glueContext.write_dynamic_frame.from_options(
        frame=source_dyf,
        connection_type="s3",
        connection_options={"path": "s3://your-data-lake-bucket/processed/"},
        format="parquet",
    )
    
    job.commit()
    
    

    Step 4: Querying Processed Data with Amazon Athena

    Once your data is processed and stored as Parquet in S3, it’s ready for analysis. With Amazon Athena, you don’t need to load it into another database. You can query it right where it is.

    1. Create a Database: In the Athena query editor, create a database for your data lake: CREATE DATABASE my_data_lake;
    2. Run a Glue Crawler (or Create a Table): The easiest way to make your data queryable is to run an AWS Glue Crawler on your processed/ S3 prefix. The crawler will automatically detect the schema of your Parquet files and create an Athena table for you.
    3. Query Your Data: Once the table is created, you can run standard SQL queries on it.

    SQL

    SELECT
        customer_id,
        order_status,
        COUNT(order_id) as number_of_orders
    FROM
        my_data_lake.processed_data
    WHERE
        order_date >= '2025-01-01'
    GROUP BY
        1, 2
    ORDER BY
        3 DESC;
    

    Conclusion: The Power of Serverless

    You have now built a fully automated, event-driven, and serverless data pipeline on AWS. When a new file lands in your raw S3 bucket, a Lambda function triggers a Glue job that processes the data and writes it back to S3 in an optimized format, ready to be queried instantly by Athena.

    This architecture is not only powerful but also incredibly efficient. It scales automatically to handle terabytes of data and ensures you only pay for the resources you use, making it the perfect foundation for a modern data engineering stack.

  • Structuring dbt Projects in Snowflake: The Definitive Guide

    Structuring dbt Projects in Snowflake: The Definitive Guide

    If you’ve ever inherited a dbt project, you know there are two kinds: the clean, logical, and easy-to-navigate project, and the other kind—a tangled mess of models that makes you question every life choice that led you to that moment. The difference between the two isn’t talent; it’s structure. For high-performing data teams, a well-defined structure for dbt projects in Snowflake isn’t just a nice-to-have, it’s the very foundation of a scalable, maintainable, and trustworthy analytics workflow.

    While dbt and Snowflake are a technical match made in heaven, simply putting them together doesn’t guarantee success. Without a clear and consistent project structure, even the most powerful tools can lead to chaos. Dependencies become circular, model names become ambiguous, and new team members spend weeks just trying to understand the data flow.

    This guide provides a battle-tested blueprint for structuring dbt projects in Snowflake. We’ll move beyond the basics and dive into a scalable, multi-layered framework that will save you and your team countless hours of rework and debugging.

    Why dbt and Snowflake Are a Perfect Match

    Before we dive into project structure, it’s crucial to understand why this combination has become the gold standard for the modern data stack. Their synergy comes from a shared philosophy of decoupling, scalability, and performance.

    • Snowflake’s Decoupled Architecture: Its separation of storage and compute is revolutionary. This means you can run massive dbt transformations using a dedicated, powerful virtual warehouse without slowing down your BI tools.
    • dbt’s Transformation Power: dbt focuses on the “T” in ELT—transformation. It allows you to build, test, and document your data models using simple SQL, which it then compiles and runs directly inside Snowflake’s powerful engine.
    • Cost and Performance Synergy: Running dbt models in Snowflake is incredibly efficient. You can spin up a warehouse for a dbt run and spin it down the second it’s finished, meaning you only pay for the exact compute you use.
    • Zero-Copy Cloning for Development: Instantly create a zero-copy clone of your entire production database for development. This allows you to test your dbt project against production-scale data without incurring storage costs or impacting the production environment.

    In short, Snowflake provides the powerful, elastic engine, while dbt provides the organized, version-controlled, and testable framework to harness that engine.

    The Layered Approach: From Raw Data to Actionable Insights

    A scalable dbt project is like a well-organized factory. Raw materials come in one end, go through a series of refined production stages, and emerge as a finished product. We achieve this by structuring our models into distinct layers, each with a specific job.

    Our structure will follow this flow: Sources -> Staging -> Intermediate -> Marts.

    Layer 1: Declaring Your Sources (The Contract with Raw Data)

    Before you write a single line of transformation SQL, you must tell dbt where your raw data lives in Snowflake. This is done in a .yml file. Think of this file as a formal contract that declares your raw tables, allows you to add data quality tests, and serves as a foundation for your data lineage graph.

    Example: models/staging/sources.yml

    Let’s assume we have a RAW_DATA database in Snowflake with schemas from a jaffle_shop and stripe.

    YAML

    version: 2
    
    sources:
      - name: jaffle_shop
        database: raw_data 
        schema: jaffle_shop
        description: "Raw data from the primary application database."
        tables:
          - name: customers
            columns:
              - name: id
                tests:
                  - unique
                  - not_null
          - name: orders
            loaded_at_field: _etl_loaded_at
            freshness:
              warn_after: {count: 12, period: hour}
    
      - name: stripe
        database: raw_data
        schema: stripe
        tables:
          - name: payment
            columns:
              - name: orderid
                tests:
                  - relationships:
                      to: source('jaffle_shop', 'orders')
                      field: id
    

    Layer 2: Staging Models (Clean and Standardize)

    Staging models are the first line of transformation. They should have a 1:1 relationship with your source tables. The goal here is strict and simple:

    • DO: Rename columns, cast data types, and perform very light cleaning.
    • DO NOT: Join to other tables.

    This creates a clean, standardized version of each source table, forming a reliable foundation for the rest of your project.

    Example: models/staging/stg_customers.sql

    SQL

    -- models/staging/stg_customers.sql
    with source as (
        select * from {{ source('jaffle_shop', 'customers') }}
    ),
    
    renamed as (
        select
            id as customer_id,
            first_name,
            last_name
        from source
    )
    
    select * from renamed
    

    Layer 3: Intermediate Models (Build, Join, and Aggregate)

    This is where the real business logic begins. Intermediate models are the “workhorses” of your dbt project. They take the clean data from your staging models and start combining them.

    • DO: Join different staging models together.
    • DO: Perform complex calculations, aggregations, and business-specific logic.
    • Materialize them as tables if they are slow to run or used by many downstream models.

    These models are not typically exposed to business users. They are building blocks for your final data marts.

    Example: models/intermediate/int_orders_with_payments.sql

    SQL

    -- models/intermediate/int_orders_with_payments.sql
    with orders as (
        select * from {{ ref('stg_orders') }}
    ),
    
    payments as (
        select * from {{ ref('stg_payments') }}
    ),
    
    order_payments as (
        select
            order_id,
            sum(case when payment_status = 'success' then amount else 0 end) as total_amount
        from payments
        group by 1
    ),
    
    final as (
        select
            orders.order_id,
            orders.customer_id,
            orders.order_date,
            coalesce(order_payments.total_amount, 0) as amount
        from orders
        left join order_payments 
          on orders.order_id = order_payments.order_id
    )
    
    select * from final
    

    Layer 4: Data Marts (Ready for Analysis)

    Finally, we arrive at the data marts. These are the polished, final models that power your dashboards, reports, and analytics. They should be clean, easy to understand, and built for a specific business purpose (e.g., finance, marketing, product).

    • DO: Join intermediate models.
    • DO: Have clear, business-friendly column names.
    • DO NOT: Contain complex, nested logic. All the heavy lifting should have been done in the intermediate layer.

    These models are the “products” of your data factory, ready for consumption by BI tools like Tableau, Looker, or Power BI.

    Example: models/marts/fct_customer_orders.sql

    SQL

    -- models/marts/fct_customer_orders.sql
    with customers as (
        select * from {{ ref('stg_customers') }}
    ),
    
    orders as (
        select * from {{ ref('int_orders_with_payments') }}
    ),
    
    customer_orders as (
        select
            customers.customer_id,
            min(orders.order_date) as first_order_date,
            max(orders.order_date) as most_recent_order_date,
            count(orders.order_id) as number_of_orders,
            sum(orders.amount) as lifetime_value
        from customers
        left join orders 
          on customers.customer_id = orders.customer_id
        group by 1
    )
    
    select * from customer_orders
    

    Conclusion: Structure is Freedom

    By adopting a layered approach to your dbt projects in Snowflake, you move from a chaotic, hard-to-maintain process to a scalable, modular, and efficient analytics factory. This structure gives you:

    • Maintainability: When logic needs to change, you know exactly which model to edit.
    • Scalability: Onboarding new data sources or team members becomes a clear, repeatable process.
    • Trust: With testing at every layer, you build confidence in your data and empower the entire organization to make better, faster decisions.

    This framework isn’t just about writing cleaner code—it’s about building a foundation for a mature and reliable data culture.