-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdevice_controller.py
More file actions
233 lines (195 loc) · 8.59 KB
/
Copy pathdevice_controller.py
File metadata and controls
233 lines (195 loc) · 8.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
"""
使用 oBIX 协议和 LLM 智能控制设备
"""
import time
import config
from fsm import OccupancyState
from obix_client import ObixClient
from oBIX.common import DataType
from weather_service import fetch_current_weather
from llm_service import LLMControlService
class DeviceController:
"""
通过 oBIX 协议和 LLM 智能控制设备
"""
def __init__(self):
self.obix_client = ObixClient()
self.llm_service = LLMControlService()
# 设备当前状态
self.light_brightness = 0
self.fan_state = 0
self.ac_mode = 0
# 状态管理
self.previous_state = OccupancyState.VACANT
self.last_llm_request_time = 0
self.current_control_params = None
# 上次推理思维
self.last_llm_reasoning = None
def _set_devices(self, light_brightness, fan_state, ac_mode):
"""设置所有设备状态"""
try:
# 设置灯光开关
self.obix_client.write_point(
config.DEVICE_POINTS["light_switch"],
bool(light_brightness > 0),
DataType.bool
)
# 设置灯光亮度
self.obix_client.write_point(
config.DEVICE_POINTS["light_brightness"],
float(light_brightness/10), # 转换为 0-10 范围
DataType.real
)
# 设置风扇状态
self.obix_client.write_point(
config.DEVICE_POINTS["fan_state"],
bool(fan_state),
DataType.bool
)
# 设置空调模式
# self.obix_client.write_point(
# config.DEVICE_POINTS["ac_mode"],
# int(ac_mode),
# DataType.real
# )
# 设置空调开关
self.obix_client.write_point(
config.DEVICE_POINTS["ac_switch"],
bool(ac_mode > 0),
DataType.bool
)
# 设置空调模式
self.obix_client.write_point(
config.DEVICE_POINTS["ac_mode_1"],
bool(ac_mode == 1),
DataType.bool
)
self.obix_client.write_point(
config.DEVICE_POINTS["ac_mode_2"],
bool(ac_mode == 2),
DataType.bool
)
self.obix_client.write_point(
config.DEVICE_POINTS["ac_mode_3"],
bool(ac_mode == 3),
DataType.bool
)
# 更新内部状态
self.light_brightness = light_brightness
self.fan_state = fan_state
self.ac_mode = ac_mode
ac_mode_names = ["关闭", "制热", "制冷", "除湿"]
print(f"设备控制: 灯光={light_brightness}%, 风扇={'开' if fan_state else '关'}, "
f"空调={ac_mode_names[ac_mode]}")
except Exception as e:
print(f"设备控制错误: {e}")
def _turn_off_all_devices(self):
"""关闭所有设备"""
self._set_devices(0, 0, 0)
print("所有设备已关闭")
def _request_llm_control(self, sensors_data):
"""请求LLM控制决策"""
try:
# 获取天气数据
weather_data = fetch_current_weather()
# 获取LLM控制决策
control_output = self.llm_service.get_control_decision(sensors_data, weather_data)
# 更新控制参数
self.current_control_params = control_output
self.last_llm_request_time = time.time()
self.last_llm_reasoning = control_output.reasoning
# 应用控制参数
self._set_devices(
control_output.light_brightness,
control_output.fan_state,
control_output.ac_mode
)
return control_output
except Exception as e:
print(f"LLM控制请求失败: {e}")
return None
def _should_request_llm(self, current_state):
"""判断是否需要请求LLM"""
current_time = time.time()
# 在有人或离开状态,且距离上次请求间隙
if (current_state in (OccupancyState.OCCUPIED, OccupancyState.LEAVING, OccupancyState.ARRIVING) and
current_time - self.last_llm_request_time >= config.LLM_REQUEST_INTERVAL):
return True
return False
def control(self, state: OccupancyState, sensors: dict):
"""根据占用状态控制设备"""
# 导入shared以避免循环导入
import shared
control_state = shared.control_state
# 检查是否刚从手动模式切换到自动模式
mode_switched_to_auto = control_state.check_and_reset_mode_switch()
if mode_switched_to_auto:
print("检测到从手动模式切换到自动模式")
if state == OccupancyState.VACANT:
print("当前状态为VACANT,关闭所有设备...")
self._turn_off_all_devices()
self.current_control_params = None
self.last_llm_reasoning = None
else:
print(f"当前状态为{state.name},请求LLM智能控制...")
self._request_llm_control(sensors)
return
# 如果是手动模式,应用手动覆盖
if control_state.get_mode() == 'manual':
overrides = control_state.get_manual_overrides()
# 使用手动设置的值,如果没有设置则保持当前值
light_brightness = overrides.get('light_brightness', self.light_brightness)
fan_state = overrides.get('fan_state', self.fan_state)
ac_mode = overrides.get('ac_mode', self.ac_mode)
# 如果有任何手动覆盖值,应用它们
if any(v is not None for v in overrides.values()):
self._set_devices(
light_brightness if overrides['light_brightness'] is not None else self.light_brightness,
fan_state if overrides['fan_state'] is not None else self.fan_state,
ac_mode if overrides['ac_mode'] is not None else self.ac_mode
)
return
# 自动模式:原有逻辑
current_time = time.time()
# 检查状态变化
state_changed = self.previous_state != state
# 状态转换逻辑
if state_changed:
print(f"状态变化: {self.previous_state.name} -> {state.name}")
# 无人 -> 来人:请求LLM控制
if (self.previous_state == OccupancyState.VACANT and
state == OccupancyState.ARRIVING):
print("检测到有人到来,请求LLM智能控制...")
self._request_llm_control(sensors)
if (self.previous_state == OccupancyState.VACANT and
state == OccupancyState.OCCUPIED):
print("检测到有人到来,请求LLM智能控制...")
self._request_llm_control(sensors)
# 离开 -> 无人:关闭所有设备
elif (self.previous_state == OccupancyState.LEAVING and
state == OccupancyState.VACANT):
print("检测到所有人员离开,关闭所有设备...")
self._turn_off_all_devices()
self.current_control_params = None
self.last_llm_reasoning = None
elif (self.previous_state == OccupancyState.OCCUPIED and
state == OccupancyState.VACANT):
print("检测到所有人员离开,关闭所有设备...")
self._turn_off_all_devices()
self.current_control_params = None
self.last_llm_reasoning = None
# 在有人/离开状态时,定期请求LLM更新
if self._should_request_llm(state):
print("定期更新LLM控制参数...")
self._request_llm_control(sensors)
# 更新上一次状态
self.previous_state = state
def get_device_status(self):
"""获取设备当前状态"""
return {
"light_brightness": self.light_brightness,
"fan_state": self.fan_state,
"ac_mode": self.ac_mode,
"last_llm_request": self.last_llm_request_time,
"last_llm_reasoning": self.last_llm_reasoning
}