Building Data APIs on Top of Your Lakehouse: Serving Layer Design
A lakehouse stores your data beautifully. But raw Delta tables and Parquet files are not how your downstream consumers — applications, dashboards, data science teams, external partners — should be touching your data.
The serving layer is what sits between your lakehouse and the world. Getting it wrong means slow queries, runaway costs, brittle integrations, and 3am pages. Getting it right means your lakehouse becomes a product.
Why the Serving Layer Matters
Most teams treat the serving layer as an afterthought. They expose Databricks SQL endpoints directly, give every team JDBC credentials, and call it done.
Then six months later:
- A dashboard query does a full table scan on a 500GB Delta table
- A data science team runs a heavy join during peak business hours
- An external partner hammers the API with 10,000 requests/minute
- Nobody knows who is querying what, or why costs spiked 300% this month
The serving layer is where you enforce contracts, control costs, and make your data accessible without making it dangerous.
Serving Layer Architecture Overview
Lakehouse (Delta / Iceberg)
│
▼
Materialized Views / Feature Store
│
▼
Query Engine (Databricks SQL / Trino / DuckDB)
│
▼
┌─────────────────────────────────────┐
│ REST API │ GraphQL │ gRPC │
│ Cache │ Rate Limiting │
│ Auth │ Schema Registry │
└─────────────────────────────────────┘
│
▼
┌──────────────────────────────────────┐
│ Apps │ Dashboards │ ML │ Partners │
└──────────────────────────────────────┘
The key insight: your lakehouse is not your API. The serving layer translates between the storage-optimised world of your lakehouse and the latency-optimised world of your consumers.
Layer 1: Materialization Strategy
Before writing a single API endpoint, decide what data you are actually serving. Hitting raw Delta tables on every API request is almost always the wrong answer.
Materialized Views
Pre-compute the results your consumers will ask for most often.
In Databricks:
-- Refresh hourly via Databricks Workflows
CREATE OR REPLACE TABLE serving.user_metrics_daily AS
SELECT
user_id,
DATE(event_time) AS date,
COUNT(*) AS event_count,
COUNT(DISTINCT session_id) AS session_count,
SUM(revenue) AS total_revenue,
MAX(event_time) AS last_seen
FROM lakehouse.events.raw
WHERE event_time >= CURRENT_DATE - INTERVAL 90 DAYS
GROUP BY user_id, DATE(event_time);In dbt (recommended for most teams):
-- models/serving/user_metrics_daily.sql
{{ config(
materialized='incremental',
unique_key='user_date_key',
partition_by={'field': 'date', 'data_type': 'date'},
cluster_by=['user_id'],
on_schema_change='fail'
) }}
SELECT
{{ dbt_utils.generate_surrogate_key(['user_id', 'date']) }} AS user_date_key,
user_id,
DATE(event_time) AS date,
COUNT(*) AS event_count,
SUM(revenue) AS total_revenue,
MAX(event_time) AS last_seen
FROM {{ ref('stg_events') }}
{% if is_incremental() %}
WHERE event_time >= (SELECT MAX(last_seen) FROM {{ this }})
{% endif %}
GROUP BY user_id, DATE(event_time)Aggregation Tiers
Not all consumers need the same granularity. Build multiple tiers:
Raw Events (lakehouse) → full fidelity, high cost
│
▼
Hourly Aggregates (serving) → medium granularity, low cost
│
▼
Daily Aggregates (serving) → coarse granularity, near-zero cost
│
▼
Summary Metrics (Redis) → single-row lookups, sub-millisecond
Rule of thumb: If a query runs more than 10 times per hour — materialize it. If it runs more than 1,000 times per hour — cache it.
Layer 2: Query Engine Selection
Choosing the right query engine for your serving layer is as important as your API design.
Option A: Databricks SQL Warehouse
Best when you're already on Databricks and need full SQL flexibility.
from databricks import sql
def query_databricks(sql_query: str, params: dict = None):
with sql.connect(
server_hostname=DATABRICKS_HOST,
http_path=SQL_WAREHOUSE_PATH,
access_token=DATABRICKS_TOKEN
) as connection:
with connection.cursor() as cursor:
cursor.execute(sql_query, parameters=params)
return cursor.fetchall()
# Usage
results = query_databricks(
"SELECT * FROM serving.user_metrics_daily WHERE user_id = :user_id",
params={"user_id": "u123"}
)Pros: Full Delta Lake support, Unity Catalog integration, auto-scaling Cons: Cold start latency (30–60s for serverless), cost per query
Option B: Trino / Presto
Best for multi-source queries and when you need engine independence.
from trino.dbapi import connect
def query_trino(sql_query: str):
conn = connect(
host="trino.internal",
port=8080,
user="api-service",
catalog="delta",
schema="serving"
)
cursor = conn.cursor()
cursor.execute(sql_query)
return cursor.fetchall()Pros: Engine-agnostic, no vendor lock-in, fast for pre-optimised tables Cons: Requires cluster management, no auto-scaling out of the box
Option C: DuckDB
Best for serving layers with data under ~100GB. Surprisingly powerful for the weight class.
import duckdb
conn = duckdb.connect()
conn.execute("INSTALL delta; LOAD delta;")
def query_delta_local(table_path: str, query: str):
conn.execute(
f"CREATE VIEW IF NOT EXISTS tbl AS SELECT * FROM delta_scan('{table_path}')"
)
return conn.execute(query).fetchall()
# Query Parquet files on S3 directly — no cluster needed
def query_s3_parquet(s3_path: str, filter_clause: str):
return conn.execute(f"""
SELECT *
FROM read_parquet('{s3_path}/**/*.parquet', hive_partitioning=true)
WHERE {filter_clause}
""").fetchall()Pros: Near-zero infrastructure, embeds in your API process, blazing fast for medium data Cons: Single-node only, not suitable for multi-tenant high-concurrency
Layer 3: API Design Patterns
REST API with FastAPI
The standard choice for most data APIs. Clean, well-understood, easy to version.
from fastapi import FastAPI, Depends, HTTPException, Query
from pydantic import BaseModel
from typing import Optional
import time
app = FastAPI(title="Lakehouse Data API", version="1.0.0")
class UserMetrics(BaseModel):
user_id: str
date: str
event_count: int
total_revenue: float
last_seen: str
class MetricsResponse(BaseModel):
data: list[UserMetrics]
total: int
page: int
page_size: int
query_time_ms: float
@app.get("/v1/users/{user_id}/metrics", response_model=MetricsResponse)
async def get_user_metrics(
user_id: str,
start_date: str = Query(..., description="YYYY-MM-DD"),
end_date: str = Query(..., description="YYYY-MM-DD"),
page: int = Query(1, ge=1),
page_size: int = Query(100, ge=1, le=1000),
db = Depends(get_db_connection)
):
start = time.time()
validate_date_range(start_date, end_date, max_days=90)
offset = (page - 1) * page_size
rows = db.execute("""
SELECT user_id, date, event_count, total_revenue, last_seen
FROM serving.user_metrics_daily
WHERE user_id = ?
AND date BETWEEN ? AND ?
ORDER BY date DESC
LIMIT ? OFFSET ?
""", [user_id, start_date, end_date, page_size, offset]).fetchall()
total = db.execute("""
SELECT COUNT(*) FROM serving.user_metrics_daily
WHERE user_id = ? AND date BETWEEN ? AND ?
""", [user_id, start_date, end_date]).fetchone()[0]
return MetricsResponse(
data=[UserMetrics(**dict(zip(UserMetrics.__fields__, r))) for r in rows],
total=total,
page=page,
page_size=page_size,
query_time_ms=round((time.time() - start) * 1000, 2)
)Pagination Patterns
Never return unbounded result sets. Always paginate.
Offset pagination — simple, good for small datasets:
# GET /v1/events?page=3&page_size=100
# Problem: expensive at large offsets (DB must skip all preceding rows)
@app.get("/v1/events")
async def list_events(page: int = 1, page_size: int = 100):
offset = (page - 1) * page_size
return db.execute(
"SELECT * FROM events ORDER BY event_time DESC LIMIT ? OFFSET ?",
[page_size, offset]
).fetchall()Cursor pagination — recommended for large datasets:
# GET /v1/events?cursor=2024-04-01T12:00:00&limit=100
# O(1) regardless of page depth — no skipping rows
@app.get("/v1/events")
async def list_events(
cursor: Optional[str] = None,
limit: int = Query(100, le=1000)
):
if cursor:
rows = db.execute("""
SELECT * FROM events
WHERE event_time < ?
ORDER BY event_time DESC
LIMIT ?
""", [cursor, limit + 1]).fetchall()
else:
rows = db.execute(
"SELECT * FROM events ORDER BY event_time DESC LIMIT ?",
[limit + 1]
).fetchall()
has_more = len(rows) > limit
data = rows[:limit]
next_cursor = data[-1]["event_time"] if has_more else None
return {"data": data, "next_cursor": next_cursor, "has_more": has_more}GraphQL with Strawberry
Best when consumers need flexible field selection — dashboards, apps, notebooks.
import strawberry
from strawberry.fastapi import GraphQLRouter
@strawberry.type
class UserMetrics:
user_id: str
date: str
event_count: int
total_revenue: float
@strawberry.type
class Query:
@strawberry.field
def user_metrics(
self,
user_id: str,
start_date: str,
end_date: str,
limit: int = 100
) -> list[UserMetrics]:
rows = db.execute("""
SELECT user_id, date, event_count, total_revenue
FROM serving.user_metrics_daily
WHERE user_id = ? AND date BETWEEN ? AND ?
LIMIT ?
""", [user_id, start_date, end_date, limit]).fetchall()
return [UserMetrics(**dict(r)) for r in rows]
@strawberry.field
def top_users(self, date: str, limit: int = 10) -> list[UserMetrics]:
rows = db.execute("""
SELECT user_id, date, event_count, total_revenue
FROM serving.user_metrics_daily
WHERE date = ?
ORDER BY total_revenue DESC
LIMIT ?
""", [date, limit]).fetchall()
return [UserMetrics(**dict(r)) for r in rows]
schema = strawberry.Schema(query=Query)
app.include_router(GraphQLRouter(schema), prefix="/graphql")GraphQL shines when:
- Consumers need different subsets of fields
- You want to avoid versioning for field additions
- Frontend teams want to own their own query shapes
Layer 4: Caching Strategy
Caching is the difference between a serving layer that scales and one that doesn't.
Two-Level Cache Architecture
Request
│
▼
L1: In-Memory Cache (per-instance, TTL: 60s)
│ miss
▼
L2: Redis Cache (shared across instances, TTL: 5min–1hr)
│ miss
▼
Query Engine (Databricks SQL / Trino / DuckDB)
│
▼
Lakehouse (Delta / Iceberg)
Redis Caching Implementation
import redis
import json
import hashlib
from functools import wraps
redis_client = redis.Redis(host="redis.internal", port=6379, decode_responses=True)
def cache_key(prefix: str, **kwargs) -> str:
params = json.dumps(kwargs, sort_keys=True)
hash_val = hashlib.md5(params.encode()).hexdigest()
return f"{prefix}:{hash_val}"
def cached(prefix: str, ttl: int = 300):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
key = cache_key(prefix, **kwargs)
cached = redis_client.get(key)
if cached:
return json.loads(cached)
result = await func(*args, **kwargs)
redis_client.setex(key, ttl, json.dumps(result, default=str))
return result
return wrapper
return decorator
# Usage — cache user summaries for 5 minutes
@app.get("/v1/users/{user_id}/summary")
@cached(prefix="user_summary", ttl=300)
async def get_user_summary(user_id: str):
return query_lakehouse(
"SELECT * FROM serving.user_summary WHERE user_id = ?",
[user_id]
)Cache Invalidation on Data Refresh
When your materialized views refresh, invalidate the relevant cache keys:
# Triggered after dbt run or Databricks Workflow completes
def invalidate_cache_on_refresh(tables_refreshed: list[str]):
table_to_pattern = {
"user_metrics_daily": "user_summary:*",
"revenue_summary": "revenue:*",
"product_metrics": "product:*",
}
for table in tables_refreshed:
if table in table_to_pattern:
pattern = table_to_pattern[table]
keys = redis_client.keys(pattern)
if keys:
redis_client.delete(*keys)
log.info("Invalidated %d cache keys for %s", len(keys), table)Cache TTL by Data Freshness
| Data Type | Update Frequency | Recommended TTL |
|---|---|---|
| Real-time metrics | Streaming (seconds) | 10–30 seconds |
| Operational dashboards | Hourly refresh | 5–15 minutes |
| Business metrics | Daily refresh | 1–6 hours |
| Historical aggregates | Weekly / static | 24 hours |
| Reference / lookup data | Rarely changes | 7 days |
Layer 5: Rate Limiting and Access Control
Rate Limiting with slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Standard endpoint
@app.get("/v1/users/{user_id}/metrics")
@limiter.limit("100/minute")
async def get_user_metrics(request: Request, user_id: str):
...
# Expensive endpoint — much stricter
@app.get("/v1/reports/full-export")
@limiter.limit("5/hour")
async def full_export(request: Request):
...API Key Authentication with Scopes
from fastapi.security import APIKeyHeader
API_KEYS = {
"key_dashboard_prod": {
"name": "Production Dashboard",
"scopes": ["read:metrics", "read:reports"]
},
"key_ds_team": {
"name": "Data Science Team",
"scopes": ["read:metrics", "read:reports", "write:annotations"]
},
"key_external_partner": {
"name": "Acme Corp Integration",
"scopes": ["read:metrics"] # restricted — no reports
},
}
api_key_header = APIKeyHeader(name="X-API-Key")
def require_scope(scope: str):
def dependency(api_key: str = Depends(api_key_header)):
if api_key not in API_KEYS:
raise HTTPException(status_code=401, detail="Invalid API key")
if scope not in API_KEYS[api_key]["scopes"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
return API_KEYS[api_key]
return dependency
@app.get("/v1/reports/revenue")
async def revenue_report(key_info=Depends(require_scope("read:reports"))):
...Row-Level Security
When different consumers should see different subsets of your data:
def get_tenant_filter(api_key: str) -> dict:
key_info = API_KEYS.get(api_key, {})
return {
"tenant_id": key_info.get("tenant_id"),
"allowed_regions": key_info.get("allowed_regions", []),
"max_classification": key_info.get("max_classification", "public"),
}
@app.get("/v1/metrics")
async def get_metrics(api_key: str = Depends(api_key_header), db=Depends(get_db)):
f = get_tenant_filter(api_key)
# Inject tenant filter into every query — never trust the caller
rows = db.execute("""
SELECT * FROM serving.metrics
WHERE tenant_id = ?
AND region = ANY(?)
AND data_classification <= ?
""", [f["tenant_id"], f["allowed_regions"], f["max_classification"]]).fetchall()
return rowsLayer 6: Schema Management and Versioning
API Versioning Strategy
# URL versioning — most explicit, easiest to maintain
app.include_router(v1_router, prefix="/v1")
app.include_router(v2_router, prefix="/v2")
# v1: flat response shape (legacy)
@v1_router.get("/users/{user_id}/metrics")
async def get_metrics_v1(user_id: str):
return {
"user_id": user_id,
"event_count": 1234,
"revenue": 56.78
}
# v2: nested shape with metadata envelope
@v2_router.get("/users/{user_id}/metrics")
async def get_metrics_v2(user_id: str):
return {
"data": {
"user_id": user_id,
"metrics": {"event_count": 1234, "revenue": 56.78}
},
"meta": {
"refreshed_at": "2024-04-29T10:00:00Z",
"source": "user_metrics_daily"
}
}Schema Evolution Policy
SAFE — no version bump required:
✅ Adding new optional fields to response
✅ Adding new endpoints
✅ Relaxing validation rules
BREAKING — always requires a new version:
❌ Removing or renaming existing fields
❌ Changing field types (string → int)
❌ Making optional fields required
❌ Changing pagination shape or semantics
Observability for the Serving Layer
Structured Request Logging
import structlog, time
log = structlog.get_logger()
@app.middleware("http")
async def log_requests(request: Request, call_next):
start = time.time()
api_key = request.headers.get("X-API-Key", "anonymous")
response = await call_next(request)
duration = round((time.time() - start) * 1000, 2)
log.info(
"api_request",
method = request.method,
path = request.url.path,
status_code = response.status_code,
duration_ms = duration,
consumer = API_KEYS.get(api_key, {}).get("name", "unknown"),
cache_hit = response.headers.get("X-Cache-Hit", "false"),
)
return responseThe 4 Metrics That Matter
1. Consumer lag — latency percentiles (p50 / p95 / p99):
metrics.histogram(
"api_request_duration_ms",
value=duration_ms,
tags={"endpoint": request.url.path, "status": response.status_code}
)2. Cache hit rate — target above 80%:
metrics.increment(
"cache_lookup_total",
tags={"result": "hit" if cache_hit else "miss", "endpoint": path}
)3. Data freshness — expose in every response:
@app.get("/v1/metrics/revenue")
async def revenue_metrics():
result = query_db("SELECT *, _dbt_updated_at FROM serving.revenue_summary")
freshness_hours = (datetime.now() - result["_dbt_updated_at"]).total_seconds() / 3600
if freshness_hours > 2:
log.warning("Stale serving data", hours_stale=freshness_hours)
return {
"data": result,
"meta": {
"refreshed_at": result["_dbt_updated_at"].isoformat(),
"freshness_warning": freshness_hours > 1,
}
}4. Query cost tracking — bytes scanned per request:
def log_query_cost(query_id: str, endpoint: str):
stats = get_query_stats(query_id)
metrics.histogram("query_bytes_scanned", value=stats["bytes_scanned"],
tags={"endpoint": endpoint})
if stats["bytes_scanned"] > 10 * GB:
alert(f"Expensive query on {endpoint}: {stats['bytes_scanned'] / GB:.1f} GB scanned")Real-World Failures and Fixes
Failure 1: The Silent Stale Data Problem
What happened: A revenue dashboard showed figures 6 hours stale. The dbt refresh job had silently failed due to an upstream schema change. No alert fired. Consumers had no idea.
The fix: Always expose refreshed_at in your API response and alert when freshness_hours > threshold. Consumers can show a staleness warning. Your on-call team gets paged before anyone notices.
Failure 2: The Unbounded Query
What happened: An analyst hit GET /v1/events with no date filter. The query scanned 2 years of history, returned 50M rows, OOM-killed the API pod, and generated a $400 Databricks bill in a single request.
The fix:
def validate_query_params(
start_date: Optional[str],
end_date: Optional[str],
max_days: int = 90
):
if not start_date or not end_date:
raise HTTPException(
status_code=400,
detail="start_date and end_date are required for this endpoint"
)
delta = (parse_date(end_date) - parse_date(start_date)).days
if delta > max_days:
raise HTTPException(
status_code=400,
detail=f"Date range cannot exceed {max_days} days. Requested: {delta} days."
)Failure 3: The Cache Stampede
What happened: A popular endpoint had a 5-minute TTL. At the exact expiry boundary, 200 concurrent requests all got cache misses simultaneously and all fired expensive Databricks queries at once, overloading the warehouse.
The fix — distributed lock to prevent thundering herd:
import asyncio
async def get_with_lock(key: str, fetch_func, ttl: int):
lock_key = f"lock:{key}"
# Fast path — cache hit
cached = redis_client.get(key)
if cached:
return json.loads(cached)
# Slow path — acquire lock, only one process computes
acquired = redis_client.set(lock_key, "1", nx=True, ex=30)
if acquired:
try:
value = await fetch_func()
redis_client.setex(key, ttl, json.dumps(value, default=str))
return value
finally:
redis_client.delete(lock_key)
else:
# Wait for lock holder to populate cache
for _ in range(10):
await asyncio.sleep(0.5)
cached = redis_client.get(key)
if cached:
return json.loads(cached)
# Fallback: query directly
return await fetch_func()Decision Framework
Choose REST if:
✅ Standard data access with well-defined query patterns ✅ External partners or public-facing consumers ✅ Your team knows REST conventions ✅ Simple, stable request/response contract
Choose GraphQL if:
✅ Multiple consumers needing different field subsets ✅ Frontend teams want to own their query shape ✅ Rapidly evolving data model ✅ Internal developer experience is a priority
Choose gRPC if:
✅ High-throughput internal service-to-service calls ✅ Strongly typed contracts are critical ✅ Streaming responses required (live dashboards) ✅ Sub-100ms latency is a hard requirement
Key Takeaways
- The lakehouse is not the API — always put a serving layer in between
- Materialize aggressively — if a query runs more than 10x/hour, pre-compute it
- Cache at two levels — in-memory for hot paths, Redis for shared state
- Never return unbounded results — enforce date ranges and page sizes at the API layer
- Expose data freshness — consumers deserve to know how old the data is
- Rate limit by consumer — not all consumers get the same quota
- Version your contracts — breaking changes without versioning destroy trust
- Monitor cache hit rate — below 80% means your materialization or TTL strategy is wrong
The serving layer is where your lakehouse investment becomes a product. It deserves the same engineering rigour you give to your storage and transformation layers.
Related: Delta Lake vs Iceberg | Fault-Tolerant Streaming Systems | dbt Best Practices