Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/rage-rb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ def self.events
Rage::Events
end

# Shorthand to access {Rage::SSE Rage::SSE}.
# @return [Rage::SSE]
def self.sse
Rage::SSE
end

# Configure routes for the Rage application.
# @return [Rage::Router::DSL::Handler]
# @example
Expand Down Expand Up @@ -185,6 +191,7 @@ module ActiveRecord
autoload :OpenAPI, "rage/openapi/openapi"
autoload :Deferred, "rage/deferred/deferred"
autoload :Events, "rage/events/events"
autoload :SSE, "rage/sse/sse"
end

module RageController
Expand Down
3 changes: 2 additions & 1 deletion lib/rage/cable/cable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def self.application

application = ->(env) do
Rage::Telemetry.tracer.span_cable_websocket_handshake(env:) do
if env["rack.upgrade?"] == :websocket
if env["HTTP_UPGRADE"] == "websocket" || env["HTTP_UPGRADE"]&.downcase == "websocket"
env["rack.upgrade?"] = :websocket
env["rack.upgrade"] = handler
accept_response
else
Expand Down
2 changes: 1 addition & 1 deletion lib/rage/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ def __finalize
middleware.insert_before(::Rack::Events, Rage::BodyFinalizer)
end

Rage::Telemetry.__setup if @telemetry
Rage::Telemetry.__setup(@telemetry.handlers_map) if @telemetry
end
end

Expand Down
36 changes: 29 additions & 7 deletions lib/rage/controller/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -484,23 +484,31 @@ def session
#
# @param json [String, Object] send a json response to the client; objects like arrays will be serialized automatically
# @param plain [String] send a text response to the client
# @param sse [#each, Proc, #to_json] send an SSE response to the client
# @param status [Integer, Symbol] set a response status
# @example
# @example Render a JSON object
# render json: { hello: "world" }
# @example
# @example Set a response status
# render status: :ok
# @example
# render plain: "hello world", status: 201
# @example Render an SSE stream
# render sse: "hello world".each_char
# @example Render a one-off SSE update
# render sse: { message: "hello world" }
# @example Write to an SSE connection manually
# render sse: ->(connection) do
# connection.write("data: Hello, World!\n\n")
# connection.close
# end
# @note `render` doesn't terminate execution of the action, so if you want to exit an action after rendering, you need to do something like `render(...) and return`.
def render(json: nil, plain: nil, status: nil)
raise "Render was called multiple times in this action" if @__rendered
def render(json: nil, plain: nil, sse: nil, status: nil)
raise "Render was called multiple times in this action." if @__rendered
@__rendered = true

if json || plain
@__body << if json
json.is_a?(String) ? json : json.to_json
else
headers["content-type"] = "text/plain; charset=utf-8"
@__headers["content-type"] = "text/plain; charset=utf-8"
plain.to_s
end

Expand All @@ -514,6 +522,20 @@ def render(json: nil, plain: nil, status: nil)
status
end
end

if sse
raise ArgumentError, "Cannot render both a standard body and an SSE stream." unless @__body.empty?

if status
return if @__status == 204
raise ArgumentError, "SSE responses only support 200 and 204 statuses." if @__status != 200
end

@__env["rack.upgrade?"] = :sse
@__env["rack.upgrade"] = Rage::SSE::Application.new(sse)
@__status = 200
@__headers["content-type"] = "text/event-stream; charset=utf-8"
end
end

# Send a response with no body.
Expand Down
2 changes: 1 addition & 1 deletion lib/rage/fiber_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def block(_blocker, timeout = nil)
unless fulfilled
fulfilled = true
::Iodine.defer { ::Iodine.unsubscribe(channel) }
f.resume
f.resume if f.alive?
end
end

Expand Down
58 changes: 58 additions & 0 deletions lib/rage/sse/application.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# frozen_string_literal: true

# @private
# This class is responsible for handling the lifecycle of an SSE connection.
# It determines the type of SSE stream and manages the data flow.
#
class Rage::SSE::Application
def initialize(stream)
@stream = stream

@type = if @stream.is_a?(Enumerator)
:stream
elsif @stream.is_a?(Proc)
:manual
else
:single
end

@log_tags, @log_context = Fiber[:__rage_logger_tags], Fiber[:__rage_logger_context]
end

def on_open(connection)
@type == :single ? send_data(connection) : start_stream(connection)
end

private

