Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion backend/coreAdmin/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import os
import urllib.parse
from pathlib import Path

import colorlog
import environ
import sys
from kombu import Exchange, Queue

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
Expand Down Expand Up @@ -221,6 +222,7 @@
"skybot",
"des",
"newsletter",
"predict_occultation",
]

MIDDLEWARE = [
Expand Down Expand Up @@ -502,6 +504,17 @@
"disable_existing_loggers": False,
"formatters": {
"standard": {"format": "%(asctime)s [%(levelname)s] %(message)s"},
'colored': {
'()': colorlog.ColoredFormatter,
'format': '%(log_color)s[%(levelname)s] %(message)s',
'log_colors': {
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'bold_red,bg_white',
},
}
},
"handlers": {
"file": {
Expand All @@ -511,6 +524,12 @@
"maxBytes": 1024 * 1024 * 5, # 5 MB
"backupCount": 5,
},
'console': {
'level': LOGGING_LEVEL,
'class': 'logging.StreamHandler',
'formatter': 'colored', # Use the colored formatter
'stream': sys.stdout, # Direct to stdout
},
# "proccess": {
# "level": LOGGING_LEVEL,
# "class": "logging.handlers.RotatingFileHandler",
Expand Down Expand Up @@ -601,6 +620,32 @@
"filename": os.path.join(LOG_DIR, "asteroid_cache.log"),
"formatter": "standard",
},
# ------------------------------------------
# Prediction Occultation Workers
"predict_occ_prepare_worker": {
"level": LOGGING_LEVEL,
"class": "logging.handlers.RotatingFileHandler",
"maxBytes": 1024 * 1024 * 5, # 5 MB
"backupCount": 5,
"filename": os.path.join(LOG_DIR, "predict_occ_prepare_worker.log"),
"formatter": "standard",
},
"predict_occ_submit_worker": {
"level": LOGGING_LEVEL,
"class": "logging.handlers.RotatingFileHandler",
"maxBytes": 1024 * 1024 * 5, # 5 MB
"backupCount": 5,
"filename": os.path.join(LOG_DIR, "predict_occ_submit_worker.log"),
"formatter": "standard",
},
"predict_occ_ingest_worker": {
"level": LOGGING_LEVEL,
"class": "logging.handlers.RotatingFileHandler",
"maxBytes": 1024 * 1024 * 5, # 5 MB
"backupCount": 5,
"filename": os.path.join(LOG_DIR, "predict_occ_ingest_worker.log"),
"formatter": "standard",
},
},
"loggers": {
"django": {
Expand Down Expand Up @@ -663,5 +708,22 @@
"level": LOGGING_LEVEL,
"propagate": False,
},
# ------------------------------------------
# Prediction Occultation Workers
"predict_occ_prepare_worker": {
"handlers": ["predict_occ_prepare_worker", "console"],
"level": LOGGING_LEVEL,
"propagate": False,
},
"predict_occ_submit_worker": {
"handlers": ["predict_occ_submit_worker", "console"],
"level": LOGGING_LEVEL,
"propagate": False,
},
"predict_occ_ingest_worker": {
"handlers": ["predict_occ_ingest_worker", "console"],
"level": LOGGING_LEVEL,
"propagate": False,
},
},
}
8 changes: 8 additions & 0 deletions backend/coreAdmin/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
UserViewSet,
)

from predict_occultation.api.views import (
PredictionTaskViewSet,
WorkersHeartbeatViewSet,
)

if settings.DEBUG:
router = DefaultRouter()
else:
Expand Down Expand Up @@ -97,6 +102,9 @@
router.register(r"submission", SubmissionViewSet, basename="submission")


router.register(r"predict_occultation/task", PredictionTaskViewSet, basename="predict_occultation_task")
router.register(r"predict_occultation/worker", WorkersHeartbeatViewSet, basename="predict_occultation_worker")

