Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copy this file to .env and fill in your values.
# Never commit the .env file.

# Kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_TELEMETRY_TOPIC=drone-telemetry
KAFKA_PROCESSED_TOPIC=drone-processed
KAFKA_ALERTS_TOPIC=drone-alerts
KAFKA_CONSUMER_GROUP=drone-pipeline

# Spark
SPARK_APP_NAME=DroneDeliveryPipeline
SPARK_MASTER=local[*]
STREAMING_TRIGGER_INTERVAL=10 seconds
STREAMING_CHECKPOINT_LOCATION=/tmp/drone-pipeline-checkpoints

# Collision detection thresholds
COLLISION_SAFE_DISTANCE_M=50.0
COLLISION_SAFE_ALTITUDE_M=10.0

# Route optimisation
LOW_BATTERY_THRESHOLD=20.0
MAX_PAYLOAD_WEIGHT_KG=5.0

# AWS
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=

# S3
S3_BUCKET=drone-telemetry-data
S3_RAW_PREFIX=raw/
S3_PROCESSED_PREFIX=processed/
S3_ALERTS_PREFIX=alerts/

# Redshift
REDSHIFT_HOST=
REDSHIFT_PORT=5439
REDSHIFT_DATABASE=drone_analytics
REDSHIFT_USER=
REDSHIFT_PASSWORD=
REDSHIFT_IAM_ROLE=
REDSHIFT_SCHEMA=public
41 changes: 41 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# ---- Python ----
__pycache__/
*.py[cod]
*.pyo
*.pyd
.Python
*.egg-info/
dist/
build/
.eggs/
.venv/
venv/
env/

# ---- Pytest ----
.pytest_cache/
.coverage
htmlcov/
coverage.xml

# ---- Spark ----
/tmp/
derby.log
metastore_db/
spark-warehouse/

# ---- IDE ----
.idea/
.vscode/
*.swp
*.swo

# ---- Environment ----
.env
*.env.local

# ---- Docker ----
*.log

# ---- AWS ----
.aws/credentials
16 changes: 16 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM python:3.11-slim

# Install Java (required by PySpark)
RUN apt-get update && \
apt-get install -y --no-install-recommends default-jdk-headless && \
rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

ENV PYTHONPATH=/app
ENV JAVA_HOME=/usr/lib/jvm/default-java
202 changes: 200 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,200 @@
# drone-delivery-optimization
Real-time drone delivery optimization using Apache Kafka, Spark, and AWS for route efficiency and collision avoidance.
# Drone Delivery Optimization – Big Data Pipeline

Real-time drone delivery optimization using **Apache Kafka**, **Apache Spark Structured Streaming**, **AWS S3**, and **AWS Redshift** for route efficiency and collision avoidance.

---

## Architecture

```
Drone Fleet
│ (JSON telemetry, 1 Hz per drone)
┌─────────────────────────┐
│ Kafka Producer │ src/kafka/drone_producer.py
│ Topic: drone-telemetry│
└──────────┬──────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Spark Structured Streaming │
│ │
│ ┌─────────────────────┐ ┌───────────────────────────┐ │
│ │ TelemetryProcessor │ │ CollisionDetector │ │
│ │ (schema, parse) │──▶│ (self-join, safe radius) │ │
│ └────────┬────────────┘ └─────────────┬─────────────┘ │
│ │ │ alerts │
│ ┌────────▼────────────┐ ▼ │
│ │ RouteOptimizer │ Kafka topic: drone-alerts │
│ │ (haversine, score) │ │
│ └────────┬────────────┘ │
└───────────┼─────────────────────────────────────────────────┘
│ Parquet (partitioned by date)
┌─────────────────────────┐
│ AWS S3 │ src/aws/s3_handler.py
│ s3://bucket/processed/ │
└──────────┬──────────────┘
│ Redshift COPY
┌─────────────────────────┐
│ AWS Redshift │ src/aws/redshift_handler.py
│ drone_analytics DB │
│ • drone_telemetry │
│ • drone_routes │
│ • collision_alerts │
└─────────────────────────┘
```

---

## Project Structure

