-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample_workflow.py
More file actions
185 lines (156 loc) · 5.58 KB
/
example_workflow.py
File metadata and controls
185 lines (156 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python3
"""
Example workflow demonstrating the use of new database helper functions
for the v2 Video Generation API with storyboard approval.
"""
import sys
from pathlib import Path
# Add backend to path
sys.path.insert(0, str(Path(__file__).parent / "backend"))
from database import (
save_generated_video,
update_job_progress,
get_job,
increment_retry_count,
mark_job_failed,
get_jobs_by_status,
approve_storyboard
)
def example_video_generation_workflow():
"""
Demonstrates a typical video generation workflow with progress tracking
and storyboard approval.
"""
print("=" * 70)
print("Example: Video Generation Workflow with Storyboard Approval")
print("=" * 70)
# Step 1: Create initial job
print("\n[STEP 1] Creating video generation job...")
job_id = save_generated_video(
prompt="A serene sunset over a mountain lake",
video_url="", # Will be populated later
model_id="runway-gen3",
parameters={"duration": 10, "aspect_ratio": "16:9"},
status="pending"
)
print(f"Created job ID: {job_id}")
# Step 2: Update progress - Generating storyboard
print("\n[STEP 2] Generating storyboard...")
update_job_progress(job_id, {
"stage": "storyboard_generation",
"percent": 20,
"message": "AI is creating storyboard frames...",
"current_frame": 1,
"total_frames": 4
})
# Step 3: Storyboard ready for approval
print("\n[STEP 3] Storyboard generated, awaiting approval...")
update_job_progress(job_id, {
"stage": "awaiting_approval",
"percent": 40,
"message": "Storyboard ready for review",
"storyboard_frames": 4
})
# Get the job to show current state
job = get_job(job_id)
print(f"Current progress: {job['progress']}")
print(f"Approved: {job['approved']}")
# Step 4: Approve the storyboard
print("\n[STEP 4] User approves storyboard...")
approve_storyboard(job_id)
job = get_job(job_id)
print(f"Approved: {job['approved']}")
print(f"Approved at: {job['approved_at']}")
# Step 5: Start video generation
print("\n[STEP 5] Starting video generation...")
update_job_progress(job_id, {
"stage": "video_generation",
"percent": 60,
"message": "Generating video from approved storyboard..."
})
# Step 6: Video generation complete
print("\n[STEP 6] Video generation complete!")
update_job_progress(job_id, {
"stage": "completed",
"percent": 100,
"message": "Video generation successful"
})
# Get final job state
job = get_job(job_id)
print(f"Final status: {job['status']}")
print(f"Final progress: {job['progress']}")
return job_id
def example_error_handling_workflow():
"""
Demonstrates error handling and retry logic.
"""
print("\n\n" + "=" * 70)
print("Example: Error Handling and Retry Logic")
print("=" * 70)
# Create a job
print("\n[STEP 1] Creating video generation job...")
job_id = save_generated_video(
prompt="A futuristic cityscape at night",
video_url="",
model_id="runway-gen3",
parameters={"duration": 8},
status="processing"
)
print(f"Created job ID: {job_id}")
# Simulate a failure
print("\n[STEP 2] Simulating API failure...")
retry_count = increment_retry_count(job_id)
print(f"Retry count: {retry_count}")
update_job_progress(job_id, {
"stage": "retrying",
"percent": 30,
"message": f"Retrying after failure (attempt {retry_count}/3)"
})
# Another failure
print("\n[STEP 3] Another failure...")
retry_count = increment_retry_count(job_id)
print(f"Retry count: {retry_count}")
# Final failure - give up
print("\n[STEP 4] Max retries exceeded, marking as failed...")
mark_job_failed(job_id, "API timeout after 3 retry attempts")
job = get_job(job_id)
print(f"Final status: {job['status']}")
print(f"Error message: {job['error_message']}")
print(f"Retry count: {job['download_retries']}")
return job_id
def example_job_monitoring():
"""
Demonstrates job monitoring and status queries.
"""
print("\n\n" + "=" * 70)
print("Example: Job Monitoring and Status Queries")
print("=" * 70)
# Get all pending jobs
print("\n[QUERY 1] Fetching pending jobs...")
pending_jobs = get_jobs_by_status("pending", limit=5)
print(f"Found {len(pending_jobs)} pending job(s)")
for job in pending_jobs[:3]: # Show first 3
print(f" - Job {job['id']}: {job['prompt'][:50]}...")
# Get all failed jobs
print("\n[QUERY 2] Fetching failed jobs...")
failed_jobs = get_jobs_by_status("failed", limit=5)
print(f"Found {len(failed_jobs)} failed job(s)")
for job in failed_jobs[:3]: # Show first 3
print(f" - Job {job['id']}: {job['error_message']}")
# Get all completed jobs
print("\n[QUERY 3] Fetching completed jobs...")
completed_jobs = get_jobs_by_status("completed", limit=5)
print(f"Found {len(completed_jobs)} completed job(s)")
if __name__ == "__main__":
print("\n" + "🎬" * 35)
print("Database Helper Functions - Workflow Examples")
print("🎬" * 35)
# Run example workflows
job1 = example_video_generation_workflow()
job2 = example_error_handling_workflow()
example_job_monitoring()
print("\n\n" + "=" * 70)
print("All workflow examples completed successfully!")
print("=" * 70)
print(f"\nCreated example jobs: {job1}, {job2}")
print("\nThese functions are ready for integration into the v2 API!")