Back to Blog

Building Data APIs on Top of Your Lakehouse: Serving Layer Design

2024-04-2913 min read

Building Data APIs on Top of Your Lakehouse: Serving Layer Design

A lakehouse stores your data beautifully. But raw Delta tables and Parquet files are not how your downstream consumers — applications, dashboards, data science teams, external partners — should be touching your data.

The serving layer is what sits between your lakehouse and the world. Getting it wrong means slow queries, runaway costs, brittle integrations, and 3am pages. Getting it right means your lakehouse becomes a product.

Why the Serving Layer Matters

Most teams treat the serving layer as an afterthought. They expose Databricks SQL endpoints directly, give every team JDBC credentials, and call it done.

Then six months later:

  • A dashboard query does a full table scan on a 500GB Delta table
  • A data science team runs a heavy join during peak business hours
  • An external partner hammers the API with 10,000 requests/minute
  • Nobody knows who is querying what, or why costs spiked 300% this month

The serving layer is where you enforce contracts, control costs, and make your data accessible without making it dangerous.

Serving Layer Architecture Overview

Lakehouse (Delta / Iceberg)
        │
        ▼
  Materialized Views / Feature Store
        │
        ▼
  Query Engine (Databricks SQL / Trino / DuckDB)
        │
        ▼
  ┌─────────────────────────────────────┐
  │  REST API  │  GraphQL  │  gRPC      │
  │  Cache     │  Rate Limiting         │
  │  Auth      │  Schema Registry       │
  └─────────────────────────────────────┘
        │
        ▼
  ┌──────────────────────────────────────┐
  │  Apps │ Dashboards │ ML │ Partners   │
  └──────────────────────────────────────┘

The key insight: your lakehouse is not your API. The serving layer translates between the storage-optimised world of your lakehouse and the latency-optimised world of your consumers.

Layer 1: Materialization Strategy

Before writing a single API endpoint, decide what data you are actually serving. Hitting raw Delta tables on every API request is almost always the wrong answer.

Materialized Views

Pre-compute the results your consumers will ask for most often.

In Databricks:

-- Refresh hourly via Databricks Workflows
CREATE OR REPLACE TABLE serving.user_metrics_daily AS
SELECT
    user_id,
    DATE(event_time)            AS date,
    COUNT(*)                    AS event_count,
    COUNT(DISTINCT session_id)  AS session_count,
    SUM(revenue)                AS total_revenue,
    MAX(event_time)             AS last_seen
FROM lakehouse.events.raw
WHERE event_time >= CURRENT_DATE - INTERVAL 90 DAYS
GROUP BY user_id, DATE(event_time);

In dbt (recommended for most teams):

-- models/serving/user_metrics_daily.sql
{{ config(
    materialized='incremental',
    unique_key='user_date_key',
    partition_by={'field': 'date', 'data_type': 'date'},
    cluster_by=['user_id'],
    on_schema_change='fail'
) }}
 
SELECT
    {{ dbt_utils.generate_surrogate_key(['user_id', 'date']) }} AS user_date_key,
    user_id,
    DATE(event_time)    AS date,
    COUNT(*)            AS event_count,
    SUM(revenue)        AS total_revenue,
    MAX(event_time)     AS last_seen
FROM {{ ref('stg_events') }}
 
{% if is_incremental() %}
    WHERE event_time >= (SELECT MAX(last_seen) FROM {{ this }})
{% endif %}
 
GROUP BY user_id, DATE(event_time)

Aggregation Tiers

Not all consumers need the same granularity. Build multiple tiers:

Raw Events      (lakehouse)     → full fidelity, high cost
    │
    ▼
Hourly Aggregates (serving)     → medium granularity, low cost
    │
    ▼
Daily Aggregates  (serving)     → coarse granularity, near-zero cost
    │
    ▼
Summary Metrics   (Redis)       → single-row lookups, sub-millisecond