def send_data(connection)
Rage::Telemetry.tracer.span_sse_stream_process(connection:, type: @type) do
connection.write(Rage::SSE.__serialize(@stream))
connection.close
end
end

def start_stream(connection)
Fiber.schedule do
Fiber[:__rage_logger_tags], Fiber[:__rage_logger_context] = @log_tags, @log_context
Rage::Telemetry.tracer.span_sse_stream_process(connection:, type: @type) do
@type == :stream ? start_formatted_stream(connection) : start_raw_stream(connection)
end
rescue => e
Rage.logger.error("SSE stream failed with exception: #{e.class} (#{e.message}):\n#{e.backtrace.join("\n")}")
end
end

def start_formatted_stream(connection)
@stream.each do |event|
break if !connection.open?
connection.write(Rage::SSE.__serialize(event)) if event
end
ensure
connection.close
end

def start_raw_stream(connection)
@stream.call(Rage::SSE::ConnectionProxy.new(connection))
end
end
60 changes: 60 additions & 0 deletions lib/rage/sse/connection_proxy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# frozen_string_literal: true

##
# This class acts as a proxy for the underlying SSE connection, providing a simplified and safe interface for interacting with the stream.
# It ensures that operations are only performed on an open connection and abstracts away the direct connection handling.
#
# Example:
#
# ```ruby
# render sse: ->(connection) do
# # `connection` is an instance of Rage::SSE::ConnectionProxy
# end
# ```
#
class Rage::SSE::ConnectionProxy
# @private
def initialize(connection)
@connection = connection
end

# Writes data to the SSE stream.
# @param data [#to_s]
# @raise [IOError] if the stream is already closed.
def write(data)
raise IOError, "closed stream" unless @connection.open?
@connection.write(data.to_s)
end

alias_method :<<, :write

# Closes the SSE stream.
def close
@connection.close
end

alias_method :close_write, :close

# Checks if the SSE stream is closed.
# @return [Boolean]
def closed?
!@connection.open?
end

# A no-op method to maintain interface compatibility.
# Flushing is handled by the underlying connection.
# @raise [IOError] if the stream is already closed.
def flush
raise IOError, "closed stream" unless @connection.open?
end

# A no-op method to maintain interface compatibility.
# Reading from an SSE stream is not supported on the server side.
def read(...)
end

# A no-op method to maintain interface compatibility.
# Reading from an SSE stream is not supported on the server side.
def close_read
end
end
25 changes: 25 additions & 0 deletions lib/rage/sse/message.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

# Represents a single Server-Sent Event. This class allows you to define the `id`, `event`, and `retry` fields for an SSE message.
#
# @!attribute id
# @return [String] The `id` field for the SSE event. This can be used to track messages.
# @!attribute event
# @return [String] The `event` field for the SSE event. This can be used to define custom event types.
# @!attribute retry
# @return [Integer] The `retry` field for the SSE event, in milliseconds. This value is a suggestion for the client about how long to wait before reconnecting.
# @!attribute data
# @return [String, #to_json] The `data` field for the SSE event. If the object provided is not a string, it will be serialized to JSON.
Rage::SSE::Message = Struct.new(:id, :event, :retry, :data, keyword_init: true) do
def to_s
data_entry = if !data.is_a?(String)
"data: #{data.to_json}\n"
elsif data.include?("\n")
data.split("\n").map { |d| "data: #{d}\n" }.join
else
"data: #{data}\n"
end

"#{data_entry}#{"id: #{id}\n" if id}#{"event: #{event}\n" if event}#{"retry: #{self.retry.to_i}\n" if self.retry && self.retry.to_i > 0}\n"
end
end
31 changes: 31 additions & 0 deletions lib/rage/sse/sse.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

module Rage::SSE
# A factory method for creating Server-Sent Events.
#
# @param data [String, #to_json] The `data` field for the SSE event. If the object provided is not a string, it will be serialized to JSON.
# @param id [String, nil] The `id` field for the SSE event. This can be used to track messages.
# @param event [String, nil] The `event` field for the SSE event. This can be used to define custom event types.
# @param retry [Integer, nil] The `retry` field for the SSE event, in milliseconds. This value is used to instruct the client how long to wait before attempting to reconnect.
# @return [Message] The formatted SSE event.
# @example
# render sse: Rage::SSE.message(current_user.profile, id: current_user.id)
def self.message(data, id: nil, event: nil, retry: nil)
Message.new(data:, id:, event:, retry:)
end

