Introduction
Python is the backbone of modern data engineering. This tutorial covers the core patterns you'll use every day in production pipelines — not just syntax, but real-world usage.
1. Generators
Generators let you process data lazily — one record at a time — without loading everything into memory. Essential for large files and streaming pipelines.
def read_large_file(path: str):
with open(path, 'r') as f:
for line in f:
yield line.strip()
# Only one line in memory at a time
for record in read_large_file('events.jsonl'):
process(record)
When to use: Reading large CSVs, JSONL files, or any unbounded stream.
2. Decorators
Decorators wrap functions with cross-cutting concerns — logging, retries, timing — without polluting your business logic.
import functools, time
def retry(max_attempts: int = 3, delay: float = 1.0):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
time.sleep(delay * (attempt + 1))
return wrapper
return decorator
@retry(max_attempts=3, delay=2.0)
def fetch_from_api(url: str) -> dict:
...
When to use: API calls, database connections, any flaky I/O operation.
3. Context Managers
Use context managers for any resource that needs guaranteed cleanup — DB connections, file handles, locks.
from contextlib import contextmanager
import psycopg2
@contextmanager
def get_db_connection(dsn: str):
conn = psycopg2.connect(dsn)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
with get_db_connection(DSN) as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM events")
When to use: DB connections, file handles, any resource with open/close lifecycle.
4. Async IO
When your pipeline makes many I/O calls (API requests, DB queries), asyncio lets
you run them concurrently without threads.
import asyncio
import aiohttp
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as response:
return await response.json()
async def fetch_all(urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
results = asyncio.run(fetch_all(urls))
When to use: Fetching from REST APIs, reading from multiple S3 paths, any I/O-bound fan-out.
Summary
| Pattern | Use case | |---|---| | Generators | Large files, streaming data | | Decorators | Retry, logging, timing | | Context managers | DB connections, file handles | | Async IO | Concurrent API/I/O calls |