Rule of thumb: If a query runs more than 10 times per hour — materialize it. If it runs more than 1,000 times per hour — cache it.

Layer 2: Query Engine Selection

Choosing the right query engine for your serving layer is as important as your API design.

Option A: Databricks SQL Warehouse

Best when you're already on Databricks and need full SQL flexibility.

from databricks import sql
 
def query_databricks(sql_query: str, params: dict = None):
    with sql.connect(
        server_hostname=DATABRICKS_HOST,
        http_path=SQL_WAREHOUSE_PATH,
        access_token=DATABRICKS_TOKEN
    ) as connection:
        with connection.cursor() as cursor:
            cursor.execute(sql_query, parameters=params)
            return cursor.fetchall()
 
# Usage
results = query_databricks(
    "SELECT * FROM serving.user_metrics_daily WHERE user_id = :user_id",
    params={"user_id": "u123"}
)

Pros: Full Delta Lake support, Unity Catalog integration, auto-scaling Cons: Cold start latency (30–60s for serverless), cost per query

Option B: Trino / Presto

Best for multi-source queries and when you need engine independence.

from trino.dbapi import connect
 
def query_trino(sql_query: str):
    conn = connect(
        host="trino.internal",
        port=8080,
        user="api-service",
        catalog="delta",
        schema="serving"
    )
    cursor = conn.cursor()
    cursor.execute(sql_query)
    return cursor.fetchall()

Pros: Engine-agnostic, no vendor lock-in, fast for pre-optimised tables Cons: Requires cluster management, no auto-scaling out of the box

Option C: DuckDB

Best for serving layers with data under ~100GB. Surprisingly powerful for the weight class.

import duckdb
 
conn = duckdb.connect()
conn.execute("INSTALL delta; LOAD delta;")
 
def query_delta_local(table_path: str, query: str):
    conn.execute(
        f"CREATE VIEW IF NOT EXISTS tbl AS SELECT * FROM delta_scan('{table_path}')"
    )
    return conn.execute(query).fetchall()
 
# Query Parquet files on S3 directly — no cluster needed
def query_s3_parquet(s3_path: str, filter_clause: str):
    return conn.execute(f"""
        SELECT *
        FROM read_parquet('{s3_path}/**/*.parquet', hive_partitioning=true)
        WHERE {filter_clause}
    """).fetchall()

Pros: Near-zero infrastructure, embeds in your API process, blazing fast for medium data Cons: Single-node only, not suitable for multi-tenant high-concurrency

Layer 3: API Design Patterns

REST API with FastAPI

The standard choice for most data APIs. Clean, well-understood, easy to version.

from fastapi import FastAPI, Depends, HTTPException, Query
from pydantic import BaseModel
from typing import Optional
import time
 
app = FastAPI(title="Lakehouse Data API", version="1.0.0")
 
class UserMetrics(BaseModel):
    user_id: str
    date: str
    event_count: int
    total_revenue: float
    last_seen: str
 
class MetricsResponse(BaseModel):
    data: list[UserMetrics]
    total: int
    page: int
    page_size: int
    query_time_ms: float
 
@app.get("/v1/users/{user_id}/metrics", response_model=MetricsResponse)
async def get_user_metrics(
    user_id: str,
    start_date: str = Query(..., description="YYYY-MM-DD"),
    end_date:   str = Query(..., description="YYYY-MM-DD"),
    page:       int = Query(1,   ge=1),
    page_size:  int = Query(100, ge=1, le=1000),
    db = Depends(get_db_connection)
):
    start = time.time()
    validate_date_range(start_date, end_date, max_days=90)
    offset = (page - 1) * page_size
 
    rows = db.execute("""
        SELECT user_id, date, event_count, total_revenue, last_seen
        FROM serving.user_metrics_daily
        WHERE user_id = ?
          AND date BETWEEN ? AND ?
        ORDER BY date DESC
        LIMIT ? OFFSET ?
    """, [user_id, start_date, end_date, page_size, offset]).fetchall()
 
    total = db.execute("""
        SELECT COUNT(*) FROM serving.user_metrics_daily
        WHERE user_id = ? AND date BETWEEN ? AND ?
    """, [user_id, start_date, end_date]).fetchone()[0]
 
    return MetricsResponse(
        data=[UserMetrics(**dict(zip(UserMetrics.__fields__, r))) for r in rows],
        total=total,
        page=page,
        page_size=page_size,
        query_time_ms=round((time.time() - start) * 1000, 2)
    )

