8-step B2B Lead Generation Pipeline — architektura Blackboard Pattern.
n8n triggeruje pipeline przez webhook → Scrap.io enrichment → Google Maps scraping → LLM qualification → Exception triage (Redis DLQ) → HubSpot + Google Sheets export → Slack report.
n8n Trigger
│
▼
Step 1: Flask Webhook Receiver
│ ThreadPoolExecutor (graceful shutdown)
│ Status tracking via Redis
▼
Step 2: Scrap.io Enrichment ← firmografika (optional, requires API key)
▼
Step 3: ICP Search Params Builder ← YAML profile → query params
▼
Step 4: Hybrid Scraper ← Google Maps API → Playwright fallback
▼
Step 5: LLM Parser ← OpenAI → OpenRouter → Ollama fallback
│ Rate limiting + retry on 429
▼
Step 6: Exception Handler ← Triage: valid / cold pool / DLQ
│ Zwraca list[EnrichedLead] (type-safe)
▼
Step 7: CRM Exporter ← HubSpot (primary) + Google Sheets (backup)
│ HubSpot 409 → PATCH (nie crash)
▼
Step 8: Report Generator ← Slack webhook + conversion metrics
# 1. Klonowanie
git clone https://github.com/Gruszkoland/leadgen-comet-pipeline
cd leadgen-comet-pipeline
# 2. Instalacja
pip install -e ".[dev]"
playwright install chromium
# 3. Konfiguracja
cp .env.example .env
# edytuj .env — wpisz klucze API
# 4. Uruchomienie (dev — bez API calls)
make run-dry
# 5. Docker (produkcja)
make upmake test # wszystkie testy
make test-cov # + HTML coverage report
make test-step6 # tylko exception handler
make test-orchestrator| Profile | Opis |
|---|---|
usa_dental_medspa |
USA — dentyści i medspa, 1-20 pracowników |
pl_logistyka_wielkopolska |
PL — firmy logistyczne, Wielkopolska |
# LLM
OPENAI_API_KEY=sk-...
OPENROUTER_API_KEY=sk-or-...
LLM_INTER_CALL_DELAY_S=0.5 # rate limiting między callami LLM
# CRM
HUBSPOT_API_KEY=pat-...
GOOGLE_SERVICE_ACCOUNT_JSON=/path/to/sa.json # lub JSON string bezpośrednio
# Redis DLQ
REDIS_URL=redis://localhost:6379/0
DLQ_RETRY_INTERVAL_S=3600
DLQ_BATCH_SIZE=20
# Scrap.io
SCRAPIO_API_KEY=...
# Webhook
WEBHOOK_TOKEN=twoj-tajny-token
FLASK_PORT=8080
# Produkcja przez Docker Compose
make up # uruchamia: postgres + redis + webhook + dlq-worker
make logs # tail logs
# Status pipeline z n8n / curl:
curl http://localhost:8080/webhook/status/CP-20240101120000- ✅ BUG-01: Scrap.io (Step 2) prawidłowo wdrożony w orchestratorze
- ✅ BUG-03: Type safety —
ExceptionHandler.triage()zwracalist[EnrichedLead] - ✅ BUG-04:
Dockerfile.webhookstworzony - ✅ BUG-05: Polskie szablony email (
cold_intro_pl,followup_pl,breakup_pl) - ✅ WARN-01:
threading.Thread(daemon)→ThreadPoolExecutorzatexit - ✅ WARN-02:
/webhook/status/<id>— pełna implementacja przez Redis - ✅ WARN-04: Rate limiting LLM —
INTER_CALL_DELAY_S+ retry na 429 - ✅ WARN-05: DLQ retry worker —
tasks/dlq_retry.py+ osobny kontener - ✅ WARN-06: Naprawiony
orw step8 →if state.enriched_leads - ✅ WARN-07: SQLAlchemy models (
PipelineRun,Lead) —models.py - ✅ WARN-08:
AWAITING_ENRICHMENT— zaimplementowany jako aktywny status step2 - ✅ Testy: step1, step4, step6, step7, orchestrator (5 nowych modułów)
- ✅ Google Sheets credentials: obsługa pliku i JSON string