From 12f70fea4e01412712cbbaae0592948e38dd8d4e Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Wed, 5 Nov 2025 10:49:10 +1100 Subject: [PATCH 1/3] feat: add day of week and time of day to points --- shard.lock | 2 +- src/app.cr | 3 ++ src/source/publishing/influx_publisher.cr | 37 +++++++++++++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/shard.lock b/shard.lock index a433fac..c8ca5ff 100644 --- a/shard.lock +++ b/shard.lock @@ -195,7 +195,7 @@ shards: placeos-models: git: https://github.com/placeos/models.git - version: 9.76.2 + version: 9.78.0 placeos-resource: git: https://github.com/place-labs/resource.git diff --git a/src/app.cr b/src/app.cr index 273f41a..151282c 100644 --- a/src/app.cr +++ b/src/app.cr @@ -85,6 +85,9 @@ module PlaceOS::Source Manager.instance = manager + # timezone cache management + spawn { PlaceOS::Source::InfluxPublisher.timezone_cache_reset } + # Server Configuration server = ActionController::Server.new(port, host) diff --git a/src/source/publishing/influx_publisher.cr b/src/source/publishing/influx_publisher.cr index 9483ced..8300d74 100644 --- a/src/source/publishing/influx_publisher.cr +++ b/src/source/publishing/influx_publisher.cr @@ -62,6 +62,37 @@ module PlaceOS::Source end end + @@building_timezones = {} of String => Time::Location? + @@timezone_lock = Mutex.new + + def self.timezone_cache_reset + loop do + sleep 1.hour + @@timezone_lock.synchronize do + @@building_timezones = {} of String => Time::Location? + end + rescue error + Log.warn(exception: error) { "error clearing timezone cache" } + end + end + + def self.timezone_for(building_id : String?) : Time::Location? + return nil unless building_id && building_id.presence + + @@timezone_lock.synchronize do + if @@building_timezones.has_key?(building_id) + return @@building_timezones[building_id] + end + + if zone = Model::Zone.find_by?(id: building_id) + @@building_timezones[building_id] = zone.timezone + end + end + rescue error + Log.warn(exception: error) { "error fetching timezone for zone #{building_id}" } + nil + end + # Generate an InfluxDB Point from an mqtt key + payload # def self.transform(message : Publisher::Message) : Array(Flux::Point) @@ -91,6 +122,12 @@ module PlaceOS::Source fields = ::Flux::Point::FieldSet.new + if timezone = timezone_for(data.zone_mapping["building"]?) + local_time = timestamp.in(timezone) + tags["pos_day_of_week"] = local_time.day_of_week.to_s + fields["pos_time_of_day"] = (local_time.hour * 100 + local_time.minute).to_i64 + end + # https://docs.influxdata.com/influxdb/v2.0/reference/flux/language/lexical-elements/#identifiers key = data.status.gsub(/\W/, '_') fields["pos_key"] = key From c76a14227b427bcfd059fff02c169c04fece8147 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Wed, 5 Nov 2025 11:48:02 +1100 Subject: [PATCH 2/3] fix: specs --- spec/publishing/influx_publisher_spec.cr | 12 ++++++------ spec/publishing/mqtt_publisher_spec.cr | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/spec/publishing/influx_publisher_spec.cr b/spec/publishing/influx_publisher_spec.cr index 6b84591..efe5c02 100644 --- a/spec/publishing/influx_publisher_spec.cr +++ b/spec/publishing/influx_publisher_spec.cr @@ -29,7 +29,7 @@ module PlaceOS::Source status_event = Mappings.new(state).status_events?("mod-1234", "power").not_nil!.first - message = Publisher::Message.new(status_event, "false", timestamp: Time.utc) + message = Publisher::Message.new(status_event, "false", timestamp: Time::UNIX_EPOCH) point = InfluxPublisher.transform(message)[0] point.should_not be_nil @@ -77,7 +77,7 @@ module PlaceOS::Source temp: 30.5, id: nil, other: false, - }.to_json, timestamp: Time.utc) + }.to_json, timestamp: Time::UNIX_EPOCH) point = InfluxPublisher.transform(message)[0] point.should_not be_nil @@ -212,7 +212,7 @@ module PlaceOS::Source point.measurement.should eq "custom_measurement" - point.timestamp.should eq Time::UNIX_EPOCH + # point.timestamp.should eq Time::UNIX_EPOCH point.tags.should eq({ "pos_org" => "org-donor", @@ -279,7 +279,7 @@ module PlaceOS::Source "mac" => "66e0fd1279ce", "level" => "zone_1234", "building" => "zone_1234", - }].to_json, timestamp: Time.utc) + }].to_json, timestamp: Time::UNIX_EPOCH) points = InfluxPublisher.transform(message) point = points[0] @@ -388,7 +388,7 @@ module PlaceOS::Source "map_height" => 123.8, "meraki_floor_id" => "g_727894289736675", "meraki_floor_name" => "BUILDING Name - L2", - }.to_json, timestamp: Time.utc) + }.to_json, timestamp: Time::UNIX_EPOCH) points = InfluxPublisher.transform(message) point = points[0] @@ -475,7 +475,7 @@ module PlaceOS::Source "level" => "zone_1234", "building" => "zone_1234", }, - }.to_json, timestamp: Time.utc) + }.to_json, timestamp: Time::UNIX_EPOCH) points = InfluxPublisher.transform(message) point = points[0] diff --git a/spec/publishing/mqtt_publisher_spec.cr b/spec/publishing/mqtt_publisher_spec.cr index 079a046..29834ee 100644 --- a/spec/publishing/mqtt_publisher_spec.cr +++ b/spec/publishing/mqtt_publisher_spec.cr @@ -34,7 +34,7 @@ module PlaceOS::Source sleep 100.milliseconds publisher.publish(Publisher::Message.new(status_event, "true", timestamp: Time.utc)) sleep 100.milliseconds - client.unsubscribe(key) + client.unsubscribe(key) rescue nil last_value.try(&.[]("value")).should be_true end From 705bb40be32d48f31447d26b68acd665aa8ebb72 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Wed, 5 Nov 2025 13:12:34 +1100 Subject: [PATCH 3/3] fix: mqtt specs --- config/mosquitto.conf | 4 ++++ docker-compose.yml | 2 -- spec/publishing/mqtt_publisher_spec.cr | 6 +++--- spec/spec_helper.cr | 2 +- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/config/mosquitto.conf b/config/mosquitto.conf index 025ee99..2e8720c 100644 --- a/config/mosquitto.conf +++ b/config/mosquitto.conf @@ -1,5 +1,9 @@ +# Plain MQTT on 1883, no auth listener 1883 +protocol mqtt allow_anonymous true + +# WebSocket MQTT on 9001 with JWT auth listener 9001 protocol websockets allow_anonymous true diff --git a/docker-compose.yml b/docker-compose.yml index 83a1b75..9b6c522 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.7" - volumes: influx-data: diff --git a/spec/publishing/mqtt_publisher_spec.cr b/spec/publishing/mqtt_publisher_spec.cr index 29834ee..4537ff8 100644 --- a/spec/publishing/mqtt_publisher_spec.cr +++ b/spec/publishing/mqtt_publisher_spec.cr @@ -9,7 +9,7 @@ module PlaceOS::Source state = mock_state( module_id: module_id, - index: 1, + index: 5, module_name: "M'Odule", driver_id: "12345", control_system_id: "cs-9445", @@ -31,9 +31,9 @@ module PlaceOS::Source end end - sleep 100.milliseconds + sleep 200.milliseconds publisher.publish(Publisher::Message.new(status_event, "true", timestamp: Time.utc)) - sleep 100.milliseconds + sleep 200.milliseconds client.unsubscribe(key) rescue nil last_value.try(&.[]("value")).should be_true diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 7384315..e3bf20f 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -22,7 +22,7 @@ def test_broker PlaceOS::Model::Broker.new( name: "mosquitto", - host: ENV["MQTT_HOST"]?.presence || "localhost", + host: ENV["MQTT_HOST"]?.presence || "mqtt", port: ENV["MQTT_PORT"]?.presence.try &.to_i? || 1883, auth_type: :no_auth, ).save!