Pagination Patterns

Never return unbounded result sets. Always paginate.

Offset pagination — simple, good for small datasets:

# GET /v1/events?page=3&page_size=100
# Problem: expensive at large offsets (DB must skip all preceding rows)
 
@app.get("/v1/events")
async def list_events(page: int = 1, page_size: int = 100):
    offset = (page - 1) * page_size
    return db.execute(
        "SELECT * FROM events ORDER BY event_time DESC LIMIT ? OFFSET ?",
        [page_size, offset]
    ).fetchall()

Cursor pagination — recommended for large datasets:

# GET /v1/events?cursor=2024-04-01T12:00:00&limit=100
# O(1) regardless of page depth — no skipping rows
 
@app.get("/v1/events")
async def list_events(
    cursor: Optional[str] = None,
    limit:  int = Query(100, le=1000)
):
    if cursor:
        rows = db.execute("""
            SELECT * FROM events
            WHERE event_time < ?
            ORDER BY event_time DESC
            LIMIT ?
        """, [cursor, limit + 1]).fetchall()
    else:
        rows = db.execute(
            "SELECT * FROM events ORDER BY event_time DESC LIMIT ?",
            [limit + 1]
        ).fetchall()
 
    has_more   = len(rows) > limit
    data       = rows[:limit]
    next_cursor = data[-1]["event_time"] if has_more else None
 
    return {"data": data, "next_cursor": next_cursor, "has_more": has_more}

GraphQL with Strawberry

Best when consumers need flexible field selection — dashboards, apps, notebooks.

import strawberry
from strawberry.fastapi import GraphQLRouter
 
@strawberry.type
class UserMetrics:
    user_id:       str
    date:          str
    event_count:   int
    total_revenue: float
 
@strawberry.type
class Query:
    @strawberry.field
    def user_metrics(
        self,
        user_id:    str,
        start_date: str,
        end_date:   str,
        limit:      int = 100
    ) -> list[UserMetrics]:
        rows = db.execute("""
            SELECT user_id, date, event_count, total_revenue
            FROM serving.user_metrics_daily
            WHERE user_id = ? AND date BETWEEN ? AND ?
            LIMIT ?
        """, [user_id, start_date, end_date, limit]).fetchall()
        return [UserMetrics(**dict(r)) for r in rows]
 
    @strawberry.field
    def top_users(self, date: str, limit: int = 10) -> list[UserMetrics]:
        rows = db.execute("""
            SELECT user_id, date, event_count, total_revenue
            FROM serving.user_metrics_daily
            WHERE date = ?
            ORDER BY total_revenue DESC
            LIMIT ?
        """, [date, limit]).fetchall()
        return [UserMetrics(**dict(r)) for r in rows]
 
schema = strawberry.Schema(query=Query)
app.include_router(GraphQLRouter(schema), prefix="/graphql")

GraphQL shines when:

  • Consumers need different subsets of fields
  • You want to avoid versioning for field additions
  • Frontend teams want to own their own query shapes

Layer 4: Caching Strategy

Caching is the difference between a serving layer that scales and one that doesn't.

Two-Level Cache Architecture

Request
    │
    ▼
L1: In-Memory Cache  (per-instance, TTL: 60s)
    │ miss
    ▼
L2: Redis Cache      (shared across instances, TTL: 5min–1hr)
    │ miss
    ▼
