diff --git a/.gitignore b/.gitignore index 3667575..3908a5f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +shared + # Binaries for programs and plugins *.exe *.exe~ diff --git a/docker-compose.yml b/docker-compose.yml index 44e0cd0..d6e16e9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,12 @@ -version: '3.8' - # Включаем все сервисы из отдельных файлов include: - docker-compose/mysql.yml - docker-compose/event-ingest.yml - docker-compose/shard-splitter.yml - docker-compose/shard-workers.yml - - docker-compose/balance-daemon.yml - - docker-compose/cache-service.yml - - docker-compose/tests.yml + - docker-compose/nginx.yml + - docker-compose/pipeline-api.yml + - docker-compose/mysql-sender.yml +# - docker-compose/balance-daemon.yml +# - docker-compose/cache-service.yml +# - docker-compose/tests.yml diff --git a/docker-compose/.gitignore b/docker-compose/.gitignore new file mode 100644 index 0000000..cd3d225 --- /dev/null +++ b/docker-compose/.gitignore @@ -0,0 +1 @@ +logs \ No newline at end of file diff --git a/docker-compose/event-ingest.yml b/docker-compose/event-ingest.yml index c435ff7..bbf8cc2 100644 --- a/docker-compose/event-ingest.yml +++ b/docker-compose/event-ingest.yml @@ -8,3 +8,5 @@ services: - "8080:8080" volumes: - ../shared:/shared + networks: + - network \ No newline at end of file diff --git a/docker-compose/mysql-sender.yml b/docker-compose/mysql-sender.yml new file mode 100644 index 0000000..f00c003 --- /dev/null +++ b/docker-compose/mysql-sender.yml @@ -0,0 +1,22 @@ +services: + mysql-sender: + build: + context: ../services/mysql-sender + dockerfile: Dockerfile + container_name: shop-mysql-sender + environment: + - PIPELINE_API_URL=http://pipeline-apid:8082/api/v1/pipeline + - MYSQL_SHARD_0_HOST=mysql-shard-0 + - MYSQL_SHARD_1_HOST=mysql-shard-1 + - MYSQL_PORT=3306 + - MYSQL_USER=shop_user + - MYSQL_PASSWORD=shop_password + - MYSQL_DATABASE_PREFIX=shop_shard_ + - POLL_INTERVAL=5 + volumes: + - ../shared:/shared + networks: + - network + depends_on: + - pipeline-apid-impl + restart: unless-stopped \ No newline at end of file diff --git a/docker-compose/mysql.yml b/docker-compose/mysql.yml index 7a1ba8a..bcb924a 100644 --- a/docker-compose/mysql.yml +++ b/docker-compose/mysql.yml @@ -13,6 +13,8 @@ services: - ../db/ddl.sql:/docker-entrypoint-initdb.d/01-ddl.sql - mysql_shard_0_data:/var/lib/mysql command: --default-authentication-plugin=mysql_native_password + networks: + - network mysql-shard-1: image: mysql:8.0 @@ -28,6 +30,8 @@ services: - ../db/ddl.sql:/docker-entrypoint-initdb.d/01-ddl.sql - mysql_shard_1_data:/var/lib/mysql command: --default-authentication-plugin=mysql_native_password + networks: + - network mysql-pipeline: image: mysql:8.0 @@ -43,6 +47,8 @@ services: - ../db/pipeline_ddl.sql:/docker-entrypoint-initdb.d/01-pipeline-ddl.sql - mysql_pipeline_data:/var/lib/mysql command: --default-authentication-plugin=mysql_native_password + networks: + - network volumes: mysql_shard_0_data: mysql_shard_1_data: diff --git a/docker-compose/nginx.conf b/docker-compose/nginx.conf new file mode 100644 index 0000000..ff87957 --- /dev/null +++ b/docker-compose/nginx.conf @@ -0,0 +1,80 @@ +worker_processes auto; +error_log /var/log/nginx/error.log warn; +pid /var/run/nginx.pid; + +events { + worker_connections 1024; +} + +http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + + access_log /var/log/nginx/access.log main; + + sendfile on; + tcp_nopush on; + tcp_nodelay on; + keepalive_timeout 65; + types_hash_max_size 2048; + + upstream event_ingest { + server event-ingest:8080; + } + + upstream pipeline_apid_impl { + server pipeline-apid-impl:8083; + } + + server { + listen 8082; + server_name _; + location /health { + proxy_pass http://event_ingest; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_read_timeout 300; + proxy_connect_timeout 300; + proxy_send_timeout 300; + } + + location /ready { + proxy_pass http://event_ingest; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_read_timeout 300; + proxy_connect_timeout 300; + proxy_send_timeout 300; + } + + location /api/v1/events/ { + proxy_pass http://event_ingest; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_read_timeout 300; + proxy_connect_timeout 300; + proxy_send_timeout 300; + } + + location /api/v1/pipeline/ { + proxy_pass http://pipeline_apid_impl; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_read_timeout 300; + proxy_connect_timeout 300; + proxy_send_timeout 300; + } + } +} \ No newline at end of file diff --git a/docker-compose/nginx.yml b/docker-compose/nginx.yml new file mode 100644 index 0000000..ea494ca --- /dev/null +++ b/docker-compose/nginx.yml @@ -0,0 +1,20 @@ +services: + pipeline-apid: + image: nginx:1.23-alpine + container_name: shop-nginx + ports: + - "8082:8082" + volumes: + - ./nginx.conf:/etc/nginx/nginx.conf:ro + - ./logs:/var/log/nginx + - ../shared:/shared:ro + networks: + - network + depends_on: + - event-ingest + - pipeline-apid-impl + - shard-splitter + - mysql-pipeline + - mysql-shard-0 + - mysql-shard-1 + restart: unless-stopped \ No newline at end of file diff --git a/docker-compose/pipeline-api.yml b/docker-compose/pipeline-api.yml new file mode 100644 index 0000000..898fa46 --- /dev/null +++ b/docker-compose/pipeline-api.yml @@ -0,0 +1,23 @@ +services: + pipeline-apid-impl: + build: + context: ../services/pipeline-apid + dockerfile: Dockerfile + container_name: shop-pipeline-apid-impl + ports: + - "8083:8083" + environment: + - MYSQL_HOST=mysql-pipeline + - MYSQL_PORT=3306 + - MYSQL_USER=shop_user + - MYSQL_PASSWORD=shop_password + - MYSQL_DATABASE=pipeline_db + volumes: + - ../shared:/shared + networks: + - network + restart: unless-stopped + +networks: + network: + driver: bridge \ No newline at end of file diff --git a/docker-compose/shard-splitter.yml b/docker-compose/shard-splitter.yml index f076ffc..586e99a 100644 --- a/docker-compose/shard-splitter.yml +++ b/docker-compose/shard-splitter.yml @@ -6,3 +6,6 @@ services: container_name: shop-shard-splitter volumes: - ../shared:/shared + networks: + - network + diff --git a/docker-compose/shard-workers.yml b/docker-compose/shard-workers.yml index 1feb5c4..eef46bf 100644 --- a/docker-compose/shard-workers.yml +++ b/docker-compose/shard-workers.yml @@ -12,7 +12,8 @@ services: - ../shared:/shared depends_on: - mysql-shard-0 - - balance-daemon + networks: + - network shard-worker-1: build: @@ -27,4 +28,5 @@ services: - ../shared:/shared depends_on: - mysql-shard-1 - - balance-daemon + networks: + - network diff --git a/services/balance-daemon/Dockerfile b/services/balance-daemon/Dockerfile deleted file mode 100644 index 65128e3..0000000 --- a/services/balance-daemon/Dockerfile +++ /dev/null @@ -1,12 +0,0 @@ -FROM alpine:latest - -RUN apk --no-cache add curl bash - -WORKDIR /app - -# Скрипт сервиса -RUN echo '#!/usr/bin/env bash\nset -e\nSERVICE_DIR="/shared/balance-daemon"\nmkdir -p "$SERVICE_DIR"\ncleanup(){ echo "Cleaning $SERVICE_DIR ..."; rm -rf "$SERVICE_DIR"/* || true; }\ntrap cleanup EXIT TERM INT\necho "Balance Daemon Service"\necho "Waiting for databases..."\nsleep 20\necho "Balance Daemon is running..."\nwhile true; do\n echo "Processing balance operations..." | tee -a "$SERVICE_DIR/activity.log"\n sleep 30\ndone\n' > /app/run.sh && chmod +x /app/run.sh - -EXPOSE 8081 - -CMD ["/app/run.sh"] diff --git a/services/cache-service/Dockerfile b/services/cache-service/Dockerfile deleted file mode 100644 index a9fb3c6..0000000 --- a/services/cache-service/Dockerfile +++ /dev/null @@ -1,12 +0,0 @@ -FROM alpine:latest - -RUN apk --no-cache add curl bash - -WORKDIR /app - -# Скрипт сервиса -RUN echo '#!/usr/bin/env bash\nset -e\nSERVICE_DIR="/shared/cache-service"\nmkdir -p "$SERVICE_DIR"\ncleanup(){ echo "Cleaning $SERVICE_DIR ..."; rm -rf "$SERVICE_DIR"/* || true; }\ntrap cleanup EXIT TERM INT\necho "Cache Service"\necho "Starting..."\nwhile true; do\n echo "Processing cache operations..." | tee -a "$SERVICE_DIR/activity.log"\n sleep 30\ndone\n' > /app/run.sh && chmod +x /app/run.sh - -EXPOSE 8080 - -CMD ["/app/run.sh"] diff --git a/services/event-ingest/Dockerfile b/services/event-ingest/Dockerfile index 7e216f6..e462c41 100644 --- a/services/event-ingest/Dockerfile +++ b/services/event-ingest/Dockerfile @@ -2,33 +2,35 @@ FROM golang:1.21-alpine AS builder WORKDIR /app -# Копируем go.mod и go.sum COPY go.mod go.sum ./ RUN go mod download -# Копируем исходный код COPY . . - -# Синхронизируем зависимости и go.sum RUN go mod tidy -# Собираем приложение RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main ./cmd/event-ingest FROM alpine:latest -RUN apk --no-cache add ca-certificates bash +RUN apk --no-cache add ca-certificates WORKDIR /root/ -# Копируем бинарный файл COPY --from=builder /app/main . - -# Копируем конфигурацию COPY --from=builder /app/config ./config -# Обертка для подготовки директории и очистки при остановке -RUN echo '#!/usr/bin/env bash\n\nset -e\nSERVICE_DIR="/shared/events"\nmkdir -p "$SERVICE_DIR"\ncleanup() {\n echo "Cleaning $SERVICE_DIR ..."\n rm -rf "$SERVICE_DIR"/* || true\n}\ntrap cleanup EXIT TERM INT\nexec /root/main\n' > /root/entrypoint.sh && chmod +x /root/entrypoint.sh +# Создаем скрипт построчно +RUN printf '#!/bin/sh\n' > /root/entrypoint.sh && \ + printf 'set -e\n' >> /root/entrypoint.sh && \ + printf 'SERVICE_DIR="/shared/events"\n' >> /root/entrypoint.sh && \ + printf 'mkdir -p "$SERVICE_DIR"\n' >> /root/entrypoint.sh && \ + printf 'cleanup() {\n' >> /root/entrypoint.sh && \ + printf ' echo "Cleaning $SERVICE_DIR ..."\n' >> /root/entrypoint.sh && \ + printf ' rm -rf "$SERVICE_DIR"/* || true\n' >> /root/entrypoint.sh && \ + printf '}\n' >> /root/entrypoint.sh && \ + printf 'trap cleanup EXIT TERM INT\n' >> /root/entrypoint.sh && \ + printf 'exec /root/main\n' >> /root/entrypoint.sh && \ + chmod +x /root/entrypoint.sh EXPOSE 8080 -CMD ["/root/entrypoint.sh"] +CMD ["/bin/sh", "/root/entrypoint.sh"] \ No newline at end of file diff --git a/services/event-ingest/config/app.toml b/services/event-ingest/config/app.toml index 7b0744a..e6026f4 100644 --- a/services/event-ingest/config/app.toml +++ b/services/event-ingest/config/app.toml @@ -9,7 +9,7 @@ idle_timeout = 120 max_payload_size = 1048576 validation_timeout = 5 max_batch_size = 1000 -generation_interval = 10 # секунд между генерациями +generation_interval = 5 # секунд между генерациями events_per_batch = 500 # максимальное количество событий за раз [logging] diff --git a/services/event-ingest/internal/models/event.go b/services/event-ingest/internal/models/event.go index 2232af9..98c73cd 100644 --- a/services/event-ingest/internal/models/event.go +++ b/services/event-ingest/internal/models/event.go @@ -2,7 +2,6 @@ package models import ( "encoding/json" - "strconv" "time" ) diff --git a/services/mysql-sender/Dockerfile b/services/mysql-sender/Dockerfile new file mode 100644 index 0000000..1f375b8 --- /dev/null +++ b/services/mysql-sender/Dockerfile @@ -0,0 +1,25 @@ +# Используем официальный Python образ с нужной версией +FROM python:3.11-slim + +# Устанавливаем системные зависимости (если нужны) +RUN apt-get update && apt-get install -y \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +# Создаем рабочую директорию +WORKDIR /app + +# Копируем зависимости +COPY requirements.txt . + +# Устанавливаем Python-зависимости +RUN pip install --no-cache-dir -r requirements.txt + +# Копируем исходный код +COPY . . + +# Опционально: запускаем тесты +# RUN pytest tests/ -v + +# Указываем команду запуска +CMD ["python", "main.py"] \ No newline at end of file diff --git a/services/mysql-sender/main.py b/services/mysql-sender/main.py new file mode 100644 index 0000000..da8202e --- /dev/null +++ b/services/mysql-sender/main.py @@ -0,0 +1,150 @@ +import os +import json +import time +import logging +import signal +import shutil +import schedule +import requests +import mysql.connector +from pathlib import Path +from dotenv import load_dotenv + +load_dotenv() + +PIPELINE_API_URL = os.getenv("PIPELINE_API_URL", "http://pipeline-apid:8082/api/v1/pipeline") +MYSQL_SHARD_0_HOST = os.getenv("MYSQL_SHARD_0_HOST", "mysql-shard-0") +MYSQL_SHARD_1_HOST = os.getenv("MYSQL_SHARD_1_HOST", "mysql-shard-1") +MYSQL_PORT = int(os.getenv("MYSQL_PORT", "3306")) +MYSQL_USER = os.getenv("MYSQL_USER", "shop_user") +MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD", "shop_password") +MYSQL_DATABASE_PREFIX = os.getenv("MYSQL_DATABASE_PREFIX", "shop_shard_") +POLL_INTERVAL = int(os.getenv("POLL_INTERVAL", "5")) +SHARED_DIR = Path("/shared") +SERVICE_DIR = SHARED_DIR / "mysql-sender" + +SERVICE_DIR.mkdir(parents=True, exist_ok=True) + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(SERVICE_DIR / "activity.log"), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +def cleanup(): + if SERVICE_DIR.exists(): + shutil.rmtree(SERVICE_DIR) + logger.info(f"Cleaned up {SERVICE_DIR}") + +def signal_handler(signum, frame): + cleanup() + exit(0) + +signal.signal(signal.SIGTERM, signal_handler) +signal.signal(signal.SIGINT, signal_handler) + +mysql_connections = { + 0: mysql.connector.connect( + host=MYSQL_SHARD_0_HOST, + port=MYSQL_PORT, + user=MYSQL_USER, + password=MYSQL_PASSWORD, + database=f"{MYSQL_DATABASE_PREFIX}0" + ), + 1: mysql.connector.connect( + host=MYSQL_SHARD_1_HOST, + port=MYSQL_PORT, + user=MYSQL_USER, + password=MYSQL_PASSWORD, + database=f"{MYSQL_DATABASE_PREFIX}1" + ) +} + +def get_queue(): + try: + response = requests.get(f"{PIPELINE_API_URL}/queues/mysql-sender", timeout=10) + response.raise_for_status() + return response.json()["items"] + except Exception as e: + logger.error(f"Failed to fetch queue: {e}") + return [] + +def update_status(filename, shard, transition): + try: + response = requests.post( + f"{PIPELINE_API_URL}/stages/mysql-sender/{transition}", + json={"filename": filename, "shard": shard}, + timeout=5 + ) + response.raise_for_status() + logger.info(f"Updated {filename} shard {shard} to {transition}") + except Exception as e: + logger.error(f"Failed to update status: {e}") + +def insert_events(shard, events): + conn = mysql_connections[shard] + cursor = conn.cursor() + try: + insert_query = """ + INSERT INTO events (event_type, shop_id, user_id, timestamp, shard, created_at) + VALUES (%s, %s, %s, %s, %s, NOW()) + """ + data = [] + for event in events: + data.append(( + event["event_type"], + event.get("shop_id"), + event.get("user_id"), + event["timestamp"], + shard + )) + cursor.executemany(insert_query, data) + conn.commit() + logger.info(f"Inserted {len(events)} events into shard {shard}") + except Exception as e: + conn.rollback() + raise e + finally: + cursor.close() + +def process_task(filename, shard): + input_file = SHARED_DIR / f"shard-worker-{shard}" / f"{filename}.shard-worker.{shard}.prepared.txt" + if not input_file.exists(): + raise FileNotFoundError(f"Input file not found: {input_file}") + + events = [] + with open(input_file, 'r') as f: + for line in f: + line = line.strip() + if line: + events.append(json.loads(line)) + + if events: + insert_events(shard, events) + +def process_queue(): + tasks = get_queue() + for task in tasks: + filename = task["filename"] + shard = task["shard"] + try: + update_status(filename, shard, "start") + process_task(filename, shard) + update_status(filename, shard, "done") + except Exception as e: + logger.error(f"Failed to process {filename} shard {shard}: {e}") + update_status(filename, shard, "fail") + +def main(): + logger.info("MySQL Sender started") + schedule.every(POLL_INTERVAL).seconds.do(process_queue) + while True: + schedule.run_pending() + time.sleep(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/services/mysql-sender/requirements.txt b/services/mysql-sender/requirements.txt new file mode 100644 index 0000000..a6f43f4 --- /dev/null +++ b/services/mysql-sender/requirements.txt @@ -0,0 +1,4 @@ +requests +mysql-connector-python +schedule +python-dotenv \ No newline at end of file diff --git a/services/pipeline-apid/Dockerfile b/services/pipeline-apid/Dockerfile new file mode 100644 index 0000000..1f375b8 --- /dev/null +++ b/services/pipeline-apid/Dockerfile @@ -0,0 +1,25 @@ +# Используем официальный Python образ с нужной версией +FROM python:3.11-slim + +# Устанавливаем системные зависимости (если нужны) +RUN apt-get update && apt-get install -y \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +# Создаем рабочую директорию +WORKDIR /app + +# Копируем зависимости +COPY requirements.txt . + +# Устанавливаем Python-зависимости +RUN pip install --no-cache-dir -r requirements.txt + +# Копируем исходный код +COPY . . + +# Опционально: запускаем тесты +# RUN pytest tests/ -v + +# Указываем команду запуска +CMD ["python", "main.py"] \ No newline at end of file diff --git a/services/pipeline-apid/main.py b/services/pipeline-apid/main.py new file mode 100644 index 0000000..9ab734a --- /dev/null +++ b/services/pipeline-apid/main.py @@ -0,0 +1,270 @@ +from flask import Flask, request, jsonify +import pymysql +import os +import logging + +app = Flask(__name__) + +MYSQL_HOST = os.getenv('MYSQL_HOST', 'mysql-pipeline') +MYSQL_PORT = int(os.getenv('MYSQL_PORT', 3306)) +MYSQL_USER = os.getenv('MYSQL_USER', 'shop_user') +MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', 'shop_password') +MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'pipeline_db') + +def get_db(): + return pymysql.connect( + host=MYSQL_HOST, + port=MYSQL_PORT, + user=MYSQL_USER, + password=MYSQL_PASSWORD, + database=MYSQL_DATABASE, + cursorclass=pymysql.cursors.DictCursor + ) + +@app.route('/api/v1/pipeline/files/register', methods=['POST']) +def register_file(): + data = request.get_json() + filename = data.get('filename') + shards = data.get('shards') + + if not filename or not shards: + return jsonify({'error': 'Missing filename or shards'}), 400 + + conn = get_db() + try: + with conn.cursor() as cursor: + for shard in shards: + sql = """ + INSERT INTO pipeline_tracking (filename, shard, event_ingest_status) + VALUES (%s, %s, 'started') + ON DUPLICATE KEY UPDATE filename = %s + """ + cursor.execute(sql, (filename, shard, filename)) + conn.commit() + return jsonify({'filename': filename, 'shards': shards, 'created': True}), 200 + except Exception as e: + conn.rollback() + return jsonify({'error': str(e)}), 500 + finally: + conn.close() + +@app.route('/api/v1/pipeline/stages/event-ingest/done', methods=['POST']) +def event_ingest_done(): + data = request.get_json() + filename = data.get('filename') + + if not filename: + return jsonify({'error': 'Missing filename'}), 400 + + conn = get_db() + try: + with conn.cursor() as cursor: + sql = """ + UPDATE pipeline_tracking + SET event_ingest_status = 'done', updated_at = NOW() + WHERE filename = %s + """ + cursor.execute(sql, (filename,)) + updated_rows = cursor.rowcount + conn.commit() + return jsonify({'filename': filename, 'updated_rows': updated_rows}), 200 + except Exception as e: + conn.rollback() + return jsonify({'error': str(e)}), 500 + finally: + conn.close() + +@app.route('/api/v1/pipeline/stages//', methods=['POST']) +def stage_transition(stage, transition): + valid_stages = ['shard-splitter', 'shard-worker', 'mysql-sender'] + valid_transitions = ['start', 'done', 'fail'] + + if stage not in valid_stages: + return jsonify({'error': 'Invalid stage'}), 400 + if transition not in valid_transitions: + return jsonify({'error': 'Invalid transition'}), 400 + + data = request.get_json() + filename = data.get('filename') + shard = data.get('shard') + + if filename is None or shard is None: + return jsonify({'error': 'Missing filename or shard'}), 400 + + stage_column = { + 'shard-splitter': 'shard_splitter_status', + 'shard-worker': 'shard_worker_status', + 'mysql-sender': 'mysql_sender_status' + }[stage] + + status_map = {'start': 'started', 'done': 'done', 'fail': 'failed'} + new_status = status_map[transition] + + conn = get_db() + try: + with conn.cursor() as cursor: + sql = f""" + UPDATE pipeline_tracking + SET {stage_column} = %s, updated_at = NOW() + WHERE filename = %s AND shard = %s + """ + cursor.execute(sql, (new_status, filename, shard)) + updated_rows = cursor.rowcount + conn.commit() + if updated_rows == 0: + return jsonify({'error': 'No record found'}), 404 + return jsonify({ + 'filename': filename, + 'shard': shard, + 'stage': stage, + 'status': new_status + }), 200 + except Exception as e: + conn.rollback() + return jsonify({'error': str(e)}), 500 + finally: + conn.close() + +@app.route('/api/v1/pipeline/stages//retry', methods=['POST']) +def stage_retry(stage): + valid_stages = ['shard-splitter', 'shard-worker', 'mysql-sender'] + if stage not in valid_stages: + return jsonify({'error': 'Invalid stage'}), 400 + + data = request.get_json() + filename = data.get('filename') + shard = data.get('shard') + + if filename is None or shard is None: + return jsonify({'error': 'Missing filename or shard'}), 400 + + stage_column = { + 'shard-splitter': 'shard_splitter_status', + 'shard-worker': 'shard_worker_status', + 'mysql-sender': 'mysql_sender_status' + }[stage] + + conn = get_db() + try: + with conn.cursor() as cursor: + sql = f""" + UPDATE pipeline_tracking + SET {stage_column} = 'new', updated_at = NOW() + WHERE filename = %s AND shard = %s AND {stage_column} = 'failed' + """ + cursor.execute(sql, (filename, shard)) + updated_rows = cursor.rowcount + conn.commit() + if updated_rows == 0: + return jsonify({'error': 'No failed record found'}), 404 + return jsonify({ + 'filename': filename, + 'shard': shard, + 'stage': stage, + 'status': 'new' + }), 200 + except Exception as e: + conn.rollback() + return jsonify({'error': str(e)}), 500 + finally: + conn.close() + +@app.route('/api/v1/pipeline/queues/', methods=['GET']) +def get_queue(stage): + valid_stages = ['shard-splitter', 'shard-worker', 'mysql-sender'] + if stage not in valid_stages: + return jsonify({'error': 'Invalid stage'}), 400 + + conditions = { + 'shard-splitter': "event_ingest_status='done' AND shard_splitter_status='new'", + 'shard-worker': "shard_splitter_status='done' AND shard_worker_status='new'", + 'mysql-sender': "shard_worker_status='done' AND mysql_sender_status='new'" + } + + limit = request.args.get('limit', default=10, type=int) + offset = request.args.get('offset', default=0, type=int) + + conn = get_db() + try: + with conn.cursor() as cursor: + sql = f""" + SELECT filename, shard, created_at + FROM pipeline_tracking + WHERE {conditions[stage]} + ORDER BY created_at + LIMIT %s OFFSET %s + """ + cursor.execute(sql, (limit, offset)) + items = cursor.fetchall() + return jsonify({'stage': stage, 'items': items}), 200 + except Exception as e: + return jsonify({'error': str(e)}), 500 + finally: + conn.close() + +@app.route('/api/v1/pipeline/files/', methods=['GET']) +def get_file_status(filename): + conn = get_db() + try: + with conn.cursor() as cursor: + sql = """ + SELECT shard, event_ingest_status, shard_splitter_status, + shard_worker_status, mysql_sender_status, updated_at + FROM pipeline_tracking + WHERE filename = %s + ORDER BY shard + """ + cursor.execute(sql, (filename,)) + shards = cursor.fetchall() + return jsonify({'filename': filename, 'shards': shards}), 200 + except Exception as e: + return jsonify({'error': str(e)}), 500 + finally: + conn.close() + +@app.route('/api/v1/pipeline/metrics/summary', methods=['GET']) +def get_metrics_summary(): + conn = get_db() + try: + with conn.cursor() as cursor: + cursor.execute("SELECT COUNT(*) as total FROM pipeline_tracking") + total = cursor.fetchone()['total'] + + stages = ['shard_splitter_status', 'shard_worker_status', 'mysql_sender_status'] + by_stage = {} + for stage in stages: + sql = f""" + SELECT + SUM({stage}='new') as new, + SUM({stage}='started') as started, + SUM({stage}='done') as done, + SUM({stage}='failed') as failed + FROM pipeline_tracking + """ + cursor.execute(sql) + by_stage[stage] = cursor.fetchone() + + sql = """ + SELECT shard, + COUNT(*) as total, + SUM(mysql_sender_status='done') as done, + SUM(mysql_sender_status='failed') as failed + FROM pipeline_tracking + GROUP BY shard + ORDER BY shard + """ + cursor.execute(sql) + by_shard = cursor.fetchall() + + return jsonify({ + 'total': total, + 'by_stage': by_stage, + 'by_shard': by_shard + }), 200 + except Exception as e: + return jsonify({'error': str(e)}), 500 + finally: + conn.close() + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=8083) \ No newline at end of file diff --git a/services/pipeline-apid/requirements.txt b/services/pipeline-apid/requirements.txt new file mode 100644 index 0000000..c22ae72 --- /dev/null +++ b/services/pipeline-apid/requirements.txt @@ -0,0 +1,3 @@ +Flask==2.3.3 +PyMySQL==1.0.2 +requests==2.31.0 \ No newline at end of file diff --git a/services/shard-splitter/Dockerfile b/services/shard-splitter/Dockerfile index 7a26755..1f375b8 100644 --- a/services/shard-splitter/Dockerfile +++ b/services/shard-splitter/Dockerfile @@ -1,12 +1,25 @@ -FROM alpine:latest +# Используем официальный Python образ с нужной версией +FROM python:3.11-slim -RUN apk --no-cache add curl bash +# Устанавливаем системные зависимости (если нужны) +RUN apt-get update && apt-get install -y \ + gcc \ + && rm -rf /var/lib/apt/lists/* +# Создаем рабочую директорию WORKDIR /app -# Скрипт сервиса -RUN echo '#!/usr/bin/env bash\nset -e\nSERVICE_DIR="/shared/shard-splitter"\nmkdir -p "$SERVICE_DIR"\ncleanup(){ echo "Cleaning $SERVICE_DIR ..."; rm -rf "$SERVICE_DIR"/* || true; }\ntrap cleanup EXIT TERM INT\necho "Shard Splitter Service"\necho "Starting..."\nwhile true; do\n echo "Processing events..." | tee -a "$SERVICE_DIR/activity.log"\n sleep 30\ndone\n' > /app/run.sh && chmod +x /app/run.sh +# Копируем зависимости +COPY requirements.txt . -EXPOSE 8080 +# Устанавливаем Python-зависимости +RUN pip install --no-cache-dir -r requirements.txt -CMD ["/app/run.sh"] +# Копируем исходный код +COPY . . + +# Опционально: запускаем тесты +# RUN pytest tests/ -v + +# Указываем команду запуска +CMD ["python", "main.py"] \ No newline at end of file diff --git a/services/shard-splitter/main.py b/services/shard-splitter/main.py new file mode 100644 index 0000000..af34512 --- /dev/null +++ b/services/shard-splitter/main.py @@ -0,0 +1,76 @@ +import os +import time +import json +import logging +import requests +from pathlib import Path + +PIPELINE_API_URL = os.getenv('PIPELINE_API_URL', 'http://pipeline-apid:8082/api/v1/pipeline') +SHARED_DIR = os.getenv('SHARED_DIR', '/shared') +EVENTS_DIR = os.path.join(SHARED_DIR, 'events') +SPLITTER_DIR = os.path.join(SHARED_DIR, 'shard-splitter') + +Path(SPLITTER_DIR).mkdir(parents=True, exist_ok=True) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def process_task(filename, shard): + response = requests.post( + f"{PIPELINE_API_URL}/stages/shard-splitter/start", + json={"filename": filename, "shard": shard} + ) + if response.status_code != 200: + logger.error(f"Failed to start shard-splitter for {filename} shard {shard}: {response.text}") + return + + input_file = os.path.join(EVENTS_DIR, f"{filename}.event-ingest.txt") + output_file = os.path.join(SPLITTER_DIR, f"{filename}.shard-splitter.{shard}.txt") + + try: + with open(input_file, 'r') as f: + events = f.readlines() + + filtered_events = [] + for line in events: + event = json.loads(line) + shop_id = event.get('payload', {}).get('shop_id') + if shop_id is not None and shop_id % 2 == shard: + filtered_events.append(line) + + with open(output_file, 'w') as f: + f.writelines(filtered_events) + + response = requests.post( + f"{PIPELINE_API_URL}/stages/shard-splitter/done", + json={"filename": filename, "shard": shard} + ) + if response.status_code != 200: + logger.error(f"Failed to mark shard-splitter done for {filename} shard {shard}: {response.text}") + else: + logger.info(f"Processed {filename} for shard {shard}") + + except Exception as e: + logger.error(f"Error processing {filename} shard {shard}: {e}") + response = requests.post( + f"{PIPELINE_API_URL}/stages/shard-splitter/fail", + json={"filename": filename, "shard": shard} + ) + +def main(): + logger.info("Starting shard-splitter daemon") + while True: + try: + response = requests.get(f"{PIPELINE_API_URL}/queues/shard-splitter") + if response.status_code == 200: + tasks = response.json().get('items', []) + for task in tasks: + process_task(task['filename'], task['shard']) + else: + logger.error(f"Failed to get queue: {response.text}") + except Exception as e: + logger.error(f"Error in main loop: {e}") + time.sleep(5) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/services/shard-splitter/requirements.txt b/services/shard-splitter/requirements.txt new file mode 100644 index 0000000..c22ae72 --- /dev/null +++ b/services/shard-splitter/requirements.txt @@ -0,0 +1,3 @@ +Flask==2.3.3 +PyMySQL==1.0.2 +requests==2.31.0 \ No newline at end of file diff --git a/services/shard-worker/Dockerfile b/services/shard-worker/Dockerfile index af7abb8..1f375b8 100644 --- a/services/shard-worker/Dockerfile +++ b/services/shard-worker/Dockerfile @@ -1,12 +1,25 @@ -FROM alpine:latest +# Используем официальный Python образ с нужной версией +FROM python:3.11-slim -RUN apk --no-cache add curl bash +# Устанавливаем системные зависимости (если нужны) +RUN apt-get update && apt-get install -y \ + gcc \ + && rm -rf /var/lib/apt/lists/* +# Создаем рабочую директорию WORKDIR /app -# Скрипт сервиса -RUN echo '#!/usr/bin/env bash\nset -e\nSERVICE_DIR="/shared/shard-worker-$SHARD_ID"\nmkdir -p "$SERVICE_DIR"\ncleanup(){ echo "Cleaning $SERVICE_DIR ..."; rm -rf "$SERVICE_DIR"/* || true; }\ntrap cleanup EXIT TERM INT\necho "Shard Worker Service - Shard $SHARD_ID"\necho "Waiting for dependencies..."\nsleep 15\necho "Shard Worker is running..."\nwhile true; do\n echo "Processing shard $SHARD_ID events..." | tee -a "$SERVICE_DIR/activity.log"\n sleep 30\ndone\n' > /app/run.sh && chmod +x /app/run.sh +# Копируем зависимости +COPY requirements.txt . -EXPOSE 8080 +# Устанавливаем Python-зависимости +RUN pip install --no-cache-dir -r requirements.txt -CMD ["/app/run.sh"] +# Копируем исходный код +COPY . . + +# Опционально: запускаем тесты +# RUN pytest tests/ -v + +# Указываем команду запуска +CMD ["python", "main.py"] \ No newline at end of file diff --git a/services/shard-worker/main.py b/services/shard-worker/main.py new file mode 100644 index 0000000..76e9f6b --- /dev/null +++ b/services/shard-worker/main.py @@ -0,0 +1,99 @@ +import os +import time +import json +import logging +import requests +from pathlib import Path +from collections import defaultdict + +PIPELINE_API_URL = os.getenv('PIPELINE_API_URL', 'http://pipeline-apid:8082/api/v1/pipeline') +SHARED_DIR = os.getenv('SHARED_DIR', '/shared') +SHARD_ID = int(os.getenv('SHARD_ID', 0)) +SPLITTER_DIR = os.path.join(SHARED_DIR, 'shard-splitter') +WORKER_DIR = os.path.join(SHARED_DIR, f'shard-worker-{SHARD_ID}') + +Path(WORKER_DIR).mkdir(parents=True, exist_ok=True) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +cache = defaultdict(set) +CACHE_TTL = 30 + +def process_task(filename, shard): + response = requests.post( + f"{PIPELINE_API_URL}/stages/shard-worker/start", + json={"filename": filename, "shard": shard} + ) + if response.status_code != 200: + logger.error(f"Failed to start shard-worker for {filename} shard {shard}: {response.text}") + return + + input_file = os.path.join(SPLITTER_DIR, f"{filename}.shard-splitter.{shard}.txt") + output_file = os.path.join(WORKER_DIR, f"{filename}.shard-worker.{shard}.prepared.txt") + + try: + with open(input_file, 'r') as f: + events = f.readlines() + + processed_events = [] + for line in events: + event = json.loads(line) + event_type = event.get('event_type') + payload = event.get('payload', {}) + timestamp = event.get('timestamp') + + if event_type in [1, 3]: + order_id = payload.get('order_id') + if order_id is not None: + key = f"{order_id}_{timestamp}" + if key in cache[event_type]: + continue + cache[event_type].add(key) + + processed_event = { + 'event_type': event_type, + 'shop_id': payload.get('shop_id'), + 'user_id': payload.get('user_id'), + 'timestamp': timestamp, + 'shard': shard + } + processed_events.append(json.dumps(processed_event) + '\n') + + with open(output_file, 'w') as f: + f.writelines(processed_events) + + response = requests.post( + f"{PIPELINE_API_URL}/stages/shard-worker/done", + json={"filename": filename, "shard": shard} + ) + if response.status_code != 200: + logger.error(f"Failed to mark shard-worker done for {filename} shard {shard}: {response.text}") + else: + logger.info(f"Processed {filename} for shard {shard}") + + except Exception as e: + logger.error(f"Error processing {filename} shard {shard}: {e}") + response = requests.post( + f"{PIPELINE_API_URL}/stages/shard-worker/fail", + json={"filename": filename, "shard": shard} + ) + +def main(): + logger.info(f"Starting shard-worker daemon for shard {SHARD_ID}") + while True: + try: + response = requests.get(f"{PIPELINE_API_URL}/queues/shard-worker") + if response.status_code == 200: + tasks = response.json().get('items', []) + for task in tasks: + if task['shard'] == SHARD_ID: + process_task(task['filename'], task['shard']) + else: + logger.error(f"Failed to get queue: {response.text}") + except Exception as e: + logger.error(f"Error in main loop: {e}") + time.sleep(5) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/services/shard-worker/requirements.txt b/services/shard-worker/requirements.txt new file mode 100644 index 0000000..c22ae72 --- /dev/null +++ b/services/shard-worker/requirements.txt @@ -0,0 +1,3 @@ +Flask==2.3.3 +PyMySQL==1.0.2 +requests==2.31.0 \ No newline at end of file