Data Platform Cost Optimization: A FinOps Approach for Data Engineers
Our data platform costs were growing 40% year-over-year with no clear visibility into spend drivers. Six months later, we'd reduced spend by 40% ($500K annually) while improving performance. Here's the complete playbook.
The Cost Crisis
Initial State:
- Monthly cloud spend: $83K/month ($1M annually)
- Growing 40% YoY with 20% data growth
- No cost attribution or accountability
- No automated anomaly detection
- Teams had unlimited budgets
The Wake-Up Call: A single misconfigured Databricks cluster cost $12K in one weekend.
The FinOps Framework for Data Engineering
Unlike traditional FinOps focused on compute, data platforms have unique cost drivers:
Cost Drivers Hierarchy
┌─────────────────────────────────────┐
│ 1. Compute (50%) │
│ - Databricks DBUs │
│ - Snowflake credits │
│ - EMR clusters │
├─────────────────────────────────────┤
│ 2. Storage (30%) │
│ - S3/ADLS data lake │
│ - Warehouse storage │
│ - Delta Lake versions │
├─────────────────────────────────────┤
│ 3. Data Transfer (15%) │
│ - Cross-region transfers │
│ - Egress charges │
│ - API calls │
├─────────────────────────────────────┤
│ 4. Licensing (5%) │
│ - DBU markup │
│ - Warehouse compute markup │
└─────────────────────────────────────┘
Phase 1: Visibility & Attribution
Problem: No one knew what was costing money.
Solution 1: Unified Cost Dashboard
Built centralized dashboard aggregating costs across platforms:
# cost_aggregation.py
from dataclasses import dataclass
from datetime import datetime
import boto3
import requests
@dataclass
class CostRecord:
platform: str
service: str
team: str
cost_usd: float
date: datetime
resource_id: str
tags: dict
class CostAggregator:
def __init__(self):
self.aws_client = boto3.client('ce')
self.databricks_token = os.getenv('DATABRICKS_TOKEN')
self.snowflake_conn = snowflake.connector.connect(...)
def get_aws_costs(self, start_date: str, end_date: str) -> list[CostRecord]:
"""Fetch AWS Cost Explorer data"""
response = self.aws_client.get_cost_and_usage(
TimePeriod={
'Start': start_date,
'End': end_date
},
Granularity='DAILY',
Metrics=['UnblendedCost'],
GroupBy=[
{'Type': 'DIMENSION', 'Key': 'SERVICE'},
{'Type': 'TAG', 'Key': 'Team'}
]
)
costs = []
for result in response['ResultsByTime']:
date = result['TimePeriod']['Start']
for group in result['Groups']:
service = group['Keys'][0]
team = group['Keys'][1] if len(group['Keys']) > 1 else 'untagged'
cost = float(group['Metrics']['UnblendedCost']['Amount'])
costs.append(CostRecord(
platform='AWS',
service=service,
team=team,
cost_usd=cost,
date=datetime.fromisoformat(date),
resource_id='',
tags={}
))
return costs
def get_databricks_costs(self, start_date: str, end_date: str) -> list[CostRecord]:
"""Fetch Databricks usage from System Tables"""
query = f"""
SELECT
usage_date,
workspace_id,
cluster_id,
usage_metadata.cluster_name as cluster_name,
usage_metadata.job_id as job_id,
custom_tags['Team'] as team,
sku_name,
usage_quantity,
list_price * usage_quantity as cost_usd
FROM system.billing.usage
WHERE usage_date BETWEEN '{start_date}' AND '{end_date}'
AND usage_unit = 'DBUs'
"""
results = self.execute_databricks_sql(query)
costs = []
for row in results:
costs.append(CostRecord(
platform='Databricks',
service=row['sku_name'],
team=row['team'] or 'untagged',
cost_usd=row['cost_usd'],
date=row['usage_date'],
resource_id=row['cluster_id'],
tags={'job_id': row['job_id'], 'cluster_name': row['cluster_name']}
))
return costs
def get_snowflake_costs(self, start_date: str, end_date: str) -> list[CostRecord]:
"""Fetch Snowflake warehouse metering history"""
query = f"""
SELECT
DATE(start_time) as usage_date,
warehouse_name,
database_name,
query_tag:team::string as team,
SUM(credits_used_compute) as credits_used,
SUM(credits_used_compute) * 2.5 as cost_usd
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= '{start_date}'
AND start_time < '{end_date}'
GROUP BY 1, 2, 3, 4
"""
cursor = self.snowflake_conn.cursor()
results = cursor.execute(query).fetchall()
costs = []
for row in results:
costs.append(CostRecord(
platform='Snowflake',
service=row[1],
team=row[3] or 'untagged',
cost_usd=row[5],
date=row[0],
resource_id=row[1],
tags={'database': row[2]}
))
return costs
def aggregate_all_costs(self, start_date: str, end_date: str) -> list[CostRecord]:
"""Aggregate costs from all platforms"""
all_costs = []
all_costs.extend(self.get_aws_costs(start_date, end_date))
all_costs.extend(self.get_databricks_costs(start_date, end_date))
all_costs.extend(self.get_snowflake_costs(start_date, end_date))
return all_costsUsage:
# Aggregate monthly costs
aggregator = CostAggregator()
costs = aggregator.aggregate_all_costs('2024-03-01', '2024-03-31')
# Store in PostgreSQL for dashboarding
store_costs_in_db(costs)Impact:
- Discovered 30% of spend had no team attribution
- Identified top 10 cost drivers accounting for 70% of spend
- Found $8K/month in idle resources
Solution 2: Tagging Strategy
Implemented mandatory tagging across all resources:
# tagging_policy.py
from typing import Dict
REQUIRED_TAGS = {
'Team': ['data-engineering', 'analytics', 'ml-platform', 'data-science'],
'Environment': ['dev', 'staging', 'prod'],
'CostCenter': ['eng', 'product', 'finance'],
'Owner': 'email',
'Project': 'string',
}
def validate_tags(tags: Dict[str, str]) -> tuple[bool, list[str]]:
"""Validate resource tags against policy"""
errors = []
for tag_key, allowed_values in REQUIRED_TAGS.items():
if tag_key not in tags:
errors.append(f"Missing required tag: {tag_key}")
elif isinstance(allowed_values, list) and tags[tag_key] not in allowed_values:
errors.append(f"Invalid value for {tag_key}: {tags[tag_key]}")
return len(errors) == 0, errors
def enforce_tagging_policy():
"""Scan and enforce tagging across AWS resources"""
ec2 = boto3.client('ec2')
# Get untagged resources
response = ec2.describe_instances(
Filters=[
{'Name': 'tag-key', 'Values': ['Team'], 'Operator': 'not-exists'}
]
)
for reservation in response['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
# Stop untagged non-prod instances
if instance['State']['Name'] == 'running':
ec2.stop_instances(InstanceIds=[instance_id])
send_alert(f"Stopped untagged instance: {instance_id}")Databricks Cluster Policy:
{
"custom_tags.Team": {
"type": "allowlist",
"values": ["data-engineering", "analytics", "ml-platform"]
},
"custom_tags.Environment": {
"type": "allowlist",
"values": ["dev", "staging", "prod"]
},
"autotermination_minutes": {
"type": "range",
"minValue": 10,
"maxValue": 60,
"defaultValue": 15
}
}Impact:
- Increased tagged resources from 40% → 95%
- Enabled accurate chargeback per team
- Automated cost allocation
Phase 2: Right-Sizing & Optimization
Databricks Cluster Optimization
Problem: Over-provisioned clusters running 24/7
Analysis Query:
-- Analyze Databricks cluster utilization
SELECT
cluster_id,
cluster_name,
custom_tags['Team'] as team,
COUNT(*) as total_hours,
SUM(CASE WHEN avg_cpu_percent < 20 THEN 1 ELSE 0 END) as underutilized_hours,
SUM(CASE WHEN avg_cpu_percent > 80 THEN 1 ELSE 0 END) as overutilized_hours,
AVG(avg_cpu_percent) as avg_cpu,
AVG(avg_memory_percent) as avg_memory,
SUM(dbu_usage) as total_dbus,
SUM(cost_usd) as total_cost
FROM system.compute.cluster_utilization
WHERE usage_date >= CURRENT_DATE - 30
GROUP BY 1, 2, 3
HAVING avg_cpu < 30 OR total_hours > 500
ORDER BY total_cost DESCRight-Sizing Algorithm:
# cluster_rightsizing.py
from dataclasses import dataclass
@dataclass
class ClusterRecommendation:
cluster_id: str
current_config: dict
recommended_config: dict
estimated_savings_monthly: float
reason: str
def analyze_cluster_utilization(cluster_id: str, days: int = 30) -> ClusterRecommendation:
"""Analyze cluster metrics and recommend right-sizing"""
# Get historical metrics
metrics = get_cluster_metrics(cluster_id, days)
avg_cpu = metrics['avg_cpu_percent']
avg_memory = metrics['avg_memory_percent']
p95_cpu = metrics['p95_cpu_percent']
p95_memory = metrics['p95_memory_percent']
current_config = get_cluster_config(cluster_id)
current_cost = calculate_cluster_cost(current_config)
# Right-sizing logic
if avg_cpu < 30 and avg_memory < 30:
# Downsize: reduce by 50%
recommended_workers = max(current_config['num_workers'] // 2, 2)
reason = "Consistently underutilized (CPU<30%, Memory<30%)"
elif p95_cpu > 85 or p95_memory > 85:
# Upsize: increase by 50%
recommended_workers = int(current_config['num_workers'] * 1.5)
reason = "Frequently hitting resource limits (p95 > 85%)"
else:
# Keep current size
recommended_workers = current_config['num_workers']
reason = "Well-sized for current workload"
recommended_config = {
**current_config,
'num_workers': recommended_workers,
'autoscaling': {
'min_workers': max(recommended_workers // 2, 2),
'max_workers': recommended_workers * 2
}
}
new_cost = calculate_cluster_cost(recommended_config)
savings = current_cost - new_cost
return ClusterRecommendation(
cluster_id=cluster_id,
current_config=current_config,
recommended_config=recommended_config,
estimated_savings_monthly=savings,
reason=reason
)Usage:
# Analyze all clusters
clusters = get_all_production_clusters()
recommendations = [analyze_cluster_utilization(c['id']) for c in clusters]
# Generate report
total_potential_savings = sum(r.estimated_savings_monthly for r in recommendations)
print(f"Total potential monthly savings: ${total_potential_savings:,.2f}")Results:
- Right-sized 45 clusters
- Reduced cluster costs by $18K/month (35%)
- Improved performance through auto-scaling
Snowflake Query Optimization
Problem: Expensive queries consuming 60% of warehouse credits
Find Expensive Queries:
-- Identify most expensive queries
WITH expensive_queries AS (
SELECT
query_id,
query_text,
user_name,
warehouse_name,
database_name,
execution_time,
bytes_scanned,
partitions_scanned,
credits_used_cloud_services,
(execution_time / 1000.0 / 3600.0) * warehouse_size * 2.5 as estimated_cost
FROM snowflake.account_usage.query_history
WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
AND warehouse_name IS NOT NULL
AND execution_status = 'SUCCESS'
)
SELECT
LEFT(query_text, 100) as query_preview,
user_name,
warehouse_name,
COUNT(*) as execution_count,
AVG(execution_time) / 1000.0 as avg_duration_sec,
SUM(bytes_scanned) / 1e12 as total_tb_scanned,
SUM(estimated_cost) as total_cost_usd,
MAX(partitions_scanned) as max_partitions
FROM expensive_queries
GROUP BY 1, 2, 3
HAVING total_cost_usd > 100
ORDER BY total_cost_usd DESC
LIMIT 50;Optimization Example:
-- ❌ BEFORE: Full table scan
SELECT
user_id,
SUM(transaction_amount) as total_spent
FROM raw.transactions -- 5TB table
WHERE transaction_date >= '2024-01-01'
GROUP BY user_id;
-- Cost: $45 per run, 8 min execution time
-- ✅ AFTER: Partitioned + materialized
-- Step 1: Create partitioned table
CREATE TABLE optimized.transactions_partitioned (
transaction_id STRING,
user_id STRING,
transaction_amount DECIMAL(10,2),
transaction_date DATE
)
CLUSTER BY (transaction_date);
-- Step 2: Create materialized view
CREATE MATERIALIZED VIEW analytics.user_spending_daily AS
SELECT
user_id,
transaction_date,
SUM(transaction_amount) as daily_spent,
COUNT(*) as transaction_count
FROM optimized.transactions_partitioned
GROUP BY user_id, transaction_date;
-- Step 3: Query materialized view
SELECT
user_id,
SUM(daily_spent) as total_spent
FROM analytics.user_spending_daily
WHERE transaction_date >= '2024-01-01'
GROUP BY user_id;
-- Cost: $2 per run, 15 sec execution time
-- ✅ Savings: $43 per run (95% reduction)Results:
- Optimized top 20 queries
- Reduced warehouse spend by $12K/month (25%)
- Improved query performance by 85%
Phase 3: Storage Optimization
S3 Lifecycle Policies
Problem: 80TB of data in S3 Standard, only 20% accessed monthly
# s3_lifecycle_optimization.py
import boto3
from datetime import datetime, timedelta
def analyze_s3_access_patterns(bucket: str, prefix: str = '') -> dict:
"""Analyze S3 object access patterns"""
s3 = boto3.client('s3')
# Get all objects
paginator = s3.get_paginator('list_objects_v2')
objects = []
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
if 'Contents' in page:
objects.extend(page['Contents'])
# Categorize by age
now = datetime.now()
hot_data = [] # < 30 days
warm_data = [] # 30-90 days
cold_data = [] # 90-365 days
archive_data = [] # > 365 days
for obj in objects:
age_days = (now - obj['LastModified'].replace(tzinfo=None)).days
size_gb = obj['Size'] / (1024**3)
if age_days < 30:
hot_data.append((obj['Key'], size_gb))
elif age_days < 90:
warm_data.append((obj['Key'], size_gb))
elif age_days < 365:
cold_data.append((obj['Key'], size_gb))
else:
archive_data.append((obj['Key'], size_gb))
return {
'hot': sum(s for _, s in hot_data),
'warm': sum(s for _, s in warm_data),
'cold': sum(s for _, s in cold_data),
'archive': sum(s for _, s in archive_data),
}
def create_intelligent_tiering_policy(bucket: str):
"""Create S3 lifecycle policy"""
s3 = boto3.client('s3')
lifecycle_policy = {
'Rules': [
{
'Id': 'IntelligentTieringPolicy',
'Status': 'Enabled',
'Filter': {'Prefix': 'data/'},
'Transitions': [
{
'Days': 30,
'StorageClass': 'INTELLIGENT_TIERING'
},
{
'Days': 90,
'StorageClass': 'GLACIER_IR'
},
{
'Days': 365,
'StorageClass': 'DEEP_ARCHIVE'
}
],
'NoncurrentVersionTransitions': [
{
'NoncurrentDays': 30,
'StorageClass': 'GLACIER_IR'
}
],
'NoncurrentVersionExpiration': {
'NoncurrentDays': 90
}
}
]
}
s3.put_bucket_lifecycle_configuration(
Bucket=bucket,
LifecycleConfiguration=lifecycle_policy
)Cost Analysis:
# Calculate savings
analysis = analyze_s3_access_patterns('my-data-lake')
# S3 pricing per GB/month
standard_cost = analysis['hot'] * 0.023
ia_cost = analysis['warm'] * 0.0125
glacier_cost = analysis['cold'] * 0.004
deep_archive_cost = analysis['archive'] * 0.00099
current_total = sum(analysis.values()) * 0.023
optimized_total = standard_cost + ia_cost + glacier_cost + deep_archive_cost
print(f"Current monthly cost: ${current_total:,.2f}")
print(f"Optimized monthly cost: ${optimized_total:,.2f}")
print(f"Monthly savings: ${current_total - optimized_total:,.2f}")Results:
- Migrated 60TB from Standard to Intelligent Tiering/Glacier
- Reduced storage costs by $9K/month (55%)
- Maintained access performance for hot data
Delta Lake Retention Optimization
Problem: Keeping 60+ days of Delta Lake versions
-- Analyze Delta Lake version sizes
SELECT
table_name,
COUNT(DISTINCT version) as total_versions,
SUM(size_in_bytes) / 1e9 as total_size_gb,
MAX(version) - MIN(version) as version_span_days,
SUM(size_in_bytes) / COUNT(DISTINCT version) / 1e9 as avg_version_size_gb
FROM delta_lake_metadata.table_versions
GROUP BY table_name
HAVING total_versions > 60
ORDER BY total_size_gb DESC;
-- Optimize retention policy
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
-- Vacuum tables (reduce from 60 to 7 days)
VACUUM delta.bronze.events RETAIN 168 HOURS; -- 7 days
VACUUM delta.silver.transactions RETAIN 168 HOURS;
VACUUM delta.gold.aggregates RETAIN 336 HOURS; -- 14 days
-- Re-enable safety check
SET spark.databricks.delta.retentionDurationCheck.enabled = true;Results:
- Reduced Delta Lake storage by 40%
- Saved $5K/month on S3 costs
- Maintained adequate time-travel window
Phase 4: Spot Instances & Reserved Capacity
AWS Spot Instances for EMR
# emr_spot_config.py
emr_spot_config = {
'Name': 'Cost-Optimized-EMR-Cluster',
'ReleaseLabel': 'emr-6.10.0',
'Instances': {
'InstanceFleets': [
{
'Name': 'Master',
'InstanceFleetType': 'MASTER',
'TargetOnDemandCapacity': 1,
'InstanceTypeConfigs': [
{
'InstanceType': 'm5.xlarge',
'WeightedCapacity': 1
}
]
},
{
'Name': 'Core',
'InstanceFleetType': 'CORE',
'TargetOnDemandCapacity': 2, # 20% on-demand
'TargetSpotCapacity': 8, # 80% spot
'InstanceTypeConfigs': [
{
'InstanceType': 'r5.2xlarge',
'WeightedCapacity': 2,
'BidPriceAsPercentageOfOnDemandPrice': 100
},
{
'InstanceType': 'r5.4xlarge',
'WeightedCapacity': 4,
'BidPriceAsPercentageOfOnDemandPrice': 100
}
],
'LaunchSpecifications': {
'SpotSpecification': {
'TimeoutDurationMinutes': 10,
'TimeoutAction': 'SWITCH_TO_ON_DEMAND',
'AllocationStrategy': 'capacity-optimized'
}
}
}
]
}
}Results:
- Saved 60% on core nodes ($8K/month)
- Zero job failures due to spot interruptions
- Used capacity-optimized allocation strategy
Databricks Reserved Capacity
# Calculate break-even for reserved DBUs
monthly_dbu_usage = 50000
on_demand_price_per_dbu = 0.55
reserved_price_per_dbu = 0.35 # 36% discount
monthly_on_demand_cost = monthly_dbu_usage * on_demand_price_per_dbu
monthly_reserved_cost = monthly_dbu_usage * reserved_price_per_dbu
annual_savings = (monthly_on_demand_cost - monthly_reserved_cost) * 12
print(f"Annual savings with reserved: ${annual_savings:,.2f}")
# Output: Annual savings: $132,000Results:
- Committed to 1-year reserved DBUs
- Saved $11K/month (36% discount)
- ROI achieved in 3 months
Phase 5: Automated Cost Anomaly Detection
# cost_anomaly_detection.py
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from dataclasses import dataclass
@dataclass
class CostAnomaly:
date: str
platform: str
service: str
team: str
expected_cost: float
actual_cost: float
anomaly_score: float
severity: str
class CostAnomalyDetector:
def __init__(self):
self.model = IsolationForest(
contamination=0.1,
random_state=42
)
def train(self, historical_costs: pd.DataFrame):
"""Train on 90 days of historical data"""
features = self.extract_features(historical_costs)
self.model.fit(features)
def extract_features(self, costs_df: pd.DataFrame) -> np.ndarray:
"""Extract relevant features"""
costs_df['day_of_week'] = pd.to_datetime(costs_df['date']).dt.dayofweek
costs_df['day_of_month'] = pd.to_datetime(costs_df['date']).dt.day
costs_df['week_of_year'] = pd.to_datetime(costs_df['date']).dt.isocalendar().week
features = costs_df[[
'day_of_week',
'day_of_month',
'week_of_year',
'cost_usd'
]].values
return features
def detect_anomalies(self, current_costs: pd.DataFrame) -> list[CostAnomaly]:
"""Detect anomalies in current day's costs"""
features = self.extract_features(current_costs)
predictions = self.model.predict(features)
scores = self.model.score_samples(features)
anomalies = []
for idx, is_anomaly in enumerate(predictions):
if is_anomaly == -1:
row = current_costs.iloc[idx]
historical = self.get_historical_baseline(
row['platform'],
row['service'],
row['team']
)
severity = self.calculate_severity(
row['cost_usd'],
historical['mean'],
historical['std']
)
anomalies.append(CostAnomaly(
date=row['date'],
platform=row['platform'],
service=row['service'],
team=row['team'],
expected_cost=historical['mean'],
actual_cost=row['cost_usd'],
anomaly_score=abs(scores[idx]),
severity=severity
))
return anomalies
def calculate_severity(self, actual: float, mean: float, std: float) -> str:
"""Calculate severity based on std deviations"""
z_score = abs((actual - mean) / std)
if z_score > 4:
return 'critical'
elif z_score > 3:
return 'high'
elif z_score > 2:
return 'medium'
else:
return 'low'Usage:
# Daily anomaly detection job
detector = CostAnomalyDetector()
# Train on historical data
historical = get_costs_last_90_days()
detector.train(historical)
# Detect today's anomalies
today_costs = get_today_costs()
anomalies = detector.detect_anomalies(today_costs)
# Alert on critical/high
for anomaly in anomalies:
if anomaly.severity in ['critical', 'high']:
send_slack_alert(f"""
🚨 Cost Anomaly - {anomaly.severity.upper()}
Platform: {anomaly.platform}
Service: {anomaly.service}
Team: {anomaly.team}
Expected: ${anomaly.expected_cost:,.2f}
Actual: ${anomaly.actual_cost:,.2f}
Difference: ${anomaly.actual_cost - anomaly.expected_cost:,.2f}
""")Results:
- Detected anomalies within 2 hours (vs 2 weeks)
- Prevented $15K+ in runaway costs
- Average response time: 30 minutes
Phase 6: Chargeback & Accountability
# team_chargeback.py
from datetime import datetime
import pandas as pd
def generate_monthly_chargeback_report(month: str):
"""Generate detailed chargeback per team"""
costs = get_monthly_costs(month)
# Group by team
team_costs = costs.groupby('team').agg({
'cost_usd': 'sum',
'resource_id': 'count'
}).rename(columns={'resource_id': 'resource_count'})
# Add budget comparison
budgets = get_team_budgets()
team_costs['budget'] = team_costs.index.map(budgets)
team_costs['budget_used_pct'] = (
team_costs['cost_usd'] / team_costs['budget'] * 100
).round(1)
team_costs['over_budget'] = team_costs['cost_usd'] > team_costs['budget']
return team_costs
def send_team_cost_reports():
"""Send monthly reports to each team"""
month = datetime.now().strftime('%Y-%m')
team_costs = generate_monthly_chargeback_report(month)
for team in team_costs.index:
report = generate_team_report(team, team_costs.loc[team])
send_email(get_team_email(team), 'Monthly Cost Report', report)Results:
- Teams now accountable for spend
- 90% of teams staying within budget
- Drove team-level optimization initiatives
Final Results
Cost Reduction Summary
| Category | Before | After | Savings | % Reduction |
|---|---|---|---|---|
| Databricks Clusters | $28K | $18K | $10K | 36% |
| Snowflake Warehouses | $22K | $14K | $8K | 36% |
| AWS EMR | $15K | $9K | $6K | 40% |
| S3 Storage | $12K | $6K | $6K | 50% |
| Data Transfer | $6K | $3K | $3K | 50% |
| Total | $83K | $50K | $33K | 40% |
Annual Savings: $396K → $500K
Key Success Metrics
✅ 40% cost reduction while improving performance
✅ 95% resource tagging (from 40%)
✅ 100% budget visibility across teams
✅ 2-hour anomaly detection (from 2 weeks)
✅ Zero cost overruns in Q4 2024
✅ ROI: 8x (FinOps team cost vs savings)
Lessons Learned
What Worked
- Visibility First - Can't optimize what you can't measure
- Start with Low-Hanging Fruit - Storage & idle resources = quick wins
- Automated Enforcement - Policies > manual reviews
- Team Accountability - Chargeback drives behavior change
- Continuous Monitoring - One-time optimization ≠ lasting results
What Didn't Work
- Manual Reviews - Didn't scale, teams ignored recommendations
- Generic Policies - One-size-fits-all failed
- Over-Aggressive Cuts - Broke critical workflows
- Ignoring Performance - Saved money but hurt SLAs
Mistakes to Avoid
❌ DON'T: Optimize without understanding criticality
✅ DO: Classify workloads (critical/important/best-effort)
❌ DON'T: Set unrealistic budgets
✅ DO: Base on historical + growth projections
❌ DON'T: Optimize everything at once
✅ DO: Prioritize by $ impact and ease
❌ DON'T: Forget performance impact
✅ DO: Monitor SLAs during optimization
Implementation Roadmap
Month 1: Foundation
- Set up cost aggregation pipeline
- Implement tagging policy
- Create unified cost dashboard
- Baseline current spend
Month 2: Quick Wins
- Implement S3 lifecycle policies
- Enable auto-termination on dev clusters
- Right-size over-provisioned resources
- Clean up unused resources
Month 3: Advanced Optimization
- Deploy anomaly detection
- Implement reserved capacity
- Optimize expensive queries
- Set up chargeback reporting
Month 4+: Continuous Improvement
- Monthly cost reviews per team
- Quarterly optimization sprints
- Annual reserved capacity planning
- Expand automation
Tools & Resources
Cost Monitoring
- AWS Cost Explorer - Native AWS costs
- Databricks System Tables - DBU usage
- Snowflake Account Usage - Warehouse metering
- Grafana - Unified dashboarding
Optimization
- AWS Trusted Advisor - Right-sizing
- Databricks Cluster Policies - Governance
- Snowflake Query Profile - Performance
Automation
- Terraform - Infrastructure as Code
- Apache Airflow - Cost pipeline orchestration
- Python + Boto3 - Custom automation
Conclusion
Data platform cost optimization isn't a one-time project—it's an ongoing practice requiring:
- Visibility into where money goes
- Attribution to drive accountability
- Automation to enforce policies at scale
- Culture that values cost-consciousness
Our 40% reduction came from systematic application of these principles.
The FinOps mindset for data engineers:
Every query has a cost. Every table has a cost. Every idle cluster has a cost. Make them count.
Next Steps
Want to implement FinOps for your data platform?
- Audit - Run cost analysis
- Prioritize - Identify top 10 cost drivers
- Execute - Start with storage (easiest wins)
- Iterate - Monthly reviews
Questions? Reach out: contact@vasudevarao.com
This post is based on real cost optimization work reducing annual spend from $1M to $600K. Code examples are simplified for clarity.