Back to Tutorials
Beginner45 minPython

Python for Data Engineers

Core Python concepts every data engineer needs — generators, decorators, context managers, and writing clean, production-ready scripts.

Introduction

Python is the backbone of modern data engineering. This tutorial covers the core patterns you'll use every day in production pipelines — not just syntax, but real-world usage.


1. Generators

Generators let you process data lazily — one record at a time — without loading everything into memory. Essential for large files and streaming pipelines.

code
def read_large_file(path: str):
    with open(path, 'r') as f:
        for line in f:
            yield line.strip()

# Only one line in memory at a time
for record in read_large_file('events.jsonl'):
    process(record)

When to use: Reading large CSVs, JSONL files, or any unbounded stream.


2. Decorators

Decorators wrap functions with cross-cutting concerns — logging, retries, timing — without polluting your business logic.

code
import functools, time

def retry(max_attempts: int = 3, delay: float = 1.0):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise
                    time.sleep(delay * (attempt + 1))
        return wrapper
    return decorator

@retry(max_attempts=3, delay=2.0)
def fetch_from_api(url: str) -> dict:
    ...

When to use: API calls, database connections, any flaky I/O operation.


3. Context Managers

Use context managers for any resource that needs guaranteed cleanup — DB connections, file handles, locks.

code
from contextlib import contextmanager
import psycopg2

@contextmanager
def get_db_connection(dsn: str):
    conn = psycopg2.connect(dsn)
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        conn.close()

with get_db_connection(DSN) as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT COUNT(*) FROM events")

When to use: DB connections, file handles, any resource with open/close lifecycle.


4. Async IO

When your pipeline makes many I/O calls (API requests, DB queries), asyncio lets you run them concurrently without threads.

code
import asyncio
import aiohttp

async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
    async with session.get(url) as response:
        return await response.json()

async def fetch_all(urls: list[str]) -> list[dict]:
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

results = asyncio.run(fetch_all(urls))

When to use: Fetching from REST APIs, reading from multiple S3 paths, any I/O-bound fan-out.


Summary

| Pattern | Use case | |---|---| | Generators | Large files, streaming data | | Decorators | Retry, logging, timing | | Context managers | DB connections, file handles | | Async IO | Concurrent API/I/O calls |

PythonFundamentals