-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcelery_app.py
More file actions
90 lines (76 loc) · 3.2 KB
/
Copy pathcelery_app.py
File metadata and controls
90 lines (76 loc) · 3.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""Celery 애플리케이션 설정
백그라운드 작업(검색, 크롤링, 분석, 폴링, 텔레그램 수집)을 큐 기반으로 처리하기 위한 Celery 설정을 제공합니다.
큐는 토픽(Topic) 교환을 사용하며, 작업 종류별로 라우팅 키를 분리합니다.
"""
import os
from celery import Celery
from kombu import Queue, Exchange
import tasks
# Broker and backend from environment
BROKER_URL = os.getenv("CELERY_BROKER_URL", os.getenv("RABBITMQ_URL", "amqp://guest:guest@localhost:5672//"))
RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", None) # optional
app = Celery(
"retriever",
broker=BROKER_URL,
backend=RESULT_BACKEND,
include=[
tasks.analyze_module_name,
tasks.crawl_module_name,
tasks.poll_gemini_module_name,
tasks.search_module_name,
tasks.telegram_module_name,
],
)
default_exchange = Exchange("celery", type="topic", durable=True, )
def setup_celery():
"""Celery 전역 설정을 초기화합니다.
구성 요소:
- 큐: search, crawl, analyze, poll, telegram, default
- 라우팅: 작업명별로 라우팅 키를 부여하여 관심 큐로 전달
- beat 스케줄: Gemini 배치 결과 폴링을 60초 간격으로 실행
"""
# Basic config
app.conf.update(
task_default_exchange="celery",
task_default_exchange_type="topic",
task_queues=(
Queue("search", default_exchange, routing_key="search.#", durable=True,),
Queue("crawl", default_exchange, routing_key="crawl.#", durable=True,),
Queue("analyze",default_exchange, routing_key="analyze.#", durable=True,),
Queue("poll", default_exchange, routing_key="poll.#", durable=True,),
Queue("telegram", default_exchange, routing_key="telegram.#", durable=True,),
Queue("default",default_exchange, routing_key="default", durable=True,),
),
task_default_queue="default",
task_time_limit=60 * 10,
worker_prefetch_multiplier=1,
)
# 등록된 task를 .delay() 등으로 직접 호출할 때 발행할 routing key를 설정.
# queue를 추가로 설정함으로써 큐를 자동 선언
app.conf.task_routes = {
# 검색
tasks.names.SEARCH_TASK_NAME: {"queue": "search", "routing_key": "search.start"},
# 크롤링
tasks.names.CRAWL_TASK_NAME: {"queue": "crawl", "routing_key": "crawl.page"},
# gemini로 분석
tasks.names.ANALYSIS_TASK_NAME: {"queue": "analyze", "routing_key": "analyze.gemini.batch"},
# 배치 폴링
tasks.names.POLL_GEMINI_TASK_NAME: {"queue": "poll", "routing_key": "poll.gemini.batch"},
# 텔레그램 채널 및 메시지 수집
tasks.names.TELEGRAM_CHANNEL_TASK_NAME: {"queue": "telegram", "routing_key": "telegram"},
}
# Periodic tasks: poll Gemini batches every minute
app.conf.beat_schedule = {
"poll-gemini-batches": {
"task": tasks.names.POLL_GEMINI_TASK_NAME,
"schedule": 60.0, # every 60 seconds
}
}
setup_celery()
@app.task(bind=True)
def ping():
"""상태 확인을 위한 테스트 태스크.
Returns:
str: "pong"
"""
return "pong"