From 5363f9950c15d2ab63629cdfe674b955087b48a8 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Wed, 25 Feb 2026 14:39:20 +0000 Subject: [PATCH 01/23] Add SSE support --- lib/rage-rb.rb | 8 ++ lib/rage/controller/api.rb | 17 +++- lib/rage/pubsub/adapters/base.rb | 23 ++++++ lib/rage/pubsub/adapters/redis.rb | 128 ++++++++++++++++++++++++++++++ lib/rage/pubsub/pubsub.rb | 6 ++ lib/rage/request.rb | 4 + lib/rage/sse/application.rb | 58 ++++++++++++++ lib/rage/sse/connection_proxy.rb | 34 ++++++++ lib/rage/sse/message.rb | 22 +++++ lib/rage/sse/sse.rb | 56 +++++++++++++ lib/rage/sse/stream.rb | 11 +++ rage.gemspec | 2 +- 12 files changed, 366 insertions(+), 3 deletions(-) create mode 100644 lib/rage/pubsub/adapters/base.rb create mode 100644 lib/rage/pubsub/adapters/redis.rb create mode 100644 lib/rage/pubsub/pubsub.rb create mode 100644 lib/rage/sse/application.rb create mode 100644 lib/rage/sse/connection_proxy.rb create mode 100644 lib/rage/sse/message.rb create mode 100644 lib/rage/sse/sse.rb create mode 100644 lib/rage/sse/stream.rb diff --git a/lib/rage-rb.rb b/lib/rage-rb.rb index b711f9f9..f0f7d962 100644 --- a/lib/rage-rb.rb +++ b/lib/rage-rb.rb @@ -40,6 +40,12 @@ def self.events Rage::Events end + # Shorthand to access {Rage::SSE Rage::SSE}. + # @return [Rage::SSE] + def self.sse + Rage::SSE + end + # Configure routes for the Rage application. # @return [Rage::Router::DSL::Handler] # @example @@ -185,6 +191,8 @@ module ActiveRecord autoload :OpenAPI, "rage/openapi/openapi" autoload :Deferred, "rage/deferred/deferred" autoload :Events, "rage/events/events" + autoload :SSE, "rage/sse/sse" + autoload :PubSub, "rage/pubsub/pubsub" end module RageController diff --git a/lib/rage/controller/api.rb b/lib/rage/controller/api.rb index 5efa870d..f17a0d0b 100644 --- a/lib/rage/controller/api.rb +++ b/lib/rage/controller/api.rb @@ -492,7 +492,7 @@ def session # @example # render plain: "hello world", status: 201 # @note `render` doesn't terminate execution of the action, so if you want to exit an action after rendering, you need to do something like `render(...) and return`. - def render(json: nil, plain: nil, status: nil) + def render(json: nil, plain: nil, sse: nil, status: nil) raise "Render was called multiple times in this action" if @__rendered @__rendered = true @@ -500,7 +500,7 @@ def render(json: nil, plain: nil, status: nil) @__body << if json json.is_a?(String) ? json : json.to_json else - headers["content-type"] = "text/plain; charset=utf-8" + @__headers["content-type"] = "text/plain; charset=utf-8" plain.to_s end @@ -514,6 +514,19 @@ def render(json: nil, plain: nil, status: nil) status end end + + if sse + raise "already rendered" unless @__body.empty? + + unless @__env["rack.upgrade?"] == :sse + @__status = 406 + @__body << "Bad Request: Expected an SSE connection" + return + end + + @__env["rack.upgrade"] = Rage::SSE::Application.new(sse) + @__status = 0 # TODO render 204 + end end # Send a response with no body. diff --git a/lib/rage/pubsub/adapters/base.rb b/lib/rage/pubsub/adapters/base.rb new file mode 100644 index 00000000..5f308c2f --- /dev/null +++ b/lib/rage/pubsub/adapters/base.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +class Rage::PubSub::Adapters::Base + def pick_a_worker(&block) + _lock, lock_path = Tempfile.new.yield_self { |file| [file, file.path] } + + caller = -> do + if File.new(lock_path).flock(File::LOCK_EX | File::LOCK_NB) + if Rage.logger.debug? + puts "INFO: #{Process.pid} is managing #{self.class.name.split("::").last} subscriptions." + end + block.call + end + end + + # TODO: move to root + if Iodine.running? + caller.call + else + Iodine.on_state(:on_start, &caller) + end + end +end diff --git a/lib/rage/pubsub/adapters/redis.rb b/lib/rage/pubsub/adapters/redis.rb new file mode 100644 index 00000000..280050f6 --- /dev/null +++ b/lib/rage/pubsub/adapters/redis.rb @@ -0,0 +1,128 @@ +# frozen_string_literal: true + +require "securerandom" + +if !defined?(RedisClient) + fail <<~ERR + + Redis adapter depends on the `redis-client` gem. Add the following line to your Gemfile: + gem "redis-client" + + ERR +end + +class Rage::PubSub::Adapters::Redis < Rage::PubSub::Adapters::Base + DEFAULT_REDIS_OPTIONS = { reconnect_attempts: [0.05, 0.1, 0.5] } + REDIS_MIN_VERSION_SUPPORTED = Gem::Version.create(6) + + def initialize(stream_name, broadcaster, config) + @redis_stream = if (prefix = config.delete(:channel_prefix)) + "#{prefix}:#{stream_name}" + else + stream_name + end + + @broadcaster = broadcaster + @redis_config = RedisClient.config(**DEFAULT_REDIS_OPTIONS.merge(config)) + @server_uuid = SecureRandom.uuid + + redis_version = get_redis_version + if redis_version < REDIS_MIN_VERSION_SUPPORTED + raise "Redis adapter only supports Redis 6+. Detected Redis version: #{redis_version}." + end + + @trimming_strategy = redis_version < Gem::Version.create("6.2.0") ? :maxlen : :minid + + pick_a_worker { poll } + end + + def publish(stream_name, data) + message_uuid = SecureRandom.uuid + + publish_redis.call( + "XADD", + @redis_stream, + trimming_method, "~", trimming_value, + "*", + "1", stream_name, + "2", data.to_json, + "3", @server_uuid, + "4", message_uuid + ) + end + + private + + def publish_redis + @publish_redis ||= @redis_config.new_client + end + + def trimming_method + @trimming_strategy == :maxlen ? "MAXLEN" : "MINID" + end + + def trimming_value + @trimming_strategy == :maxlen ? "10000" : ((Time.now.to_f - 5 * 60) * 1000).to_i + end + + def get_redis_version + service_redis = @redis_config.new_client + version = service_redis.call("INFO").match(/redis_version:([[:graph:]]+)/)[1] + + Gem::Version.create(version) + + rescue RedisClient::Error => e + puts "FATAL: Couldn't connect to Redis - all broadcasts will be limited to the current server." + puts e.backtrace.join("\n") + REDIS_MIN_VERSION_SUPPORTED + + ensure + service_redis.close + end + + def error_backoff_intervals + @error_backoff_intervals ||= Enumerator.new do |y| + y << 0.2 << 0.5 << 1 << 2 << 5 + loop { y << 10 } + end + end + + def poll + unless Fiber.scheduler + Fiber.set_scheduler(Rage::FiberScheduler.new) + end + + Iodine.on_state(:start_shutdown) do + @stopping = true + end + + Fiber.schedule do + read_redis = @redis_config.new_client + last_id = (Time.now.to_f * 1000).to_i + last_message_uuid = nil + + loop do + data = read_redis.blocking_call(5, "XREAD", "COUNT", "100", "BLOCK", "5000", "STREAMS", @redis_stream, last_id) + + if data + data[@redis_stream].each do |id, (_, stream_name, _, serialized_data, _, broadcaster_uuid, _, message_uuid)| + if broadcaster_uuid != @server_uuid && message_uuid != last_message_uuid + @broadcaster.broadcast(stream_name, JSON.parse(serialized_data)) + end + + last_id = id + last_message_uuid = message_uuid + end + end + + rescue RedisClient::Error => e + Rage.logger.error("Subscriber error: #{e.message} (#{e.class})") + sleep error_backoff_intervals.next + rescue => e + @stopping ? break : raise(e) + else + error_backoff_intervals.rewind + end + end + end +end diff --git a/lib/rage/pubsub/pubsub.rb b/lib/rage/pubsub/pubsub.rb new file mode 100644 index 00000000..c6b801a3 --- /dev/null +++ b/lib/rage/pubsub/pubsub.rb @@ -0,0 +1,6 @@ +module Rage::PubSub + module Adapters + autoload :Base, "rage/pubsub/adapters/base" + autoload :Redis, "rage/pubsub/adapters/redis" + end +end diff --git a/lib/rage/request.rb b/lib/rage/request.rb index 13aa273a..c9934f3e 100644 --- a/lib/rage/request.rb +++ b/lib/rage/request.rb @@ -220,6 +220,10 @@ def route_uri_pattern end end + def sse? + @env["rack.upgrade?"] == :sse + end + private def rack_request diff --git a/lib/rage/sse/application.rb b/lib/rage/sse/application.rb new file mode 100644 index 00000000..41c057de --- /dev/null +++ b/lib/rage/sse/application.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +# @private +class Rage::SSE::Application + def initialize(stream) + @stream = stream + + @type = if stream.is_a?(Enumerator) + @streamer = create_enum_streamer + :enum + elsif stream.is_a?(Proc) + @streamer = create_proc_streamer + :proc + elsif stream.is_a?(Rage::SSE::Stream) + :stream + else + :object + end + end + + def on_open(connection) + case @type + when :enum, :proc + @streamer.resume(connection) + when :stream + connection.subscribe("sse:#{@stream.id}") # TODO: hash? # TODO: broadcast right away? + when :object + connection.write(Rage::SSE.__serialize(@stream)) + connection.close + end + end + + private + + def create_enum_streamer + Fiber.schedule do + connection = Fiber.yield + + @stream.each do |event| + break if !connection.open? + connection.write(Rage::SSE.__serialize(event)) if event + end + rescue => e + Rage.logger.error("SSE stream failed with exception: #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}") + ensure + connection.close + end + end + + def create_proc_streamer + Fiber.schedule do + connection = Fiber.yield + @stream.call(Rage::SSE::ConnectionProxy.new(connection)) + rescue => e + Rage.logger.error("SSE stream failed with exception: #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}") + end + end +end diff --git a/lib/rage/sse/connection_proxy.rb b/lib/rage/sse/connection_proxy.rb new file mode 100644 index 00000000..0f78b6e7 --- /dev/null +++ b/lib/rage/sse/connection_proxy.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +class Rage::SSE::ConnectionProxy + def initialize(connection) + @connection = connection + end + + def write(data) + raise IOError, "closed stream" unless @connection.open? + @connection.write(data) + end + + alias_method :<<, :write + + def close + @connection.close + end + + alias_method :close_write, :close + + def closed? + !@connection.open? + end + + def flush + raise IOError, "closed stream" unless @connection.open? + end + + def read(...) + end + + def close_read + end +end diff --git a/lib/rage/sse/message.rb b/lib/rage/sse/message.rb new file mode 100644 index 00000000..6b252469 --- /dev/null +++ b/lib/rage/sse/message.rb @@ -0,0 +1,22 @@ +# A class representing an SSE event. Use it to specify the `id`, `event`, and `retry` fields in an SSE. +# +# @!attribute id +# @return [String] The `id` field of the SSE event. +# @!attribute event +# @return [String] The `event` field of the SSE event. +# @!attribute retry +# @return [Integer] The `retry` field of the SSE event, in milliseconds. +# @!attribute data +# @return [String, #to_json] The `data` field of the SSE event. If it's an object, it will be serialized to JSON. +Rage::SSE::Message = Struct.new(:id, :event, :retry, :data, keyword_init: true) do + def to_s + str = "" + + str << "id: #{id}\n" if id + str << "event: #{event}\n" if event + str << "retry: #{self.retry}\n" if self.retry + str << "data: #{data.is_a?(String) ? data : data.to_json}\n" if data # TODO: multiline + + str + "\n" + end +end diff --git a/lib/rage/sse/sse.rb b/lib/rage/sse/sse.rb new file mode 100644 index 00000000..dc7f52f9 --- /dev/null +++ b/lib/rage/sse/sse.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +module Rage::SSE + # Factory method to create SSE events. + # + # @param data [String, #to_json] The `data` field of the SSE event. If it's an object, it will be serialized to JSON. + # @param id [String, nil] The `id` field of the SSE event. + # @param event [String, nil] The `event` field of the SSE event. + # @param retry [Integer, nil] The `retry` field of the SSE event, in milliseconds. + # @return [Message] The created SSE event. + # @example + # Rage::SSE.message(current_user.profile, id: current_user.id) + def self.message(data, id: nil, event: nil, retry: nil) + Message.new(data:, id:, event:, retry:) + end + + def self.__serialize(data) + if data.is_a?(String) + "data: #{data}\n\n" + elsif data.is_a?(Message) + data.to_s + else + "data: #{data.to_json}\n\n" + end + end + + # @private + def self.__adapter=(adapter) + @__adapter = adapter + end + + # @private + def self.__adapter + @__adapter + end + + # TODO: telemetry + def self.broadcast(stream, data) + InternalBroadcast.broadcast(stream, data) + __adapter&.publish(stream, data) + end + + # @private + module InternalBroadcast + def self.broadcast(stream, data) + Iodine.publish("sse:#{stream}", Rage::SSE.__serialize(data)) + end + end +end + +require_relative "application" +require_relative "connection_proxy" +require_relative "message" +require_relative "stream" + +# Rage::SSE.__adapter = Rage::PubSub::Adapters::Redis.new("rage:sse:messages", Rage::SSE::InternalBroadcast, {}) diff --git a/lib/rage/sse/stream.rb b/lib/rage/sse/stream.rb new file mode 100644 index 00000000..37c1a14b --- /dev/null +++ b/lib/rage/sse/stream.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class Rage::SSE::Stream + attr_reader :id + + # TODO: close + def initialize(id) + # TODO: composite keys + @id = id + end +end diff --git a/rage.gemspec b/rage.gemspec index d29dda12..ea1eaf15 100644 --- a/rage.gemspec +++ b/rage.gemspec @@ -29,7 +29,7 @@ Gem::Specification.new do |spec| spec.add_dependency "thor", "~> 1.0" spec.add_dependency "rack", "< 4" - spec.add_dependency "rage-iodine", "~> 4.3" + spec.add_dependency "rage-iodine", "~> 5.0" spec.add_dependency "zeitwerk", "~> 2.6" spec.add_dependency "rack-test", "~> 2.1" spec.add_dependency "rake", ">= 12.0" From 40058a0aea58075741796af9468d9c3f98956685 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Wed, 25 Feb 2026 15:04:43 +0000 Subject: [PATCH 02/23] Don't resume dead fibers fixes the case when fiber is manually killed while waiting --- lib/rage/fiber_scheduler.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 0c74c773..5bec50c0 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -97,7 +97,7 @@ def block(_blocker, timeout = nil) unless fulfilled fulfilled = true ::Iodine.defer { ::Iodine.unsubscribe(channel) } - f.resume + f.resume if f.alive? end end From c36b8f19613a30b54888cfe71201699352aa958e Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Wed, 25 Feb 2026 16:26:09 +0000 Subject: [PATCH 03/23] Ensure correct content-type is set --- lib/rage/controller/api.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/rage/controller/api.rb b/lib/rage/controller/api.rb index f17a0d0b..ba1b28d0 100644 --- a/lib/rage/controller/api.rb +++ b/lib/rage/controller/api.rb @@ -526,6 +526,7 @@ def render(json: nil, plain: nil, sse: nil, status: nil) @__env["rack.upgrade"] = Rage::SSE::Application.new(sse) @__status = 0 # TODO render 204 + @__headers["content-type"] = "text/event-stream" end end From f1c8d1edcbe5c30f4de2a546c3ebf1117b43c295 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Thu, 26 Feb 2026 17:28:14 +0000 Subject: [PATCH 04/23] Update message generation logic --- lib/rage/sse/message.rb | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/lib/rage/sse/message.rb b/lib/rage/sse/message.rb index 6b252469..4168a5bc 100644 --- a/lib/rage/sse/message.rb +++ b/lib/rage/sse/message.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + # A class representing an SSE event. Use it to specify the `id`, `event`, and `retry` fields in an SSE. # # @!attribute id @@ -10,13 +12,14 @@ # @return [String, #to_json] The `data` field of the SSE event. If it's an object, it will be serialized to JSON. Rage::SSE::Message = Struct.new(:id, :event, :retry, :data, keyword_init: true) do def to_s - str = "" - - str << "id: #{id}\n" if id - str << "event: #{event}\n" if event - str << "retry: #{self.retry}\n" if self.retry - str << "data: #{data.is_a?(String) ? data : data.to_json}\n" if data # TODO: multiline + data_entry = if !data.is_a?(String) + "data: #{data.to_json}\n" + elsif data.include?("\n") + data.split("\n").map { |d| "data: #{d}\n" }.join + else + "data: #{data}\n" + end - str + "\n" + "#{data_entry}#{"id: #{id}\n" if id}#{"event: #{event}\n" if event}#{"retry: #{self.retry}\n" if self.retry}\n" end end From 1356d00a38a7f4ac2700e0cad5fecc842f469f9e Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Sat, 28 Feb 2026 15:51:48 +0000 Subject: [PATCH 05/23] Remove SSE::Stream functionality reducing the scope - streams will go into a separate PR --- lib/rage-rb.rb | 1 - lib/rage/pubsub/adapters/base.rb | 23 ------ lib/rage/pubsub/adapters/redis.rb | 128 ------------------------------ lib/rage/pubsub/pubsub.rb | 6 -- lib/rage/sse/application.rb | 4 - lib/rage/sse/sse.rb | 26 ------ lib/rage/sse/stream.rb | 11 --- 7 files changed, 199 deletions(-) delete mode 100644 lib/rage/pubsub/adapters/base.rb delete mode 100644 lib/rage/pubsub/adapters/redis.rb delete mode 100644 lib/rage/pubsub/pubsub.rb delete mode 100644 lib/rage/sse/stream.rb diff --git a/lib/rage-rb.rb b/lib/rage-rb.rb index f0f7d962..d2047e98 100644 --- a/lib/rage-rb.rb +++ b/lib/rage-rb.rb @@ -192,7 +192,6 @@ module ActiveRecord autoload :Deferred, "rage/deferred/deferred" autoload :Events, "rage/events/events" autoload :SSE, "rage/sse/sse" - autoload :PubSub, "rage/pubsub/pubsub" end module RageController diff --git a/lib/rage/pubsub/adapters/base.rb b/lib/rage/pubsub/adapters/base.rb deleted file mode 100644 index 5f308c2f..00000000 --- a/lib/rage/pubsub/adapters/base.rb +++ /dev/null @@ -1,23 +0,0 @@ -# frozen_string_literal: true - -class Rage::PubSub::Adapters::Base - def pick_a_worker(&block) - _lock, lock_path = Tempfile.new.yield_self { |file| [file, file.path] } - - caller = -> do - if File.new(lock_path).flock(File::LOCK_EX | File::LOCK_NB) - if Rage.logger.debug? - puts "INFO: #{Process.pid} is managing #{self.class.name.split("::").last} subscriptions." - end - block.call - end - end - - # TODO: move to root - if Iodine.running? - caller.call - else - Iodine.on_state(:on_start, &caller) - end - end -end diff --git a/lib/rage/pubsub/adapters/redis.rb b/lib/rage/pubsub/adapters/redis.rb deleted file mode 100644 index 280050f6..00000000 --- a/lib/rage/pubsub/adapters/redis.rb +++ /dev/null @@ -1,128 +0,0 @@ -# frozen_string_literal: true - -require "securerandom" - -if !defined?(RedisClient) - fail <<~ERR - - Redis adapter depends on the `redis-client` gem. Add the following line to your Gemfile: - gem "redis-client" - - ERR -end - -class Rage::PubSub::Adapters::Redis < Rage::PubSub::Adapters::Base - DEFAULT_REDIS_OPTIONS = { reconnect_attempts: [0.05, 0.1, 0.5] } - REDIS_MIN_VERSION_SUPPORTED = Gem::Version.create(6) - - def initialize(stream_name, broadcaster, config) - @redis_stream = if (prefix = config.delete(:channel_prefix)) - "#{prefix}:#{stream_name}" - else - stream_name - end - - @broadcaster = broadcaster - @redis_config = RedisClient.config(**DEFAULT_REDIS_OPTIONS.merge(config)) - @server_uuid = SecureRandom.uuid - - redis_version = get_redis_version - if redis_version < REDIS_MIN_VERSION_SUPPORTED - raise "Redis adapter only supports Redis 6+. Detected Redis version: #{redis_version}." - end - - @trimming_strategy = redis_version < Gem::Version.create("6.2.0") ? :maxlen : :minid - - pick_a_worker { poll } - end - - def publish(stream_name, data) - message_uuid = SecureRandom.uuid - - publish_redis.call( - "XADD", - @redis_stream, - trimming_method, "~", trimming_value, - "*", - "1", stream_name, - "2", data.to_json, - "3", @server_uuid, - "4", message_uuid - ) - end - - private - - def publish_redis - @publish_redis ||= @redis_config.new_client - end - - def trimming_method - @trimming_strategy == :maxlen ? "MAXLEN" : "MINID" - end - - def trimming_value - @trimming_strategy == :maxlen ? "10000" : ((Time.now.to_f - 5 * 60) * 1000).to_i - end - - def get_redis_version - service_redis = @redis_config.new_client - version = service_redis.call("INFO").match(/redis_version:([[:graph:]]+)/)[1] - - Gem::Version.create(version) - - rescue RedisClient::Error => e - puts "FATAL: Couldn't connect to Redis - all broadcasts will be limited to the current server." - puts e.backtrace.join("\n") - REDIS_MIN_VERSION_SUPPORTED - - ensure - service_redis.close - end - - def error_backoff_intervals - @error_backoff_intervals ||= Enumerator.new do |y| - y << 0.2 << 0.5 << 1 << 2 << 5 - loop { y << 10 } - end - end - - def poll - unless Fiber.scheduler - Fiber.set_scheduler(Rage::FiberScheduler.new) - end - - Iodine.on_state(:start_shutdown) do - @stopping = true - end - - Fiber.schedule do - read_redis = @redis_config.new_client - last_id = (Time.now.to_f * 1000).to_i - last_message_uuid = nil - - loop do - data = read_redis.blocking_call(5, "XREAD", "COUNT", "100", "BLOCK", "5000", "STREAMS", @redis_stream, last_id) - - if data - data[@redis_stream].each do |id, (_, stream_name, _, serialized_data, _, broadcaster_uuid, _, message_uuid)| - if broadcaster_uuid != @server_uuid && message_uuid != last_message_uuid - @broadcaster.broadcast(stream_name, JSON.parse(serialized_data)) - end - - last_id = id - last_message_uuid = message_uuid - end - end - - rescue RedisClient::Error => e - Rage.logger.error("Subscriber error: #{e.message} (#{e.class})") - sleep error_backoff_intervals.next - rescue => e - @stopping ? break : raise(e) - else - error_backoff_intervals.rewind - end - end - end -end diff --git a/lib/rage/pubsub/pubsub.rb b/lib/rage/pubsub/pubsub.rb deleted file mode 100644 index c6b801a3..00000000 --- a/lib/rage/pubsub/pubsub.rb +++ /dev/null @@ -1,6 +0,0 @@ -module Rage::PubSub - module Adapters - autoload :Base, "rage/pubsub/adapters/base" - autoload :Redis, "rage/pubsub/adapters/redis" - end -end diff --git a/lib/rage/sse/application.rb b/lib/rage/sse/application.rb index 41c057de..3dcc1123 100644 --- a/lib/rage/sse/application.rb +++ b/lib/rage/sse/application.rb @@ -11,8 +11,6 @@ def initialize(stream) elsif stream.is_a?(Proc) @streamer = create_proc_streamer :proc - elsif stream.is_a?(Rage::SSE::Stream) - :stream else :object end @@ -22,8 +20,6 @@ def on_open(connection) case @type when :enum, :proc @streamer.resume(connection) - when :stream - connection.subscribe("sse:#{@stream.id}") # TODO: hash? # TODO: broadcast right away? when :object connection.write(Rage::SSE.__serialize(@stream)) connection.close diff --git a/lib/rage/sse/sse.rb b/lib/rage/sse/sse.rb index dc7f52f9..900605e0 100644 --- a/lib/rage/sse/sse.rb +++ b/lib/rage/sse/sse.rb @@ -23,34 +23,8 @@ def self.__serialize(data) "data: #{data.to_json}\n\n" end end - - # @private - def self.__adapter=(adapter) - @__adapter = adapter - end - - # @private - def self.__adapter - @__adapter - end - - # TODO: telemetry - def self.broadcast(stream, data) - InternalBroadcast.broadcast(stream, data) - __adapter&.publish(stream, data) - end - - # @private - module InternalBroadcast - def self.broadcast(stream, data) - Iodine.publish("sse:#{stream}", Rage::SSE.__serialize(data)) - end - end end require_relative "application" require_relative "connection_proxy" require_relative "message" -require_relative "stream" - -# Rage::SSE.__adapter = Rage::PubSub::Adapters::Redis.new("rage:sse:messages", Rage::SSE::InternalBroadcast, {}) diff --git a/lib/rage/sse/stream.rb b/lib/rage/sse/stream.rb deleted file mode 100644 index 37c1a14b..00000000 --- a/lib/rage/sse/stream.rb +++ /dev/null @@ -1,11 +0,0 @@ -# frozen_string_literal: true - -class Rage::SSE::Stream - attr_reader :id - - # TODO: close - def initialize(id) - # TODO: composite keys - @id = id - end -end From 871bb91c05390b88dd2b48699068d50833c6003d Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Sat, 28 Feb 2026 16:01:27 +0000 Subject: [PATCH 06/23] Ensure string data with raw connections --- lib/rage/sse/connection_proxy.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/rage/sse/connection_proxy.rb b/lib/rage/sse/connection_proxy.rb index 0f78b6e7..72f70f26 100644 --- a/lib/rage/sse/connection_proxy.rb +++ b/lib/rage/sse/connection_proxy.rb @@ -7,7 +7,7 @@ def initialize(connection) def write(data) raise IOError, "closed stream" unless @connection.open? - @connection.write(data) + @connection.write(data.to_s) end alias_method :<<, :write From 6e36f7274c1710dea155503723e8ade65ca98e7a Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Sat, 28 Feb 2026 17:36:46 +0000 Subject: [PATCH 07/23] Correctly handle response status codes --- lib/rage/controller/api.rb | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/rage/controller/api.rb b/lib/rage/controller/api.rb index ba1b28d0..081514af 100644 --- a/lib/rage/controller/api.rb +++ b/lib/rage/controller/api.rb @@ -493,7 +493,7 @@ def session # render plain: "hello world", status: 201 # @note `render` doesn't terminate execution of the action, so if you want to exit an action after rendering, you need to do something like `render(...) and return`. def render(json: nil, plain: nil, sse: nil, status: nil) - raise "Render was called multiple times in this action" if @__rendered + raise "Render was called multiple times in this action." if @__rendered @__rendered = true if json || plain @@ -516,7 +516,12 @@ def render(json: nil, plain: nil, sse: nil, status: nil) end if sse - raise "already rendered" unless @__body.empty? + raise ArgumentError, "Cannot render both a standard body and an SSE stream." unless @__body.empty? + + if status + return if @__status == 204 + raise ArgumentError, "SSE responses only support 200 and 204 statuses." if @__status != 200 + end unless @__env["rack.upgrade?"] == :sse @__status = 406 @@ -525,7 +530,7 @@ def render(json: nil, plain: nil, sse: nil, status: nil) end @__env["rack.upgrade"] = Rage::SSE::Application.new(sse) - @__status = 0 # TODO render 204 + @__status = 0 @__headers["content-type"] = "text/event-stream" end end From 4b68ce6782b846609d10549d0e9d502c6546e752 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Mon, 2 Mar 2026 16:59:04 +0000 Subject: [PATCH 08/23] Treat `retry` field as integer --- lib/rage/sse/message.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/rage/sse/message.rb b/lib/rage/sse/message.rb index 4168a5bc..c05ab47c 100644 --- a/lib/rage/sse/message.rb +++ b/lib/rage/sse/message.rb @@ -20,6 +20,6 @@ def to_s "data: #{data}\n" end - "#{data_entry}#{"id: #{id}\n" if id}#{"event: #{event}\n" if event}#{"retry: #{self.retry}\n" if self.retry}\n" + "#{data_entry}#{"id: #{id}\n" if id}#{"event: #{event}\n" if event}#{"retry: #{self.retry.to_i}\n" if self.retry && self.retry.to_i > 0}\n" end end From c0df2e587361108bd5cf931c9bf557d658994cf0 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Mon, 2 Mar 2026 16:59:20 +0000 Subject: [PATCH 09/23] Add message tests --- spec/sse/message_spec.rb | 118 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 spec/sse/message_spec.rb diff --git a/spec/sse/message_spec.rb b/spec/sse/message_spec.rb new file mode 100644 index 00000000..e2854a97 --- /dev/null +++ b/spec/sse/message_spec.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +RSpec.describe Rage::SSE::Message do + describe "#to_s" do + context "with data only" do + it "formats simple string data" do + message = described_class.new(data: "hello") + expect(message.to_s).to eq("data: hello\n\n") + end + + it "formats multiline string data" do + message = described_class.new(data: "line1\nline2\nline3") + expect(message.to_s).to eq("data: line1\ndata: line2\ndata: line3\n\n") + end + + it "formats object data as JSON" do + message = described_class.new(data: { name: "test", count: 42 }) + expect(message.to_s).to eq("data: {\"name\":\"test\",\"count\":42}\n\n") + end + + it "formats array data as JSON" do + message = described_class.new(data: [1, 2, 3]) + expect(message.to_s).to eq("data: [1,2,3]\n\n") + end + end + + context "with id" do + it "includes the id field" do + message = described_class.new(data: "hello", id: "123") + expect(message.to_s).to eq("data: hello\nid: 123\n\n") + end + + it "excludes id when nil" do + message = described_class.new(data: "hello", id: nil) + expect(message.to_s).to eq("data: hello\n\n") + end + end + + context "with event" do + it "includes the event field" do + message = described_class.new(data: "hello", event: "update") + expect(message.to_s).to eq("data: hello\nevent: update\n\n") + end + + it "excludes event when nil" do + message = described_class.new(data: "hello", event: nil) + expect(message.to_s).to eq("data: hello\n\n") + end + end + + context "with retry" do + it "includes the retry field for positive values" do + message = described_class.new(data: "hello", retry: 3000) + expect(message.to_s).to eq("data: hello\nretry: 3000\n\n") + end + + it "excludes retry when zero" do + message = described_class.new(data: "hello", retry: 0) + expect(message.to_s).to eq("data: hello\n\n") + end + + it "excludes retry when negative" do + message = described_class.new(data: "hello", retry: -1000) + expect(message.to_s).to eq("data: hello\n\n") + end + + it "excludes retry when nil" do + message = described_class.new(data: "hello", retry: nil) + expect(message.to_s).to eq("data: hello\n\n") + end + + it "converts float retry to integer" do + message = described_class.new(data: "hello", retry: 2500.7) + expect(message.to_s).to eq("data: hello\nretry: 2500\n\n") + end + end + + context "with all fields" do + it "includes all fields in the correct order" do + message = described_class.new(data: "hello", id: "456", event: "message", retry: 5000) + expect(message.to_s).to eq("data: hello\nid: 456\nevent: message\nretry: 5000\n\n") + end + + it "handles multiline data with all fields" do + message = described_class.new(data: "line1\nline2", id: "789", event: "multi", retry: 1000) + expect(message.to_s).to eq("data: line1\ndata: line2\nid: 789\nevent: multi\nretry: 1000\n\n") + end + + it "handles JSON data with all fields" do + message = described_class.new(data: { status: "ok" }, id: "1", event: "status", retry: 2000) + expect(message.to_s).to eq("data: {\"status\":\"ok\"}\nid: 1\nevent: status\nretry: 2000\n\n") + end + end + end + + describe "attributes" do + it "supports keyword initialization" do + message = described_class.new(id: "1", event: "test", retry: 1000, data: "hello") + expect(message.id).to eq("1") + expect(message.event).to eq("test") + expect(message.retry).to eq(1000) + expect(message.data).to eq("hello") + end + + it "allows attribute assignment" do + message = described_class.new + message.id = "2" + message.event = "update" + message.retry = 500 + message.data = "world" + + expect(message.id).to eq("2") + expect(message.event).to eq("update") + expect(message.retry).to eq(500) + expect(message.data).to eq("world") + end + end +end From c31d42a24741f381ba15a2acf18932c61f294def Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Mon, 2 Mar 2026 18:26:07 +0000 Subject: [PATCH 10/23] Add SSE tests --- spec/controller/api/sse_spec.rb | 166 ++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 spec/controller/api/sse_spec.rb diff --git a/spec/controller/api/sse_spec.rb b/spec/controller/api/sse_spec.rb new file mode 100644 index 00000000..757d71b3 --- /dev/null +++ b/spec/controller/api/sse_spec.rb @@ -0,0 +1,166 @@ +# frozen_string_literal: true + +module ControllerApiSSESpec + class TestControllerSSE < RageController::API + def index + render sse: "hello world" + end + end + + class TestControllerSSEAnd204 < RageController::API + def index + render sse: "hello world", status: 204 + end + end + + class TestControllerSSEAnd401 < RageController::API + def index + render sse: "hello world", status: 401 + end + end + + class TestControllerSSEAndNoContent < RageController::API + def index + render sse: "hello world", status: :no_content + end + end + + class TestControllerTwoRenders < RageController::API + def index + render plain: "hello world" + render sse: "hello world" + end + end + + class TestControllerSSEAndPlain < RageController::API + def index + render sse: "hello world", plain: "hello world" + end + end + + class TestControllerSSEAndCustomContentType < RageController::API + def index + headers["content-type"] = "text/plain" + render sse: "hello world" + end + end +end + +RSpec.describe RageController::API do + let(:env) { { "rack.upgrade?" => :sse } } + + subject { run_action(klass, :index, env:) } + + context "with SSE" do + let(:klass) { ControllerApiSSESpec::TestControllerSSE } + + it "returns an SSE response" do + expect(subject).to match([0, { "content-type" => "text/event-stream" }, []]) + end + + it "upgrades the request" do + subject + expect(env["rack.upgrade"]).to be_a(Rage::SSE::Application) + end + end + + context "with 204 response status" do + let(:klass) { ControllerApiSSESpec::TestControllerSSEAnd204 } + + it "returns an SSE response" do + status, headers, body = subject + + expect(status).to eq(204) + expect(headers["content-type"]).not_to include("text/event-stream") + expect(body).to be_empty + end + + it "doesn't upgrade the request" do + subject + expect(env["rack.upgrade"]).to be_nil + end + end + + context "with 401 response status" do + let(:klass) { ControllerApiSSESpec::TestControllerSSEAnd401 } + + it "raises an error" do + expect { subject }.to raise_error(/SSE responses only support 200 and 204 statuses/) + end + + it "doesn't upgrade the request" do + subject rescue nil + expect(env["rack.upgrade"]).to be_nil + end + end + + context "with :no_content response status" do + let(:klass) { ControllerApiSSESpec::TestControllerSSEAndNoContent } + + it "returns an SSE response" do + status, headers, body = subject + + expect(status).to eq(204) + expect(headers["content-type"]).not_to include("text/event-stream") + expect(body).to be_empty + end + + it "doesn't upgrade the request" do + subject + expect(env["rack.upgrade"]).to be_nil + end + end + + context "with two renders" do + let(:klass) { ControllerApiSSESpec::TestControllerTwoRenders } + + it "raises an error" do + expect { subject }.to raise_error(/Render was called multiple times/) + end + + it "doesn't upgrade the request" do + subject rescue nil + expect(env["rack.upgrade"]).to be_nil + end + end + + context "with double render" do + let(:klass) { ControllerApiSSESpec::TestControllerSSEAndPlain } + + it "raises an error" do + expect { subject }.to raise_error(/Cannot render both a standard body and an SSE stream/) + end + + it "doesn't upgrade the request" do + subject rescue nil + expect(env["rack.upgrade"]).to be_nil + end + end + + context "with custom content type" do + let(:klass) { ControllerApiSSESpec::TestControllerSSEAndCustomContentType } + + it "ensures correct content type" do + _, headers, _ = subject + expect(headers["content-type"]).to eq("text/event-stream") + end + end + + context "with a non-SSE request" do + let(:klass) { ControllerApiSSESpec::TestControllerSSE } + let(:env) { {} } + + it "returns an error" do + status, headers, body = subject + + expect(status).to eq(406) + expect(headers["content-type"]).not_to include("text/event-stream") + expect(body[0]).to match(/Expected an SSE connection/) + end + + it "doesn't upgrade the request" do + subject + expect(env["rack.upgrade"]).to be_nil + end + end +end From b4dce2a1acc3da8a1c9baac850cb1942224ec480 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Mon, 2 Mar 2026 18:56:49 +0000 Subject: [PATCH 11/23] Update error message --- spec/controller/api/after_actions_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/controller/api/after_actions_spec.rb b/spec/controller/api/after_actions_spec.rb index a94b66b8..9df3c65a 100644 --- a/spec/controller/api/after_actions_spec.rb +++ b/spec/controller/api/after_actions_spec.rb @@ -294,7 +294,7 @@ def index let(:klass) { ControllerApiAfterActionsSpec::TestController9 } it "correctly runs after actions" do - expect { run_action(klass, :index) }.to raise_error("Render was called multiple times in this action") + expect { run_action(klass, :index) }.to raise_error(/Render was called multiple times in this action/) end end From be35c92ad9692ee90d239f3f0199e93ca9f12ca1 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 3 Mar 2026 20:19:19 +0000 Subject: [PATCH 12/23] Add integration tests --- spec/integration/sse_spec.rb | 96 +++++++++++++++++++ .../app/controllers/sse_controller.rb | 33 +++++++ spec/integration/test_app/config/routes.rb | 5 + 3 files changed, 134 insertions(+) create mode 100644 spec/integration/sse_spec.rb create mode 100644 spec/integration/test_app/app/controllers/sse_controller.rb diff --git a/spec/integration/sse_spec.rb b/spec/integration/sse_spec.rb new file mode 100644 index 00000000..822cfd77 --- /dev/null +++ b/spec/integration/sse_spec.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +require "http" + +RSpec.describe "SSE" do + before :all do + skip("skipping end-to-end tests") unless ENV["ENABLE_EXTERNAL_TESTS"] == "true" + end + + before :all do + launch_server + end + + after :all do + stop_server + end + + describe "headers" do + it "returns correct content-type" do + response = HTTP.headers(accept: "text/event-stream").get("http://localhost:3000/sse/object") + expect(response.headers["content-type"]).to eq("text/event-stream") + end + + it "returns 200 status" do + response = HTTP.headers(accept: "text/event-stream").get("http://localhost:3000/sse/object") + expect(response.code).to eq(200) + end + + context "with non-SSE content type" do + it "returns 406 status" do + response = HTTP.get("http://localhost:3000/sse/object") + expect(response.code).to eq(406) + end + end + end + + describe "object mode" do + it "correctly serializes objects" do + response = HTTP.headers(accept: "text/event-stream").persistent("http://localhost:3000").get("/sse/object") + + data = response.to_s + expect(data.delete_prefix!("data: ")).not_to be_nil + expect(data.chomp!("\n\n")).not_to be_nil + + expect(JSON.parse(data)).to eq({ "status" => "ok", "count" => 42 }) + end + end + + describe "stream mode" do + it "correctly streams responses" do + response = HTTP.headers(accept: "text/event-stream").persistent("http://localhost:3000").get("/sse/stream") + + chunks = response.to_s.split("\n\n") + expect(chunks.size).to eq(3) + + expect(chunks[0]).to eq("data: first") + expect(chunks[1]).to eq("data: second\nid: 2\nevent: update") + expect(JSON.parse(chunks[2].delete_prefix("data: "))).to eq({ "data" => "third" }) + end + + it "doesn't buffer responses" do + response = HTTP.headers(accept: "text/event-stream").persistent("http://localhost:3000").get("/sse/stream") + + chunks_arrive_timestamps = response.body.filter_map do |chunk| + Time.now.to_f unless chunk.empty? + end + + expect(chunks_arrive_timestamps.size).to eq(3) + + chunks_arrive_timestamps.each_cons(2) do |timestamp_a, timestamp_b| + expect(timestamp_b - timestamp_a).to be > 0.08 + end + end + end + + describe "raw mode" do + it "correctly streams responses" do + response = HTTP.headers(accept: "text/event-stream").persistent("http://localhost:3000").get("/sse/proc") + + chunks = response.to_s.split("\n\n") + expect(chunks.size).to eq(2) + + expect(chunks[0]).to eq("data: hello") + expect(chunks[1]).to eq("data: world") + end + end + + describe "POST request" do + it "correctly processes request" do + response = HTTP.headers(accept: "text/event-stream").persistent("http://localhost:3000").post("/sse/object") + + expect(response.code).to eq(200) + expect(response.to_s).to match(/"count":42/) + end + end +end diff --git a/spec/integration/test_app/app/controllers/sse_controller.rb b/spec/integration/test_app/app/controllers/sse_controller.rb new file mode 100644 index 00000000..0a51ce06 --- /dev/null +++ b/spec/integration/test_app/app/controllers/sse_controller.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +class SseController < RageController::API + def object + render sse: { status: "ok", count: 42 } + end + + def stream + stream = Enumerator.new do |y| + y << "first" + sleep 0.1 + + y << Rage::SSE.message("second", id: "2", event: "update") + sleep 0.1 + + y << nil + sleep 0.1 + + y << { data: "third" } + sleep 0.1 + end + + render sse: stream + end + + def proc + render sse: ->(conn) { + conn.write("data: hello\n\n") + conn.write(Rage::SSE.message("world")) + conn.close + } + end +end diff --git a/spec/integration/test_app/config/routes.rb b/spec/integration/test_app/config/routes.rb index 7743d177..4a48809d 100644 --- a/spec/integration/test_app/config/routes.rb +++ b/spec/integration/test_app/config/routes.rb @@ -32,6 +32,11 @@ post "deferred/create_file", to: "deferred#create" + get "sse/object", to: "sse#object" + post "sse/object", to: "sse#object" + get "sse/proc", to: "sse#proc" + get "sse/stream", to: "sse#stream" + mount ->(_) { [200, {}, ""] }, at: "/admin" namespace :api do From 6fe24e92ebe9b299ff6a38b8d946de4b3841e817 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Wed, 4 Mar 2026 17:24:43 +0000 Subject: [PATCH 13/23] Let the framework choose the upgrade flow --- lib/rage/cable/cable.rb | 3 ++- lib/rage/controller/api.rb | 7 +------ lib/rage/request.rb | 4 ---- rage.gemspec | 2 +- spec/controller/api/sse_spec.rb | 18 ++++++++---------- spec/integration/sse_spec.rb | 16 ++++++++-------- 6 files changed, 20 insertions(+), 30 deletions(-) diff --git a/lib/rage/cable/cable.rb b/lib/rage/cable/cable.rb index e552343f..54531cbb 100644 --- a/lib/rage/cable/cable.rb +++ b/lib/rage/cable/cable.rb @@ -44,7 +44,8 @@ def self.application application = ->(env) do Rage::Telemetry.tracer.span_cable_websocket_handshake(env:) do - if env["rack.upgrade?"] == :websocket + if env["HTTP_UPGRADE"] == "websocket" || env["HTTP_UPGRADE"]&.downcase == "websocket" + env["rack.upgrade?"] = :websocket env["rack.upgrade"] = handler accept_response else diff --git a/lib/rage/controller/api.rb b/lib/rage/controller/api.rb index 081514af..c889180f 100644 --- a/lib/rage/controller/api.rb +++ b/lib/rage/controller/api.rb @@ -523,12 +523,7 @@ def render(json: nil, plain: nil, sse: nil, status: nil) raise ArgumentError, "SSE responses only support 200 and 204 statuses." if @__status != 200 end - unless @__env["rack.upgrade?"] == :sse - @__status = 406 - @__body << "Bad Request: Expected an SSE connection" - return - end - + @__env["rack.upgrade?"] = :sse @__env["rack.upgrade"] = Rage::SSE::Application.new(sse) @__status = 0 @__headers["content-type"] = "text/event-stream" diff --git a/lib/rage/request.rb b/lib/rage/request.rb index c9934f3e..13aa273a 100644 --- a/lib/rage/request.rb +++ b/lib/rage/request.rb @@ -220,10 +220,6 @@ def route_uri_pattern end end - def sse? - @env["rack.upgrade?"] == :sse - end - private def rack_request diff --git a/rage.gemspec b/rage.gemspec index ea1eaf15..c596a530 100644 --- a/rage.gemspec +++ b/rage.gemspec @@ -29,7 +29,7 @@ Gem::Specification.new do |spec| spec.add_dependency "thor", "~> 1.0" spec.add_dependency "rack", "< 4" - spec.add_dependency "rage-iodine", "~> 5.0" + spec.add_dependency "rage-iodine", "~> 5.1" spec.add_dependency "zeitwerk", "~> 2.6" spec.add_dependency "rack-test", "~> 2.1" spec.add_dependency "rake", ">= 12.0" diff --git a/spec/controller/api/sse_spec.rb b/spec/controller/api/sse_spec.rb index 757d71b3..10904742 100644 --- a/spec/controller/api/sse_spec.rb +++ b/spec/controller/api/sse_spec.rb @@ -47,7 +47,7 @@ def index end RSpec.describe RageController::API do - let(:env) { { "rack.upgrade?" => :sse } } + let(:env) { {} } subject { run_action(klass, :index, env:) } @@ -60,6 +60,7 @@ def index it "upgrades the request" do subject + expect(env["rack.upgrade?"]).to eq(:sse) expect(env["rack.upgrade"]).to be_a(Rage::SSE::Application) end end @@ -148,19 +149,16 @@ def index context "with a non-SSE request" do let(:klass) { ControllerApiSSESpec::TestControllerSSE } - let(:env) { {} } + let(:env) { { "HTTP_ACCEPT" => "application/json" } } - it "returns an error" do - status, headers, body = subject - - expect(status).to eq(406) - expect(headers["content-type"]).not_to include("text/event-stream") - expect(body[0]).to match(/Expected an SSE connection/) + it "returns an SSE response" do + expect(subject).to match([0, { "content-type" => "text/event-stream" }, []]) end - it "doesn't upgrade the request" do + it "upgrades the request" do subject - expect(env["rack.upgrade"]).to be_nil + expect(env["rack.upgrade?"]).to eq(:sse) + expect(env["rack.upgrade"]).to be_a(Rage::SSE::Application) end end end diff --git a/spec/integration/sse_spec.rb b/spec/integration/sse_spec.rb index 822cfd77..d2160bea 100644 --- a/spec/integration/sse_spec.rb +++ b/spec/integration/sse_spec.rb @@ -27,16 +27,16 @@ end context "with non-SSE content type" do - it "returns 406 status" do - response = HTTP.get("http://localhost:3000/sse/object") - expect(response.code).to eq(406) + it "returns 200 status" do + response = HTTP.headers(accept: "application/json").get("http://localhost:3000/sse/object") + expect(response.code).to eq(200) end end end describe "object mode" do it "correctly serializes objects" do - response = HTTP.headers(accept: "text/event-stream").persistent("http://localhost:3000").get("/sse/object") + response = HTTP.persistent("http://localhost:3000").get("/sse/object") data = response.to_s expect(data.delete_prefix!("data: ")).not_to be_nil @@ -48,7 +48,7 @@ describe "stream mode" do it "correctly streams responses" do - response = HTTP.headers(accept: "text/event-stream").persistent("http://localhost:3000").get("/sse/stream") + response = HTTP.persistent("http://localhost:3000").get("/sse/stream") chunks = response.to_s.split("\n\n") expect(chunks.size).to eq(3) @@ -59,7 +59,7 @@ end it "doesn't buffer responses" do - response = HTTP.headers(accept: "text/event-stream").persistent("http://localhost:3000").get("/sse/stream") + response = HTTP.persistent("http://localhost:3000").get("/sse/stream") chunks_arrive_timestamps = response.body.filter_map do |chunk| Time.now.to_f unless chunk.empty? @@ -75,7 +75,7 @@ describe "raw mode" do it "correctly streams responses" do - response = HTTP.headers(accept: "text/event-stream").persistent("http://localhost:3000").get("/sse/proc") + response = HTTP.persistent("http://localhost:3000").get("/sse/proc") chunks = response.to_s.split("\n\n") expect(chunks.size).to eq(2) @@ -87,7 +87,7 @@ describe "POST request" do it "correctly processes request" do - response = HTTP.headers(accept: "text/event-stream").persistent("http://localhost:3000").post("/sse/object") + response = HTTP.persistent("http://localhost:3000").post("/sse/object") expect(response.code).to eq(200) expect(response.to_s).to match(/"count":42/) From ed17daa9d6a0bc9c92fe31453dd8dd0e331362dd Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Wed, 4 Mar 2026 17:53:34 +0000 Subject: [PATCH 14/23] Add a static file test --- spec/integration/file_server_spec.rb | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/spec/integration/file_server_spec.rb b/spec/integration/file_server_spec.rb index 12ee9273..f7fcb6a4 100644 --- a/spec/integration/file_server_spec.rb +++ b/spec/integration/file_server_spec.rb @@ -128,5 +128,23 @@ expect(subject.code).to eq(404) end end + + context "with shadowing an upgrade endpoint" do + let(:url) { "http://localhost:3000/sse/stream" } + + before do + FileUtils.mkdir_p("spec/integration/test_app/public/sse") + File.write("spec/integration/test_app/public/sse/stream", "static stream file") + end + + after do + FileUtils.rm_r("spec/integration/test_app/public/sse") + end + + it "returns correct response" do + expect(subject.code).to eq(200) + expect(subject.to_s).to eq("static stream file") + end + end end end From e75cdd905f3c198fe74e867136bf9028efbe5464 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 10 Mar 2026 16:54:34 +0000 Subject: [PATCH 15/23] Schedule SSE fibers from root --- lib/rage/sse/application.rb | 53 +++++++++++----------- lib/rage/telemetry/spans/dispatch_fiber.rb | 2 +- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/lib/rage/sse/application.rb b/lib/rage/sse/application.rb index 3dcc1123..49cf332a 100644 --- a/lib/rage/sse/application.rb +++ b/lib/rage/sse/application.rb @@ -5,50 +5,51 @@ class Rage::SSE::Application def initialize(stream) @stream = stream - @type = if stream.is_a?(Enumerator) - @streamer = create_enum_streamer - :enum - elsif stream.is_a?(Proc) - @streamer = create_proc_streamer - :proc + @type = if @stream.is_a?(Enumerator) + :stream + elsif @stream.is_a?(Proc) + :raw else :object end + + @log_tags, @log_context = Fiber[:__rage_logger_tags], Fiber[:__rage_logger_context] end def on_open(connection) - case @type - when :enum, :proc - @streamer.resume(connection) - when :object + @type == :object ? send_data(connection) : start_stream(connection) + end + + private + + def send_data(connection) + Rage::Telemetry.tracer.span_sse_stream_process(connection:) do connection.write(Rage::SSE.__serialize(@stream)) connection.close end end - private - - def create_enum_streamer + def start_stream(connection) Fiber.schedule do - connection = Fiber.yield - - @stream.each do |event| - break if !connection.open? - connection.write(Rage::SSE.__serialize(event)) if event + Fiber[:__rage_logger_tags], Fiber[:__rage_logger_context] = @log_tags, @log_context + Rage::Telemetry.tracer.span_sse_stream_process(connection:) do + @type == :stream ? start_formatted_stream(connection) : start_raw_stream(connection) end rescue => e Rage.logger.error("SSE stream failed with exception: #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}") - ensure - connection.close end end - def create_proc_streamer - Fiber.schedule do - connection = Fiber.yield - @stream.call(Rage::SSE::ConnectionProxy.new(connection)) - rescue => e - Rage.logger.error("SSE stream failed with exception: #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}") + def start_formatted_stream(connection) + @stream.each do |event| + break if !connection.open? + connection.write(Rage::SSE.__serialize(event)) if event end + ensure + connection.close + end + + def start_raw_stream(connection) + @stream.call(Rage::SSE::ConnectionProxy.new(connection)) end end diff --git a/lib/rage/telemetry/spans/dispatch_fiber.rb b/lib/rage/telemetry/spans/dispatch_fiber.rb index aa468c78..baa9a849 100644 --- a/lib/rage/telemetry/spans/dispatch_fiber.rb +++ b/lib/rage/telemetry/spans/dispatch_fiber.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true ## -# The **core.fiber.dispatch** span tracks the scheduling and processing of system-level fibers created by the framework to process requests and deferred tasks. +# The **core.fiber.dispatch** span tracks the scheduling and processing of system-level fibers created by the framework to process requests, deferred tasks, or streams. # # This span is started when a system fiber begins processing and ends when the fiber has completed processing. # See {handle handle} for the list of arguments passed to handler methods. From 7fd0df9a8b31eea5728691698837b9a0ffb3f196 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 10 Mar 2026 16:58:32 +0000 Subject: [PATCH 16/23] Add `sse.stream.process` span --- .../telemetry/spans/process_sse_stream.rb | 50 +++++++++++++++++++ lib/rage/telemetry/telemetry.rb | 1 + spec/telemetry/spans_spec.rb | 44 ++++++++++++++++ 3 files changed, 95 insertions(+) create mode 100644 lib/rage/telemetry/spans/process_sse_stream.rb diff --git a/lib/rage/telemetry/spans/process_sse_stream.rb b/lib/rage/telemetry/spans/process_sse_stream.rb new file mode 100644 index 00000000..3b4f7b22 --- /dev/null +++ b/lib/rage/telemetry/spans/process_sse_stream.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +## +# The **sse.stream.process** span wraps the processing of an SSE stream. +# +# This span starts when a connection is opened and ends when the stream is finished. +# See {handle handle} for the list of arguments passed to handler methods. +# +# @see Rage::Telemetry::Handler Rage::Telemetry::Handler +# +class Rage::Telemetry::Spans::ProcessSSEStream + class << self + # @private + def id + "sse.stream.process" + end + + # @private + def span_parameters + %w[connection:] + end + + # @private + def handler_arguments + { + name: '"SSE.process"', + env: "connection.env" + } + end + + # @!parse [ruby] + # # @param id ["sse.stream.process"] ID of the span + # # @param name ["SSE.process"] human-readable name of the operation + # # @param env [Hash] the Rack environment associated with the connection + # # @yieldreturn [Rage::Telemetry::SpanResult] + # # + # # @example + # # class MyTelemetryHandler < Rage::Telemetry::Handler + # # handle "sse.stream.process", with: :my_handler + # # + # # def my_handler(id:, name:, env:) + # # yield + # # end + # # end + # # @note Rage automatically detects which parameters your handler method accepts and only passes those parameters. + # # You can omit any of the parameters described here. + # def handle(id:, name:, env:) + # end + end +end diff --git a/lib/rage/telemetry/telemetry.rb b/lib/rage/telemetry/telemetry.rb index 4b69ec2a..94ad1482 100644 --- a/lib/rage/telemetry/telemetry.rb +++ b/lib/rage/telemetry/telemetry.rb @@ -81,6 +81,7 @@ def self.__setup # | `deferred.task.process` | {ProcessDeferredTask} | Wraps the processing of deferred tasks | # | `events.event.publish` | {PublishEvent} | Wraps the publishing of events via {Rage::Events Rage::Events} | # | `events.subscriber.process` | {ProcessEventSubscriber} | Wraps the processing of events by subscribers | + # | `sse.stream.process` | {ProcessSSEStream} | Wraps the processing of an SSE stream | # module Spans end diff --git a/spec/telemetry/spans_spec.rb b/spec/telemetry/spans_spec.rb index bfceacee..bd0bac54 100644 --- a/spec/telemetry/spans_spec.rb +++ b/spec/telemetry/spans_spec.rb @@ -425,4 +425,48 @@ def appear Rage::Cable.application.call(env) end end + + describe described_class::ProcessSSEStream do + before do + allow(Fiber).to receive(:schedule).and_yield + end + + let(:connection) { double(env: { "rack.upgrade?" => :sse }, write: nil, close: nil, open?: true) } + + context "with enumerator" do + it "passes correct arguments" do + expect(verifier).to receive(:call).with({ + id: "sse.stream.process", + name: "SSE.process", + env: connection.env + }) + + Rage::SSE::Application.new([].each).on_open(connection) + end + end + + context "with proc" do + it "passes correct arguments" do + expect(verifier).to receive(:call).with({ + id: "sse.stream.process", + name: "SSE.process", + env: connection.env + }) + + Rage::SSE::Application.new(proc {}).on_open(connection) + end + end + + context "with object" do + it "passes correct arguments" do + expect(verifier).to receive(:call).with({ + id: "sse.stream.process", + name: "SSE.process", + env: connection.env + }) + + Rage::SSE::Application.new({}).on_open(connection) + end + end + end end From 390ce414b06935f99365f48ea45b9d5091cc21d0 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 10 Mar 2026 17:14:30 +0000 Subject: [PATCH 17/23] Update SSE response code and content type --- lib/rage/controller/api.rb | 4 ++-- spec/controller/api/sse_spec.rb | 6 +++--- spec/integration/sse_spec.rb | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/rage/controller/api.rb b/lib/rage/controller/api.rb index c889180f..5888a8aa 100644 --- a/lib/rage/controller/api.rb +++ b/lib/rage/controller/api.rb @@ -525,8 +525,8 @@ def render(json: nil, plain: nil, sse: nil, status: nil) @__env["rack.upgrade?"] = :sse @__env["rack.upgrade"] = Rage::SSE::Application.new(sse) - @__status = 0 - @__headers["content-type"] = "text/event-stream" + @__status = 200 + @__headers["content-type"] = "text/event-stream; charset=utf-8" end end diff --git a/spec/controller/api/sse_spec.rb b/spec/controller/api/sse_spec.rb index 10904742..9a78e80a 100644 --- a/spec/controller/api/sse_spec.rb +++ b/spec/controller/api/sse_spec.rb @@ -55,7 +55,7 @@ def index let(:klass) { ControllerApiSSESpec::TestControllerSSE } it "returns an SSE response" do - expect(subject).to match([0, { "content-type" => "text/event-stream" }, []]) + expect(subject).to match([200, { "content-type" => "text/event-stream; charset=utf-8" }, []]) end it "upgrades the request" do @@ -143,7 +143,7 @@ def index it "ensures correct content type" do _, headers, _ = subject - expect(headers["content-type"]).to eq("text/event-stream") + expect(headers["content-type"]).to eq("text/event-stream; charset=utf-8") end end @@ -152,7 +152,7 @@ def index let(:env) { { "HTTP_ACCEPT" => "application/json" } } it "returns an SSE response" do - expect(subject).to match([0, { "content-type" => "text/event-stream" }, []]) + expect(subject).to match([200, { "content-type" => "text/event-stream; charset=utf-8" }, []]) end it "upgrades the request" do diff --git a/spec/integration/sse_spec.rb b/spec/integration/sse_spec.rb index d2160bea..3fe32f87 100644 --- a/spec/integration/sse_spec.rb +++ b/spec/integration/sse_spec.rb @@ -18,7 +18,7 @@ describe "headers" do it "returns correct content-type" do response = HTTP.headers(accept: "text/event-stream").get("http://localhost:3000/sse/object") - expect(response.headers["content-type"]).to eq("text/event-stream") + expect(response.headers["content-type"]).to eq("text/event-stream; charset=utf-8") end it "returns 200 status" do From 74fa81721c05950b52bf2a83cd5bfa03b9e48093 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 10 Mar 2026 18:04:41 +0000 Subject: [PATCH 18/23] Load telemetry handlers dynamically --- lib/rage/configuration.rb | 2 +- lib/rage/telemetry/telemetry.rb | 7 ++++--- lib/rage/telemetry/tracer.rb | 12 +++++------- spec/integration/telemetry_spec.rb | 6 ++++++ spec/integration/test_app/config/application.rb | 15 +++++++++++++++ spec/telemetry/spans_spec.rb | 3 +-- spec/telemetry/telemetry_spec.rb | 9 +++++---- spec/telemetry/tracer_spec.rb | 6 +++--- 8 files changed, 40 insertions(+), 20 deletions(-) diff --git a/lib/rage/configuration.rb b/lib/rage/configuration.rb index f275fe64..a10eb126 100644 --- a/lib/rage/configuration.rb +++ b/lib/rage/configuration.rb @@ -998,7 +998,7 @@ def __finalize middleware.insert_before(::Rack::Events, Rage::BodyFinalizer) end - Rage::Telemetry.__setup if @telemetry + Rage::Telemetry.__setup(@telemetry.handlers_map) if @telemetry end end diff --git a/lib/rage/telemetry/telemetry.rb b/lib/rage/telemetry/telemetry.rb index 94ad1482..6962cf3d 100644 --- a/lib/rage/telemetry/telemetry.rb +++ b/lib/rage/telemetry/telemetry.rb @@ -46,12 +46,13 @@ def self.__registry # @private def self.tracer - @tracer ||= Tracer.new(__registry, Rage.config.telemetry.handlers_map) + @tracer ||= Tracer.new(__registry) end # @private - def self.__setup - tracer.setup + # @param handlers_map [Hash{String => Array}] + def self.__setup(handlers_map) + tracer.setup(handlers_map) end ## diff --git a/lib/rage/telemetry/tracer.rb b/lib/rage/telemetry/tracer.rb index 1f2f392e..fda6bca4 100644 --- a/lib/rage/telemetry/tracer.rb +++ b/lib/rage/telemetry/tracer.rb @@ -6,20 +6,18 @@ class Rage::Telemetry::Tracer private_constant :DEFAULT_SPAN_RESULT # @param spans_registry [Hash{String => Rage::Telemetry::Spans}] - # @param handlers_map [Hash{String => Array}] - def initialize(spans_registry, handlers_map) + def initialize(spans_registry) @spans_registry = spans_registry - @handlers_map = handlers_map - - @all_handler_refs = handlers_map.values.flatten @spans_registry.each do |_, span| setup_noop(span) end end - def setup - @handlers_map.each do |span_id, handler_refs| + def setup(handlers_map) + @all_handler_refs = handlers_map.values.flatten + + handlers_map.each do |span_id, handler_refs| setup_tracer(@spans_registry[span_id], handler_refs) end end diff --git a/spec/integration/telemetry_spec.rb b/spec/integration/telemetry_spec.rb index 0f6d502d..0c021dac 100644 --- a/spec/integration/telemetry_spec.rb +++ b/spec/integration/telemetry_spec.rb @@ -41,4 +41,10 @@ expect(telemetry_logs.size).to eq(1) end + + it "correctly processes handlers from different configure calls" do + response = HTTP.persistent("http://localhost:3000").get("/sse/object") + expect(response.code).to eq(200) + expect(logs).to include(/starting test sse stream/) + end end diff --git a/spec/integration/test_app/config/application.rb b/spec/integration/test_app/config/application.rb index f9f95a02..4665d7d7 100644 --- a/spec/integration/test_app/config/application.rb +++ b/spec/integration/test_app/config/application.rb @@ -62,4 +62,19 @@ def call(env) end end +class TestSseObserver < Rage::Telemetry::Handler + handle "sse.stream.process", with: :monitor_stream + + def self.monitor_stream + Rage.logger.info "starting test sse stream" + yield + end +end + +Rage.configure do + if ENV["ENABLE_TELEMETRY"] + config.telemetry.use TestSseObserver + end +end + require "rage/setup" diff --git a/spec/telemetry/spans_spec.rb b/spec/telemetry/spans_spec.rb index bd0bac54..e421d3a7 100644 --- a/spec/telemetry/spans_spec.rb +++ b/spec/telemetry/spans_spec.rb @@ -16,9 +16,8 @@ def self.test_span(**) end before do - allow(Rage.config.telemetry).to receive(:handlers_map).and_return(handlers_map) allow(handler).to receive(:verifier).and_return(verifier) - subject.setup + subject.setup(handlers_map) end around do |example| diff --git a/spec/telemetry/telemetry_spec.rb b/spec/telemetry/telemetry_spec.rb index 80fef0db..f78e6cd6 100644 --- a/spec/telemetry/telemetry_spec.rb +++ b/spec/telemetry/telemetry_spec.rb @@ -39,19 +39,20 @@ it "correctly initializes Tracer" do allow(described_class).to receive(:__registry).and_return(:test_span_registry) - allow(Rage.config.telemetry).to receive(:handlers_map).and_return(:test_handlers_map) - expect(described_class::Tracer).to receive(:new).with(:test_span_registry, :test_handlers_map) + expect(described_class::Tracer).to receive(:new).with(:test_span_registry) described_class.tracer end end describe ".__setup" do + let(:handlers_map) { double } + it "calls Tracer#setup" do allow(described_class).to receive(:tracer).and_return(double) - expect(described_class.tracer).to receive(:setup) + expect(described_class.tracer).to receive(:setup).with(handlers_map) - described_class.__setup + described_class.__setup(handlers_map) end end diff --git a/spec/telemetry/tracer_spec.rb b/spec/telemetry/tracer_spec.rb index cceeffd6..f56d9981 100644 --- a/spec/telemetry/tracer_spec.rb +++ b/spec/telemetry/tracer_spec.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true RSpec.describe Rage::Telemetry::Tracer do - subject { described_class.new(spans_registry, handlers_map) } + subject { described_class.new(spans_registry) } let(:handler_arguments) { {} } let(:spans_registry) do @@ -66,7 +66,7 @@ def self.test_instrumentation before do allow(handler).to receive(:verifier).and_return(verifier) - subject.setup + subject.setup(handlers_map) end it "correctly builds tracer" do @@ -368,7 +368,7 @@ def self.test_instrumentation_3(id:, data:) before do allow(handler_1).to receive(:verifier).and_return(verifier) allow(handler_2).to receive(:verifier).and_return(verifier) - subject.setup + subject.setup(handlers_map) end it "calls handlers in the correct order" do From 99167924af875ff127c1ef36d2e3d5ae467e01fc Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 10 Mar 2026 18:05:26 +0000 Subject: [PATCH 19/23] Add SSE test --- spec/sse/sse_spec.rb | 74 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 spec/sse/sse_spec.rb diff --git a/spec/sse/sse_spec.rb b/spec/sse/sse_spec.rb new file mode 100644 index 00000000..e00ddab0 --- /dev/null +++ b/spec/sse/sse_spec.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +RSpec.describe Rage::SSE do + describe ".message" do + it "creates a Message with all fields" do + message = described_class.message("hello", id: "1", event: "update", retry: 3000) + + expect(message).to be_a(Rage::SSE::Message) + expect(message.data).to eq("hello") + expect(message.id).to eq("1") + expect(message.event).to eq("update") + expect(message.retry).to eq(3000) + end + + it "creates a Message with only data" do + message = described_class.message("hello") + + expect(message.data).to eq("hello") + expect(message.id).to be_nil + expect(message.event).to be_nil + expect(message.retry).to be_nil + end + end + + describe ".__serialize" do + context "with string data" do + it "wraps in data field" do + result = described_class.__serialize("hello") + expect(result).to eq("data: hello\n\n") + end + + it "does not split multiline strings" do + result = described_class.__serialize("line1\nline2") + expect(result).to eq("data: line1\nline2\n\n") + end + end + + context "with Message data" do + it "calls to_s on the message" do + message = Rage::SSE::Message.new(data: "hello", id: "1") + result = described_class.__serialize(message) + expect(result).to eq("data: hello\nid: 1\n\n") + end + + it "handles multiline data in messages" do + message = Rage::SSE::Message.new(data: "line1\nline2", event: "multi") + result = described_class.__serialize(message) + expect(result).to eq("data: line1\ndata: line2\nevent: multi\n\n") + end + end + + context "with object data" do + it "serializes hash as JSON" do + result = described_class.__serialize({ name: "test", count: 42 }) + expect(result).to eq("data: {\"name\":\"test\",\"count\":42}\n\n") + end + + it "serializes array as JSON" do + result = described_class.__serialize([1, 2, 3]) + expect(result).to eq("data: [1,2,3]\n\n") + end + + it "serializes numbers" do + result = described_class.__serialize(42) + expect(result).to eq("data: 42\n\n") + end + + it "serializes booleans" do + expect(described_class.__serialize(true)).to eq("data: true\n\n") + expect(described_class.__serialize(false)).to eq("data: false\n\n") + end + end + end +end From 58dbfe75a9523c0a50bce1baa6ae75d568378658 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Tue, 10 Mar 2026 18:14:15 +0000 Subject: [PATCH 20/23] Update runtime dependency --- rage.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rage.gemspec b/rage.gemspec index c596a530..33fb805a 100644 --- a/rage.gemspec +++ b/rage.gemspec @@ -29,7 +29,7 @@ Gem::Specification.new do |spec| spec.add_dependency "thor", "~> 1.0" spec.add_dependency "rack", "< 4" - spec.add_dependency "rage-iodine", "~> 5.1" + spec.add_dependency "rage-iodine", "~> 5.2" spec.add_dependency "zeitwerk", "~> 2.6" spec.add_dependency "rack-test", "~> 2.1" spec.add_dependency "rake", ">= 12.0" From ff91a35f53e16b450e8cc0cee2c8c75a1666c448 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Wed, 11 Mar 2026 17:01:07 +0000 Subject: [PATCH 21/23] Pass type to the `sse.stream.process` span --- lib/rage/sse/application.rb | 10 +++++----- lib/rage/telemetry/spans/process_sse_stream.rb | 10 ++++++---- spec/telemetry/spans_spec.rb | 9 ++++++--- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/lib/rage/sse/application.rb b/lib/rage/sse/application.rb index 49cf332a..970e1c26 100644 --- a/lib/rage/sse/application.rb +++ b/lib/rage/sse/application.rb @@ -8,22 +8,22 @@ def initialize(stream) @type = if @stream.is_a?(Enumerator) :stream elsif @stream.is_a?(Proc) - :raw + :manual else - :object + :single end @log_tags, @log_context = Fiber[:__rage_logger_tags], Fiber[:__rage_logger_context] end def on_open(connection) - @type == :object ? send_data(connection) : start_stream(connection) + @type == :single ? send_data(connection) : start_stream(connection) end private def send_data(connection) - Rage::Telemetry.tracer.span_sse_stream_process(connection:) do + Rage::Telemetry.tracer.span_sse_stream_process(connection:, type: @type) do connection.write(Rage::SSE.__serialize(@stream)) connection.close end @@ -32,7 +32,7 @@ def send_data(connection) def start_stream(connection) Fiber.schedule do Fiber[:__rage_logger_tags], Fiber[:__rage_logger_context] = @log_tags, @log_context - Rage::Telemetry.tracer.span_sse_stream_process(connection:) do + Rage::Telemetry.tracer.span_sse_stream_process(connection:, type: @type) do @type == :stream ? start_formatted_stream(connection) : start_raw_stream(connection) end rescue => e diff --git a/lib/rage/telemetry/spans/process_sse_stream.rb b/lib/rage/telemetry/spans/process_sse_stream.rb index 3b4f7b22..1c306aea 100644 --- a/lib/rage/telemetry/spans/process_sse_stream.rb +++ b/lib/rage/telemetry/spans/process_sse_stream.rb @@ -17,14 +17,15 @@ def id # @private def span_parameters - %w[connection:] + %w[connection: type:] end # @private def handler_arguments { name: '"SSE.process"', - env: "connection.env" + env: "connection.env", + type: "type" } end @@ -32,19 +33,20 @@ def handler_arguments # # @param id ["sse.stream.process"] ID of the span # # @param name ["SSE.process"] human-readable name of the operation # # @param env [Hash] the Rack environment associated with the connection + # # @param type [:stream, :single, :manual] the type of the SSE stream # # @yieldreturn [Rage::Telemetry::SpanResult] # # # # @example # # class MyTelemetryHandler < Rage::Telemetry::Handler # # handle "sse.stream.process", with: :my_handler # # - # # def my_handler(id:, name:, env:) + # # def my_handler(id:, name:, env:, type:) # # yield # # end # # end # # @note Rage automatically detects which parameters your handler method accepts and only passes those parameters. # # You can omit any of the parameters described here. - # def handle(id:, name:, env:) + # def handle(id:, name:, env:, type:) # end end end diff --git a/spec/telemetry/spans_spec.rb b/spec/telemetry/spans_spec.rb index e421d3a7..12982288 100644 --- a/spec/telemetry/spans_spec.rb +++ b/spec/telemetry/spans_spec.rb @@ -437,7 +437,8 @@ def appear expect(verifier).to receive(:call).with({ id: "sse.stream.process", name: "SSE.process", - env: connection.env + env: connection.env, + type: :stream }) Rage::SSE::Application.new([].each).on_open(connection) @@ -449,7 +450,8 @@ def appear expect(verifier).to receive(:call).with({ id: "sse.stream.process", name: "SSE.process", - env: connection.env + env: connection.env, + type: :manual }) Rage::SSE::Application.new(proc {}).on_open(connection) @@ -461,7 +463,8 @@ def appear expect(verifier).to receive(:call).with({ id: "sse.stream.process", name: "SSE.process", - env: connection.env + env: connection.env, + type: :single }) Rage::SSE::Application.new({}).on_open(connection) From 28b39ff7e326c7996ae623d0482eade34fd08c4b Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Thu, 12 Mar 2026 13:22:19 +0000 Subject: [PATCH 22/23] Update docs --- lib/rage/controller/api.rb | 16 +++++++++---- lib/rage/sse/application.rb | 3 +++ lib/rage/sse/connection_proxy.rb | 26 ++++++++++++++++++++++ lib/rage/sse/message.rb | 10 ++++----- lib/rage/sse/sse.rb | 15 +++++++------ lib/rage/telemetry/spans/dispatch_fiber.rb | 2 +- 6 files changed, 55 insertions(+), 17 deletions(-) diff --git a/lib/rage/controller/api.rb b/lib/rage/controller/api.rb index 5888a8aa..c357ae1b 100644 --- a/lib/rage/controller/api.rb +++ b/lib/rage/controller/api.rb @@ -484,13 +484,21 @@ def session # # @param json [String, Object] send a json response to the client; objects like arrays will be serialized automatically # @param plain [String] send a text response to the client + # @param sse [#each, Proc, #to_json] send an SSE response to the client # @param status [Integer, Symbol] set a response status - # @example + # @example Render a JSON object # render json: { hello: "world" } - # @example + # @example Set a response status # render status: :ok - # @example - # render plain: "hello world", status: 201 + # @example Render an SSE stream + # render sse: "hello world".each_char + # @example Render a one-off SSE update + # render sse: { message: "hello world" } + # @example Write to an SSE connection manually + # render sse: ->(connection) do + # connection.write("data: Hello, World!\n\n") + # connection.close + # end # @note `render` doesn't terminate execution of the action, so if you want to exit an action after rendering, you need to do something like `render(...) and return`. def render(json: nil, plain: nil, sse: nil, status: nil) raise "Render was called multiple times in this action." if @__rendered diff --git a/lib/rage/sse/application.rb b/lib/rage/sse/application.rb index 970e1c26..19b3e555 100644 --- a/lib/rage/sse/application.rb +++ b/lib/rage/sse/application.rb @@ -1,6 +1,9 @@ # frozen_string_literal: true # @private +# This class is responsible for handling the lifecycle of an SSE connection. +# It determines the type of SSE stream and manages the data flow. +# class Rage::SSE::Application def initialize(stream) @stream = stream diff --git a/lib/rage/sse/connection_proxy.rb b/lib/rage/sse/connection_proxy.rb index 72f70f26..a7286cd5 100644 --- a/lib/rage/sse/connection_proxy.rb +++ b/lib/rage/sse/connection_proxy.rb @@ -1,10 +1,26 @@ # frozen_string_literal: true +## +# This class acts as a proxy for the underlying SSE connection, providing a simplified and safe interface for interacting with the stream. +# It ensures that operations are only performed on an open connection and abstracts away the direct connection handling. +# +# Example: +# +# ```ruby +# render sse: ->(connection) do +# # `connection` is an instance of Rage::SSE::ConnectionProxy +# end +# ``` +# class Rage::SSE::ConnectionProxy + # @private def initialize(connection) @connection = connection end + # Writes data to the SSE stream. + # @param data [#to_s] + # @raise [IOError] if the stream is already closed. def write(data) raise IOError, "closed stream" unless @connection.open? @connection.write(data.to_s) @@ -12,23 +28,33 @@ def write(data) alias_method :<<, :write + # Closes the SSE stream. def close @connection.close end alias_method :close_write, :close + # Checks if the SSE stream is closed. + # @return [Boolean] def closed? !@connection.open? end + # A no-op method to maintain interface compatibility. + # Flushing is handled by the underlying connection. + # @raise [IOError] if the stream is already closed. def flush raise IOError, "closed stream" unless @connection.open? end + # A no-op method to maintain interface compatibility. + # Reading from an SSE stream is not supported on the server side. def read(...) end + # A no-op method to maintain interface compatibility. + # Reading from an SSE stream is not supported on the server side. def close_read end end diff --git a/lib/rage/sse/message.rb b/lib/rage/sse/message.rb index c05ab47c..27287d96 100644 --- a/lib/rage/sse/message.rb +++ b/lib/rage/sse/message.rb @@ -1,15 +1,15 @@ # frozen_string_literal: true -# A class representing an SSE event. Use it to specify the `id`, `event`, and `retry` fields in an SSE. +# Represents a single Server-Sent Event. This class allows you to define the `id`, `event`, and `retry` fields for an SSE message. # # @!attribute id -# @return [String] The `id` field of the SSE event. +# @return [String] The `id` field for the SSE event. This can be used to track messages. # @!attribute event -# @return [String] The `event` field of the SSE event. +# @return [String] The `event` field for the SSE event. This can be used to define custom event types. # @!attribute retry -# @return [Integer] The `retry` field of the SSE event, in milliseconds. +# @return [Integer] The `retry` field for the SSE event, in milliseconds. This value is a suggestion for the client about how long to wait before reconnecting. # @!attribute data -# @return [String, #to_json] The `data` field of the SSE event. If it's an object, it will be serialized to JSON. +# @return [String, #to_json] The `data` field for the SSE event. If the object provided is not a string, it will be serialized to JSON. Rage::SSE::Message = Struct.new(:id, :event, :retry, :data, keyword_init: true) do def to_s data_entry = if !data.is_a?(String) diff --git a/lib/rage/sse/sse.rb b/lib/rage/sse/sse.rb index 900605e0..0a27b471 100644 --- a/lib/rage/sse/sse.rb +++ b/lib/rage/sse/sse.rb @@ -1,19 +1,20 @@ # frozen_string_literal: true module Rage::SSE - # Factory method to create SSE events. + # A factory method for creating Server-Sent Events. # - # @param data [String, #to_json] The `data` field of the SSE event. If it's an object, it will be serialized to JSON. - # @param id [String, nil] The `id` field of the SSE event. - # @param event [String, nil] The `event` field of the SSE event. - # @param retry [Integer, nil] The `retry` field of the SSE event, in milliseconds. - # @return [Message] The created SSE event. + # @param data [String, #to_json] The `data` field for the SSE event. If the object provided is not a string, it will be serialized to JSON. + # @param id [String, nil] The `id` field for the SSE event. This can be used to track messages. + # @param event [String, nil] The `event` field for the SSE event. This can be used to define custom event types. + # @param retry [Integer, nil] The `retry` field for the SSE event, in milliseconds. This value is used to instruct the client how long to wait before attempting to reconnect. + # @return [Message] The formatted SSE event. # @example - # Rage::SSE.message(current_user.profile, id: current_user.id) + # render sse: Rage::SSE.message(current_user.profile, id: current_user.id) def self.message(data, id: nil, event: nil, retry: nil) Message.new(data:, id:, event:, retry:) end + # @private def self.__serialize(data) if data.is_a?(String) "data: #{data}\n\n" diff --git a/lib/rage/telemetry/spans/dispatch_fiber.rb b/lib/rage/telemetry/spans/dispatch_fiber.rb index baa9a849..c5a5c959 100644 --- a/lib/rage/telemetry/spans/dispatch_fiber.rb +++ b/lib/rage/telemetry/spans/dispatch_fiber.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true ## -# The **core.fiber.dispatch** span tracks the scheduling and processing of system-level fibers created by the framework to process requests, deferred tasks, or streams. +# The **core.fiber.dispatch** span tracks the scheduling and processing of system-level fibers created by the framework to process requests, deferred tasks, or SSE streams. # # This span is started when a system fiber begins processing and ends when the fiber has completed processing. # See {handle handle} for the list of arguments passed to handler methods. From ab9032dbea5c2f498f965ddffc2281e71cd9d850 Mon Sep 17 00:00:00 2001 From: Roman Samoilov <2270393+rsamoilov@users.noreply.github.com> Date: Thu, 12 Mar 2026 13:22:44 +0000 Subject: [PATCH 23/23] Ensure multiline messages are correctly serialized --- lib/rage/sse/sse.rb | 2 +- spec/sse/sse_spec.rb | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/rage/sse/sse.rb b/lib/rage/sse/sse.rb index 0a27b471..017450d3 100644 --- a/lib/rage/sse/sse.rb +++ b/lib/rage/sse/sse.rb @@ -17,7 +17,7 @@ def self.message(data, id: nil, event: nil, retry: nil) # @private def self.__serialize(data) if data.is_a?(String) - "data: #{data}\n\n" + data.include?("\n") ? Message.new(data:).to_s : "data: #{data}\n\n" elsif data.is_a?(Message) data.to_s else diff --git a/spec/sse/sse_spec.rb b/spec/sse/sse_spec.rb index e00ddab0..106b5cb4 100644 --- a/spec/sse/sse_spec.rb +++ b/spec/sse/sse_spec.rb @@ -29,9 +29,14 @@ expect(result).to eq("data: hello\n\n") end - it "does not split multiline strings" do + it "handles multiline data in messages" do result = described_class.__serialize("line1\nline2") - expect(result).to eq("data: line1\nline2\n\n") + expect(result).to eq("data: line1\ndata: line2\n\n") + end + + it "ignores escaped new line characters" do + result = described_class.__serialize({ message: "hel\nlo" }.to_json) + expect(result).to eq("data: {\"message\":\"hel\\nlo\"}\n\n") end end