diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..629d4e4 --- /dev/null +++ b/.env.example @@ -0,0 +1,66 @@ +# .env.example — نموذج متغيرات البيئة (لا يحتوي أسراراً) +# ضع القيم الحقيقية في Render -> Environment (أو ملف .env محلياً) + +# ---------- Database & Cache ---------- +DATABASE_URL= +DB_POOL_SIZE=10 +DB_TIMEOUT=30 + +REDIS_URL= +REDIS_TOKEN= +REDIS_CACHE_TTL=5 + +# ---------- Data Feed ---------- +# Dukascopy (أو استخدم بروكسي dukascopy-api-websocket) +DUKASCOPY_USER= +DUKASCOPY_PASS= +DUKASCOPY_WS_URL=wss://datafeed.dukascopy.com/ + +# Fallback providers +TWELVEDATA_API_KEY= +OANDA_API_KEY= +OANDA_ACCOUNT_ID= + +# Symbols and timeframes +SYMBOLS=XAUUSD,EURUSD,GBPUSD,USDJPY,USDCHF,AUDUSD,USDCAD,US30,NAS100,GER40,USOIL +DEFAULT_SYMBOL=XAUUSD +DEFAULT_TIMEFRAME=1m +AVAILABLE_TIMEFRAMES=1m,3m,5m,15m,30m,1h,4h,1d + +# ---------- MT5/Execution (optional) ---------- +BROKER_ENVIRONMENT=paper +MT5_SERVER_IP= +MT5_SERVER_PORT= +MT5_ACCOUNT_LOGIN= +MT5_ACCOUNT_PASSWORD= +MAX_RISK_PER_TRADE_PERCENT=2.0 + +# ---------- AI Engine ---------- +MODEL_ENABLED=true +MODEL_TYPE=xgboost +MODEL_PATH=models/market_model.pkl +USE_GPU_ACCELERATION=false +CONFIDENCE_THRESHOLD=0.65 +PREDICTION_INTERVAL_SECONDS=60 + +# ---------- API & Performance ---------- +API_HOST=0.0.0.0 +API_PORT=8000 +API_WORKERS=2 +TICK_BATCH_SIZE=500 +BATCH_INTERVAL=1.0 +MAX_QUEUE=20000 +WORKER_THREADS=4 + +# ---------- Security ---------- +ENV=production +CORS_ORIGINS=* # مؤقتًا أثناء التطوير؛ غيّره للـ production لاحقًا +API_KEY= +JWT_SECRET= +ENCRYPTION_KEY= +RATE_LIMIT_REQUESTS=120 +RATE_LIMIT_WINDOW_SECONDS=60 + +# ---------- Monitoring ---------- +LOG_LEVEL=INFO +SENTRY_DSN= diff --git a/.gitignore b/.gitignore index 6b63e9f..f0fcb1b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,45 +1,29 @@ -HELP.md -target/ -!.mvn/wrapper/maven-wrapper.jar -!**/src/main/**/target/ -!**/src/test/**/target/ -.mvn - -### STS ### -.apt_generated -.classpath -.factorypath -.project -.settings -.springBeans -.sts4-cache - -### IntelliJ IDEA ### -.idea -*.iws -*.iml -*.ipr - -# batch files -*.bat - -# svn -.svn - -# mvn -mvnw -mvnw.cmd - - -### NetBeans ### -/nbproject/private/ -/nbbuild/ -/dist/ -/nbdist/ -/.nb-gradle/ -build/ -!**/src/main/**/build/ -!**/src/test/**/build/ - -### VS Code ### -.vscode/ +# Environment files +.env +.env.local +.env.production +.env.development + +# Python +__pycache__/ +*.pyc +.pytest_cache/ + +# Node / frontend +node_modules/ +.next/ + +# Data / ML artifacts (do not upload large datasets or model weights) +data/*.csv +data/*.parquet +data/*.json +data/ticks/ + +models/*.pkl +models/*.h5 +models/*.pt +models/*.onnx + +# logs +*.log +logs/ diff --git a/Dockerfile b/Dockerfile index 6f15179..c68c8cc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,40 @@ -FROM openjdk:17-jdk-alpine -EXPOSE 7080 -EXPOSE 7081 -ARG JAR_FILE=target/dukascopy-api-websocket-1.0.war -ADD ${JAR_FILE} dukascopy-api-websocket.war -ENTRYPOINT ["java","-jar","dukascopy-api-websocket.war","--dukascopy.credential-username=username", "--dukascopy.credential-password=password"] \ No newline at end of file +# ============================ +# مرحلة البناء (Build) باستخدام Java 11 +# ============================ +FROM maven:3.8-openjdk-11 AS builder +WORKDIR /build + +# نسخ ملف POM أولاً للاستفادة من التخزين المؤقت للتبعيات +COPY pom.xml . +RUN mvn dependency:go-offline -B || true + +# نسخ باقي الكود المصدري +COPY src ./src + +# بناء التطبيق (إنشاء ملف WAR) +RUN mvn clean package -DskipTests + +# ============================ +# مرحلة التشغيل (Runtime) باستخدام Eclipse Temurin (Java 11) +# ============================ +FROM eclipse-temurin:11-jre +WORKDIR /app + +# نسخ ملف WAR الناتج من مرحلة البناء +COPY --from=builder /build/target/*.war app.war + +# تعريف المنفذ (سيتم تجاوزه بواسطة Render عبر متغير PORT) +EXPOSE 8080 + +# متغيرات البيئة الافتراضية (يتم استبدالها بقيم من خدمة Render) +ENV SERVER_PORT=${PORT:-8080} \ + DUKE_USERNAME=${DUKASCOPY_USER} \ + DUKE_PASSWORD=${DUKASCOPY_PASS} \ + DB_URL=${DATABASE_URL} + +# نقطة الدخول: تشغيل التطبيق مع تمرير المتغيرات +CMD java -jar app.war \ + --server.port=$SERVER_PORT \ + --dukascopy.credential-username=$DUKE_USERNAME \ + --dukascopy.credential-password=$DUKE_PASSWORD \ + --spring.datasource.url=$DB_URL diff --git a/api/main.py b/api/main.py new file mode 100644 index 0000000..307ed4b --- /dev/null +++ b/api/main.py @@ -0,0 +1,56 @@ +# api/main.py +import os, asyncio, json +from fastapi import FastAPI, WebSocket, Depends, HTTPException +import asyncpg +from fastapi.middleware.cors import CORSMiddleware +from dotenv import load_dotenv + +load_dotenv() +app = FastAPI(title="Market API") + +origins = os.getenv("CORS_ORIGINS","*") +if origins == "*": + allow_origins = ["*"] +else: + allow_origins = [o.strip() for o in origins.split(",")] + +app.add_middleware( + CORSMiddleware, + allow_origins=allow_origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +DB_DSN = os.getenv("DATABASE_URL") + +@app.on_event("startup") +async def startup(): + app.state.pool = await asyncpg.create_pool(dsn=DB_DSN, min_size=1, max_size=10) + app.state.subscribers = set() + +@app.on_event("shutdown") +async def shutdown(): + await app.state.pool.close() + +@app.get("/api/v1/candles/{symbol}/{timeframe}") +async def get_candles(symbol: str, timeframe: str, limit: int = 200): + pool = app.state.pool + rows = await pool.fetch( + "SELECT timestamp, open, high, low, close, volume FROM candles WHERE symbol=$1 AND timeframe=$2 ORDER BY timestamp DESC LIMIT $3", + symbol, timeframe, limit + ) + return [dict(r) for r in rows] + +@app.websocket("/ws/market") +async def ws_market(ws: WebSocket): + await ws.accept() + app.state.subscribers.add(ws) + try: + while True: + await asyncio.sleep(60) # placeholder; actual pushes from collector via Redis/pubsub + except Exception: + pass + finally: + app.state.subscribers.remove(ws) + await ws.close() diff --git a/data_feed/market_stream.py b/data_feed/market_stream.py new file mode 100644 index 0000000..3c03555 --- /dev/null +++ b/data_feed/market_stream.py @@ -0,0 +1,102 @@ +# data_feed/market_stream.py +import os, asyncio, json, traceback, time +from datetime import datetime, timezone +import asyncpg +import websockets +from dotenv import load_dotenv + +load_dotenv() + +PROXY_WS = os.getenv("PROXY_WS_URL") or os.getenv("DUKASCOPY_WS_URL") +DB_DSN = os.getenv("DATABASE_URL") +BATCH_SIZE = int(os.getenv("TICK_BATCH_SIZE", "500")) +BATCH_INTERVAL = float(os.getenv("BATCH_INTERVAL", "1.0")) +MAX_QUEUE = int(os.getenv("MAX_QUEUE", "20000")) + +queue = asyncio.Queue(maxsize=MAX_QUEUE) + +def now_iso(): + return datetime.now(timezone.utc).isoformat() + +def normalize(msg_text): + try: + d = json.loads(msg_text) + except Exception: + return None + # try many shapes + symbol = d.get("symbol") or d.get("instrument") or d.get("pair") or d.get("s") + if symbol and "/" in symbol: symbol = symbol.replace("/", "") + bid = d.get("bid") or d.get("b") or d.get("price") + ask = d.get("ask") or d.get("a") or (d.get("price") and d.get("price")) + vol = d.get("volume") or d.get("v") or d.get("size") or 0 + ts = d.get("timestamp") or d.get("ts") or d.get("time") or None + try: + if ts: + ts = int(ts) + ts_iso = datetime.fromtimestamp(ts/1000, tz=timezone.utc).isoformat() if ts>10**12 else datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() + else: + ts_iso = now_iso() + except: + ts_iso = now_iso() + if not symbol or (bid is None and ask is None): + return None + if bid is None: bid = ask + if ask is None: ask = bid + try: + bid = float(bid); ask = float(ask); vol = float(vol) + except: + pass + return (symbol, ts_iso, bid, ask, vol) + +async def pg_pool(): + return await asyncpg.create_pool(dsn=DB_DSN, min_size=1, max_size=10) + +async def writer_worker(pool): + buffer = [] + last_flush = time.time() + while True: + item = await queue.get() + buffer.append(item) + if len(buffer) >= BATCH_SIZE or (time.time()-last_flush)>=BATCH_INTERVAL: + records = [(r[0], r[1], r[2], r[3], r[4]) for r in buffer] + buffer = [] + last_flush = time.time() + async with pool.acquire() as conn: + try: + await conn.copy_records_to_table('ticks', records=records, columns=['symbol','timestamp','bid','ask','volume']) + except Exception: + for rec in records: + try: + await conn.execute("INSERT INTO ticks(symbol,timestamp,bid,ask,volume) VALUES($1,$2,$3,$4,$5)", *rec) + except Exception as e: + print("insert err:", e) + queue.task_done() + +async def ws_consumer(): + pool = await pg_pool() + workers = [asyncio.create_task(writer_worker(pool)) for _ in range(2)] + backoff = 1 + while True: + try: + async with websockets.connect(PROXY_WS, max_size=None) as ws: + print("connected to", PROXY_WS) + # optional subscribe message + sub = os.getenv("PROXY_SUBSCRIBE_MESSAGE") + if sub: + await ws.send(sub) + async for msg in ws: + n = normalize(msg) + if n: + try: + await queue.put(n) + except asyncio.QueueFull: + _ = queue.get_nowait() # drop oldest + await queue.put(n) + except Exception as e: + print("ws err:", e) + traceback.print_exc() + await asyncio.sleep(backoff) + backoff = min(backoff*2, 60) + +if __name__ == "__main__": + asyncio.run(ws_consumer()) diff --git a/prediction/predictor.py b/prediction/predictor.py new file mode 100644 index 0000000..20252d3 --- /dev/null +++ b/prediction/predictor.py @@ -0,0 +1,19 @@ +# prediction/predictor.py +import os, joblib, numpy as np +from typing import Dict + +MODEL_PATH = os.getenv("MODEL_PATH","models/market_model.pkl") +_model = None + +def load_model(): + global _model + if _model is None: + _model = joblib.load(MODEL_PATH) + return _model + +def predict(features: Dict): + model = load_model() + X = np.array([features[k] for k in sorted(features.keys())]).reshape(1,-1) + proba = model.predict_proba(X) if hasattr(model, "predict_proba") else None + pred = model.predict(X) + return {"pred":float(pred[0]), "proba": proba.tolist() if proba is not None else None} diff --git a/processing/candle_builder.py b/processing/candle_builder.py new file mode 100644 index 0000000..d12abef --- /dev/null +++ b/processing/candle_builder.py @@ -0,0 +1,45 @@ +# processing/candle_builder.py +from datetime import datetime, timezone +from collections import defaultdict +import asyncpg +import os + +DB_DSN = os.getenv("DATABASE_URL") +TIMEFRAME_SECONDS = { + "1m":60, "3m":180, "5m":300, "15m":900, "30m":1800, "1h":3600, "4h":14400, "1d":86400 +} + +class CandleBuilder: + def __init__(self, pool): + self.pool = pool + self.candles = defaultdict(dict) # key=(symbol,tf) -> candle dict + + def bucket(self, ts, tf_s): + epoch = int(ts.timestamp()) + return epoch - (epoch % tf_s) + + async def process_tick(self, symbol, ts_iso, price, vol): + ts = datetime.fromisoformat(ts_iso) + for tf, s in TIMEFRAME_SECONDS.items(): + key = (symbol, tf) + b = self.bucket(ts, s) + c = self.candles.get(key) + if not c or c['bucket']!=b: + if c: + await self.save_candle(symbol, tf, c) + self.candles[key] = {'bucket':b, 'open':price,'high':price,'low':price,'close':price,'vol':vol} + else: + c['high']=max(c['high'], price) + c['low']=min(c['low'], price) + c['close']=price + c['vol'] += vol + + async def save_candle(self, symbol, tf, c): + ts_iso = datetime.fromtimestamp(c['bucket'], tz=timezone.utc).isoformat() + async with self.pool.acquire() as conn: + await conn.execute(""" + INSERT INTO candles(symbol, timeframe, timestamp, open, high, low, close, volume) + VALUES($1,$2,$3,$4,$5,$6,$7,$8) + ON CONFLICT (symbol,timeframe,timestamp) DO UPDATE + SET open=EXCLUDED.open, high=EXCLUDED.high, low=EXCLUDED.low, close=EXCLUDED.close, volume=EXCLUDED.volume + """, symbol, tf, ts_iso, c['open'], c['high'], c['low'], c['close'], c['vol']) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ba94bcc --- /dev/null +++ b/requirements.txt @@ -0,0 +1,15 @@ +asyncpg==0.27.0 +websockets==10.4 +aiohttp==3.8.4 +redis==5.0.1 +python-dotenv==1.0.0 +ujson==5.7.0 +numpy==1.26.4 +pandas==2.2.2 +fastapi==0.100.0 +uvicorn==0.22.0 +xgboost==1.8.4 +joblib==1.3.2 +ta==0.12.1 +python-jose==3.3.0 +psycopg2-binary==2.9.10