-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
338 lines (289 loc) · 10.7 KB
/
main.py
File metadata and controls
338 lines (289 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, Dict, Any
import uvicorn
import httpx
import json
import os
import asyncio
import datetime
import pymysql
from datetime import datetime, timedelta
from dotenv import load_dotenv
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
from fastapi import Request # 如果之前没 import Request,请加上
# 加载环境变量
load_dotenv()
def get_db_connection():
"""获取MySQL数据库连接"""
return pymysql.connect(
host=os.getenv("DB_HOST", "localhost"),
user=os.getenv("DB_USER", "root"),
password=os.getenv("DB_PASSWORD", ""),
database=os.getenv("DB_NAME", "oracle_matrix"),
charset='utf8mb4'
)
def verify_and_use_cdkey(cd_key: str):
# 保留管理员测试通道(.env 中的静态密码依然可用)
valid_keys = os.getenv("VALID_CDKEYS", "oracle888,vip2026").split(",")
if cd_key in valid_keys:
return True
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT first_used_at FROM cdkeys WHERE `key`=%s", (cd_key,))
row = cursor.fetchone()
if not row:
conn.close()
raise HTTPException(status_code=403, detail="激活码无效,请检查是否输入错误。")
first_used_at = row[0]
now = datetime.now()
if first_used_at is None:
# 首次激活,记录当前时间
cursor.execute("UPDATE cdkeys SET first_used_at=%s WHERE `key`=%s", (now.isoformat(), cd_key))
conn.commit()
conn.close()
return True
else:
# 检查是否过期(24小时内有效)
used_time = datetime.fromisoformat(first_used_at)
if now - used_time > timedelta(hours=24):
conn.close()
raise HTTPException(status_code=403, detail="该激活码已过期(激活后24小时内有效)。请重新购买。")
conn.close()
return True
except Exception as e:
raise HTTPException(status_code=500, detail=f"数据库连接错误: {str(e)}")
# 导入我们的八字排盘和提示词模块
from lunar import get_bazi_result
from prompt import generate_analysis_prompt
# 创建 FastAPI 应用实例
app = FastAPI(
title="OracleMatrix 八字命理分析 API",
description="基于传统八字命理学的智能分析引擎",
version="1.0.0"
)
# --- 新增:初始化 IP 限流器 ---
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)
# 配置 CORS 跨域中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 挂载静态文件目录
app.mount("/static", StaticFiles(directory="static"), name="static")
# 数据模型定义
class UserInfo(BaseModel):
"""用户信息数据模型"""
year: int
month: int
day: int
hour: int
minute: int
longitude: float
gender: str
class AIAnalysisRequest(BaseModel):
"""AI 解析请求数据模型"""
bazi_data: Dict[str, Any]
gender: str = "男"
cd_key: str
class ChatRequest(BaseModel):
"""聊天追问请求数据模型"""
bazi_data: Dict[str, Any]
gender: str
question: str
cd_key: str
class VerifyRequest(BaseModel):
"""回测验证请求数据模型"""
bazi_data: Dict[str, Any]
gender: str
# 接口一:八字排盘计算 API
@app.post("/api/calculate")
@limiter.limit("20/day") # 每天最多 20 次
async def calculate_bazi(request: Request, user_info: UserInfo):
"""
计算八字排盘数据
Args:
user_info: 用户信息,包含出生时间、经纬度等
Returns:
Dict: 完整的八字排盘结果
"""
try:
# 调用八字计算函数
result = get_bazi_result(
year=user_info.year,
month=user_info.month,
day=user_info.day,
hour=user_info.hour,
minute=user_info.minute,
longitude=user_info.longitude,
latitude=39.9, # 默认北京纬度
gender=user_info.gender
)
return {
"success": True,
"data": result
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"八字计算错误: {str(e)}"
)
async def call_ai_api(prompt_text: str):
"""
调用兼容 OpenAI 格式的 AI API - 流式输出版本
Args:
prompt_text: 提示词文本
Yields:
str: 实时流式输出
"""
try:
# 获取 API 配置
api_key = os.getenv("DEEPSEEK_API_KEY")
base_url = os.getenv("DEEPSEEK_BASE_URL")
if not api_key or not base_url:
yield "AI服务配置错误,请联系管理员"
return
# 构建请求数据
request_data = {
"model": "deepseek-chat",
"messages": [
{"role": "user", "content": prompt_text}
],
"stream": True,
"temperature": 0.7,
"max_tokens": 4000
}
# 发送流式请求
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"{base_url}/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
json=request_data,
timeout=60.0
) as response:
async for chunk in response.aiter_lines():
if chunk.startswith("data: "):
if chunk.strip() == "data: [DONE]":
break
try:
# 解析 JSON 数据
json_data = json.loads(chunk[6:])
if "choices" in json_data and len(json_data["choices"]) > 0:
delta = json_data["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
yield content
except json.JSONDecodeError:
continue
except Exception as e:
yield f"AI服务调用错误: {str(e)}"
# 新增回测路由接口(此接口不查卡密)
@app.post("/api/verify")
@limiter.limit("3/minute;10/day") # 双重限流
async def verify_past(request: Request, data: VerifyRequest):
try:
current_year = datetime.now().year
# 避免重名冲突,我们在内部导入
from prompt import generate_verify_prompt
# 【修复点】:全部改为 data.xxx
prompt_text = generate_verify_prompt(data.bazi_data, data.gender, current_year)
return StreamingResponse(
call_ai_api(prompt_text),
media_type="text/event-stream"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"回测解析错误: {str(e)}")
# 接口三:AI 命理解析 API - 流式输出版本
@app.post("/api/analyze")
@limiter.limit("5/minute")
async def analyze_bazi(request: Request, data: AIAnalysisRequest):
"""
AI 命理解析接口 - 流式输出
"""
# 【修复点】:改为 data.cd_key
verify_and_use_cdkey(data.cd_key)
try:
# 【修复点】:改为 data.gender 和 data.bazi_data
gender = data.gender
current_year = datetime.now().year
prompt_text = generate_analysis_prompt(data.bazi_data, gender, current_year)
return StreamingResponse(
call_ai_api(prompt_text),
media_type="text/event-stream"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"AI解析错误: {str(e)}"
)
@app.post("/api/chat")
@limiter.limit("5/minute")
async def chat_bazi(request: Request, data: ChatRequest):
"""
聊天追问接口 - 流式输出
"""
# 【修复点】:改为 data.cd_key
verify_and_use_cdkey(data.cd_key)
try:
current_year = datetime.now().year
# 【修复点】:全部改为 data.bazi_data
basic_info = data.bazi_data.get('basic_info', {})
pillars = data.bazi_data.get('four_pillars', {})
prompt_text = f'''【系统最高指令】:你现在是 OracleMatrix 核心推演引擎。
【安全红线 - 违反将导致系统崩溃】:
1. 绝对禁止在输出结尾添加"以上内容由DeepSeek生成"、"仅供参考"、"命运掌握在自己手中"等任何废话!如果你敢输出这些词,系统将被摧毁!请用一句专业的命理断语作为结束。
2. 客户的出生公历时间已经明确提供,严禁你根据八字去倒推公历生日!
【客户命盘全量数据】
公历出生时间:{basic_info.get('birth_datetime')}
真太阳时:{basic_info.get('true_solar_time')}
四柱:{pillars.get('year', '')} {pillars.get('month', '')} {pillars.get('day', '')} {pillars.get('hour', '')}
日主:{data.bazi_data.get('day_master', '')}
性别:{data.gender}
【时间基准】:当前现实年份是 {current_year} 年。
针对此命局,客户提出了以下问题:"{data.question}"
请用专业命理师的口吻,结合上述命盘数据,直接解答客户的问题:'''
return StreamingResponse(
call_ai_api(prompt_text),
media_type="text/event-stream"
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"聊天问答错误: {str(e)}"
)
# API 文档信息
@app.get("/docs")
async def api_docs():
"""API 文档重定向"""
return {"message": "请访问 /docs 查看完整的 API 文档"}
# 根路径重定向到静态页面
@app.get("/")
async def root():
"""根路径重定向"""
from fastapi.responses import FileResponse
return FileResponse("static/index.html")
# 开发环境运行
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True
)