forked from DigiScore/wolff1
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
137 lines (118 loc) · 4.83 KB
/
main.py
File metadata and controls
137 lines (118 loc) · 4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import logging
from time import time, sleep
import art
from pathlib import Path
import config
from modules.conducter import Conducter
from modules.ai_robot_data_writer import AIRobotDataWriter
from nebula.hivemind import DataBorg
from nebula.nebula import Nebula
DATA_LOGGING = config.data_logging
MAIN_PATH = config.path
class Main:
"""
Main kickstarts the RAMI process. If there is datalogging
then it will init the connections.
Essentially, it wil iterate through a series of RAMI experiments, and log
data for each one separately, while maintaining connection the sensors.
This is managed through an input question in main_loop.
"""
def __init__(self):
# Logging for all modules
logging.basicConfig(level=logging.WARNING)
# Build initial dataclass filled with random numbers
self.hivemind = DataBorg()
self.eda = None
###################
# Start Nebula AI
###################
art.tprint("IMPROV2")
answer = input("Click enter when you are ready to go, after STARTING CLOCK")
# Init the AI factory (inherits AIFactory, Listener)
self.nebula = Nebula()
def main_loop(self):
"""
Manage the experiment loop.
"""
# while self.hivemind.MASTER_RUNNING:
repeat = 1
for i in range(4):
# Init Conducter & Gesture management (controls XArm)
self.robot = Conducter()
if i == 0:
print(f"========================================= Running PRACTICE MODE")
else:
print(f"========================================= Running AI. Experiment number {repeat - 1}")
# reset variables
self.hivemind.MASTER_RUNNING = True
self.first_time_through = True
initial_time = time()
while self.hivemind.MASTER_RUNNING:
# is this first time through with a new experiment
# if self.ui.go_flag:
# make new directory for this log e.g. ../data/20240908_123456
if DATA_LOGGING:
if self.first_time_through:
self.master_path = Path(f"{MAIN_PATH}/{self.hivemind.session_date}/IMPROV2_block_{repeat-1}_performance_{i+1}_mode_AI")
self.makenewdir(self.master_path)
else:
self.master_path = None
# run all systems
if self.first_time_through:
self.wolff1_main()
self.first_time_through = False
else:
sleep(1)
if time() - initial_time > config.duration_of_piece:
self.hivemind.running = False
self.robot.terminate()
print(f"========================================= Completed experiment {repeat-1}.")
if i < 3:
answer = input("Next Experiment?")
repeat += 1
self.first_time_through = True
print("TERMINATING experiment mode.")
# end of experiments so close things down
self.hivemind.MASTER_RUNNING = False
self.hivemind.running = False
# close everything like a grown up
self.terminate_all()
print("Close the clock window")
def terminate_all(self):
"""
Terminates all active agents and threads
"""
self.robot.terminate()
self.nebula.terminate_listener()
def wolff1_main(self):
"""
Main script to start a single robot arm digital score work.
Conducter calls the local interpreter for project specific functions. This
communicates directly to the robot libraries.
Nebula kick-starts the AI Factory for generating NNet data and affect
flows.
This script also controls the live mic audio analyser.
Paramaters are to be modified in config.py.
"""
# Init data writer
if DATA_LOGGING:
aidw = AIRobotDataWriter(self.master_path)
# Start Nebula AI Factory after conducter starts data moving
self.nebula.endtime = time() + config.duration_of_piece
self.hivemind.running = True
# self.robot = Conducter()
self.robot.main_loop()
self.nebula.main_loop()
if DATA_LOGGING:
aidw.main_loop()
# bdw.main_loop()
def makenewdir(self, timestamp):
try:
# os.mkdir(timestamp)
timestamp.mkdir(parents=True)
print(f"Created dir {timestamp}")
except OSError:
print(f"OS Make error. Could not make {timestamp}")
if __name__ == "__main__":
go = Main()
go.main_loop()