diff --git a/lib/rage-rb.rb b/lib/rage-rb.rb index b711f9f9..d2047e98 100644 --- a/lib/rage-rb.rb +++ b/lib/rage-rb.rb @@ -40,6 +40,12 @@ def self.events Rage::Events end + # Shorthand to access {Rage::SSE Rage::SSE}. + # @return [Rage::SSE] + def self.sse + Rage::SSE + end + # Configure routes for the Rage application. # @return [Rage::Router::DSL::Handler] # @example @@ -185,6 +191,7 @@ module ActiveRecord autoload :OpenAPI, "rage/openapi/openapi" autoload :Deferred, "rage/deferred/deferred" autoload :Events, "rage/events/events" + autoload :SSE, "rage/sse/sse" end module RageController diff --git a/lib/rage/cable/cable.rb b/lib/rage/cable/cable.rb index 9b74dade..5e66131c 100644 --- a/lib/rage/cable/cable.rb +++ b/lib/rage/cable/cable.rb @@ -44,7 +44,8 @@ def self.application application = ->(env) do Rage::Telemetry.tracer.span_cable_websocket_handshake(env:) do - if env["rack.upgrade?"] == :websocket + if env["HTTP_UPGRADE"] == "websocket" || env["HTTP_UPGRADE"]&.downcase == "websocket" + env["rack.upgrade?"] = :websocket env["rack.upgrade"] = handler accept_response else diff --git a/lib/rage/configuration.rb b/lib/rage/configuration.rb index f275fe64..a10eb126 100644 --- a/lib/rage/configuration.rb +++ b/lib/rage/configuration.rb @@ -998,7 +998,7 @@ def __finalize middleware.insert_before(::Rack::Events, Rage::BodyFinalizer) end - Rage::Telemetry.__setup if @telemetry + Rage::Telemetry.__setup(@telemetry.handlers_map) if @telemetry end end diff --git a/lib/rage/controller/api.rb b/lib/rage/controller/api.rb index 5efa870d..c357ae1b 100644 --- a/lib/rage/controller/api.rb +++ b/lib/rage/controller/api.rb @@ -484,23 +484,31 @@ def session # # @param json [String, Object] send a json response to the client; objects like arrays will be serialized automatically # @param plain [String] send a text response to the client + # @param sse [#each, Proc, #to_json] send an SSE response to the client # @param status [Integer, Symbol] set a response status - # @example + # @example Render a JSON object # render json: { hello: "world" } - # @example + # @example Set a response status # render status: :ok - # @example - # render plain: "hello world", status: 201 + # @example Render an SSE stream + # render sse: "hello world".each_char + # @example Render a one-off SSE update + # render sse: { message: "hello world" } + # @example Write to an SSE connection manually + # render sse: ->(connection) do + # connection.write("data: Hello, World!\n\n") + # connection.close + # end # @note `render` doesn't terminate execution of the action, so if you want to exit an action after rendering, you need to do something like `render(...) and return`. - def render(json: nil, plain: nil, status: nil) - raise "Render was called multiple times in this action" if @__rendered + def render(json: nil, plain: nil, sse: nil, status: nil) + raise "Render was called multiple times in this action." if @__rendered @__rendered = true if json || plain @__body << if json json.is_a?(String) ? json : json.to_json else - headers["content-type"] = "text/plain; charset=utf-8" + @__headers["content-type"] = "text/plain; charset=utf-8" plain.to_s end @@ -514,6 +522,20 @@ def render(json: nil, plain: nil, status: nil) status end end + + if sse + raise ArgumentError, "Cannot render both a standard body and an SSE stream." unless @__body.empty? + + if status + return if @__status == 204 + raise ArgumentError, "SSE responses only support 200 and 204 statuses." if @__status != 200 + end + + @__env["rack.upgrade?"] = :sse + @__env["rack.upgrade"] = Rage::SSE::Application.new(sse) + @__status = 200 + @__headers["content-type"] = "text/event-stream; charset=utf-8" + end end # Send a response with no body. diff --git a/lib/rage/fiber_scheduler.rb b/lib/rage/fiber_scheduler.rb index 0c74c773..5bec50c0 100644 --- a/lib/rage/fiber_scheduler.rb +++ b/lib/rage/fiber_scheduler.rb @@ -97,7 +97,7 @@ def block(_blocker, timeout = nil) unless fulfilled fulfilled = true ::Iodine.defer { ::Iodine.unsubscribe(channel) } - f.resume + f.resume if f.alive? end end diff --git a/lib/rage/sse/application.rb b/lib/rage/sse/application.rb new file mode 100644 index 00000000..19b3e555 --- /dev/null +++ b/lib/rage/sse/application.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +# @private +# This class is responsible for handling the lifecycle of an SSE connection. +# It determines the type of SSE stream and manages the data flow. +# +class Rage::SSE::Application + def initialize(stream) + @stream = stream + + @type = if @stream.is_a?(Enumerator) + :stream + elsif @stream.is_a?(Proc) + :manual + else + :single + end + + @log_tags, @log_context = Fiber[:__rage_logger_tags], Fiber[:__rage_logger_context] + end + + def on_open(connection) + @type == :single ? send_data(connection) : start_stream(connection) + end + + private + + def send_data(connection) + Rage::Telemetry.tracer.span_sse_stream_process(connection:, type: @type) do + connection.write(Rage::SSE.__serialize(@stream)) + connection.close + end + end + + def start_stream(connection) + Fiber.schedule do + Fiber[:__rage_logger_tags], Fiber[:__rage_logger_context] = @log_tags, @log_context + Rage::Telemetry.tracer.span_sse_stream_process(connection:, type: @type) do + @type == :stream ? start_formatted_stream(connection) : start_raw_stream(connection) + end + rescue => e + Rage.logger.error("SSE stream failed with exception: #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}") + end + end + + def start_formatted_stream(connection) + @stream.each do |event| + break if !connection.open? + connection.write(Rage::SSE.__serialize(event)) if event + end + ensure + connection.close + end + + def start_raw_stream(connection) + @stream.call(Rage::SSE::ConnectionProxy.new(connection)) + end +end diff --git a/lib/rage/sse/connection_proxy.rb b/lib/rage/sse/connection_proxy.rb new file mode 100644 index 00000000..a7286cd5 --- /dev/null +++ b/lib/rage/sse/connection_proxy.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +## +# This class acts as a proxy for the underlying SSE connection, providing a simplified and safe interface for interacting with the stream. +# It ensures that operations are only performed on an open connection and abstracts away the direct connection handling. +# +# Example: +# +# ```ruby +# render sse: ->(connection) do +# # `connection` is an instance of Rage::SSE::ConnectionProxy +# end +# ``` +# +class Rage::SSE::ConnectionProxy + # @private + def initialize(connection) + @connection = connection + end + + # Writes data to the SSE stream. + # @param data [#to_s] + # @raise [IOError] if the stream is already closed. + def write(data) + raise IOError, "closed stream" unless @connection.open? + @connection.write(data.to_s) + end + + alias_method :<<, :write + + # Closes the SSE stream. + def close + @connection.close + end + + alias_method :close_write, :close + + # Checks if the SSE stream is closed. + # @return [Boolean] + def closed? + !@connection.open? + end + + # A no-op method to maintain interface compatibility. + # Flushing is handled by the underlying connection. + # @raise [IOError] if the stream is already closed. + def flush + raise IOError, "closed stream" unless @connection.open? + end + + # A no-op method to maintain interface compatibility. + # Reading from an SSE stream is not supported on the server side. + def read(...) + end + + # A no-op method to maintain interface compatibility. + # Reading from an SSE stream is not supported on the server side. + def close_read + end +end diff --git a/lib/rage/sse/message.rb b/lib/rage/sse/message.rb new file mode 100644 index 00000000..27287d96 --- /dev/null +++ b/lib/rage/sse/message.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +# Represents a single Server-Sent Event. This class allows you to define the `id`, `event`, and `retry` fields for an SSE message. +# +# @!attribute id +# @return [String] The `id` field for the SSE event. This can be used to track messages. +# @!attribute event +# @return [String] The `event` field for the SSE event. This can be used to define custom event types. +# @!attribute retry +# @return [Integer] The `retry` field for the SSE event, in milliseconds. This value is a suggestion for the client about how long to wait before reconnecting. +# @!attribute data +# @return [String, #to_json] The `data` field for the SSE event. If the object provided is not a string, it will be serialized to JSON. +Rage::SSE::Message = Struct.new(:id, :event, :retry, :data, keyword_init: true) do + def to_s + data_entry = if !data.is_a?(String) + "data: #{data.to_json}\n" + elsif data.include?("\n") + data.split("\n").map { |d| "data: #{d}\n" }.join + else + "data: #{data}\n" + end + + "#{data_entry}#{"id: #{id}\n" if id}#{"event: #{event}\n" if event}#{"retry: #{self.retry.to_i}\n" if self.retry && self.retry.to_i > 0}\n" + end +end diff --git a/lib/rage/sse/sse.rb b/lib/rage/sse/sse.rb new file mode 100644 index 00000000..017450d3 --- /dev/null +++ b/lib/rage/sse/sse.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +module Rage::SSE + # A factory method for creating Server-Sent Events. + # + # @param data [String, #to_json] The `data` field for the SSE event. If the object provided is not a string, it will be serialized to JSON. + # @param id [String, nil] The `id` field for the SSE event. This can be used to track messages. + # @param event [String, nil] The `event` field for the SSE event. This can be used to define custom event types. + # @param retry [Integer, nil] The `retry` field for the SSE event, in milliseconds. This value is used to instruct the client how long to wait before attempting to reconnect. + # @return [Message] The formatted SSE event. + # @example + # render sse: Rage::SSE.message(current_user.profile, id: current_user.id) + def self.message(data, id: nil, event: nil, retry: nil) + Message.new(data:, id:, event:, retry:) + end + + # @private + def self.__serialize(data) + if data.is_a?(String) + data.include?("\n") ? Message.new(data:).to_s : "data: #{data}\n\n" + elsif data.is_a?(Message) + data.to_s + else + "data: #{data.to_json}\n\n" + end + end +end + +require_relative "application" +require_relative "connection_proxy" +require_relative "message" diff --git a/lib/rage/telemetry/spans/dispatch_fiber.rb b/lib/rage/telemetry/spans/dispatch_fiber.rb index aa468c78..c5a5c959 100644 --- a/lib/rage/telemetry/spans/dispatch_fiber.rb +++ b/lib/rage/telemetry/spans/dispatch_fiber.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true ## -# The **core.fiber.dispatch** span tracks the scheduling and processing of system-level fibers created by the framework to process requests and deferred tasks. +# The **core.fiber.dispatch** span tracks the scheduling and processing of system-level fibers created by the framework to process requests, deferred tasks, or SSE streams. # # This span is started when a system fiber begins processing and ends when the fiber has completed processing. # See {handle handle} for the list of arguments passed to handler methods. diff --git a/lib/rage/telemetry/spans/process_sse_stream.rb b/lib/rage/telemetry/spans/process_sse_stream.rb new file mode 100644 index 00000000..1c306aea --- /dev/null +++ b/lib/rage/telemetry/spans/process_sse_stream.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +## +# The **sse.stream.process** span wraps the processing of an SSE stream. +# +# This span starts when a connection is opened and ends when the stream is finished. +# See {handle handle} for the list of arguments passed to handler methods. +# +# @see Rage::Telemetry::Handler Rage::Telemetry::Handler +# +class Rage::Telemetry::Spans::ProcessSSEStream + class << self + # @private + def id + "sse.stream.process" + end + + # @private + def span_parameters + %w[connection: type:] + end + + # @private + def handler_arguments + { + name: '"SSE.process"', + env: "connection.env", + type: "type" + } + end + + # @!parse [ruby] + # # @param id ["sse.stream.process"] ID of the span + # # @param name ["SSE.process"] human-readable name of the operation + # # @param env [Hash] the Rack environment associated with the connection + # # @param type [:stream, :single, :manual] the type of the SSE stream + # # @yieldreturn [Rage::Telemetry::SpanResult] + # # + # # @example + # # class MyTelemetryHandler < Rage::Telemetry::Handler + # # handle "sse.stream.process", with: :my_handler + # # + # # def my_handler(id:, name:, env:, type:) + # # yield + # # end + # # end + # # @note Rage automatically detects which parameters your handler method accepts and only passes those parameters. + # # You can omit any of the parameters described here. + # def handle(id:, name:, env:, type:) + # end + end +end diff --git a/lib/rage/telemetry/telemetry.rb b/lib/rage/telemetry/telemetry.rb index 4b69ec2a..6962cf3d 100644 --- a/lib/rage/telemetry/telemetry.rb +++ b/lib/rage/telemetry/telemetry.rb @@ -46,12 +46,13 @@ def self.__registry # @private def self.tracer - @tracer ||= Tracer.new(__registry, Rage.config.telemetry.handlers_map) + @tracer ||= Tracer.new(__registry) end # @private - def self.__setup - tracer.setup + # @param handlers_map [Hash{String => Array}] + def self.__setup(handlers_map) + tracer.setup(handlers_map) end ## @@ -81,6 +82,7 @@ def self.__setup # | `deferred.task.process` | {ProcessDeferredTask} | Wraps the processing of deferred tasks | # | `events.event.publish` | {PublishEvent} | Wraps the publishing of events via {Rage::Events Rage::Events} | # | `events.subscriber.process` | {ProcessEventSubscriber} | Wraps the processing of events by subscribers | + # | `sse.stream.process` | {ProcessSSEStream} | Wraps the processing of an SSE stream | # module Spans end diff --git a/lib/rage/telemetry/tracer.rb b/lib/rage/telemetry/tracer.rb index 1f2f392e..fda6bca4 100644 --- a/lib/rage/telemetry/tracer.rb +++ b/lib/rage/telemetry/tracer.rb @@ -6,20 +6,18 @@ class Rage::Telemetry::Tracer private_constant :DEFAULT_SPAN_RESULT # @param spans_registry [Hash{String => Rage::Telemetry::Spans}] - # @param handlers_map [Hash{String => Array}] - def initialize(spans_registry, handlers_map) + def initialize(spans_registry) @spans_registry = spans_registry - @handlers_map = handlers_map - - @all_handler_refs = handlers_map.values.flatten @spans_registry.each do |_, span| setup_noop(span) end end - def setup - @handlers_map.each do |span_id, handler_refs| + def setup(handlers_map) + @all_handler_refs = handlers_map.values.flatten + + handlers_map.each do |span_id, handler_refs| setup_tracer(@spans_registry[span_id], handler_refs) end end diff --git a/rage.gemspec b/rage.gemspec index d29dda12..33fb805a 100644 --- a/rage.gemspec +++ b/rage.gemspec @@ -29,7 +29,7 @@ Gem::Specification.new do |spec| spec.add_dependency "thor", "~> 1.0" spec.add_dependency "rack", "< 4" - spec.add_dependency "rage-iodine", "~> 4.3" + spec.add_dependency "rage-iodine", "~> 5.2" spec.add_dependency "zeitwerk", "~> 2.6" spec.add_dependency "rack-test", "~> 2.1" spec.add_dependency "rake", ">= 12.0" diff --git a/spec/controller/api/after_actions_spec.rb b/spec/controller/api/after_actions_spec.rb index a94b66b8..9df3c65a 100644 --- a/spec/controller/api/after_actions_spec.rb +++ b/spec/controller/api/after_actions_spec.rb @@ -294,7 +294,7 @@ def index let(:klass) { ControllerApiAfterActionsSpec::TestController9 } it "correctly runs after actions" do - expect { run_action(klass, :index) }.to raise_error("Render was called multiple times in this action") + expect { run_action(klass, :index) }.to raise_error(/Render was called multiple times in this action/) end end diff --git a/spec/controller/api/sse_spec.rb b/spec/controller/api/sse_spec.rb new file mode 100644 index 00000000..9a78e80a --- /dev/null +++ b/spec/controller/api/sse_spec.rb @@ -0,0 +1,164 @@ +# frozen_string_literal: true + +module ControllerApiSSESpec + class TestControllerSSE < RageController::API + def index + render sse: "hello world" + end + end + + class TestControllerSSEAnd204 < RageController::API + def index + render sse: "hello world", status: 204 + end + end + + class TestControllerSSEAnd401 < RageController::API + def index + render sse: "hello world", status: 401 + end + end + + class TestControllerSSEAndNoContent < RageController::API + def index + render sse: "hello world", status: :no_content + end + end + + class TestControllerTwoRenders < RageController::API + def index + render plain: "hello world" + render sse: "hello world" + end + end + + class TestControllerSSEAndPlain < RageController::API + def index + render sse: "hello world", plain: "hello world" + end + end + + class TestControllerSSEAndCustomContentType < RageController::API + def index + headers["content-type"] = "text/plain" + render sse: "hello world" + end + end +end + +RSpec.describe RageController::API do + let(:env) { {} } + + subject { run_action(klass, :index, env:) } + + context "with SSE" do + let(:klass) { ControllerApiSSESpec::TestControllerSSE } + + it "returns an SSE response" do + expect(subject).to match([200, { "content-type" => "text/event-stream; charset=utf-8" }, []]) + end + + it "upgrades the request" do + subject + expect(env["rack.upgrade?"]).to eq(:sse) + expect(env["rack.upgrade"]).to be_a(Rage::SSE::Application) + end + end + + context "with 204 response status" do + let(:klass) { ControllerApiSSESpec::TestControllerSSEAnd204 } + + it "returns an SSE response" do + status, headers, body = subject + + expect(status).to eq(204) + expect(headers["content-type"]).not_to include("text/event-stream") + expect(body).to be_empty + end + + it "doesn't upgrade the request" do + subject + expect(env["rack.upgrade"]).to be_nil + end + end + + context "with 401 response status" do + let(:klass) { ControllerApiSSESpec::TestControllerSSEAnd401 } + + it "raises an error" do + expect { subject }.to raise_error(/SSE responses only support 200 and 204 statuses/) + end + + it "doesn't upgrade the request" do + subject rescue nil + expect(env["rack.upgrade"]).to be_nil + end + end + + context "with :no_content response status" do + let(:klass) { ControllerApiSSESpec::TestControllerSSEAndNoContent } + + it "returns an SSE response" do + status, headers, body = subject + + expect(status).to eq(204) + expect(headers["content-type"]).not_to include("text/event-stream") + expect(body).to be_empty + end + + it "doesn't upgrade the request" do + subject + expect(env["rack.upgrade"]).to be_nil + end + end + + context "with two renders" do + let(:klass) { ControllerApiSSESpec::TestControllerTwoRenders } + + it "raises an error" do + expect { subject }.to raise_error(/Render was called multiple times/) + end + + it "doesn't upgrade the request" do + subject rescue nil + expect(env["rack.upgrade"]).to be_nil + end + end + + context "with double render" do + let(:klass) { ControllerApiSSESpec::TestControllerSSEAndPlain } + + it "raises an error" do + expect { subject }.to raise_error(/Cannot render both a standard body and an SSE stream/) + end + + it "doesn't upgrade the request" do + subject rescue nil + expect(env["rack.upgrade"]).to be_nil + end + end + + context "with custom content type" do + let(:klass) { ControllerApiSSESpec::TestControllerSSEAndCustomContentType } + + it "ensures correct content type" do + _, headers, _ = subject + expect(headers["content-type"]).to eq("text/event-stream; charset=utf-8") + end + end + + context "with a non-SSE request" do + let(:klass) { ControllerApiSSESpec::TestControllerSSE } + let(:env) { { "HTTP_ACCEPT" => "application/json" } } + + it "returns an SSE response" do + expect(subject).to match([200, { "content-type" => "text/event-stream; charset=utf-8" }, []]) + end + + it "upgrades the request" do + subject + expect(env["rack.upgrade?"]).to eq(:sse) + expect(env["rack.upgrade"]).to be_a(Rage::SSE::Application) + end + end +end diff --git a/spec/integration/file_server_spec.rb b/spec/integration/file_server_spec.rb index 12ee9273..f7fcb6a4 100644 --- a/spec/integration/file_server_spec.rb +++ b/spec/integration/file_server_spec.rb @@ -128,5 +128,23 @@ expect(subject.code).to eq(404) end end + + context "with shadowing an upgrade endpoint" do + let(:url) { "http://localhost:3000/sse/stream" } + + before do + FileUtils.mkdir_p("spec/integration/test_app/public/sse") + File.write("spec/integration/test_app/public/sse/stream", "static stream file") + end + + after do + FileUtils.rm_r("spec/integration/test_app/public/sse") + end + + it "returns correct response" do + expect(subject.code).to eq(200) + expect(subject.to_s).to eq("static stream file") + end + end end end diff --git a/spec/integration/sse_spec.rb b/spec/integration/sse_spec.rb new file mode 100644 index 00000000..3fe32f87 --- /dev/null +++ b/spec/integration/sse_spec.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +require "http" + +RSpec.describe "SSE" do + before :all do + skip("skipping end-to-end tests") unless ENV["ENABLE_EXTERNAL_TESTS"] == "true" + end + + before :all do + launch_server + end + + after :all do + stop_server + end + + describe "headers" do + it "returns correct content-type" do + response = HTTP.headers(accept: "text/event-stream").get("http://localhost:3000/sse/object") + expect(response.headers["content-type"]).to eq("text/event-stream; charset=utf-8") + end + + it "returns 200 status" do + response = HTTP.headers(accept: "text/event-stream").get("http://localhost:3000/sse/object") + expect(response.code).to eq(200) + end + + context "with non-SSE content type" do + it "returns 200 status" do + response = HTTP.headers(accept: "application/json").get("http://localhost:3000/sse/object") + expect(response.code).to eq(200) + end + end + end + + describe "object mode" do + it "correctly serializes objects" do + response = HTTP.persistent("http://localhost:3000").get("/sse/object") + + data = response.to_s + expect(data.delete_prefix!("data: ")).not_to be_nil + expect(data.chomp!("\n\n")).not_to be_nil + + expect(JSON.parse(data)).to eq({ "status" => "ok", "count" => 42 }) + end + end + + describe "stream mode" do + it "correctly streams responses" do + response = HTTP.persistent("http://localhost:3000").get("/sse/stream") + + chunks = response.to_s.split("\n\n") + expect(chunks.size).to eq(3) + + expect(chunks[0]).to eq("data: first") + expect(chunks[1]).to eq("data: second\nid: 2\nevent: update") + expect(JSON.parse(chunks[2].delete_prefix("data: "))).to eq({ "data" => "third" }) + end + + it "doesn't buffer responses" do + response = HTTP.persistent("http://localhost:3000").get("/sse/stream") + + chunks_arrive_timestamps = response.body.filter_map do |chunk| + Time.now.to_f unless chunk.empty? + end + + expect(chunks_arrive_timestamps.size).to eq(3) + + chunks_arrive_timestamps.each_cons(2) do |timestamp_a, timestamp_b| + expect(timestamp_b - timestamp_a).to be > 0.08 + end + end + end + + describe "raw mode" do + it "correctly streams responses" do + response = HTTP.persistent("http://localhost:3000").get("/sse/proc") + + chunks = response.to_s.split("\n\n") + expect(chunks.size).to eq(2) + + expect(chunks[0]).to eq("data: hello") + expect(chunks[1]).to eq("data: world") + end + end + + describe "POST request" do + it "correctly processes request" do + response = HTTP.persistent("http://localhost:3000").post("/sse/object") + + expect(response.code).to eq(200) + expect(response.to_s).to match(/"count":42/) + end + end +end diff --git a/spec/integration/telemetry_spec.rb b/spec/integration/telemetry_spec.rb index 0f6d502d..0c021dac 100644 --- a/spec/integration/telemetry_spec.rb +++ b/spec/integration/telemetry_spec.rb @@ -41,4 +41,10 @@ expect(telemetry_logs.size).to eq(1) end + + it "correctly processes handlers from different configure calls" do + response = HTTP.persistent("http://localhost:3000").get("/sse/object") + expect(response.code).to eq(200) + expect(logs).to include(/starting test sse stream/) + end end diff --git a/spec/integration/test_app/app/controllers/sse_controller.rb b/spec/integration/test_app/app/controllers/sse_controller.rb new file mode 100644 index 00000000..0a51ce06 --- /dev/null +++ b/spec/integration/test_app/app/controllers/sse_controller.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +class SseController < RageController::API + def object + render sse: { status: "ok", count: 42 } + end + + def stream + stream = Enumerator.new do |y| + y << "first" + sleep 0.1 + + y << Rage::SSE.message("second", id: "2", event: "update") + sleep 0.1 + + y << nil + sleep 0.1 + + y << { data: "third" } + sleep 0.1 + end + + render sse: stream + end + + def proc + render sse: ->(conn) { + conn.write("data: hello\n\n") + conn.write(Rage::SSE.message("world")) + conn.close + } + end +end diff --git a/spec/integration/test_app/config/application.rb b/spec/integration/test_app/config/application.rb index f9f95a02..4665d7d7 100644 --- a/spec/integration/test_app/config/application.rb +++ b/spec/integration/test_app/config/application.rb @@ -62,4 +62,19 @@ def call(env) end end +class TestSseObserver < Rage::Telemetry::Handler + handle "sse.stream.process", with: :monitor_stream + + def self.monitor_stream + Rage.logger.info "starting test sse stream" + yield + end +end + +Rage.configure do + if ENV["ENABLE_TELEMETRY"] + config.telemetry.use TestSseObserver + end +end + require "rage/setup" diff --git a/spec/integration/test_app/config/routes.rb b/spec/integration/test_app/config/routes.rb index 501fb6ac..0d7923ad 100644 --- a/spec/integration/test_app/config/routes.rb +++ b/spec/integration/test_app/config/routes.rb @@ -32,6 +32,11 @@ post "deferred/create_file", to: "deferred#create" + get "sse/object", to: "sse#object" + post "sse/object", to: "sse#object" + get "sse/proc", to: "sse#proc" + get "sse/stream", to: "sse#stream" + mount ->(_) { [200, {}, ""] }, at: "/admin" namespace :api do diff --git a/spec/sse/message_spec.rb b/spec/sse/message_spec.rb new file mode 100644 index 00000000..e2854a97 --- /dev/null +++ b/spec/sse/message_spec.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +RSpec.describe Rage::SSE::Message do + describe "#to_s" do + context "with data only" do + it "formats simple string data" do + message = described_class.new(data: "hello") + expect(message.to_s).to eq("data: hello\n\n") + end + + it "formats multiline string data" do + message = described_class.new(data: "line1\nline2\nline3") + expect(message.to_s).to eq("data: line1\ndata: line2\ndata: line3\n\n") + end + + it "formats object data as JSON" do + message = described_class.new(data: { name: "test", count: 42 }) + expect(message.to_s).to eq("data: {\"name\":\"test\",\"count\":42}\n\n") + end + + it "formats array data as JSON" do + message = described_class.new(data: [1, 2, 3]) + expect(message.to_s).to eq("data: [1,2,3]\n\n") + end + end + + context "with id" do + it "includes the id field" do + message = described_class.new(data: "hello", id: "123") + expect(message.to_s).to eq("data: hello\nid: 123\n\n") + end + + it "excludes id when nil" do + message = described_class.new(data: "hello", id: nil) + expect(message.to_s).to eq("data: hello\n\n") + end + end + + context "with event" do + it "includes the event field" do + message = described_class.new(data: "hello", event: "update") + expect(message.to_s).to eq("data: hello\nevent: update\n\n") + end + + it "excludes event when nil" do + message = described_class.new(data: "hello", event: nil) + expect(message.to_s).to eq("data: hello\n\n") + end + end + + context "with retry" do + it "includes the retry field for positive values" do + message = described_class.new(data: "hello", retry: 3000) + expect(message.to_s).to eq("data: hello\nretry: 3000\n\n") + end + + it "excludes retry when zero" do + message = described_class.new(data: "hello", retry: 0) + expect(message.to_s).to eq("data: hello\n\n") + end + + it "excludes retry when negative" do + message = described_class.new(data: "hello", retry: -1000) + expect(message.to_s).to eq("data: hello\n\n") + end + + it "excludes retry when nil" do + message = described_class.new(data: "hello", retry: nil) + expect(message.to_s).to eq("data: hello\n\n") + end + + it "converts float retry to integer" do + message = described_class.new(data: "hello", retry: 2500.7) + expect(message.to_s).to eq("data: hello\nretry: 2500\n\n") + end + end + + context "with all fields" do + it "includes all fields in the correct order" do + message = described_class.new(data: "hello", id: "456", event: "message", retry: 5000) + expect(message.to_s).to eq("data: hello\nid: 456\nevent: message\nretry: 5000\n\n") + end + + it "handles multiline data with all fields" do + message = described_class.new(data: "line1\nline2", id: "789", event: "multi", retry: 1000) + expect(message.to_s).to eq("data: line1\ndata: line2\nid: 789\nevent: multi\nretry: 1000\n\n") + end + + it "handles JSON data with all fields" do + message = described_class.new(data: { status: "ok" }, id: "1", event: "status", retry: 2000) + expect(message.to_s).to eq("data: {\"status\":\"ok\"}\nid: 1\nevent: status\nretry: 2000\n\n") + end + end + end + + describe "attributes" do + it "supports keyword initialization" do + message = described_class.new(id: "1", event: "test", retry: 1000, data: "hello") + expect(message.id).to eq("1") + expect(message.event).to eq("test") + expect(message.retry).to eq(1000) + expect(message.data).to eq("hello") + end + + it "allows attribute assignment" do + message = described_class.new + message.id = "2" + message.event = "update" + message.retry = 500 + message.data = "world" + + expect(message.id).to eq("2") + expect(message.event).to eq("update") + expect(message.retry).to eq(500) + expect(message.data).to eq("world") + end + end +end diff --git a/spec/sse/sse_spec.rb b/spec/sse/sse_spec.rb new file mode 100644 index 00000000..106b5cb4 --- /dev/null +++ b/spec/sse/sse_spec.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +RSpec.describe Rage::SSE do + describe ".message" do + it "creates a Message with all fields" do + message = described_class.message("hello", id: "1", event: "update", retry: 3000) + + expect(message).to be_a(Rage::SSE::Message) + expect(message.data).to eq("hello") + expect(message.id).to eq("1") + expect(message.event).to eq("update") + expect(message.retry).to eq(3000) + end + + it "creates a Message with only data" do + message = described_class.message("hello") + + expect(message.data).to eq("hello") + expect(message.id).to be_nil + expect(message.event).to be_nil + expect(message.retry).to be_nil + end + end + + describe ".__serialize" do + context "with string data" do + it "wraps in data field" do + result = described_class.__serialize("hello") + expect(result).to eq("data: hello\n\n") + end + + it "handles multiline data in messages" do + result = described_class.__serialize("line1\nline2") + expect(result).to eq("data: line1\ndata: line2\n\n") + end + + it "ignores escaped new line characters" do + result = described_class.__serialize({ message: "hel\nlo" }.to_json) + expect(result).to eq("data: {\"message\":\"hel\\nlo\"}\n\n") + end + end + + context "with Message data" do + it "calls to_s on the message" do + message = Rage::SSE::Message.new(data: "hello", id: "1") + result = described_class.__serialize(message) + expect(result).to eq("data: hello\nid: 1\n\n") + end + + it "handles multiline data in messages" do + message = Rage::SSE::Message.new(data: "line1\nline2", event: "multi") + result = described_class.__serialize(message) + expect(result).to eq("data: line1\ndata: line2\nevent: multi\n\n") + end + end + + context "with object data" do + it "serializes hash as JSON" do + result = described_class.__serialize({ name: "test", count: 42 }) + expect(result).to eq("data: {\"name\":\"test\",\"count\":42}\n\n") + end + + it "serializes array as JSON" do + result = described_class.__serialize([1, 2, 3]) + expect(result).to eq("data: [1,2,3]\n\n") + end + + it "serializes numbers" do + result = described_class.__serialize(42) + expect(result).to eq("data: 42\n\n") + end + + it "serializes booleans" do + expect(described_class.__serialize(true)).to eq("data: true\n\n") + expect(described_class.__serialize(false)).to eq("data: false\n\n") + end + end + end +end diff --git a/spec/telemetry/spans_spec.rb b/spec/telemetry/spans_spec.rb index bfceacee..12982288 100644 --- a/spec/telemetry/spans_spec.rb +++ b/spec/telemetry/spans_spec.rb @@ -16,9 +16,8 @@ def self.test_span(**) end before do - allow(Rage.config.telemetry).to receive(:handlers_map).and_return(handlers_map) allow(handler).to receive(:verifier).and_return(verifier) - subject.setup + subject.setup(handlers_map) end around do |example| @@ -425,4 +424,51 @@ def appear Rage::Cable.application.call(env) end end + + describe described_class::ProcessSSEStream do + before do + allow(Fiber).to receive(:schedule).and_yield + end + + let(:connection) { double(env: { "rack.upgrade?" => :sse }, write: nil, close: nil, open?: true) } + + context "with enumerator" do + it "passes correct arguments" do + expect(verifier).to receive(:call).with({ + id: "sse.stream.process", + name: "SSE.process", + env: connection.env, + type: :stream + }) + + Rage::SSE::Application.new([].each).on_open(connection) + end + end + + context "with proc" do + it "passes correct arguments" do + expect(verifier).to receive(:call).with({ + id: "sse.stream.process", + name: "SSE.process", + env: connection.env, + type: :manual + }) + + Rage::SSE::Application.new(proc {}).on_open(connection) + end + end + + context "with object" do + it "passes correct arguments" do + expect(verifier).to receive(:call).with({ + id: "sse.stream.process", + name: "SSE.process", + env: connection.env, + type: :single + }) + + Rage::SSE::Application.new({}).on_open(connection) + end + end + end end diff --git a/spec/telemetry/telemetry_spec.rb b/spec/telemetry/telemetry_spec.rb index 80fef0db..f78e6cd6 100644 --- a/spec/telemetry/telemetry_spec.rb +++ b/spec/telemetry/telemetry_spec.rb @@ -39,19 +39,20 @@ it "correctly initializes Tracer" do allow(described_class).to receive(:__registry).and_return(:test_span_registry) - allow(Rage.config.telemetry).to receive(:handlers_map).and_return(:test_handlers_map) - expect(described_class::Tracer).to receive(:new).with(:test_span_registry, :test_handlers_map) + expect(described_class::Tracer).to receive(:new).with(:test_span_registry) described_class.tracer end end describe ".__setup" do + let(:handlers_map) { double } + it "calls Tracer#setup" do allow(described_class).to receive(:tracer).and_return(double) - expect(described_class.tracer).to receive(:setup) + expect(described_class.tracer).to receive(:setup).with(handlers_map) - described_class.__setup + described_class.__setup(handlers_map) end end diff --git a/spec/telemetry/tracer_spec.rb b/spec/telemetry/tracer_spec.rb index cceeffd6..f56d9981 100644 --- a/spec/telemetry/tracer_spec.rb +++ b/spec/telemetry/tracer_spec.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true RSpec.describe Rage::Telemetry::Tracer do - subject { described_class.new(spans_registry, handlers_map) } + subject { described_class.new(spans_registry) } let(:handler_arguments) { {} } let(:spans_registry) do @@ -66,7 +66,7 @@ def self.test_instrumentation before do allow(handler).to receive(:verifier).and_return(verifier) - subject.setup + subject.setup(handlers_map) end it "correctly builds tracer" do @@ -368,7 +368,7 @@ def self.test_instrumentation_3(id:, data:) before do allow(handler_1).to receive(:verifier).and_return(verifier) allow(handler_2).to receive(:verifier).and_return(verifier) - subject.setup + subject.setup(handlers_map) end it "calls handlers in the correct order" do