Query Engine         (Databricks SQL / Trino / DuckDB)
    │
    ▼
Lakehouse            (Delta / Iceberg)

Redis Caching Implementation

import redis
import json
import hashlib
from functools import wraps
 
redis_client = redis.Redis(host="redis.internal", port=6379, decode_responses=True)
 
def cache_key(prefix: str, **kwargs) -> str:
    params   = json.dumps(kwargs, sort_keys=True)
    hash_val = hashlib.md5(params.encode()).hexdigest()
    return f"{prefix}:{hash_val}"
 
def cached(prefix: str, ttl: int = 300):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            key    = cache_key(prefix, **kwargs)
            cached = redis_client.get(key)
 
            if cached:
                return json.loads(cached)
 
            result = await func(*args, **kwargs)
            redis_client.setex(key, ttl, json.dumps(result, default=str))
            return result
        return wrapper
    return decorator
 
# Usage — cache user summaries for 5 minutes
@app.get("/v1/users/{user_id}/summary")
@cached(prefix="user_summary", ttl=300)
async def get_user_summary(user_id: str):
    return query_lakehouse(
        "SELECT * FROM serving.user_summary WHERE user_id = ?",
        [user_id]
    )

Cache Invalidation on Data Refresh

When your materialized views refresh, invalidate the relevant cache keys:

# Triggered after dbt run or Databricks Workflow completes
def invalidate_cache_on_refresh(tables_refreshed: list[str]):
    table_to_pattern = {
        "user_metrics_daily": "user_summary:*",
        "revenue_summary":    "revenue:*",
        "product_metrics":    "product:*",
    }
    for table in tables_refreshed:
        if table in table_to_pattern:
            pattern = table_to_pattern[table]
            keys    = redis_client.keys(pattern)
            if keys:
                redis_client.delete(*keys)
                log.info("Invalidated %d cache keys for %s", len(keys), table)

Cache TTL by Data Freshness

Data TypeUpdate FrequencyRecommended TTL
Real-time metricsStreaming (seconds)10–30 seconds
Operational dashboardsHourly refresh5–15 minutes
Business metricsDaily refresh1–6 hours
Historical aggregatesWeekly / static24 hours
Reference / lookup dataRarely changes7 days

Layer 5: Rate Limiting and Access Control

Rate Limiting with slowapi

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
 
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
 
# Standard endpoint
@app.get("/v1/users/{user_id}/metrics")
@limiter.limit("100/minute")
async def get_user_metrics(request: Request, user_id: str):
    ...
 
# Expensive endpoint — much stricter
@app.get("/v1/reports/full-export")
@limiter.limit("5/hour")
async def full_export(request: Request):
    ...

API Key Authentication with Scopes

from fastapi.security import APIKeyHeader
 
API_KEYS = {
    "key_dashboard_prod": {
        "name":   "Production Dashboard",
        "scopes": ["read:metrics", "read:reports"]
    },
    "key_ds_team": {
        "name":   "Data Science Team",
        "scopes": ["read:metrics", "read:reports", "write:annotations"]
    },
    "key_external_partner": {
        "name":   "Acme Corp Integration",
        "scopes": ["read:metrics"]          # restricted — no reports
    },
}
 
api_key_header = APIKeyHeader(name="X-API-Key")
 
def require_scope(scope: str):
    def dependency(api_key: str = Depends(api_key_header)):
        if api_key not in API_KEYS:
            raise HTTPException(status_code=401, detail="Invalid API key")
        if scope not in API_KEYS[api_key]["scopes"]:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        return API_KEYS[api_key]
    return dependency
 
@app.get("/v1/reports/revenue")
async def revenue_report(key_info=Depends(require_scope("read:reports"))):
    ...

Row-Level Security

When different consumers should see different subsets of your data:

