Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions spec/publishing/publish_metadata_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ module PlaceOS::Source
mock = MockModel.new(id: "hello", some_data: "edkh")
router.publish_metadata(zone, mock)

sleep 0.1
sleep 1000.milliseconds

router.@publisher_managers.first.messages.should be_empty
end
Expand All @@ -52,7 +52,7 @@ module PlaceOS::Source
router = Dummy.new
router.publish_metadata(zone, mock)

sleep 0.1
sleep 1000.milliseconds

message = router.@publisher_managers.first.messages.first?
message.should_not be_nil
Expand Down
8 changes: 4 additions & 4 deletions spec/status_events_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ module PlaceOS::Source
events = StatusEvents.new(mock_mappings, managers)
spawn { events.start }

sleep 0.1
sleep 1000.milliseconds

Redis.open(url: REDIS_URL) do |client|
client.publish("status/#{module_id}/#{status_key}", expected_payload("on"))
end

sleep 0.1
sleep 1000.milliseconds

message = mock_publisher_manager.messages.first?
message.should_not be_nil
Expand All @@ -45,14 +45,14 @@ module PlaceOS::Source
events = StatusEvents.new(mock_mappings, managers)
spawn { events.start }

sleep 0.1
sleep 1000.milliseconds

Redis.open(url: REDIS_URL) do |client|
client.publish("status/#{module_id}/#{status_key}", expected_payload("on"))
client.publish("status/#{module_id}/#{status_key}", expected_payload("off"))
end

sleep 0.1
sleep 100.milliseconds
mock_publisher_manager.messages.size.should eq(1)
message = mock_publisher_manager.messages.first?
message.should_not be_nil
Expand Down
16 changes: 8 additions & 8 deletions src/app.cr
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ module PlaceOS::Source
end
end

# Configure the database connection. First check if PG_DATABASE_URL environment variable
# is set. If not, assume database configuration are set via individual environment variables
if pg_url = ENV["PG_DATABASE_URL"]?
PgORM::Database.parse(pg_url)
else
PgORM::Database.configure { |_| }
end

publisher_managers = [] of PublisherManager

publisher_managers << MqttBrokerManager.new
Expand All @@ -71,14 +79,6 @@ module PlaceOS::Source
publisher_managers << InfluxManager.new(influx_host, influx_api_key)
end

# Configure the database connection. First check if PG_DATABASE_URL environment variable
# is set. If not, assume database configuration are set via individual environment variables
if pg_url = ENV["PG_DATABASE_URL"]?
PgORM::Database.parse(pg_url)
else
PgORM::Database.configure { |_| }
end

# Start application manager
manager = Manager.new(publisher_managers)
manager.start
Expand Down
72 changes: 52 additions & 20 deletions src/source/status_events.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ module PlaceOS::Source
class StatusEvents
Log = ::Log.for(self)

STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*"
STATUS_CHANNEL_PATTERN = "status/#{Model::Module.table_name}-*"
MAX_CONTAINER_SIZE = 50_000
BATCH_SIZE = 100
PROCESSING_INTERVAL = 100.milliseconds
CONTAINER_WARNING_THRESHOLD = MAX_CONTAINER_SIZE * 0.8

private getter! redis : Redis
private getter mappings : Mappings
Expand All @@ -20,6 +24,7 @@ module PlaceOS::Source
private property? stopped : Bool = true

private getter sync_lock = Mutex.new(:reentrant)
private getter event_available = Channel(Nil).new(1)

alias EventKey = NamedTuple(source: Symbol, mod_id: String, status: String)
alias EventValue = NamedTuple(pattern: String, payload: String)
Expand Down Expand Up @@ -72,15 +77,13 @@ module PlaceOS::Source
store = PlaceOS::Driver::RedisStorage.new(module_id)
store.each do |key, value|
status_updated += 1_u64
begin
synchronize { event_container[{source: :db, mod_id: module_id, status: key}] = {pattern: pattern, payload: value} }
rescue error
Log.error(exception: error) { {
message: "publishing initial state",
pattern: pattern,
module_id: module_id,
status: key,
} }
add_event({source: :db, mod_id: module_id, status: key}, {pattern: pattern, payload: value})

# Backpressure if event container is growing too fast
if event_container.size >= MAX_CONTAINER_SIZE / 2
until event_container.size < MAX_CONTAINER_SIZE / 4
sleep 10.milliseconds
end
end
end
end
Expand All @@ -103,24 +106,53 @@ module PlaceOS::Source
} }
return
end
add_event({source: :redis, mod_id: module_id, status: status}, {pattern: pattern, payload: payload})
end

synchronize { event_container[{source: :redis, mod_id: module_id, status: status}] = {pattern: pattern, payload: payload} }
private def add_event(key : EventKey, value : EventValue)
synchronize do
if event_container.size >= MAX_CONTAINER_SIZE
Log.warn { "Event container full! Possible processing backlog. Skipping #{key[:source]} - #{key[:mod_id]}" }
return
end
event_container[key] = value
event_available.send(nil) if event_container.size == 1
end
end

protected def process_events
loop do
break if stopped?
task = synchronize { event_container.shift? }
unless task
sleep 100.milliseconds
next
until stopped?
batch = build_batch
process_batch(batch) unless batch.empty?

if event_container.empty?
select
when event_available.receive?
when timeout(PROCESSING_INTERVAL)
end
elsif event_container.size >= CONTAINER_WARNING_THRESHOLD
sleep PROCESSING_INTERVAL / 10
else
sleep PROCESSING_INTERVAL
end
key = task.first
value = task.last
process_pevent(value[:pattern], key[:mod_id], key[:status], value[:payload]) rescue nil
end
end

private def build_batch : Array({EventKey, EventValue})
synchronize do
keys = event_container.keys.first(BATCH_SIZE)
keys.map { |key| {key, event_container.delete(key).not_nil!} }
end
end

private def process_batch(batch)
batch.each do |(key, value)|
process_pevent(value[:pattern], key[:mod_id], key[:status], value[:payload])
end
rescue error
Log.error(exception: error) { "Error processing event batch" }
end

protected def process_pevent(pattern : String, module_id : String, status : String, payload : String)
events = mappings.status_events?(module_id, status)

Expand Down
Loading