urlpatterns = [
path("admin/", admin.site.urls),
re_path(r"^api/", include(router.urls)),
Expand Down
116 changes: 116 additions & 0 deletions backend/predict_occultation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
## Como usar

1. Criar algumas tasks:

```bash
python manage.py enqueue_predictions --count 3
```

2. Rodar os workers em terminais diferentes:

```bash
python manage.py prepare_worker
python manage.py submit_worker
python manage.py ingest_worker
```

Ciclo normal (sucesso)

`PENDING → PREPARING → READY_FOR_RUN → SUBMITTING → QUEUED → RUNNING → WAITING_RESULTS → INGESTING → DONE`

FAILED é só transição temporária → reentra em PENDING se ainda pode tentar.

STALLED é falha terminal → só ação manual (ex: reset).

ABORTED pode ocorrer em qualquer etapa → terminal.

O retry/backoff está ligado ao FAILED → PENDING.


## Fluxo de retry automático

- Se falhar em qualquer etapa:

- Vai para FAILED.

- Se attempt_count < max_retries, a task retorna para PENDING após delay com exponential backoff e incrementa as tentativas.

- Se attempt_count >= max_retries, vai para STALLED.

- Retry manual via admin ou retry_prediction zera contador e força reprocessar.

- exponential backoff
1ª falha → next_retry_at = agora + 1 min

2ª falha → next_retry_at = agora + 2 min

3ª falha → next_retry_at = agora + 4 min

4ª falha → STALLED definitivo

## Abort

Uma task em qualquer estado (exceto DONE, STALLED) pode ir para ABORTED.

Abort força término imediato → nunca retorna.


## TODO: Soft kill
tratar o que acontece quando uma task é interrompida, no momento ela acaba ficando no status intermediario sem avançar para proxima etapa e sem considerar como falha, deveria mudar o status para falha e reinicar ou mudar para o status inicial.

## Diagrama em Mermaid

```mermaid
stateDiagram-v2
[*] --> PENDING

PENDING --> PREPARING: start
PREPARING --> READY_FOR_RUN: success
READY_FOR_RUN --> SUBMITTING: start
SUBMITTING --> QUEUED: success
QUEUED --> RUNNING: start
RUNNING --> WAITING_RESULTS: finished
WAITING_RESULTS --> INGESTING: start
INGESTING --> DONE: success

%% Falhas
PREPARING --> FAILED: error
SUBMITTING --> FAILED: error
QUEUED --> FAILED: error
RUNNING --> FAILED: error
WAITING_RESULTS --> FAILED: error
INGESTING --> FAILED: error

FAILED --> PENDING: retry (backoff, attempts < max)
FAILED --> STALLED: attempts >= max

%% Abort pode ocorrer em quase todos
PENDING --> ABORTED: abort
PREPARING --> ABORTED: abort
READY_FOR_RUN --> ABORTED: abort
SUBMITTING --> ABORTED: abort
QUEUED --> ABORTED: abort
RUNNING --> ABORTED: abort
WAITING_RESULTS --> ABORTED: abort
INGESTING --> ABORTED: abort

DONE --> [*]
STALLED --> [*]
ABORTED --> [*]

```

## State Machine

- PENDING = Tarefa recem criada aguardando para ser iniciada.
- PREPARING = Primeira etapa do pipeline foi iniciada, criação de diretório e inputs.
- READY_FOR_RUN = está pronta para segunda etapa, que é a submissão ao slurm.
- SUBMITTING = Inicio da segunda etapa, a tarefa sera enviada para a api do slurm.
- QUEUED = Task foi inserida na fila do slurm.
- RUNNING = Uma das cpus do cluster iniciou a tarefa.
- WAITING_RESULTS = Tarefa executada no cluster finalizou. e os resultados estão prontos para serem ingeridos.
- INGESTING = Inicio da terceita etapa, registro dos resultados no banco de dados.
- DONE = Indica que a tarefa foi finalizada com sucesso.
- FAILED = Indica que a tarefa falhou em qualquer uma das etapas, mas que sera executada novemente pelo retry. ( A taks não vai ter o status FAILED explicito, mas sim o status PENDING + next_retry != None + attempt_cout > 0)
- STALLED = Indica que a tarefa esgotou os retries e continuou com falha, e não será mais executada.
- ABORTED = Indica que tarefa foi cancelada pelo usuario e não será executada independente da etapa que ela estiver.
Empty file.
38 changes: 38 additions & 0 deletions backend/predict_occultation/admin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from django.contrib import admin
from .models import PredictionTask, PredictionState, WorkersHeartbeat


@admin.register(PredictionTask)
class PredictionTaskAdmin(admin.ModelAdmin):
list_display = ("id", "asteroid_id", "state", "priority", "attempt_count", "max_retries", "workdir", "created_at", "updated_at", "aborted", "debug")
list_filter = ("state", "priority", "aborted")
search_fields = ("asteroid_id",)
# ordering = ("-priority", "-created_at")
ordering = ("-created_at",)
actions = ["abort_tasks", "retry_tasks"]

def abort_tasks(self, request, queryset):
count = 0
for task in queryset:
if task.state not in [PredictionState.DONE, PredictionState.ABORTED]:
task.mark_aborted()
count += 1
self.message_user(request, f"{count} tasks abortadas.")
abort_tasks.short_description = "Abortar tasks selecionadas"

def retry_tasks(self, request, queryset):
count = 0
for task in queryset:
if task.state in [PredictionState.FAILED, PredictionState.ABORTED]:
task.retry()
count += 1
self.message_user(request, f"{count} tasks re-enfileiradas.")
retry_tasks.short_description = "Retry nas tasks selecionadas"


@admin.register(WorkersHeartbeat)
class WorkersHeartbeatAdmin(admin.ModelAdmin):
list_display = ("id", "worker", "started_at", "updated_at", "uptime")
list_filter = ("worker",)
search_fields = ("worker",)

Empty file.
27 changes: 27 additions & 0 deletions backend/predict_occultation/api/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from rest_framework import serializers
from predict_occultation.models import PredictionTask
from predict_occultation.models import WorkersHeartbeat


class PredictionTaskSerializer(serializers.ModelSerializer):
class Meta:
model = PredictionTask
fields = '__all__'


class PredictionTaskDetailSerializer(serializers.ModelSerializer):

class Meta:
model = PredictionTask
fields = '__all__'


class WorkersHeartbeatSerializer(serializers.ModelSerializer):
status = serializers.SerializerMethodField()
class Meta:
model = WorkersHeartbeat
fields = '__all__'

def get_status(self, obj):
return obj.status()

35 changes: 35 additions & 0 deletions backend/predict_occultation/api/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

from rest_framework.viewsets import ModelViewSet
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework import status
from django.db.models import Count
from predict_occultation.models import PredictionTask
from predict_occultation.models import WorkersHeartbeat

from predict_occultation.api.serializers import PredictionTaskSerializer
from predict_occultation.api.serializers import PredictionTaskDetailSerializer
from predict_occultation.api.serializers import WorkersHeartbeatSerializer

class PredictionTaskViewSet(ModelViewSet):
serializer_class = PredictionTaskSerializer
queryset = PredictionTask.objects.all()
filterset_fields = ["id", "asteroid_id", "state", "slurm_job_id", "aborted"]
search_fields = ["id", "asteroid_id",]
ordering_fields = ["created_at", "updated_at", "priority", "state"]
ordering = ["-created_at", "priority"]

def get_serializer_class(self):
if self.action == 'retrieve':
return PredictionTaskDetailSerializer
return PredictionTaskSerializer

@action(detail=False, methods=["get"])
def group_by_state(self, request):
queryset = self.get_queryset()
grouped_data = queryset.values('state').annotate(count=Count('state')).order_by('state')
return Response(grouped_data, status=status.HTTP_200_OK)

class WorkersHeartbeatViewSet(ModelViewSet):
serializer_class = WorkersHeartbeatSerializer
queryset = WorkersHeartbeat.objects.all()
7 changes: 7 additions & 0 deletions backend/predict_occultation/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from django.apps import AppConfig


class PredictOccultationConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'predict_occultation'
verbose_name = "Predict Occultation"
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from django.core.management.base import BaseCommand
from predict_occultation.models import PredictionTask, PredictionState


class Command(BaseCommand):
help = "Aborta tasks em andamento ou na fila"

def add_arguments(self, parser):
# TODO: Ter uma opção para abortar todas as tasks em andamento?
parser.add_argument("asteroid_id", nargs="+", help="ID(s) dos asteroids para abortar")

def handle(self, *args, **options):
asteroid_ids = options["asteroid_id"]
tasks = PredictionTask.objects.filter(asteroid_id__in=asteroid_ids)
for task in tasks:
if task.state not in [PredictionState.DONE, PredictionState.ABORTED]:
task.mark_aborted()
self.stdout.write(self.style.WARNING(f"Task {task.asteroid_id} abortada"))
else:
self.stdout.write(f"Task {task.asteroid_id} já finalizada")
Loading
Loading