Automated ETL with Airflow and Python: A Practical Guide

In the world of data, consistency is king. Manually running scripts to fetch and process data is not just tedious; it’s prone to errors, delays, and gaps in your analytics. To build a reliable data-driven culture, you need automation. This is where building an automated ETL with Airflow and Python becomes a data engineer’s most valuable skill.

Apache Airflow is the industry-standard open-source platform for orchestrating complex data workflows. When combined with the power and flexibility of Python for data manipulation, you can create robust, scheduled, and maintainable pipelines that feed your analytics platforms with fresh data, day in and day out.

This guide will walk you through a practical example: building an Airflow DAG that automatically fetches cryptocurrency data from a public API, processes it with Python, and prepares it for analysis.

The Architecture: A Simple, Powerful Workflow

Our automated pipeline will consist of a few key components, orchestrated entirely by Airflow. The goal is to create a DAG (Directed Acyclic Graph) that defines the sequence of tasks required to get data from our source to its destination.

Here’s the high-level architecture of our ETL pipeline:

Public API: Our data source. We’ll use the free CoinGecko API to fetch the latest cryptocurrency prices.

Python Script: The core of our transformation logic. We’ll use the requests library to call the API and pandas to process the JSON response into a clean, tabular format.

Apache Airflow: The orchestrator. We will define a DAG that runs on a schedule (e.g., daily), executes our Python script, and handles logging, retries, and alerting.

Data Warehouse/Lake: The destination. The processed data will be saved as a CSV, which in a real-world scenario would be loaded into a data warehouse like Snowflake, BigQuery, or a data lake like Amazon S3.

Let’s get into the code.

Step 1: The Python ETL Script

First, we need a Python script that handles the logic of fetching and processing the data. This script will be called by our Airflow DAG. We’ll use a PythonVirtualenvOperator in Airflow, which means our script can have its own dependencies.

Create a file named get_crypto_prices.py in your Airflow project’s /include directory.

/include/get_crypto_prices.py

Python

import requests
import pandas as pd
from datetime import datetime

def fetch_and_process_crypto_data():
    """
    Fetches cryptocurrency data from the CoinGecko API and processes it.
    """
    print("Fetching data from CoinGecko API...")
    url = "https://api.coingecko.com/api/v3/simple/price"
    params = {
        'ids': 'bitcoin,ethereum,ripple,cardano,solana',
        'vs_currencies': 'usd',
        'include_market_cap': 'true',
        'include_24hr_vol': 'true',
        'include_24hr_change': 'true'
    }
    
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()  # Raise an exception for bad status codes
        data = response.json()
        print("Data fetched successfully.")

        # Process the JSON data into a list of dictionaries
        processed_data = []
        for coin, details in data.items():
            processed_data.append({
                'coin': coin,
                'price_usd': details.get('usd'),
                'market_cap_usd': details.get('usd_market_cap'),
                'volume_24h_usd': details.get('usd_24h_vol'),
                'change_24h_percent': details.get('usd_24h_change'),
                'timestamp': datetime.now().isoformat()
            })

        # Create a pandas DataFrame
        df = pd.DataFrame(processed_data)
        
        # In a real pipeline, you'd load this to a database.
        # For this example, we'll save it to a CSV in the local filesystem.
        output_path = '/tmp/crypto_prices.csv'
        df.to_csv(output_path, index=False)
        print(f"Data processed and saved to {output_path}")
        
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from API: {e}")
        raise

if __name__ == "__main__":
    fetch_and_process_crypto_data()

Step 2: Creating the Airflow DAG

Now, let’s create the Airflow DAG that will schedule and run this script. This file will live in your Airflow dags/ folder.

We’ll use the @task decorator and the PythonVirtualenvOperator to create a clean, isolated task.

dags/crypto_etl_dag.py

Python

from __future__ import annotations

import pendulum

from airflow.models.dag import DAG
from airflow.operators.python import PythonVirtualenvOperator

with DAG(
    dag_id="crypto_price_etl_pipeline",
    start_date=pendulum.datetime(2025, 9, 27, tz="UTC"),
    schedule="0 8 * * *",  # Run daily at 8:00 AM UTC
    catchup=False,
    tags=["api", "python", "etl"],
    doc_md="""
    ## Cryptocurrency Price ETL Pipeline
    This DAG fetches the latest crypto prices from the CoinGecko API,
    processes the data with Python, and saves it as a CSV.
    """,
) as dag:
    
    run_etl_task = PythonVirtualenvOperator(
        task_id="run_python_etl_script",
        python_callable_source="""
from include.get_crypto_prices import fetch_and_process_crypto_data
fetch_and_process_crypto_data()
""",
        requirements=["pandas==2.1.0", "requests==2.31.0"],
        system_site_packages=False,
    )

This DAG is simple but powerful. Airflow will now:

  • Run this pipeline automatically every day at 8:00 AM UTC.
  • Create a temporary virtual environment and install pandas and requests for the task.
  • Execute our Python function to fetch and process the data.
  • Log the entire process, and alert you if anything fails.

Step 3: The Analytics Payoff

With our pipeline running automatically, we now have a consistently updated CSV file (/tmp/crypto_prices.csv on the Airflow worker). In a real-world scenario where this data is loaded into a SQL data warehouse, an analyst can now run queries to derive insights, knowing the data is always fresh.

An analyst could now answer questions like:

  • What is the daily trend of Bitcoin’s market cap?
  • Which coin had the highest percentage change in the last 24 hours?
  • How does trading volume correlate with price changes across different coins?

Conclusion: Build Once, Benefit Forever

By investing a little time to build an automated ETL with Airflow and Python, you create a resilient and reliable data asset. This approach eliminates manual, error-prone work and provides your analytics team with the fresh, trustworthy data they need to make critical business decisions. This is the core of modern data engineering: building automated systems that deliver consistent value.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *