forked from DeepAI-Research/Distributask
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.py
More file actions
70 lines (54 loc) · 1.83 KB
/
Copy pathexample.py
File metadata and controls
70 lines (54 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import sys
import subprocess
import time
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), "./"))
from distributaur.monitoring import start_monitoring_server
from distributaur.distributaur import Distributaur
def example_function(arg1, arg2):
return f"Result: {arg1 + arg2}"
distributaur = Distributaur()
distributaur.register_function(example_function)
if __name__ == "__main__":
api_key = distributaur.get_env("VAST_API_KEY")
if not api_key:
raise ValueError("Vast API key not found in configuration.")
job_config = {
"max_price": 0.10,
"max_nodes": 10,
"docker_image": "your-docker-image",
"task_func": "example_function",
"task_params": {"arg1": 1, "arg2": 2},
}
worker_cmd = [
"celery",
"-A",
"example_worker",
"worker",
"--loglevel=info",
"--concurrency=1",
]
worker_process = subprocess.Popen(worker_cmd)
print("Worker process started.")
start_monitoring_server()
print("Monitoring server started. Visit http://localhost:5555 to monitor the job.")
try:
print("Submitting tasks...")
tasks = [
distributaur.execute_function(
job_config["task_func"], job_config["task_params"]
)
]
print("Tasks submitted to queue. Waiting for tasks to complete...")
while not all(task.ready() for task in tasks):
print("Tasks completed: " + str([task.ready() for task in tasks]))
print(
"Tasks remaining: " + str([task for task in tasks if not task.ready()])
)
# sleep for a few seconds
time.sleep(5)
pass
finally:
worker_process.terminate()
worker_process.wait()
print("Worker process terminated.")