Language: 한국어 | English
uniflow.py 의 Python다운 투어. 각 챕터는 작은 실행 가능한 프로그램이다. Python 포트는 C++ 멘탈모델을 유지하고 공개 이름까지 그대로 미러링하되(Task-Level Syntax) 기교(매크로/템플릿)는 버린다. 모듈 은 uniflow.Uniflow 를 상속해 하나 이상의 task 를 소유하고, task 는 uniflow.Task 를 상속해 자기 step 메서드를 소유한다. step 은 의도를 반환하는 메서드다 - self.Next(...) / self.Stay() / self.Done() / self.Fail().
API 레퍼런스: PYTHON_PORT.md. 더 깊은 개념(단일 펌프 모델, observer 훅, 크로스-런타임)은 C++ 튜토리얼 ../cpp/TUTORIAL.md 가 같은 아이디어를 더 자세히 다룬다.
모든 예제는 uniflow.py 가 import 가능하다고 가정한다(표준 라이브러리만 쓰는 단일 파일):
import uniflow예제. 여기서 언급하는 여섯 개의 예제는 python/examples 에 있다 -
simulator.py,shared_ostream.py,message_dispatch.py,pick_and_place.py,queue_drain.py,city_traffic.py. ../cpp/examples 의 C++ 세트, ../cs/examples 의 C# 세트와 짝을 이루므로 같은 프로그램이 세 언어에서 똑같이 읽힌다.
단위. Python 포트의 시간 단위는 초 다(C++ 포트는 밀리초).
rt.clock.Now(),UFTimer,StayTimeout/StayUntil마감이 모두 초 단위다.
가장 작은 단위는 uniflow.Uniflow 를 상속하고 task 하나를 소유하는 모듈이다. task 는 uniflow.Task 를 상속해 Entry() 로 첫 step 을 지정하고, step 은 Done() 을 반환한다.
import uniflow
class Flow_Hello(uniflow.Uniflow):
def __init__(self, rt):
super().__init__(rt, name="Flow_Hello")
self.ctx = self.Task_Hello() # 모듈의 단일 task
self.AddTask(self.ctx) # flow() 역참조 연결; 아무것도 시작 안 함
class Task_Hello(uniflow.Task):
def Entry(self): # 첫 step 지정
return self.Step1_Greet()
def Step1_Greet(self):
print("hello from a step")
return self.Done()
rt = uniflow.Runtime()
h = Flow_Hello(rt)
h.ctx.StartFlow() # task 런칭; 첫 step 은 다음 라운드에 실행
rt.WaitUntilIdle() # 모든 모듈이 idle 될 때까지 블록
rt.stop()uniflow.Runtime()이 펌프 스레드 1개를 띄움.Flow_Hello(rt)가 모듈을 attach; 펌프가 매 라운드 방문.AddTask(self.ctx)가 task 를 바인딩해서 그 step 들이self.flow()로 모듈에 닿게 함.h.ctx.StartFlow()가 task 를 런칭;Entry()가 다음 라운드에 실행. 반환값은StartResult.Ok, 이미 task 가 돌고 있으면StartResult.Busy.self.Done()반환 시 모듈은 idle 로 복귀.
step 본문에서 블로킹
time.sleep(...)은 금지한다 - 펌프 전체가 멈춘다.Stay()(챕터 3) 또는SubmitAsync(챕터 5)를 사용한다.
실제 task 는 self.Next(self.다음step) 으로 N개를 잇는다. step 들은 같은 task 의 형제 메서드이고, 위에서 아래로 읽힌다.
import uniflow
class Flow_Greet(uniflow.Uniflow):
def __init__(self, rt):
super().__init__(rt, name="Flow_Greet")
self.ctx = self.Task_Greet()
self.AddTask(self.ctx)
class Task_Greet(uniflow.Task):
def Entry(self):
return self.Step1_Hi()
def Step1_Hi(self):
print("1. hi there")
return self.Next(self.Step2_Nice)
def Step2_Nice(self):
print("2. nice to see you")
return self.Next(self.Step3_Bye)
def Step3_Bye(self):
print("3. see you again")
return self.Done()
rt = uniflow.Runtime()
g = Flow_Greet(rt)
g.ctx.StartFlow()
rt.WaitUntilIdle(); rt.stop()각 Next 는 다음 step 을 다음 라운드 에 예약한다. Next 는 현재 task 를 벗어나지 않고 형제 step 으로만 진행한다. step 경계가 async 결과를 끼우는 자리다(챕터 5).
인자도 넘길 수 있다: self.Next(self.Step2_Wait, job_id) 는 다음 라운드에 Step2_Wait(self, job_id) 를 호출한다 - 제출 step 에서 폴링 step 으로 AsyncId 를 넘기는 정석이다.
self.Stay() 를 반환하면 다음 라운드에 같은 step 을 다시 실행한다 - 플래그 폴링이나 다른 모듈 대기용. 펌프는 모두-Stay 라운드 사이에 stay_sleep_sec(기본 20ms) 쉰다.
step 안에서는 sleep 을 할 수 없으므로 시간 은 폴링하는 타이머로 표현한다. 모든 모듈은 (런타임 시계에 바인딩되어) 매 step 전환마다 재무장되는 내장 self.uf_timer 를 가진다 - Next, StayTimeout / StayUntil 타임아웃, task 전환, flow 시작에서 재무장하되 Stay 에서는 하지 않으므로 한 step 안에서 폴링하는 동안에는 계속 누적된다. 대기 step 에선 self.flow() 로 모듈에서 읽는다. 직접 선언한 멤버 타이머도 같은 방식으로 auto-reset 하려면 self.NewAutoTimer() 로 만들고, 셀프로 리셋할 타이머는 평범한 UFTimer 를 소유해 OnEnter() 에서 무장한다:
import uniflow
def hardware_ready():
... # 센서/플래그 읽기
class Flow_WaitReady(uniflow.Uniflow):
def __init__(self, rt):
super().__init__(rt, name="Flow_WaitReady")
self.ctx = self.Task_Wait()
self.AddTask(self.ctx)
class Task_Wait(uniflow.Task):
def OnEnter(self):
# 진입 시 per-task 상태 재무장; rt.clock(배속/정지) 에 바인딩
self.settle = uniflow.UFTimer(self.flow()._rt.clock)
def Entry(self):
return self.Step1_Wait()
def Step1_Wait(self):
# HeldFor: 조건이 0.05s 동안 연속으로 참이면 True (settling/디바운스).
# 한 번이라도 false면 누적이 리셋된다.
if self.settle.HeldFor(hardware_ready, 0.05):
return self.Next(self.Step2_Go)
return self.Stay()
def Step2_Go(self):
return self.Done()timer.Passed(d)- 타이머 무장 후d초가 지났나?timer.HeldFor(cond, d)-cond가d동안 연속으로 참이었나? (한 번이라도 false면 리셋)timer.Elapsed()- 원시 경과 초. 페이싱/진행률에.
위의 settle 대기 패턴(조건 폴링, d 동안 유지, 아니면 포기)은 흔해서 StayUntil 이 한 호출로 접어준다: 대기 조건 + settle 윈도우 + 타임아웃 catch. 조건을 매 라운드 폴링해 settle_sec 동안 유지되면 success 로 가고, 그 전에 timeout_sec 가 지나면 타임아웃 타깃으로 간다. 인자 순서는 condition, settle_sec, success, timeout_sec, timeout_step:
def Step1_Wait(self):
# hardware_ready 대기, 0.05s 유지로 정착; 5s 후 포기
return self.StayUntil(hardware_ready, 0.05, self.Step2_Go,
5.0, self.Step_Timeout)(성공 경로를 본문이 직접 정하는 평범한 타임아웃 탈출은 StayTimeout(timeout_sec, timeout_step) 을 쓴다 - 챕터 6 참고.)
OnEnter() 는 task 가 진입될 때마다 첫 step 직전 1회 실행된다 - per-task 상태 재무장 자리. Entry() 는 override 해서 첫 step 을 지정한다.
여러 모듈을 같은 Runtime 에 attach 하면 전부 같은 펌프 스레드에서 라운드로빈으로 돈다. 모듈 간 공유 상태에 락이 필요 없다 - 한 스레드이기 때문이다. shared_ostream.py 예제의 축소판이다.
import uniflow
class SharedState: # sink 하나 + turn 플래그, 펌프 스레드만 건드림
log = []
turn = 0
class Flow_Writer(uniflow.Uniflow):
def __init__(self, rt, text, count, turn_id):
super().__init__(rt, name="Flow_Writer")
self.text = text
self.remaining = count
self.turn_id = turn_id # 0 또는 1; 내 차례에만 쓴다
self.ctx = self.Task_Write()
self.AddTask(self.ctx)
class Task_Write(uniflow.Task):
def Entry(self):
return self.Step1_Loop()
def Step1_Loop(self):
f = self.flow()
if f.remaining <= 0:
return self.Done()
if SharedState.turn != f.turn_id:
return self.Stay() # 내 차례 아님 - 다음 라운드 재폴링
SharedState.log.append(f.text) # 공유 sink, 락 없음
SharedState.turn = 1 - SharedState.turn
f.remaining -= 1
return self.Stay()
rt = uniflow.Runtime(observer=uniflow.Observer()) # 조용한 observer
hello = Flow_Writer(rt, "Hello ", 3, 0)
world = Flow_Writer(rt, "World. ", 3, 1)
hello.ctx.StartFlow(); world.ctx.StartFlow()
hello.WaitUntilIdle(); world.WaitUntilIdle(); rt.stop()
print("".join(SharedState.log)) # Hello World. Hello World. Hello World.SharedState 를 두 모듈에서 락 없이 만질 수 있는 것은 둘 다 그 하나의 펌프 스레드에서 돌기 때문이다. self.flow() 는 task step 안에서 소유 모듈에 닿는다.
observer=uniflow.Observer()를 넘기면 조용한 observer 가 설치된다(베이스 클래스가 no-op). 기본값은 모든 이벤트를 찍는ConsoleObserver로, 배우는 동안엔 유용하지만 프로그램이 stdout 을 소유하면 끈다. 챕터 7 참고.
느린 함수를 step 에서 직접 부르면 펌프가 멈춘다. self.SubmitAsync(...) 로 스레드풀에 넘기면 AsyncId 를 즉시 돌려받는다. 그 id 를 뒤 step 으로 넘겨 self.AsyncResult(id) 로 폴링 한다. 펌프는 절대 막히지 않는다.
import time
import uniflow
def slow_square(n):
time.sleep(0.5) # 펌프가 아니라 풀 스레드에서 실행
return n * n
class Flow_Worker(uniflow.Uniflow):
def __init__(self, rt):
super().__init__(rt, name="Flow_Worker")
self.ctx = self.Task_Work()
self.AddTask(self.ctx)
class Task_Work(uniflow.Task):
def Entry(self):
return self.Step1_Submit()
def Step1_Submit(self):
print("느린 잡 제출 (펌프는 안 막힘)")
# (fn, label, timeout_sec, *args); timeout_sec=None 이면 타임아웃 없음.
# fn 은 모듈 레벨/정적 함수 - task 에 접근하지 못한다.
aid = self.SubmitAsync(slow_square, "slow_square", None, 9)
if aid == 0:
return self.Fail(reason="rejected: in-flight cap reached")
return self.Next(self.Step2_Wait, aid) # id 를 앞으로 넘김
def Step2_Wait(self, aid):
r = self.AsyncResult(aid) # AsyncOutcome 스냅샷
if r.pending():
return self.Stay() # 아직 진행 중 - 재폴링
if not r.ok():
return self.Fail(reason="job failed or timed out")
print("결과:", r.return_value)
return self.Done()잡이 도는 동안 펌프는 다른 모듈을 계속 돌린다. 잡이 끝나면 펌프를 깨우므로, 다음 Step2_Wait 폴링이 stay_sleep_sec 만큼 기다리지 않고 결과를 바로 캐치한다.
AsyncResult(id) 는 .state 와 다음 술어를 가진 AsyncOutcome 을 반환한다: .ok()(Done - .return_value 유효), .pending()(진행 중), .failed()(워커가 예외), .is_timeout()(마감 초과), .found()(id 가 살아있는 슬롯에 매칭). 잘못된/지워진 id(0 포함)는 NotFound 로 읽힌다. 모듈은 self.AnyAsyncPending() 과 self.ClearAsync()(모든 진행 중 워커 폐기)도 제공한다.
잡에 마감을 주려면 timeout_sec(실제 초, 3번째 인자)을 넘긴다: self.SubmitAsync(fn, "label", 2.0, *args). 마감 후 결과는 is_timeout() 으로 읽히고 워커는 폐기된다(버려진 결과로 계속 돌긴 함).
GIL 주의: I/O 바운드(네트워크/디스크)는 블로킹 중 GIL 을 풀어 실제 동시성을 얻는다. CPU 바운드는 GIL 때문에 병렬화되지 않지만, offload 하면 펌프는 계속 반응한다.
SubmitAsync 는 잡 이 늦어진 경우를 처리한다. 한편 잡을 기다리는 게 아닌 경우도 있다 - 하드웨어에 명령을 내려놓고 "완료" 플래그나 센서를 Stay() 로 폴링하는 경우로, 신호가 끝내 오지 않을 수 있다. 축이 끼이거나 밸브가 멈추면 맨 Stay() 루프는 무한히 폴링하고, 라인은 에러도 없이 멈춰 선다. 실제 장비에선 심각한 고장 양상이다.
self.StayTimeout(timeout_sec, on_timeout) 은 마감이 달린 Stay() 다: 이 step 을 계속 폴링하되, step 에 진입한 시점 부터 논리 시간 으로 timeout_sec 이 지나면 on_timeout 으로 빠진다. 그 step 이 catch 역할을 한다 - 정해진 복구 경로로의 보장된 탈출구다. 성공 경로는 여전히 본문이 소유한다(본문이 직접 Next/Done 반환). 대기 조건과 settle 윈도우까지 한 호출로 접으려면 챕터 3의 StayUntil 을 쓴다.
전체 패턴 - 명령, 마감 걸린 대기, 복구:
import uniflow
class Flow_Move(uniflow.Uniflow):
def __init__(self, rt, axis, target_mm):
super().__init__(rt, name="Flow_Move")
self.axis = axis
self.target = target_mm
self.tries = 0
self.ctx = self.Task_Move()
self.AddTask(self.ctx)
class Task_Move(uniflow.Task):
def Entry(self):
return self.Step1_Command()
def Step1_Command(self):
f = self.flow()
f.axis.move_to(f.target) # 명령 발사 (논블로킹)
return self.Next(self.Step2_WaitInPos)
def Step2_WaitInPos(self):
if self.flow().axis.in_position(): # 정상 경로
return self.Next(self.Step3_Clamp)
# 아직 이동 중 - 폴링하되 2s 넘게 멈춰있으면 포기
return self.StayTimeout(2.0, self.Step_Stalled)
# 대기 step 진입 후 2s 안에 in_position 이 끝내 true 가 안 된 경우에만 도달.
# 흐름은 멈춰 설 수 없다 - 항상 정의된 어딘가로 도착한다.
def Step_Stalled(self):
self.flow().axis.abort()
print("axis stalled before reaching target")
return self.Fail(reason="stalled")
def Step3_Clamp(self):
return self.Done()StayTimeout 없이 Step2_WaitInPos 가 맨 Stay() 를 반환했다면, 누군가 라인이 죽은 걸 알아챌 때까지 무한히 돈다. 이걸 쓰면 이동이 안 끝날 경우 Step_Stalled 에 반드시 도달한다.
복구 step 도 하나의 step 이므로 어디로든 라우팅할 수 있다. 흔한 형태가 재시도 후 포기 다:
def Step2_WaitInPos(self):
if self.flow().axis.in_position():
return self.Next(self.Step3_Clamp)
return self.StayTimeout(2.0, self.Step_Retry)
def Step_Retry(self):
f = self.flow()
f.tries += 1
if f.tries >= 3:
print("axis failed after 3 tries")
return self.Fail(reason="gave up")
f.axis.abort()
return self.Next(self.Step1_Command) # 재발행 -> 대기 step 재진입,
# 2s 창이 다시 시작됨Step1_Command -> Step2_WaitInPos 재진입은 새 step 진입이므로, 시도마다 2s 마감이 새로 시작된다 - 수동 타이머 관리가 필요 없다. 마감은 step 진입 기준이라 반복되는 Stay 틱이 시계를 뒤로 밀지 않는다.
가상 시계. 타이머와
StayTimeout/StayUntil마감은rt.clock위에서 돈다 - 배속/정지 가능한 시계다.rt.clock.SetScale(10)은 전체 흐름을 10배속 재생하고,rt.clock.Freeze()/.Resume()은 모든 논리 타임아웃을 정지한다(예: e-stop 중 2s 타임아웃이 멈춤 동안 안 터지게). async/IO 데드라인은 실제 시간을 유지한다.simulator.py예제가 이 전부를 키보드로 실시간 구동한다.
모든 이벤트(flow/step/async/예외/종료)는 Observer 로 흐른다. 기본 ConsoleObserver 는 stdout 에 찍는다. 조용히 하거나 커스텀 로깅하려면 상속해서 넘긴다:
import uniflow
class Flow_Observed(uniflow.Observer): # 빈 Observer -> 출력 없음 (조용)
pass
class MyObserver(uniflow.Observer):
def OnStepChanged(self, obj, prev_step, next_step, description,
step_ordinal, elapsed_ms, ticks):
print(f"{obj}: {prev_step} -> {next_step} {description} ({elapsed_ms:.1f}ms)")
def OnFlowEnded(self, obj, terminal_action, final_step_ordinal, wall_ms, reason):
if terminal_action is uniflow.StepAction.FAIL:
print(f"{obj} FAILED: {reason}")
rt = uniflow.Runtime(observer=MyObserver()) # rt 의 모든 모듈이 이걸 씀description 은 step 이 self.Describe(...) 로 적은 한 줄로, "지금 뭘 하는지" 를 로그에 남기는 용도다. step 전환 시 한 번 찍히고 비워진다:
def Step2_WaitInPos(self):
self.Describe("approaching ", self.flow().target, " mm") # OnStepChanged 에 나타남
if self.flow().axis.in_position():
return self.Next(self.Step3_Clamp)
return self.Stay()module.CurrentStepDescription() 로 실시간 조회도 된다 (CurrentStepName() / CurrentStepOrdinal() 로 흐름이 지금 어디인지도). 렌더러 flow 가 이 읽기를 사용한다 - simulator.py 와 message_dispatch.py 참고.
전체 observer 표면은 PYTHON_PORT.md 에 있다:
OnFlowStarted,OnStepChanged,OnStepThrew,OnAsyncSubmitted,OnAsyncCompleted,OnAsyncAbandoned,OnAsyncHighWater,OnFlowEnded. 관심 있는 이벤트만 override 하면 된다. 훅 예외가 펌프를 죽이지 못한다.
task 는 보통 펌프가 아닌 무언가가 런칭한다 - 이벤트 스레드, 소켓 콜백, main 스레드. StartFlow()(과 StartTask)는 어느 스레드에서나 안전하고 펌프를 즉시 깨우므로, 첫 step 이 다음 20ms 폴링이 아니라 즉시 돈다.
# 어느 스레드에서나: task 런칭은 안전하고 펌프를 즉시 깨운다.
def on_message(msg):
handler.current = msg
handler.ctx.StartFlow() # 런칭되면 Ok, 이미 돌고 있으면 Busy
# 자기 채널로 상태를 바꾼 뒤 펌프를 직접 깨울 수도 있습니다:
rt.Wake()오케스트레이터 가 이 패턴을 라인 규모로 구동한다: 하나의 영속 task 가 매 라운드 모든 모듈의 IsIdle() 을 확인하고, 평범한 멤버 읽기로 그 모듈의 다음 task 를 런칭한다 - 워커 모듈은 스스로 시퀀싱하지 않는다. pick_and_place.py 의 형태다.
생명주기 제어:
module.IsIdle- 비었나? 오케스트레이터가 다른 모듈의 다음 task 를 런칭하기 전에 확인. (CurrentStepName()/CurrentStepOrdinal()/CurrentStepDescription()으로 도는 흐름의 위치 조회; idle 이면""/-1.)module.WaitUntilIdle(timeout=None)- 이 모듈 이 idle 될 때까지 호출 스레드 블록.rt.WaitUntilIdle(timeout=None)- 모든 모듈이 idle 될 때까지 블록(main이 종료 전 기다리는 법). 둘 다 step 안에서는 부르지 마세요.module.Cancel()- 도는 흐름을 협력적으로 종료(이유"cancelled"로 fail 표시);rt.CancelAll()은 전체에.rt.stop()- 펌프 정지 + 풀 종료.Runtime은 컨텍스트 매니저(with uniflow.Runtime() as rt:)이기도 해서 종료 시 자동 stop.
흔한 협력적 종료(모든 콘솔 예제가 쓰는)는 모듈 레벨 "stop" 플래그다 - 각 step 이 이를 확인해 Done() 을 반환하면 WaitUntilIdle() 이 반환되고 rt.stop() 이 전체를 정리한다.
- PYTHON_PORT.md - 전체 API 표와 설계 결정.
- python/examples - 여섯 개의 예제(../cpp/examples, ../cs/examples 와 미러).
- ../cpp/TUTORIAL.md - C++ 튜토리얼; 같은 개념을 단일 펌프 모델·크로스-런타임까지 더 깊이.
- ../README.md - 프로젝트 개요.