-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsetup_database.py
More file actions
140 lines (115 loc) · 3.47 KB
/
setup_database.py
File metadata and controls
140 lines (115 loc) · 3.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python3
"""
Database setup script for videogen.
Creates SQLite database for WorkingBlock storage using the latest schema.
"""
import sys
from pathlib import Path
from datetime import datetime
# Add project root to PYTHONPATH
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from videogen.dao.working_block_dao import WorkingBlockDAO
from videogen.pipeline.working_block import WorkingBlock, WorkingBlockStatus
REQUIRED_COLUMNS = [
"id",
"project_name",
"method_name",
"status",
"retries",
"prev_ids",
"output_path",
"accumulated_duration_sec",
"block_id",
"action_index",
"config_json",
"result_json",
"priority",
"last_scheduled_at",
"create_time",
"modify_time"
]
def setup_database():
print("Initializing SQLite database for WorkingBlock storage...\n")
# ensure db directory exists
db_dir = Path("db")
db_dir.mkdir(exist_ok=True)
dao = WorkingBlockDAO()
print(f"Database file: {dao.db_path}")
# ---------------------------------------------------------
# Validate schema (ensure all required columns exist)
# ---------------------------------------------------------
import sqlite3
conn = sqlite3.connect(dao.db_path)
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(working_blocks)")
cols = [row[1] for row in cursor.fetchall()]
conn.close()
print("\nChecking schema...")
missing = [c for c in REQUIRED_COLUMNS if c not in cols]
if missing:
print(f"⚠️ WARNING: Missing columns detected: {missing}")
print("DAO will auto-migrate these fields on next startup.")
else:
print("Schema validation OK — all required columns present.")
print("\nRunning CRUD tests...")
now = datetime.utcnow().isoformat(timespec="seconds") + "Z"
test_block = WorkingBlock(
id="test_working_id",
project_name="test_project",
method_name="remotion_picture",
status=WorkingBlockStatus.PENDING,
retries=0,
prev_ids=["upstream_block"],
output_path="./output/test.mp4",
accumulated_duration_sec=1.5,
block_id="L1",
action_index=0,
config_json='{"prompt": "hello"}',
result_json='{"status": "pending"}',
# 🚀 new scheduling fields
priority=5,
last_scheduled_at=123456.0,
create_time=now,
modify_time=now,
)
# clean before test
dao.delete(test_block.id)
# CREATE
if dao.insert(test_block):
print("CREATE OK")
else:
print("❌ CREATE FAILED")
return False
# READ
rb = dao.get_by_id(test_block.id)
if rb and rb.priority == 5:
print("READ OK")
else:
print("❌ READ FAILED")
return False
# UPDATE
test_block.status = WorkingBlockStatus.SUCCESS
test_block.result_json = '{"status":"done"}'
test_block.priority = 1
if dao.update(test_block):
print("UPDATE OK")
else:
print("❌ UPDATE FAILED")
return False
# DELETE
if dao.delete(test_block.id):
print("DELETE OK")
else:
print("❌ DELETE FAILED")
return False
print("\nAll CRUD tests passed successfully!")
return True
if __name__ == "__main__":
success = setup_database()
if success:
print("\n🎉 Database setup completed successfully.")
sys.exit(0)
else:
print("\n❌ Database setup encountered errors.")
sys.exit(1)