```
drone-delivery-optimization/
├── config/
│ ├── kafka_config.py # Kafka broker / topic settings
│ ├── spark_config.py # Spark / streaming settings
│ └── aws_config.py # S3 & Redshift settings
├── src/
│ ├── models/
│ │ └── drone_telemetry.py # Telemetry dataclass + (de)serialisation
│ ├── kafka/
│ │ ├── drone_producer.py # Telemetry simulator & Kafka producer
│ │ └── drone_consumer.py # Kafka consumer with pluggable handler
│ ├── spark/
│ │ ├── telemetry_processor.py # Structured Streaming read/write
│ │ ├── route_optimizer.py # Haversine distance + action scoring
│ │ └── collision_detector.py # Self-join proximity alerts
│ └── aws/
│ ├── s3_handler.py # S3 upload / download / list helpers
│ └── redshift_handler.py# Redshift DDL, bulk inserts, COPY, queries
├── tests/
│ ├── test_drone_telemetry.py
│ ├── test_drone_producer.py
│ ├── test_drone_consumer.py
│ ├── test_route_optimizer.py
│ ├── test_collision_detector.py
│ ├── test_s3_handler.py
│ └── test_redshift_handler.py
├── scripts/
│ ├── run_pipeline.py # Spark pipeline entrypoint
│ └── start_pipeline.sh # Docker Compose helper
├── docker-compose.yml # ZooKeeper, Kafka, Schema Registry, Kafka UI
├── Dockerfile
└── requirements.txt
```

---

## Quick Start

### Prerequisites

| Tool | Version |
|------|---------|
| Python | ≥ 3.11 |
| Docker & Docker Compose | ≥ 24 |
| Java (JDK) | ≥ 11 (required by PySpark) |

### 1 – Install dependencies

```bash
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
```

### 2 – Configure environment

Copy `.env.example` to `.env` and fill in your AWS credentials:

```bash
cp .env.example .env
```

Key variables:

| Variable | Description | Default |
|----------|-------------|---------|
| `KAFKA_BOOTSTRAP_SERVERS` | Kafka broker address | `localhost:9092` |
| `AWS_REGION` | AWS region | `us-east-1` |
| `S3_BUCKET` | Target S3 bucket | `drone-telemetry-data` |
| `REDSHIFT_HOST` | Redshift cluster endpoint | _(required for Redshift)_ |
| `REDSHIFT_USER` / `REDSHIFT_PASSWORD` | Redshift credentials | _(required for Redshift)_ |
| `COLLISION_SAFE_DISTANCE_M` | Horizontal safety radius (m) | `50` |
| `COLLISION_SAFE_ALTITUDE_M` | Vertical safety clearance (m) | `10` |

### 3 – Start infrastructure

```bash
./scripts/start_pipeline.sh
```

This starts ZooKeeper, Kafka, Schema Registry, and the Kafka UI at http://localhost:8080.

### 4 – Run the producer (simulated drones)

```bash
python -m src.kafka.drone_producer
```

### 5 – Run the Spark pipeline

```bash
python -m scripts.run_pipeline
```

---

## Running Tests

```bash
pytest tests/ -v --cov=src --cov-report=term-missing
```

Tests that exercise Spark (route optimiser, collision detector) spin up a local `SparkSession` and do **not** require a running cluster.

---

## Key Components

### Telemetry Model (`src/models/drone_telemetry.py`)

A `@dataclass` capturing: `drone_id`, `latitude`, `longitude`, `altitude`, `speed`, `heading`, `battery_level`, `status`, `timestamp`, `destination_lat/lon`, `payload_weight`. Includes JSON serialisation helpers.

### Kafka Producer (`src/kafka/drone_producer.py`)

Simulates a configurable number of drones publishing telemetry at a configurable rate. Uses `confluent-kafka` with GZIP compression, idempotent delivery (`acks=all`), and automatic topic creation.

### Spark Telemetry Processor (`src/spark/telemetry_processor.py`)

Reads from the `drone-telemetry` Kafka topic, applies the telemetry schema, and writes Parquet files to S3 partitioned by date. Also forwards enriched records to `drone-processed`.

### Route Optimizer (`src/spark/route_optimizer.py`)

Adds four columns to the streaming DataFrame:
- `distance_to_dest_m` – great-circle distance to destination via the Haversine formula
- `estimated_flight_time_s` – ETA at current speed
- `recommended_action` – one of `continue`, `return_to_base`, `emergency_land`, `reduce_speed`, `optimal`
- `optimisation_score` – 0–100 efficiency score (battery × 0.5, speed × 0.3, payload × 0.2)

### Collision Detector (`src/spark/collision_detector.py`)

Performs a self-join on each micro-batch to find drone pairs whose **horizontal** separation is below `COLLISION_SAFE_DISTANCE_M` **and/or** whose **vertical** separation is below `COLLISION_SAFE_ALTITUDE_M`. Publishes `WARNING` / `CRITICAL` alerts to the `drone-alerts` Kafka topic.

