-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathAutoFlow_2.py
More file actions
341 lines (263 loc) · 15 KB
/
Copy pathAutoFlow_2.py
File metadata and controls
341 lines (263 loc) · 15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
import time
import pandas as pd
from scipy.stats import qmc
import ansys.fluent.core as pyfluent
import os
import subprocess
# import functions
import Function_py_to_FC_spreadsheet
import Function_autoassembly
import Function_py_to_Spaceclaim
import Funciton_automesh
import Function_AutoFlent
import Function_saveData
import Function_BO_3
# ================= initial settings 初始化设置 =================
bounds = [(0.2, 2.0), (0.001, 1.0)] # channel width: 0.2 - 2mm , mass flow rate = 0.01 - 1 kg/s
initial_samples_size = 10
CFD_calcuate_iteration = 250
max_total_iterations = 50 # total iteration - LHS + BO iteration
# saving data in spreadsheet, and assign name for them
raw_data_name = "CFD_Raw_Data_run12_fixiter50.csv" # line 184
proc_data_name ="CFD_Final_Result_run12_fixiter50.csv" # line 184
time_log_name = "Time_Log_Realtime_run12_fixiter50.csv" # line 219
time_history = [] # dictonary for save running time
# --- Launch Fluent (only lauch once, outside of the main loop 只启动一次,放在循环外!) ---
print("🚀 Launching Fluent Solver...")
solver = pyfluent.launch_fluent(mode="solver", precision="double", processor_count=4, ui_mode="no_gui", cleanup_on_exit=False)
# ================= Generate LHS 样本 (计划单) =================
# sampler = qmc.LatinHypercube(d=2, seed=42) # for replicable results
sampler = qmc.LatinHypercube(d=2)
sample = sampler.random(initial_samples_size)
scaled = qmc.scale(sample, [bounds[0][0], bounds[1][0]], [bounds[0][1], bounds[1][1]])
# create LHS pending list "待运行队列" list of tuples: [(w1, f1), (w2, f2)...]
pending_runs = []
for i in range(initial_samples_size):
w = round(scaled[i, 0], 2)
f = round(scaled[i, 1], 4)
pending_runs.append((w, f))
print(f"✅ LHS generated {len(pending_runs)} samples.")
# ================= create data list 结果存储列表 =================
width_result_list = []
flow_result_list = []
entropy_result_list = []
# =============== initial stoppig conditions =====================================
best_entropy_so_far = float('inf') # infinite big value, make sure first sample always win
best_params_so_far = None # store best (width, flow)
no_improve_count = 0 # counter
PATIENCE_LIMIT = 5 # after 5 iteration no improvement, stop entire loop
max_total_iterations = 50 # total iteration - LHS + BO iteration
# ================= start main loop !!!!! 进入自动化大循环 =================
iteration = 0
t_bo = 0
while iteration < max_total_iterations:
current_iter_start_time = time.perf_counter()
print("\n" + "="*60)
print(f"🔄 Iteration {iteration + 1} / {max_total_iterations}")
# --- decide to do initial samples or BO ---
if len(pending_runs) > 0:
# pahse I: if pneding list is not emptyu, do LHS samples 如果待运行队列里还有 LHS 的点,就取出来跑
print("👉 Mode: Initial Sampling (LHS)")
channelwidth, mass_flow = pending_runs.pop(0) # alwasy use first pair in pending list and delet 取出第一个,并在列表中删除
else:
# phase II: if pending list is empty, use BO to explore point to sample 队列空了,说明 LHS 跑完了,开始呼叫 BO
print("👉 Mode: Bayesian Optimization (BO)")
# construct DataFrame
"""
strings in in/out put should be same as df's
"""
df = pd.DataFrame({
"width": width_result_list,
"flowrate": flow_result_list,
"entropy": entropy_result_list
})
input_col = ["width", "flowrate"]
output_col = "entropy"
# 2. run BO and obtain next sample 运行 BO 获取建议点
bo_start = time.perf_counter()
next_sample_dict = Function_BO_3.runBO(df, input_col, output_col, bounds)
t_bo = time.perf_counter() - bo_start
# 3. BO result BO 结果
channelwidth = round(next_sample_dict["width"], 2)
mass_flow = round(next_sample_dict["flowrate"], 4)
print(f"🤖 BO suggests next point: Width={channelwidth}, Flow={mass_flow}")
print(f"BO time: {time.perf_counter() - bo_start:.2f}s")
print(f"🧪 Testing: Width = {channelwidth} mm, Flow Rate = {mass_flow} kg/s")
print("="*60)
# --- CAD -> Mesh -> CFD ---
t_cad = t_assem = t_sc = t_mesh = t_cfd = 0.0
try:
# CAD Modification
t0 = time.perf_counter()
filepath = r"C:\Users\lichengd\OneDrive - Oregon State University\1. PhD\Chiller\automation\Chiller_auto_Python\CAD_files\Coldplate_spreadsheet.FCStd"
msg, pitch_value = Function_py_to_FC_spreadsheet.coldplate_size(filepath, channelwidth)
t_cad = time.perf_counter() - t0
print(f"✅ CAD Modified ({t_cad:.2f}s): Pitch = {pitch_value}")
# Assembly
t0 = time.perf_counter()
CP_path = r"C:\Users\lichengd\OneDrive - Oregon State University\1. PhD\Chiller\automation\Chiller_auto_Python\CAD_files\Export_Coldplate_spreadsheet.step"
lets_path = r"C:\Users\lichengd\OneDrive - Oregon State University\1. PhD\Chiller\automation\Chiller_auto_Python\CAD_files\InOutlet_MaxWidth_step.SLDPRT.STEP"
chips_path = r"C:\Users\lichengd\OneDrive - Oregon State University\1. PhD\Chiller\automation\Chiller_auto_Python\CAD_files\chip1_step.STEP"
save_path = r"C:\Users\lichengd\OneDrive - Oregon State University\1. PhD\Chiller\automation\Chiller_auto_Python\CAD_files\Export_autoAssem_coldplate.step"
result = Function_autoassembly.autoassem_coldplate(CP_path, lets_path, chips_path, output=save_path)
t_assem = time.perf_counter() - t0
print(f"✅ Assembly Completed ({t_assem:.2f}s)")
# SpaceClaim - Volume Extract/share topology
t0 = time.perf_counter()
Function_py_to_Spaceclaim.autoVolumeShare()
t_sc = time.perf_counter() - t0
print(f"✅ Volume Extracted ({t_sc:.2f}s)")
# 👇 make sure Spaceclaim release file 加入这两行:确保 SC 彻底松开文件,防止死锁
subprocess.run(["taskkill", "/F", "/IM", "SpaceClaim.exe", "/T"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
time.sleep(10)
# Meshing
t0 = time.perf_counter()
cad_path = r'C:\Users\lichengd\OneDrive - Oregon State University\1. PhD\Chiller\automation\Chiller_auto_Python\CAD_files\Export_autoVolumeShare_coldplate.scdocx'
mesh_data = None
max_mesh_iteraiton = 4
sleeptime = 10
for attempt in range(max_mesh_iteraiton):
print(f"\n⏳ mesh generation trail: (Attempt {attempt + 1}/{max_mesh_iteraiton})...")
# Generate mesh 生成网格
mesh_data = Funciton_automesh.automesh(cad_path)
if mesh_data is not None:
# 成功了!直接跳出重试循环
break
else:
print(f"⚠️ mesh generation fail at {attempt + 1} th trail。")
subprocess.run(["taskkill", "/F", "/IM", "AnsysPrimeServer.exe", "/T"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if attempt < max_mesh_iteraiton - 1:
print("🔄 wait for 5 sceond for next trail...")
time.sleep(sleeptime)
# All retry complete, shut donw main loop 重试网格构建全失败后的终极判
if mesh_data is None:
print("\n" + "!"*60)
print(f"🚨 after {max_mesh_iteraiton} trails,Mesh generation failed.")
print("Skipping this iteration.")
print(f"💥 Failed parameters -> Width: {channelwidth} mm, Flow Rate: {mass_flow} kg/s")
print("!"*60 + "\n")
break # 跳出当前 iteration,进入下一轮大循环
# ----------- clean the background Primemesh process(just in case it hasn't been shutdone in Functinon_automesh) ----------
print("Force killing any lingering AnsysPrimeServer processes...")
try:
subprocess.run(["taskkill", "/F", "/IM", "AnsysPrimeServer.exe", "/T"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
except Exception:
pass # 如果没有残留进程或由于权限问题报错,忽略即可
# print mesh quality report
print("\n" + "="*40)
print(" MESH GENERATION REPORT ")
print("="*40)
print(f"Time to generate mesh")
print(f" {mesh_data['generation_time']:.2f} seconds")
print(f"1. Skewness Quality:")
print(f" - Maximum Skewness: {mesh_data['skewness_max']:.4f} (Target < 0.9)")
print(f" - Bad Cells (>0.9): {mesh_data['skewness_bad_count']}")
print(f"\n2. Inverse Orthogonal Quality:")
print(f" - Maximum I.O.: {mesh_data['io_max']:.4f} (Target < 0.9)")
print(f" - Bad Cells (>0.9): {mesh_data['io_bad_count']}")
print(f"\n3. Aspect Ratio:")
print(f" - Maximum A.R.: {mesh_data['ar_max']:.2f} (Target < 50)")
print(f" - Bad Cells (>50): {mesh_data['ar_bad_count']}")
print("-" * 40)
print(f"📂 Export Location:\n{mesh_data['file_path']}")
print("=" * 40)
t_mesh = time.perf_counter() - t0
print(f"✅ Mesh Generated ({t_mesh:.2f}s)")
# CFD Simulation
"""
This auto Fluent setting is specific for the 7-channel simple coldplate. Modify Function_AutoFlent if needed
.runFluent(Fluent solver, flow rate, distance of channel center to center, mesh file path, how many iter in Fluent )
"""
t0 = time.perf_counter()
mesh_filename = r'C:\Users\lichengd\OneDrive - Oregon State University\1. PhD\Chiller\automation\Chiller_auto_Python\Fluent_work_dir\Meshsurface_case\Export_Automesh_coldplate_proximity.msh'
CFD_results = Function_AutoFlent.runFluent(solver, mass_flow, pitch_value, mesh_filename, cal_iter=CFD_calcuate_iteration)
t_cfd = time.perf_counter() - t0
print(f"✅ CFD Completed ({t_cfd:.2f}s)")
# ---------------- save CFD results 数据保存与闭环 -------------------
if CFD_results:
# create dictionary 准备数据字典
data_row = {"width": channelwidth, "flowrate": mass_flow}
data_row.update(CFD_results)
# save in spreadsheet 保存到 CSV
proc_data = Function_saveData.save_coldplate_result(data_row, raw_name=raw_data_name, proc_name=proc_data_name)
# [Attention] append current sample to the list for next BO
# the keys should be same as df's
# [关键]将结果存入内存列表,供下一轮 BO 学习
# 注意:这里要确保 key 和上面 df 创建时的名字一致
width_result_list.append(proc_data["width"])
flow_result_list.append(proc_data["flowrate"])
entropy_result_list.append(proc_data["entropy"])
print(f"💾 Data Saved. Current Entropy: {proc_data['entropy']:.4f}")
else:
print("❌ CFD failed to return results.")
# 可以在这里添加重试逻辑,或者填入一个惩罚值
except Exception as e:
print(f"\n❌ Error in iteration {iteration}: {e}")
# =============== Save running time for each stage and total time for each iteration ================
# =============== 记录每个阶段时间 ================
time_used = {"CAD time": t_cad,
"post CAD time": t_sc,
"mesh time":t_mesh,
"CFD time":t_cfd,
"optimization time":t_bo,
"totoal time":t_cad+t_sc+t_mesh+t_cfd+t_bo}
time_history.append(time_used)
# convert dictionary to DataFrame
df_current_step = pd.DataFrame([time_used])
# save as csv (mode='a' -> append model)
csv_filename = time_log_name
if not os.path.isfile(csv_filename):
# if file doesn't exsit, writ a header for it 如果文件不存在,写入表头 (header=True)
df_current_step.to_csv(csv_filename, index=False, mode='w', header=True)
else:
# if already exsit, no header 如果文件已存在,直接追加内容,不写表头 (header=False)
df_current_step.to_csv(csv_filename, index=False, mode='a', header=False)
print(f"⏱️ {iteration}th iteration running time saved")
# ================= stopping condition =================
# stopping condition #2: no improvement in 5 iteration
# obtain current entropy and variables
current_entropy = proc_data["entropy"]
current_params = (proc_data["width"], proc_data["flowrate"])
# --- best so far vs. current result 最强vs目前 ---
if current_entropy < best_entropy_so_far:
# found new best sample
print(f"🌟 new best sample found!! Entropy drops to {best_entropy_so_far:.4f} from {current_entropy:.4f}")
best_entropy_so_far = current_entropy # update current best Entropy
best_params_so_far = current_params # update current best variable
no_improve_count = 0
else:
# no improve and not in initial sampling
if len(entropy_result_list) > initial_samples_size:
no_improve_count += 1
print(f"😐 no improve, keep searching: {no_improve_count}/{PATIENCE_LIMIT}")
# --- cheeck meet stopping condition? 是否触发停止条件 ---
if no_improve_count >= PATIENCE_LIMIT and len(entropy_result_list) > initial_samples_size:
print("\n" + "!"*60)
print(f"🛑 stopping condition met: no improvement in {PATIENCE_LIMIT} iterations ")
print(f"🏆 Final optimal Entropy = {best_entropy_so_far:.4f}")
print(f"📐 correspoding variables: Width = {best_params_so_far[0]}, Flow = {best_params_so_far[1]}")
print("!"*60 + "\n")
# break # 👈 只要这一句 break,整个大循环就停了
# --- Finish current iteration ---
iteration += 1
total_time = time.perf_counter() - current_iter_start_time
print(f"⏱️ Iteration Total Time: {total_time:.2f}s\n")
print(f"length of result list is :{len(width_result_list)}\n")
# ----------- clean the background Primemesh process(just in case it hasn't been shutdone in Functinon_automesh) ----------
print("Force killing any lingering AnsysPrimeServer processes...")
try:
subprocess.run(["taskkill", "/F", "/IM", "AnsysPrimeServer.exe", "/T"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
except Exception:
pass # 如果没有残留进程或由于权限问题报错,忽略即可
# ================= 5. 结束清理 =================
print("🎉 Optimization Loop Finished.")
print("\n" + "!"*60)
print(f"🛑 Heads up: no improvement in {no_improve_count} iterations ")
print(f"🏆 Final optimal Entropy = {best_entropy_so_far:.4f}")
print(f"📐 correspoding variables: Width = {best_params_so_far[0]}, Flow = {best_params_so_far[1]}")
print("!"*60 + "\n")
solver.exit()