Apache Spark, Delta Lake, and AI-powered analytics for manufacturing data.
The Manufacturing Analytics Platform processes factory sensor data using:
- Apache Spark for distributed data processing
- Delta Lake for ACID transactions and time travel
- MLlib for predictive maintenance clustering
- Structured Streaming for anomaly detection
- AI assistant with web search integration
The platform analyzes manufacturing telemetry including temperature, pressure, vibration, power consumption, production rates, and defect rates across multiple machines and shifts.
- Real-time Processing: Sub-second anomaly detection from streaming IoT sensors
- Delta Lake Time Travel: Query any historical version (versionAsOf, timestampAsOf)
- MLlib Predictive Maintenance: K-means clustering for machine risk assessment
- Graph Analytics: Machine-shift relationship mapping
- AI Assistant: Natural language Q&A with web search (Groq + Tavily)
- ACID Compliance: Schema enforcement, transaction logging, VACUUM support
- Multiple Outputs: Executive dashboards, warehouse exports, quality alerts
-
๐ญ Real-time sensor ingestion from CSV and JSON streaming sources
-
๐ 5-stage analytics pipeline: Ingest โ Process โ ML โ Graph โ Output
-
๐ค AI-powered assistant with real-time web search (Groq Llama 3.3 + Tavily)
-
๐ Delta Lake Time Travel & Versioning:
versionAsOf- Query historical snapshots by version numbertimestampAsOf- Query data as it existed at any point in timehistory()- Full version log with operation metadataVACUUM- Physical deletion of old Parquet files (GDPR compliance)- 4 versions tracked (0โ1โ2โ3) with complete audit trail
-
โ๏ธ MLlib predictive maintenance: K-means clustering for risk levels (High/Medium/Low)
-
๐ GraphX-style analysis: Machine-shift relationships and performance mapping
-
๐พ Multiple storage formats: Delta Lake, Parquet, CSV exports
-
๐ Executive dashboards: Shift performance, defect rates, OEE metrics
- Modular architecture: 12 specialized modules with clear separation of concerns
- Structured Streaming: 30-second windowed anomaly detection
- Schema enforcement: Automatic rejection of mismatched data types
- Comprehensive testing: 15 passing tests with pytest
- Professional project structure: src/ layout with uv package manager
- Real-time web search: Tavily + Groq integration for up-to-date answers
- Python 3.12 or higher
- uv for dependency management
- 4GB RAM minimum (8GB recommended)
- (Optional) API keys for AI features
# 1. Clone the repository
git clone https://github.com/mahmoudnajmeh/manufacturing-analytics-spark.git
cd manufacturing_analytics_spark
# 2. Install dependencies using uv
uv sync
# 3. Create data directory and add your CSV file
mkdir -p data
# Copy production_metrics.csv to data/
# 4. (Optional) Set up API keys for AI assistant
export TAVILY_API_KEY="your-tavily-key"
export GROQ_API_KEY="your-groq-key"
# 5. Run the complete pipeline
uv run python -m manufacturing_analytics.mainCreate data/production_metrics.csv with the following schema:
machine_id,temperature,pressure,vibration,power_consumption,production_rate,defect_rate,timestamp,shift
101,72.5,14.2,0.32,45.6,98.5,2.1,2026-04-21 08:00:00,Morning
102,74.1,14.5,0.28,47.2,97.8,1.9,2026-04-21 08:00:00,Morningmanufacturing_analytics_spark/
โโโ .venv/
โโโ chroma_db/
โโโ data/
โ โโโ production_metrics.csv
โ โโโ sensor_streaming/
โโโ lake/
โ โโโ manufacturing_delta/
โ โโโ manufacturing_parquet/
โ โโโ manufacturing_warehouse/
โ โโโ quality_alerts/
โโโ src/
โ โโโ manufacturing_analytics/
โ โโโ __init__.py
โ โโโ ai_assistant.py
โ โโโ config.py
โ โโโ data_ingestion.py
โ โโโ delta_time_travel.py
โ โโโ graph_analytics.py
โ โโโ main.py
โ โโโ ml_analytics.py
โ โโโ output.py
โ โโโ processing.py
โ โโโ streaming.py
โ โโโ storage.py
โ โโโ utils.py
โโโ tests/
โ โโโ __init__.py
โ โโโ test_manufacturing.py
โโโ .gitignore
โโโ .python-version
โโโ pyproject.toml
โโโ README.md
โโโ uv.lock
# Run full pipeline
uv run python -m manufacturing_analytics.main
# Run with AI features (requires API keys)
export TAVILY_API_KEY="your-key"
export GROQ_API_KEY="your-key"
uv run python -m manufacturing_analytics.main# Run all tests
uv run pytest tests/ -v
# Run specific test file
uv run pytest tests/test_manufacturing.py -v
# Run with coverage
uv run pytest tests/ -v --cov=src/manufacturing_analytics --cov-report=html
# Run specific test
uv run pytest tests/test_manufacturing.py::test_version_0_exists -v# Query version history from within Python
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "lake/manufacturing_delta")
history = delta_table.history().show()
# Time travel to version 0
v0 = spark.read.format("delta").option("versionAsOf", 0).load("lake/manufacturing_delta")
# Run VACUUM (remove old files)
delta_table.vacuum(retentionHours=168) # 7 days default