def get_tenant_filter(api_key: str) -> dict:
    key_info = API_KEYS.get(api_key, {})
    return {
        "tenant_id":           key_info.get("tenant_id"),
        "allowed_regions":     key_info.get("allowed_regions", []),
        "max_classification":  key_info.get("max_classification", "public"),
    }
 
@app.get("/v1/metrics")
async def get_metrics(api_key: str = Depends(api_key_header), db=Depends(get_db)):
    f = get_tenant_filter(api_key)
 
    # Inject tenant filter into every query — never trust the caller
    rows = db.execute("""
        SELECT * FROM serving.metrics
        WHERE tenant_id            = ?
          AND region               = ANY(?)
          AND data_classification <= ?
    """, [f["tenant_id"], f["allowed_regions"], f["max_classification"]]).fetchall()
 
    return rows

Layer 6: Schema Management and Versioning

API Versioning Strategy

# URL versioning — most explicit, easiest to maintain
app.include_router(v1_router, prefix="/v1")
app.include_router(v2_router, prefix="/v2")
 
# v1: flat response shape (legacy)
@v1_router.get("/users/{user_id}/metrics")
async def get_metrics_v1(user_id: str):
    return {
        "user_id":     user_id,
        "event_count": 1234,
        "revenue":     56.78
    }
 
# v2: nested shape with metadata envelope
@v2_router.get("/users/{user_id}/metrics")
async def get_metrics_v2(user_id: str):
    return {
        "data": {
            "user_id": user_id,
            "metrics": {"event_count": 1234, "revenue": 56.78}
        },
        "meta": {
            "refreshed_at": "2024-04-29T10:00:00Z",
            "source":       "user_metrics_daily"
        }
    }

Schema Evolution Policy

SAFE — no version bump required:
  ✅ Adding new optional fields to response
  ✅ Adding new endpoints
  ✅ Relaxing validation rules

BREAKING — always requires a new version:
  ❌ Removing or renaming existing fields
  ❌ Changing field types  (string → int)
  ❌ Making optional fields required
  ❌ Changing pagination shape or semantics

Observability for the Serving Layer

Structured Request Logging

import structlog, time
 
log = structlog.get_logger()
 
@app.middleware("http")
async def log_requests(request: Request, call_next):
    start    = time.time()
    api_key  = request.headers.get("X-API-Key", "anonymous")
    response = await call_next(request)
    duration = round((time.time() - start) * 1000, 2)
 
    log.info(
        "api_request",
        method       = request.method,
        path         = request.url.path,
        status_code  = response.status_code,
        duration_ms  = duration,
        consumer     = API_KEYS.get(api_key, {}).get("name", "unknown"),
        cache_hit    = response.headers.get("X-Cache-Hit", "false"),
    )
    return response

The 4 Metrics That Matter

1. Consumer lag — latency percentiles (p50 / p95 / p99):

metrics.histogram(
    "api_request_duration_ms",
    value=duration_ms,
    tags={"endpoint": request.url.path, "status": response.status_code}
)

2. Cache hit rate — target above 80%:

metrics.increment(
    "cache_lookup_total",
    tags={"result": "hit" if cache_hit else "miss", "endpoint": path}
)

3. Data freshness — expose in every response:

@app.get("/v1/metrics/revenue")
async def revenue_metrics():
    result = query_db("SELECT *, _dbt_updated_at FROM serving.revenue_summary")
    freshness_hours = (datetime.now() - result["_dbt_updated_at"]).total_seconds() / 3600
 
    if freshness_hours > 2:
        log.warning("Stale serving data", hours_stale=freshness_hours)
 
    return {
        "data": result,
        "meta": {
            "refreshed_at":    result["_dbt_updated_at"].isoformat(),
            "freshness_warning": freshness_hours > 1,
        }
    }

4. Query cost tracking — bytes scanned per request:

