In the world of data, consistency is king. Manually running scripts to fetch and process data is not just tedious; it’s prone to errors, delays, and gaps in your analytics. To build a reliable data-driven culture, you need automation. This is where building an automated ETL with Airflow and Python becomes a data engineer’s most valuable skill.
Apache Airflow is the industry-standard open-source platform for orchestrating complex data workflows. When combined with the power and flexibility of Python for data manipulation, you can create robust, scheduled, and maintainable pipelines that feed your analytics platforms with fresh data, day in and day out.
This guide will walk you through a practical example: building an Airflow DAG that automatically fetches cryptocurrency data from a public API, processes it with Python, and prepares it for analysis.
The Architecture: A Simple, Powerful Workflow
Our automated pipeline will consist of a few key components, orchestrated entirely by Airflow. The goal is to create a DAG (Directed Acyclic Graph) that defines the sequence of tasks required to get data from our source to its destination.
Here’s the high-level architecture of our ETL pipeline:
Public API: Our data source. We’ll use the free CoinGecko API to fetch the latest cryptocurrency prices.
Python Script: The core of our transformation logic. We’ll use the requests library to call the API and pandas to process the JSON response into a clean, tabular format.
Apache Airflow: The orchestrator. We will define a DAG that runs on a schedule (e.g., daily), executes our Python script, and handles logging, retries, and alerting.
Data Warehouse/Lake: The destination. The processed data will be saved as a CSV, which in a real-world scenario would be loaded into a data warehouse like Snowflake, BigQuery, or a data lake like Amazon S3.
Let’s get into the code.
Step 1: The Python ETL Script
First, we need a Python script that handles the logic of fetching and processing the data. This script will be called by our Airflow DAG. We’ll use a PythonVirtualenvOperator in Airflow, which means our script can have its own dependencies.
Create a file named get_crypto_prices.py in your Airflow project’s /include directory.
/include/get_crypto_prices.py
Python
import requests
import pandas as pd
from datetime import datetime
def fetch_and_process_crypto_data():
"""
Fetches cryptocurrency data from the CoinGecko API and processes it.
"""
print("Fetching data from CoinGecko API...")
url = "https://api.coingecko.com/api/v3/simple/price"
params = {
'ids': 'bitcoin,ethereum,ripple,cardano,solana',
'vs_currencies': 'usd',
'include_market_cap': 'true',
'include_24hr_vol': 'true',
'include_24hr_change': 'true'
}
try:
response = requests.get(url, params=params)
response.raise_for_status() # Raise an exception for bad status codes
data = response.json()
print("Data fetched successfully.")
# Process the JSON data into a list of dictionaries
processed_data = []
for coin, details in data.items():
processed_data.append({
'coin': coin,
'price_usd': details.get('usd'),
'market_cap_usd': details.get('usd_market_cap'),
'volume_24h_usd': details.get('usd_24h_vol'),
'change_24h_percent': details.get('usd_24h_change'),
'timestamp': datetime.now().isoformat()
})
# Create a pandas DataFrame
df = pd.DataFrame(processed_data)
# In a real pipeline, you'd load this to a database.
# For this example, we'll save it to a CSV in the local filesystem.
output_path = '/tmp/crypto_prices.csv'
df.to_csv(output_path, index=False)
print(f"Data processed and saved to {output_path}")
except requests.exceptions.RequestException as e:
print(f"Error fetching data from API: {e}")
raise
if __name__ == "__main__":
fetch_and_process_crypto_data()
Step 2: Creating the Airflow DAG
Now, let’s create the Airflow DAG that will schedule and run this script. This file will live in your Airflow dags/ folder.
We’ll use the @task decorator and the PythonVirtualenvOperator to create a clean, isolated task.
dags/crypto_etl_dag.py
Python
from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.python import PythonVirtualenvOperator
with DAG(
dag_id="crypto_price_etl_pipeline",
start_date=pendulum.datetime(2025, 9, 27, tz="UTC"),
schedule="0 8 * * *", # Run daily at 8:00 AM UTC
catchup=False,
tags=["api", "python", "etl"],
doc_md="""
## Cryptocurrency Price ETL Pipeline
This DAG fetches the latest crypto prices from the CoinGecko API,
processes the data with Python, and saves it as a CSV.
""",
) as dag:
run_etl_task = PythonVirtualenvOperator(
task_id="run_python_etl_script",
python_callable_source="""
from include.get_crypto_prices import fetch_and_process_crypto_data
fetch_and_process_crypto_data()
""",
requirements=["pandas==2.1.0", "requests==2.31.0"],
system_site_packages=False,
)
This DAG is simple but powerful. Airflow will now:
- Run this pipeline automatically every day at 8:00 AM UTC.
- Create a temporary virtual environment and install
pandasandrequestsfor the task. - Execute our Python function to fetch and process the data.
- Log the entire process, and alert you if anything fails.
Step 3: The Analytics Payoff
With our pipeline running automatically, we now have a consistently updated CSV file (/tmp/crypto_prices.csv on the Airflow worker). In a real-world scenario where this data is loaded into a SQL data warehouse, an analyst can now run queries to derive insights, knowing the data is always fresh.
An analyst could now answer questions like:
- What is the daily trend of Bitcoin’s market cap?
- Which coin had the highest percentage change in the last 24 hours?
- How does trading volume correlate with price changes across different coins?
Conclusion: Build Once, Benefit Forever
By investing a little time to build an automated ETL with Airflow and Python, you create a resilient and reliable data asset. This approach eliminates manual, error-prone work and provides your analytics team with the fresh, trustworthy data they need to make critical business decisions. This is the core of modern data engineering: building automated systems that deliver consistent value.

Leave a Reply