Comprehensive monitoring and observability setup for the data ingestion architecture using Prometheus, Grafana, ELK Stack, and Jaeger.
┌─────────────────────────────────────────────────────────────────┐
│ Monitoring & Observability │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Metrics │ │ Logs │ │ Traces │ │
│ │ (Prometheus) │ │ (ELK Stack) │ │ (Jaeger) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └──────────────────┴──────────────────┘ │
│ │ │
│ ┌───────▼────────┐ │
│ │ Grafana │ │
│ │ (Dashboards) │ │
│ └────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
cluster: 'data-ingestion-prod'
environment: 'production'
scrape_configs:
# Collection Layer Services
- job_name: 'shipping-collector'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- data-ingestion
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
regex: shipping-collector
action: keep
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
regex: true
action: keep
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
regex: (.+)
target_label: __metrics_path__
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_port]
regex: (.+)
target_label: __address__
replacement: ${1}:${2}
- job_name: 'inventory-collector'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- data-ingestion
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
regex: inventory-collector
action: keep
- job_name: 'supplier-collector'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- data-ingestion
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
regex: supplier-collector
action: keep
# Processing Layer Services
- job_name: 'flink-metrics'
static_configs:
- targets:
- 'flink-jobmanager:9249'
- 'flink-taskmanager-0:9249'
- 'flink-taskmanager-1:9249'
- 'flink-taskmanager-2:9249'
# Storage Layer Services
- job_name: 'storage-router'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- data-ingestion
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
regex: storage-router
action: keep
- job_name: 'query-api'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- data-ingestion
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
regex: query-api
action: keep
# Infrastructure Services
- job_name: 'kafka'
static_configs:
- targets:
- 'kafka-0.kafka:7071'
- 'kafka-1.kafka:7071'
- 'kafka-2.kafka:7071'
- job_name: 'postgres'
static_configs:
- targets:
- 'postgres-exporter:9187'
- job_name: 'timescaledb'
static_configs:
- targets:
- 'timescale-exporter:9187'
- job_name: 'redis'
static_configs:
- targets:
- 'redis-exporter:9121'
alerting:
alertmanagers:
- static_configs:
- targets:
- 'alertmanager:9093'
rule_files:
- '/etc/prometheus/alerts/*.yml'# Messages received
collector_messages_received_total{service="shipping-collector", status="success"}
# Messages published to Kafka
collector_messages_published_total{service="shipping-collector"}
# Processing duration
collector_processing_duration_seconds{service="shipping-collector"}
# Error rates
collector_errors_total{service="shipping-collector", error_type="api_timeout"}
# API rate limiting
collector_rate_limit_exceeded_total{service="shipping-collector"}
# Messages processed
pipeline_messages_processed_total{stage="validation", status="success"}
# Processing latency
pipeline_processing_duration_seconds{stage="transformation"}
# Validation failures
pipeline_validation_failures_total{error_type="schema_violation"}
# Enrichment cache performance
pipeline_enrichment_cache_hits_total
pipeline_enrichment_cache_misses_total
# Backpressure indicators
flink_taskmanager_job_task_buffers_inPoolUsage
flink_taskmanager_job_task_checkpointAlignmentTime
# Write operations
storage_write_latency_seconds{backend="timescaledb", operation="insert"}
storage_write_errors_total{backend="postgres", error_type="connection_refused"}
# Query performance
storage_query_duration_seconds{endpoint="/api/v1/shipments"}
storage_query_errors_total{endpoint="/api/v1/inventory"}
# Capacity metrics
storage_capacity_bytes{backend="s3"}
storage_capacity_utilization{backend="postgres"}
# Connection pool metrics
storage_connection_pool_active{backend="postgres"}
storage_connection_pool_idle{backend="postgres"}
# alerts/collection-layer.yml
groups:
- name: collection-layer
interval: 30s
rules:
- alert: HighCollectorErrorRate
expr: rate(collector_errors_total[5m]) > 10
for: 5m
labels:
severity: warning
layer: collection
annotations:
summary: "High error rate in {{ $labels.service }}"
description: "Error rate is {{ $value }} errors/s"
- alert: CollectorDown
expr: up{job=~".*-collector"} == 0
for: 2m
labels:
severity: critical
layer: collection
annotations:
summary: "Collector {{ $labels.job }} is down"
- alert: KafkaProducerLag
expr: kafka_producer_record_send_total - kafka_producer_record_ack_total > 1000
for: 5m
labels:
severity: warning
layer: collection
annotations:
summary: "Kafka producer lag detected"
# alerts/processing-layer.yml
groups:
- name: processing-layer
interval: 30s
rules:
- alert: HighValidationFailureRate
expr: rate(pipeline_validation_failures_total[5m]) > 50
for: 5m
labels:
severity: warning
layer: processing
annotations:
summary: "High validation failure rate"
description: "{{ $value }} validation failures per second"
- alert: ProcessingLatencyHigh
expr: histogram_quantile(0.95, rate(pipeline_processing_duration_seconds_bucket[5m])) > 5
for: 10m
labels:
severity: warning
layer: processing
annotations:
summary: "High processing latency (p95 > 5s)"
- alert: FlinkJobDown
expr: flink_jobmanager_job_uptime == 0
for: 2m
labels:
severity: critical
layer: processing
annotations:
summary: "Flink job is down"
- alert: FlinkCheckpointFailure
expr: rate(flink_jobmanager_job_lastCheckpointSize[5m]) == 0
for: 10m
labels:
severity: critical
layer: processing
annotations:
summary: "Flink checkpoints are failing"
# alerts/storage-layer.yml
groups:
- name: storage-layer
interval: 30s
rules:
- alert: HighStorageWriteLatency
expr: histogram_quantile(0.95, rate(storage_write_latency_seconds_bucket[5m])) > 1
for: 5m
labels:
severity: warning
layer: storage
annotations:
summary: "High storage write latency in {{ $labels.backend }}"
- alert: StorageCapacityLow
expr: (storage_capacity_bytes / storage_capacity_total_bytes) < 0.1
for: 10m
labels:
severity: critical
layer: storage
annotations:
summary: "Low storage capacity in {{ $labels.backend }}"
- alert: DatabaseConnectionPoolExhausted
expr: storage_connection_pool_active / storage_connection_pool_max > 0.9
for: 5m
labels:
severity: warning
layer: storage
annotations:
summary: "Database connection pool near exhaustion"
- alert: QueryAPIHighErrorRate
expr: rate(storage_query_errors_total[5m]) > 10
for: 5m
labels:
severity: warning
layer: storage
annotations:
summary: "High error rate in Query API"# elasticsearch.yml
cluster.name: data-ingestion-logs
node.name: ${HOSTNAME}
network.host: 0.0.0.0
discovery.seed_hosts: ["es-node-1", "es-node-2", "es-node-3"]
cluster.initial_master_nodes: ["es-node-1", "es-node-2", "es-node-3"]
# Index lifecycle management
xpack.ilm.enabled: true# logstash.conf
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["application-logs"]
codec => json
group_id => "logstash-consumer"
}
}
filter {
# Parse log level
grok {
match => { "message" => "%{LOGLEVEL:log_level}" }
}
# Extract timestamp
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
# Add layer information
if [service] =~ /collector/ {
mutate {
add_field => { "layer" => "collection" }
}
} else if [service] =~ /flink|processing/ {
mutate {
add_field => { "layer" => "processing" }
}
} else if [service] =~ /storage|query/ {
mutate {
add_field => { "layer" => "storage" }
}
}
# Enrich with Kubernetes metadata
if [kubernetes] {
mutate {
add_field => {
"pod_name" => "%{[kubernetes][pod][name]}"
"namespace" => "%{[kubernetes][namespace]}"
}
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{layer}-%{+YYYY.MM.dd}"
template_name => "logs"
}
}{
"timestamp": "2024-01-15T10:30:00.123Z",
"level": "INFO",
"service": "shipping-collector",
"layer": "collection",
"trace_id": "550e8400-e29b-41d4-a716-446655440000",
"span_id": "7f2a1b3c",
"message": "Event processed successfully",
"event_id": "evt-12345",
"processing_time_ms": 125,
"metadata": {
"tracking_id": "ABC123456789",
"carrier": "fedex"
}
}# jaeger-collector-config.yml
receivers:
jaeger:
protocols:
grpc:
endpoint: 0.0.0.0:14250
thrift_http:
endpoint: 0.0.0.0:14268
thrift_compact:
endpoint: 0.0.0.0:6831
thrift_binary:
endpoint: 0.0.0.0:6832
processors:
batch:
timeout: 1s
send_batch_size: 100
exporters:
elasticsearch:
endpoints: ["http://elasticsearch:9200"]
index: jaeger-span
service:
pipelines:
traces:
receivers: [jaeger]
processors: [batch]
exporters: [elasticsearch]# Example Python instrumentation
from opentelemetry import trace
from opentelemetry.exporter.jaeger import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Setup tracer
trace.set_tracer_provider(TracerProvider())
jaeger_exporter = JaegerExporter(
agent_host_name="jaeger-agent",
agent_port=6831,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
tracer = trace.get_tracer(__name__)
def process_event(event):
with tracer.start_as_current_span("process_event") as span:
span.set_attribute("event.id", event["event_id"])
span.set_attribute("event.type", event["type"])
# Validate
with tracer.start_as_current_span("validate"):
validate(event)
# Transform
with tracer.start_as_current_span("transform"):
transformed = transform(event)
# Enrich
with tracer.start_as_current_span("enrich"):
enriched = enrich(transformed)
return enrichedPanels:
- System health status (all layers)
- Total throughput (events/second)
- End-to-end latency (p50, p95, p99)
- Error rate by layer
- Active alerts
Panels:
- Messages received per collector
- Kafka publish rate
- API response times
- Error rates by type
- Rate limiting incidents
Panels:
- Flink job uptime
- Processing throughput
- Validation pass/fail rates
- Transformation latency
- Enrichment cache hit rate
- Checkpoint success rate
- Backpressure indicators
Panels:
- Write throughput by backend
- Write latency by backend
- Query API response times
- Database connection pool usage
- Storage capacity utilization
- Cache hit rates
Panels:
- Kafka lag by consumer group
- Kafka broker health
- Database connections
- Redis memory usage
- Pod CPU/memory usage
- Network I/O
global:
resolve_timeout: 5m
slack_api_url: '${SLACK_WEBHOOK_URL}'
route:
group_by: ['alertname', 'layer']
group_wait: 10s
group_interval: 10s
repeat_interval: 12h
receiver: 'slack-notifications'
routes:
- match:
severity: critical
receiver: 'pagerduty-critical'
continue: true
- match:
severity: warning
receiver: 'slack-notifications'
receivers:
- name: 'slack-notifications'
slack_configs:
- channel: '#data-ingestion-alerts'
title: '{{ .GroupLabels.alertname }}'
text: '{{ range .Alerts }}{{ .Annotations.description }}{{ end }}'
- name: 'pagerduty-critical'
pagerduty_configs:
- service_key: '${PAGERDUTY_SERVICE_KEY}'
description: '{{ .GroupLabels.alertname }}'
inhibit_rules:
- source_match:
severity: 'critical'
target_match:
severity: 'warning'
equal: ['alertname', 'layer']livenessProbe:
httpGet:
path: /health/live
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /health/ready
port: 8080
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 3@app.get("/health/live")
def liveness():
"""Simple liveness check"""
return {"status": "alive"}
@app.get("/health/ready")
def readiness():
"""Check if service is ready to accept traffic"""
checks = {
"kafka": check_kafka_connection(),
"database": check_database_connection(),
"cache": check_cache_connection()
}
all_healthy = all(checks.values())
status_code = 200 if all_healthy else 503
return JSONResponse(
status_code=status_code,
content={
"status": "ready" if all_healthy else "not_ready",
"checks": checks
}
)slos:
collection_layer:
availability: 99.9% # 43m downtime/month
latency_p95: 500ms
error_rate: < 0.1%
processing_layer:
availability: 99.95% # 21m downtime/month
latency_p95: 2s
throughput: > 10000 events/sec
error_rate: < 0.05%
storage_layer:
availability: 99.99% # 4m downtime/month
write_latency_p95: 100ms
read_latency_p95: 50ms
error_rate: < 0.01%# Availability SLI
(
sum(rate(http_requests_total{code=~"2.."}[5m]))
/
sum(rate(http_requests_total[5m]))
) * 100
# Latency SLI (p95)
histogram_quantile(0.95,
rate(http_request_duration_seconds_bucket[5m])
)
# Error Rate SLI
(
sum(rate(http_requests_total{code=~"5.."}[5m]))
/
sum(rate(http_requests_total[5m]))
) * 100
-
High Error Rate in Collector
- Check API credentials and connectivity
- Verify rate limits not exceeded
- Check Kafka broker availability
-
Processing Pipeline Slow
- Check Flink resource allocation
- Review enrichment API performance
- Analyze backpressure indicators
-
Storage Write Failures
- Verify database connectivity
- Check storage capacity
- Review connection pool settings
-
Query API Slow Response
- Check database query performance
- Verify cache hit rates
- Review connection pool exhaustion