def log_query_cost(query_id: str, endpoint: str):
    stats = get_query_stats(query_id)
    metrics.histogram("query_bytes_scanned", value=stats["bytes_scanned"],
                      tags={"endpoint": endpoint})
 
    if stats["bytes_scanned"] > 10 * GB:
        alert(f"Expensive query on {endpoint}: {stats['bytes_scanned'] / GB:.1f} GB scanned")

Real-World Failures and Fixes

Failure 1: The Silent Stale Data Problem

What happened: A revenue dashboard showed figures 6 hours stale. The dbt refresh job had silently failed due to an upstream schema change. No alert fired. Consumers had no idea.

The fix: Always expose refreshed_at in your API response and alert when freshness_hours > threshold. Consumers can show a staleness warning. Your on-call team gets paged before anyone notices.

Failure 2: The Unbounded Query

What happened: An analyst hit GET /v1/events with no date filter. The query scanned 2 years of history, returned 50M rows, OOM-killed the API pod, and generated a $400 Databricks bill in a single request.

The fix:

def validate_query_params(
    start_date: Optional[str],
    end_date:   Optional[str],
    max_days:   int = 90
):
    if not start_date or not end_date:
        raise HTTPException(
            status_code=400,
            detail="start_date and end_date are required for this endpoint"
        )
    delta = (parse_date(end_date) - parse_date(start_date)).days
    if delta > max_days:
        raise HTTPException(
            status_code=400,
            detail=f"Date range cannot exceed {max_days} days. Requested: {delta} days."
        )

Failure 3: The Cache Stampede

What happened: A popular endpoint had a 5-minute TTL. At the exact expiry boundary, 200 concurrent requests all got cache misses simultaneously and all fired expensive Databricks queries at once, overloading the warehouse.

The fix — distributed lock to prevent thundering herd:

import asyncio
 
async def get_with_lock(key: str, fetch_func, ttl: int):
    lock_key = f"lock:{key}"
 
    # Fast path — cache hit
    cached = redis_client.get(key)
    if cached:
        return json.loads(cached)
 
    # Slow path — acquire lock, only one process computes
    acquired = redis_client.set(lock_key, "1", nx=True, ex=30)
    if acquired:
        try:
            value = await fetch_func()
            redis_client.setex(key, ttl, json.dumps(value, default=str))
            return value
        finally:
            redis_client.delete(lock_key)
    else:
        # Wait for lock holder to populate cache
        for _ in range(10):
            await asyncio.sleep(0.5)
            cached = redis_client.get(key)
            if cached:
                return json.loads(cached)
        # Fallback: query directly
        return await fetch_func()

Decision Framework

Choose REST if:

✅ Standard data access with well-defined query patterns ✅ External partners or public-facing consumers ✅ Your team knows REST conventions ✅ Simple, stable request/response contract

Choose GraphQL if:

✅ Multiple consumers needing different field subsets ✅ Frontend teams want to own their query shape ✅ Rapidly evolving data model ✅ Internal developer experience is a priority

Choose gRPC if:

✅ High-throughput internal service-to-service calls ✅ Strongly typed contracts are critical ✅ Streaming responses required (live dashboards) ✅ Sub-100ms latency is a hard requirement

Key Takeaways

  1. The lakehouse is not the API — always put a serving layer in between
  2. Materialize aggressively — if a query runs more than 10x/hour, pre-compute it
  3. Cache at two levels — in-memory for hot paths, Redis for shared state
  4. Never return unbounded results — enforce date ranges and page sizes at the API layer
  5. Expose data freshness — consumers deserve to know how old the data is
  6. Rate limit by consumer — not all consumers get the same quota
  7. Version your contracts — breaking changes without versioning destroy trust
  8. Monitor cache hit rate — below 80% means your materialization or TTL strategy is wrong

The serving layer is where your lakehouse investment becomes a product. It deserves the same engineering rigour you give to your storage and transformation layers.


Related: Delta Lake vs Iceberg | Fault-Tolerant Streaming Systems | dbt Best Practices