diff --git a/spec/publishing/publish_metadata_spec.cr b/spec/publishing/publish_metadata_spec.cr index 7c0d68a..bc9df82 100644 --- a/spec/publishing/publish_metadata_spec.cr +++ b/spec/publishing/publish_metadata_spec.cr @@ -37,7 +37,7 @@ module PlaceOS::Source mock = MockModel.new(id: "hello", some_data: "edkh") router.publish_metadata(zone, mock) - sleep 0.1 + sleep 1000.milliseconds router.@publisher_managers.first.messages.should be_empty end @@ -52,7 +52,7 @@ module PlaceOS::Source router = Dummy.new router.publish_metadata(zone, mock) - sleep 0.1 + sleep 1000.milliseconds message = router.@publisher_managers.first.messages.first? message.should_not be_nil diff --git a/spec/status_events_spec.cr b/spec/status_events_spec.cr index 09d8a11..2ac3f5f 100644 --- a/spec/status_events_spec.cr +++ b/spec/status_events_spec.cr @@ -14,13 +14,13 @@ module PlaceOS::Source events = StatusEvents.new(mock_mappings, managers) spawn { events.start } - sleep 0.1 + sleep 1000.milliseconds Redis.open(url: REDIS_URL) do |client| client.publish("status/#{module_id}/#{status_key}", expected_payload("on")) end - sleep 0.1 + sleep 1000.milliseconds message = mock_publisher_manager.messages.first? message.should_not be_nil @@ -45,14 +45,14 @@ module PlaceOS::Source events = StatusEvents.new(mock_mappings, managers) spawn { events.start } - sleep 0.1 + sleep 1000.milliseconds Redis.open(url: REDIS_URL) do |client| client.publish("status/#{module_id}/#{status_key}", expected_payload("on")) client.publish("status/#{module_id}/#{status_key}", expected_payload("off")) end - sleep 0.1 + sleep 100.milliseconds mock_publisher_manager.messages.size.should eq(1) message = mock_publisher_manager.messages.first? message.should_not be_nil diff --git a/src/app.cr b/src/app.cr index 2f402b8..273f41a 100644 --- a/src/app.cr +++ b/src/app.cr @@ -58,6 +58,14 @@ module PlaceOS::Source end end + # Configure the database connection. First check if PG_DATABASE_URL environment variable + # is set. If not, assume database configuration are set via individual environment variables + if pg_url = ENV["PG_DATABASE_URL"]? + PgORM::Database.parse(pg_url) + else + PgORM::Database.configure { |_| } + end + publisher_managers = [] of PublisherManager publisher_managers << MqttBrokerManager.new @@ -71,14 +79,6 @@ module PlaceOS::Source publisher_managers << InfluxManager.new(influx_host, influx_api_key) end - # Configure the database connection. First check if PG_DATABASE_URL environment variable - # is set. If not, assume database configuration are set via individual environment variables - if pg_url = ENV["PG_DATABASE_URL"]? - PgORM::Database.parse(pg_url) - else - PgORM::Database.configure { |_| } - end - # Start application manager manager = Manager.new(publisher_managers) manager.start diff --git a/src/source/status_events.cr b/src/source/status_events.cr index 4297595..8d19df1 100644 --- a/src/source/status_events.cr +++ b/src/source/status_events.cr @@ -11,7 +11,11 @@ module PlaceOS::Source class StatusEvents Log = ::Log.for(self) - STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*" + STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*" + MAX_CONTAINER_SIZE = 50_000 + BATCH_SIZE = 100 + PROCESSING_INTERVAL = 100.milliseconds + CONTAINER_WARNING_THRESHOLD = MAX_CONTAINER_SIZE * 0.8 private getter! redis : Redis private getter mappings : Mappings @@ -20,6 +24,7 @@ module PlaceOS::Source private property? stopped : Bool = true private getter sync_lock = Mutex.new(:reentrant) + private getter event_available = Channel(Nil).new(1) alias EventKey = NamedTuple(source: Symbol, mod_id: String, status: String) alias EventValue = NamedTuple(pattern: String, payload: String) @@ -72,15 +77,13 @@ module PlaceOS::Source store = PlaceOS::Driver::RedisStorage.new(module_id) store.each do |key, value| status_updated += 1_u64 - begin - synchronize { event_container[{source: :db, mod_id: module_id, status: key}] = {pattern: pattern, payload: value} } - rescue error - Log.error(exception: error) { { - message: "publishing initial state", - pattern: pattern, - module_id: module_id, - status: key, - } } + add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value}) + + # Backpressure if event container is growing too fast + if event_container.size >= MAX_CONTAINER_SIZE / 2 + until event_container.size < MAX_CONTAINER_SIZE / 4 + sleep 10.milliseconds + end end end end @@ -103,24 +106,53 @@ module PlaceOS::Source } } return end + add_event({source: :redis, mod_id: module_id, status: status}, {pattern: pattern, payload: payload}) + end - synchronize { event_container[{source: :redis, mod_id: module_id, status: status}] = {pattern: pattern, payload: payload} } + private def add_event(key : EventKey, value : EventValue) + synchronize do + if event_container.size >= MAX_CONTAINER_SIZE + Log.warn { "Event container full! Possible processing backlog. Skipping #{key[:source]} - #{key[:mod_id]}" } + return + end + event_container[key] = value + event_available.send(nil) if event_container.size == 1 + end end protected def process_events - loop do - break if stopped? - task = synchronize { event_container.shift? } - unless task - sleep 100.milliseconds - next + until stopped? + batch = build_batch + process_batch(batch) unless batch.empty? + + if event_container.empty? + select + when event_available.receive? + when timeout(PROCESSING_INTERVAL) + end + elsif event_container.size >= CONTAINER_WARNING_THRESHOLD + sleep PROCESSING_INTERVAL / 10 + else + sleep PROCESSING_INTERVAL end - key = task.first - value = task.last - process_pevent(value[:pattern], key[:mod_id], key[:status], value[:payload]) rescue nil end end + private def build_batch : Array({EventKey, EventValue}) + synchronize do + keys = event_container.keys.first(BATCH_SIZE) + keys.map { |key| {key, event_container.delete(key).not_nil!} } + end + end + + private def process_batch(batch) + batch.each do |(key, value)| + process_pevent(value[:pattern], key[:mod_id], key[:status], value[:payload]) + end + rescue error + Log.error(exception: error) { "Error processing event batch" } + end + protected def process_pevent(pattern : String, module_id : String, status : String, payload : String) events = mappings.status_events?(module_id, status)