-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtransitions.py
More file actions
106 lines (96 loc) · 4.62 KB
/
transitions.py
File metadata and controls
106 lines (96 loc) · 4.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
from agent import *
from math import inf
class TransitionEvent(Event):
"""
An event for a state transition. An optional callback function may be
provided to be executed after the state has changed
"""
def __init__(self, time, rule):
"""
initialize a transition event
:param time: the event time
:param rule: the state change rule
"""
super().__init__(time)
self.rule = rule
def handle(self, sim):
if self.rule.from_state.match(self.owner):
if self.rule.to_change is not None:
yes = self.rule.to_change(self.time, sim, self.owner, self.rule.to_state)
if not yes:
return
sim.set_state( self.time, self.owner, self.rule.to_state)
if self.rule.changed is not None:
self.rule.changed(self.time, sim, self.owner, self.rule.from_state)
class ContactEvent(Event):
"""
An event for a state transition. An optional callback function may be
provided to be executed after the state has changed
"""
def __init__(self, time, contact, rule):
"""
initialize a transition event
:param time: the event time
:param contact: the agent to contact
:param rule: the contact rule
"""
super().__init__(time)
self.contact = contact
self.rule = rule
def handle(self, sim):
if self.rule.from_state[0].match(self.owner) and self.rule.from_state[1].match(self.contact):
yes = True if self.rule.to_change is None else \
self.rule.to_change(self.time, sim, (self.owner, self.contact), self.rule.to_state)
if yes:
if not self.rule.to_state[1].match(self.contact):
sim.set_state(self.time, self.contact, self.rule.to_state[1])
if not self.rule.to_state[0].match(self.owner):
sim.set_state(self.time, self.owner, self.rule.to_state[0])
if self.rule.changed is not None:
self.rule.changed(self.time, sim, (self.owner, self.contact), self.rule.from_state)
self.rule.schedule(self.time, self.owner)
class Transition:
def __init__(self, from_state, to_state, waiting_time, to_change_callback=None, changed_callback=None, contact = None):
"""
:param from_state:
:param to_state:
:param waiting_time:
:param to_change_callback: a function to be called before the stqte change happens. It returns
a boolean value. If True, the change will happen; if False, the change will not happen.
It has the following arguments:
time: the time that the transition happened
simulation: the current simulation
agent: an Agent for the state change of an agent, or a tuple of agents for the state changes
caused by a contact (the first initiates the contact, the second is its contact).
to_state: the state(s) that the agent(s) will change to
:param changed_callback: a function to be called after the state change has happened. It returns
no value, and has the following arguments:
time: the time that the transition happened
simulation: the current simulation
agent: an Agent whose state has changes, or a tuple of agents whose state has changed due
to contact.
from_state: the state(s) that the agent(s) changed from
:param contact: If this transition is caused by a contact, this points the contact object
"""
if not isinstance(from_state, State) and not isinstance(from_state, States):
self.from_state = State(from_state)
else:
self.from_state = from_state
if not isinstance(to_state, State) and not isinstance(to_state, States):
self.to_state = State(to_state)
else:
self.to_state = to_state
self.waiting_time = waiting_time
self.to_change = to_change_callback
self.changed = changed_callback
self.contact = contact
def schedule(self, current_time, agent):
if self.contact is None:
if self.from_state.match(agent):
time = self.waiting_time(current_time) + current_time
agent.schedule(TransitionEvent(time, self))
elif self.from_state[0].match(agent):
for c in self.contact.contact(agent):
time = self.waiting_time(current_time) + current_time
if time is not inf:
agent.schedule(ContactEvent(time, c, self))