Note: I will be updating this repo with the codes, charts and relevant docs.
I built this personal project to explore real-time retail data processing at scale. The system processes simulated point-of-sale transactions in real-time, handling up to 20 million daily transactions with low latency analytics.
This project helped me gain hands-on experience with:
- Stream processing with PySpark
- Event streaming architecture with Kafka
- Workflow orchestration using Apache Airflow
- GCP cloud services (Dataproc, BigQuery, Cloud Storage)
- Designing for scalability and fault tolerance
[POS Data Generator] → [Kafka] → [PySpark Streaming] → [BigQuery]
↓
[Cloud Storage (checkpoints)]
↑
[Airflow (orchestration)]
- Data Producer: Python script that simulates POS transactions
- Stream Processing: PySpark jobs for real-time aggregations
- Data Storage: BigQuery tables for analytics queries
- Orchestration: Airflow DAGs for workflow management
- Python 3.8
- PySpark 3.4
- Kafka 3.3
- Apache Airflow 2.6
- Google Cloud Platform (Dataproc, BigQuery, Cloud Storage)
- Throughput: ~500k events/sec at peak
- Latency: 5-minute aggregation windows
- Accuracy: 95% data quality via validation checks
- Scalability: Handles 30% traffic spikes with Kafka partitioning
# Sample from retail_pipeline.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, sum, col
spark = SparkSession.builder \
.appName("RetailAnalyticsPipeline") \
.config("spark.executor.memory", "4g") \
.getOrCreate()
# Read from Kafka
transactions = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "pos-transactions") \
.load()
# Parse and process transactions
parsed = transactions.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Windowed aggregations
sales_by_store = parsed \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes"),
col("store_id")
) \
.agg(sum("amount").alias("total_sales"))
# Write to BigQuery
query = sales_by_store \
.writeStream \
.outputMode("append") \
.format("bigquery") \
.option("table", "retail_analytics.sales_by_store") \
.option("checkpointLocation", "gs://retail-checkpoints/sales-by-store") \
.start()
query.awaitTermination()# Sample from retail_analytics_pipeline.py
from airflow import DAG
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
from airflow.providers.google.cloud.sensors.bigquery import BigQueryTableExistenceSensor
with DAG(
'retail_analytics_pipeline',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False
) as dag:
validate_data = BigQueryTableExistenceSensor(
task_id='validate_data',
project_id='retail-analytics',
dataset_id='retail_analytics',
table_id='daily_sales'
)
submit_spark_job = DataprocSubmitJobOperator(
task_id='submit_spark_job',
project_id='retail-analytics',
region='us-central1',
cluster_name='retail-cluster',
job={
'reference': {'job_id': 'retail-pipeline-{{ds_nodash}}'},
'placement': {'cluster_name': 'retail-cluster'},
'pyspark_job': {
'main_python_file_uri': 'gs://retail-code/retail_pipeline.py',
'jar_file_uris': ['gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar']
}
}
)
validate_data >> submit_spark_jobInitially, the system couldn't handle daily traffic spikes. I resolved this by:
- Increasing Kafka partitions from 5 to 10
- Optimizing PySpark memory configuration
- Implementing backpressure handling
Early runs showed data quality problems. I addressed this by:
- Adding schema validation in PySpark
- Implementing Airflow data quality checks
- Creating alerting for validation failures
If I continue working on this project, I'd like to:
- Add machine learning for demand forecasting
- Build a real-time dashboard using Grafana
- Implement exactly-once processing semantics
To run this locally (simplified version):
- Start local Kafka:
docker-compose up -d kafka zookeeper- Create Kafka topic:
kafka-topics.sh --create --topic pos-transactions --partitions 10 --replication-factor 1- Run the transaction simulator:
python producer.py --rate 1000- Submit the PySpark job:
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 retail_pipeline.pyFeel free to reach out with questions or suggestions:
- Portfolio: pvcodes.in
- Email: [email protected]
- LinkedIn: linkedin.com/in/pvcodes