Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,13 @@ SENSITIVE_FIELDS=password,token,secret,api_key,authorization,credit_card,ssn
# Development default: http://localhost:3000,http://localhost:5173,http://127.0.0.1:5173
# Production example: https://yourdomain.com,https://app.yourdomain.com
CORS_ORIGINS=http://localhost:3000,http://localhost:5173,http://127.0.0.1:5173

# Event streaming (Kafka-compatible, supports Apache Kafka and Redpanda)
ENABLE_EVENT_STREAMING=False
EVENT_STREAM_BACKEND=kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_CLIENT_ID=flexiroaster-backend
TOPIC_PIPELINE_CREATED=pipeline.created
TOPIC_EXECUTION_STARTED=execution.started
TOPIC_EXECUTION_FAILED=execution.failed
TOPIC_EXECUTION_COMPLETED=execution.completed
22 changes: 19 additions & 3 deletions backend/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,35 @@ cp backend/.env.example backend/.env

## Event-Driven Architecture (Advanced Setup)

FlexiRoaster supports Kafka-backed domain events for loose coupling, high scalability, audit-friendly workflows, and real-time analytics.
FlexiRoaster supports Kafka-compatible domain events (Apache Kafka or Redpanda) for loose coupling, high scalability, audit-friendly workflows, and real-time analytics.


#### Apache Kafka
- Event-driven triggers
- High-throughput ingestion
- Real-time pipeline activation

#### Redpanda
- Kafka-compatible
- Lower operational complexity

Use this layer when:
- Pipelines should trigger from events
- You need real-time monitoring
- You process millions of records

### Published topics
- `pipeline.created`
- `execution.started`
- `execution.failed`
- `execution.completed`

### Enable Kafka publishing
### Enable streaming publishing
Set the following environment variables in `backend/.env`:

```env
ENABLE_EVENT_STREAMING=true
EVENT_STREAM_BACKEND=kafka
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_CLIENT_ID=flexiroaster-backend
TOPIC_PIPELINE_CREATED=pipeline.created
Expand All @@ -157,7 +173,7 @@ TOPIC_EXECUTION_FAILED=execution.failed
TOPIC_EXECUTION_COMPLETED=execution.completed
```

If Kafka is unavailable, the backend falls back to structured application logs for events so local development continues to work.
If Kafka/Redpanda is unavailable, the backend falls back to structured application logs for events so local development continues to work.

## Next Steps

Expand Down
5 changes: 3 additions & 2 deletions backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
from pydantic_settings import BaseSettings
from pydantic import field_validator
from typing import List, Optional, Union
from typing import List, Literal, Optional, Union


class Settings(BaseSettings):
Expand Down Expand Up @@ -40,8 +40,9 @@ class Settings(BaseSettings):
KEYCLOAK_ENABLED: bool = False
KEYCLOAK_ISSUER: Optional[str] = None
KEYCLOAK_CLIENT_ID: Optional[str] = None
# Event-driven architecture (Kafka)
# Event-driven architecture (Kafka / Redpanda)
ENABLE_EVENT_STREAMING: bool = False
EVENT_STREAM_BACKEND: Literal["kafka", "redpanda"] = "kafka"
KAFKA_BOOTSTRAP_SERVERS: Union[str, List[str]] = "localhost:9092"
KAFKA_CLIENT_ID: str = "flexiroaster-backend"

Expand Down
7 changes: 4 additions & 3 deletions backend/events/publisher.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Kafka-backed event publisher with graceful fallback for local/dev environments."""
"""Kafka/Redpanda-backed event publisher with graceful fallback for local/dev environments."""

from __future__ import annotations

Expand Down Expand Up @@ -36,7 +36,7 @@ def _ensure_producer(self) -> Optional[Any]:
return self._producer

if KafkaProducer is None:
logger.warning("Event streaming enabled but kafka-python is not installed.")
logger.warning("Event streaming enabled for %s but kafka-python is not installed.", settings.EVENT_STREAM_BACKEND)
return None

try:
Expand All @@ -48,13 +48,14 @@ def _ensure_producer(self) -> Optional[Any]:
)
return self._producer
except Exception as exc: # pragma: no cover - depends on external broker
logger.warning("Failed to initialize Kafka producer: %s", exc)
logger.warning("Failed to initialize %s producer: %s", settings.EVENT_STREAM_BACKEND, exc)
return None

def publish(self, topic: str, key: Optional[str], payload: Dict[str, Any]) -> None:
"""Publish a single event with standard envelope metadata."""
event = {
"event_type": topic,
"stream_backend": settings.EVENT_STREAM_BACKEND,
"published_at": datetime.now().isoformat(),
"payload": payload,
}
Expand Down
38 changes: 38 additions & 0 deletions backend/tests/test_event_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from backend.events.publisher import EventPublisher


class _StubFuture:
pass


class _StubProducer:
def __init__(self, *args, **kwargs):
self.sent = []

def send(self, topic, key=None, value=None):
self.sent.append({"topic": topic, "key": key, "value": value})
return _StubFuture()

def flush(self, timeout=None):
return None


def test_publish_adds_stream_backend(monkeypatch):
monkeypatch.setattr("backend.events.publisher.KafkaProducer", _StubProducer)
monkeypatch.setattr("backend.events.publisher.settings.ENABLE_EVENT_STREAMING", True)
monkeypatch.setattr("backend.events.publisher.settings.EVENT_STREAM_BACKEND", "redpanda")

publisher = EventPublisher()
publisher.publish("pipeline.created", "pipeline-1", {"id": "pipeline-1"})

sent = publisher._producer.sent # type: ignore[attr-defined]
assert sent[0]["topic"] == "pipeline.created"
assert sent[0]["value"]["stream_backend"] == "redpanda"


def test_publish_noop_when_streaming_disabled(monkeypatch):
monkeypatch.setattr("backend.events.publisher.settings.ENABLE_EVENT_STREAMING", False)
publisher = EventPublisher()
publisher.publish("execution.started", "execution-1", {"id": "execution-1"})

assert publisher._producer is None
Loading