-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathSkyzeMessageBusService.py
More file actions
133 lines (115 loc) · 4.73 KB
/
Copy pathSkyzeMessageBusService.py
File metadata and controls
133 lines (115 loc) · 4.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""Created on 12/10/2017
@author: michaelnew"""
# Third Party Imports
from datetime import datetime
from time import sleep
import json
import sys
# Python 2 compatibility
PY2 = sys.version_info[0] == 2
PY3 = (sys.version_info[0] >= 3)
if PY2:
import Queue as queue
else: # PY3
import queue
# Skyze Imports
from Skyze_Messaging_Service import settings
from Skyze_Standard_Library.SkyzeServiceAbstract import *
from Skyze_Messaging_Service.Messages.SkyzeMessageTypes import *
class SkyzeMessageBusService(SkyzeServiceAbstract):
"""Skyze inter-service message bus"""
def __init__(self):
"""Constructor"""
path_to_service = "Skyze_Messaging_Service"
super().__init__(log_path=path_to_service)
self.__continue_processing = True
self._message_bus = queue.Queue()
# Start up message
status_message = MessageServiceStatus(self.getType(), "Started")
self.publishMessage(status_message)
def setServices(self, market_data_cleaner, market_data_updater,
message_logger, notifier, scheduler, screener):
"""Sets the services pub/sub to the bus"""
self.__market_data_cleaner_service = market_data_cleaner
self.__market_data_updater_service = market_data_updater
self.__message_logger_service = message_logger
self.__notifier_service = notifier
self.__scheduler_service = scheduler
self.__screener_service = screener
def getCreated(self):
"""Getter"""
return self.__created
def getContinueProcessing(self):
"""Getter"""
return self.__continue_processing
def getCreated(self):
"""Getter"""
return self.__created
def getJSON(self):
"""Getter"""
return json.dumps(self.__dict__)
def setContinueProcessing(self, continue_processing):
"""Setter"""
self.__continue_processing = continue_processing
def publishMessage(self, published_message):
"""Publishes Messages"""
# puts the message onto the queue
self._message_bus.put(published_message)
def __route_message(self, message):
"""Sends message to apprpriate receiver"""
# All messages are logged
# log_msg = "Message Bus Service::__route_message::" + message.getJSON()
self.__message_logger_service.receiveMessage(message)
print("route message")
# Route to appropriate service
message_type = message.getMessageType()
if message_type == SkyzeMessageType.NEW_MARKET_DATA \
or message_type == SkyzeMessageType.SCREENER_RUN:
# Route to the Screener Service
self.__screener_service.receiveMessage(message)
elif message_type == SkyzeMessageType.MARKET_DATA_UPDATER_RUN \
or message_type == SkyzeMessageType.MARKET_DATA_UPDATER_RUN_ALL:
# Route to the Market Data Updater Service
self.__market_data_updater_service.receiveMessage(message)
elif message_type == SkyzeMessageType.NOTIFICATION\
or message_type == SkyzeMessageType.MARKET_DATA_UPDATER_RUN_COMPLETE:
# Route to the Notifier Service
self.__notifier_service.receiveMessage(message)
elif message_type == SkyzeMessageType.SCHEDULER_RUN \
or message_type == SkyzeMessageType.SCHEDULER_TEST:
# Route to the Scheduler Service
self.__scheduler_service.receiveMessage(message)
elif message_type == SkyzeMessageType.ORDER_MARKET \
or message_type == SkyzeMessageType.SCHEDULER_TEST \
or message_type == SkyzeMessageType.ORDER_ENTRY_STOP \
or message_type == SkyzeMessageType.ORDER_ENTRY_LIMIT \
or message_type == SkyzeMessageType.ORDER_EXIT_STOP_LOSS \
or message_type == SkyzeMessageType.ORDER_EXIT_TAKE_PROFIT \
or message_type == SkyzeMessageType.ORDER_EXIT_TRAILING_STOP \
or message_type == SkyzeMessageType.ORDER_CANCEL:
# Route to the Execution Service
self.__scheduler_service.receiveMessage(message)
elif message_type == SkyzeMessageType.SERVICE_STATUS:
# Unused messages types - probably just for logging
# All messages are routed to the logger service already
pass
else:
# raise exception - Message not identified
self._unknownMessageTypeError(message)
def process_messages(self):
"""Infinite loop to process messages on the messuage bus"""
# Messaging handline loop
print("try get next message from infinite loop")
while self.__continue_processing:
try:
print('.', end='')
# get next message off message bus
next_message = self._message_bus.get(False)
except queue.Empty:
# if no message then start the scheduler after this there
# will always be a message as scheduler will generate messages
# and sleep in between
sleep(5)
else:
print("routing from infinite loop")
self.__route_message(next_message)