### AWS S3 Handler (`src/aws/s3_handler.py`)

Provides `upload_json_records`, `upload_file`, `download_json_records`, `list_objects`, and `delete_object` helpers backed by `boto3`. Supports Hive-style date partitioning (`year=/month=/day=`).

### AWS Redshift Handler (`src/aws/redshift_handler.py`)

- Auto-creates `drone_telemetry`, `drone_routes`, and `collision_alerts` tables with `DISTKEY` / `SORTKEY` optimisations.
- Bulk inserts via `psycopg2` `execute_values`.
- `COPY … FROM S3` for high-throughput Parquet loads.
- Analytics helpers: `get_low_battery_drones`, `get_recent_alerts`.

---

## License

MIT
Empty file added config/__init__.py
Empty file.
36 changes: 36 additions & 0 deletions config/aws_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""AWS configuration."""

import os

# ------------------------------------------------------------------
# Credentials (prefer IAM roles / environment variables in production)
# ------------------------------------------------------------------

AWS_REGION: str = os.environ.get("AWS_REGION", "us-east-1")
AWS_ACCESS_KEY_ID: str = os.environ.get("AWS_ACCESS_KEY_ID", "")
AWS_SECRET_ACCESS_KEY: str = os.environ.get("AWS_SECRET_ACCESS_KEY", "")

# ------------------------------------------------------------------
# S3
# ------------------------------------------------------------------

S3_BUCKET: str = os.environ.get("S3_BUCKET", "drone-telemetry-data")
S3_RAW_PREFIX: str = os.environ.get("S3_RAW_PREFIX", "raw/")
S3_PROCESSED_PREFIX: str = os.environ.get("S3_PROCESSED_PREFIX", "processed/")
S3_ALERTS_PREFIX: str = os.environ.get("S3_ALERTS_PREFIX", "alerts/")

# ------------------------------------------------------------------
# Redshift
# ------------------------------------------------------------------

REDSHIFT_HOST: str = os.environ.get("REDSHIFT_HOST", "")
REDSHIFT_PORT: int = int(os.environ.get("REDSHIFT_PORT", "5439"))
REDSHIFT_DATABASE: str = os.environ.get("REDSHIFT_DATABASE", "drone_analytics")
REDSHIFT_USER: str = os.environ.get("REDSHIFT_USER", "")
REDSHIFT_PASSWORD: str = os.environ.get("REDSHIFT_PASSWORD", "")
REDSHIFT_IAM_ROLE: str = os.environ.get("REDSHIFT_IAM_ROLE", "")

REDSHIFT_SCHEMA: str = os.environ.get("REDSHIFT_SCHEMA", "public")
REDSHIFT_TELEMETRY_TABLE: str = "drone_telemetry"
REDSHIFT_ROUTES_TABLE: str = "drone_routes"
REDSHIFT_ALERTS_TABLE: str = "collision_alerts"
46 changes: 46 additions & 0 deletions config/kafka_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Kafka configuration."""

import os

# ------------------------------------------------------------------
# Broker connection
# ------------------------------------------------------------------

KAFKA_BOOTSTRAP_SERVERS: str = os.environ.get(
"KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"
)

# ------------------------------------------------------------------
# Topics
# ------------------------------------------------------------------

TELEMETRY_TOPIC: str = os.environ.get("KAFKA_TELEMETRY_TOPIC", "drone-telemetry")
PROCESSED_TOPIC: str = os.environ.get("KAFKA_PROCESSED_TOPIC", "drone-processed")
ALERTS_TOPIC: str = os.environ.get("KAFKA_ALERTS_TOPIC", "drone-alerts")

# ------------------------------------------------------------------
# Producer defaults
# ------------------------------------------------------------------

PRODUCER_CONFIG: dict = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"client.id": os.environ.get("KAFKA_CLIENT_ID", "drone-producer"),
"acks": "all",
"retries": 3,
"batch.size": 16384,
"linger.ms": 5,
"compression.type": "gzip",
}

# ------------------------------------------------------------------
# Consumer defaults
# ------------------------------------------------------------------

CONSUMER_CONFIG: dict = {
"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS,
"group.id": os.environ.get("KAFKA_CONSUMER_GROUP", "drone-pipeline"),
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"session.timeout.ms": 30000,
"max.poll.interval.ms": 300000,
}
Loading