# @private
def self.__serialize(data)
if data.is_a?(String)
data.include?("\n") ? Message.new(data:).to_s : "data: #{data}\n\n"
elsif data.is_a?(Message)
data.to_s
else
"data: #{data.to_json}\n\n"
end
end
end

require_relative "application"
require_relative "connection_proxy"
require_relative "message"
2 changes: 1 addition & 1 deletion lib/rage/telemetry/spans/dispatch_fiber.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

##
# The **core.fiber.dispatch** span tracks the scheduling and processing of system-level fibers created by the framework to process requests and deferred tasks.
# The **core.fiber.dispatch** span tracks the scheduling and processing of system-level fibers created by the framework to process requests, deferred tasks, or SSE streams.
#
# This span is started when a system fiber begins processing and ends when the fiber has completed processing.
# See {handle handle} for the list of arguments passed to handler methods.
Expand Down
52 changes: 52 additions & 0 deletions lib/rage/telemetry/spans/process_sse_stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

##
# The **sse.stream.process** span wraps the processing of an SSE stream.
#
# This span starts when a connection is opened and ends when the stream is finished.
# See {handle handle} for the list of arguments passed to handler methods.
#
# @see Rage::Telemetry::Handler Rage::Telemetry::Handler
#
class Rage::Telemetry::Spans::ProcessSSEStream
class << self
# @private
def id
"sse.stream.process"
end

# @private
def span_parameters
%w[connection: type:]
end

# @private
def handler_arguments
{
name: '"SSE.process"',
env: "connection.env",
type: "type"
}
end

# @!parse [ruby]
# # @param id ["sse.stream.process"] ID of the span
# # @param name ["SSE.process"] human-readable name of the operation
# # @param env [Hash] the Rack environment associated with the connection
# # @param type [:stream, :single, :manual] the type of the SSE stream
# # @yieldreturn [Rage::Telemetry::SpanResult]
# #
# # @example
# # class MyTelemetryHandler < Rage::Telemetry::Handler
# # handle "sse.stream.process", with: :my_handler
# #
# # def my_handler(id:, name:, env:, type:)
# # yield
# # end
# # end
# # @note Rage automatically detects which parameters your handler method accepts and only passes those parameters.
# # You can omit any of the parameters described here.
# def handle(id:, name:, env:, type:)
# end
end
end
8 changes: 5 additions & 3 deletions lib/rage/telemetry/telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ def self.__registry

# @private
def self.tracer
@tracer ||= Tracer.new(__registry, Rage.config.telemetry.handlers_map)
@tracer ||= Tracer.new(__registry)
end

# @private
def self.__setup
tracer.setup
# @param handlers_map [Hash{String => Array<Rage::Telemetry::HandlerRef>}]
def self.__setup(handlers_map)
tracer.setup(handlers_map)
end

##
Expand Down Expand Up @@ -81,6 +82,7 @@ def self.__setup
# | `deferred.task.process` | {ProcessDeferredTask} | Wraps the processing of deferred tasks |
# | `events.event.publish` | {PublishEvent} | Wraps the publishing of events via {Rage::Events Rage::Events} |
# | `events.subscriber.process` | {ProcessEventSubscriber} | Wraps the processing of events by subscribers |
# | `sse.stream.process` | {ProcessSSEStream} | Wraps the processing of an SSE stream |
#
module Spans
end
Expand Down
12 changes: 5 additions & 7 deletions lib/rage/telemetry/tracer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,18 @@ class Rage::Telemetry::Tracer
private_constant :DEFAULT_SPAN_RESULT

# @param spans_registry [Hash{String => Rage::Telemetry::Spans}]
# @param handlers_map [Hash{String => Array<Rage::Telemetry::HandlerRef>}]
def initialize(spans_registry, handlers_map)
def initialize(spans_registry)
@spans_registry = spans_registry
@handlers_map = handlers_map

@all_handler_refs = handlers_map.values.flatten

@spans_registry.each do |_, span|
setup_noop(span)
end
end

def setup
@handlers_map.each do |span_id, handler_refs|
def setup(handlers_map)
@all_handler_refs = handlers_map.values.flatten

handlers_map.each do |span_id, handler_refs|
setup_tracer(@spans_registry[span_id], handler_refs)
end
end
Expand Down
Loading
Loading