-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathuf_receiver.h
More file actions
53 lines (44 loc) · 1.68 KB
/
Copy pathuf_receiver.h
File metadata and controls
53 lines (44 loc) · 1.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// uf_receiver.h - the "worker that drains the inbox" half.
//
// One looping task (Drain): pop one Msg, dispatch to the Add or Sub step by its
// operator, then loop back to pop the next. When the queue empties the task
// Done()s and the module parks; the sender relaunches it on the next burst.
// State the steps share (the current job, counters) lives on the flow and is
// reached from the steps via flow().
#pragma once
#include "globals.h"
#include <string>
enum class RecvState
{
Idle,
Dispatching,
Adding,
Subtracting,
};
const char* ToString(RecvState s);
class Flow_Receiver : public uniflow::Uniflow<Flow_Receiver>
{
public:
explicit Flow_Receiver(uniflow::Runtime& rt);
// The single drain task. Public so the sender can relaunch it with
// task.StartFlow() when it parks and a new burst arrives.
struct Task_Drain : uniflow::Task<Flow_Receiver>
{
StepResult Entry() override { return Step1_TakeNext(); }
private:
StepResult Step1_TakeNext(); // pop one job and dispatch by operator
StepResult Step2_Add(); // a + b
StepResult Step3_Sub(); // a - b
} task_drain_;
// Read-only state for the snapshot step (same pump thread, no lock).
RecvState State() const { return state_; }
int Processed() const { return processed_; }
const std::string& LastResult() const { return last_result_; }
Msg Current() const { return current_; }
private:
// Flow-owned state, reached from the steps via flow().member_.
RecvState state_ = RecvState::Idle;
Msg current_{};
int processed_ = 0;
std::string last_result_;
};