Skip to content

feat: WebSocket client v2 (ws_v2)#151

Draft
Voyz wants to merge 73 commits into
masterfrom
feat/ws_v2
Draft

feat: WebSocket client v2 (ws_v2)#151
Voyz wants to merge 73 commits into
masterfrom
feat/ws_v2

Conversation

@Voyz

@Voyz Voyz commented May 3, 2026

Copy link
Copy Markdown
Owner

Full refactor of the WebSocket handling in IBind.

Work is still in progress, no changes are final.

Available as 0.2.1rc16.

New documentation can be found in: https://github.com/Voyz/ibind/tree/feat/ws_v2/docs/websocket

Overview:

The IbkrWsClient V1 implementation - existing until now - had several unfavourable angles:

  • The connectivity and authentication issues were handled with aggressive thread restarts which was usually unnecessary, leaving dangling threads and WebSocket connections.
  • All functionalities were carried out on a single thread, coupling WebSocket processing with event propagation and lifecycle management.
  • Subscription API expected user to understand the somewhat-unintuitive requirements of IBKR WebSocket payloads.
  • Subscription API was non-idempotent and blocking

Although usable, it required close familiarity with the documentation, presented slightly awkward usage, and its threading model lead to complex logic, difficult testability and race conditions.

The IbkrWsClient V2 at its core introduces better threading and lifecycle management. It is implemented using three threads:

  • ws_runtime_thread - main thread of the client, handling the lifecycle management
  • ws_transport_thread - specialised thread, handling the WebSocket communication
  • async_sink_thread - specialised thread, handling event propagation

As a result, most connectivity and authentication issues are handled gracefully without need of restarting threads or recreating the WebSocketApp instance.

This refactor was also taken as an opportunity to re-evaluate the API, resulting in a number of improvements to the subscription API, event consumption and connection health. Some responsibilities previously expected of the user are now handled by the client out of the box.


BREAKING CHANGE:

  • Subscription API no longer accepts channel and data in favour of new Pydantic Subscription classes which encapsulate necessary parameters for each topic, reducing ambiguity and necessity to depend on IBKR docs (as discussed in IbkrWsKey should have conid for market data subscriptions #18)
  • Event consumption implemented using EventSink protocol instead of depending on the QueueAccessor API. Currently implemented are CallbackSink (propagating events into user-specified callbacks), QueueSink (supporting the QueueAccessor API), CompositeSink (allowing to mix multiple sinks together) and LogSink (logging all events, useful for debugging). Custom sinks that implement EventSink protocal are accepted.
  • Events are now represented as Pydantic WsEvent class events, replacing propagating dict objects
  • Subscription API is now idempotent, expressing an intent that is later carried out by the client asynchronously. Behind the scenes, subscriptions are internally represented as Binding objects, used for managing the lifecycle of each subscription. As a result, IbkrWsClient.subscribe() and IbkrWsClient.unsubscribe() are no longer blocking and instead they return a SubscriptionHandle that can be waited on using .wait()
  • New threading model implemented from scratch, changing the WebSocket health and lifecycle handling
  • Removed SubscriptionProcessor class - instead Subscription objects define the subscribe and unsubscribe payload generation
  • Removed restart_on_close and restart_on_critical parameters - both are now fixed as true: the WebSocketApp will always reconnect on close, and the transport thread will always recreate a new WebSocketApp on critical.
  • Removed connected, ready and running properties - now replaced by is_running() and get_state()
  • Removed IbkrWsKey - identifying subscriptions and events now uses concrete WsEvent class types instead of the enum
  • Deprecated the QueueAccessor - currently still available through QueueSink.new_queue_accessor() for backwards compatibility. Instead QueueSink serves as a 1-1 replacement for QueueAccessor, passing appropriate queue-identifying key to its get() and empty() methods.

Feature:

  • Subscriptions accept expiry_seconds allowing to periodically resubscribe (fixes Feature request: auto-refresh IBKR WS smd subscriptions on 15-min server-side expiry #145)
  • Lifecycle state changes (eg. STARTED, CONNECTING, OPEN, AUTHENTICATED, DEGRADED, etc.) are now propagated as events under LIFECYCLE key, facilitating responding to lifecycle changes in user applications
  • Additional information on status of subscriptions (eg. NEW, ACTIVE, FAILED, UNSUBSCRIBED, etc.) is exposed through the API
  • MarketHistory unsubscribing with server IDs is now natively handled by the client, removing the necessity for boilerplate payload generation with a custom SubscriptionProcessor (as demonstrated in https://github.com/Voyz/ibind/blob/master/examples/ws_03_market_history.py)
  • Authentication with IBKR is validated using the session id / cookie, fixing authentication issues where a dangling connection was kept open for a session that lost authentication.

Refactor:

  • Logging of all WebSocket-related logs is now sent through a single ibind.ibkr_ws_client logger.

Chore:

Behind The Scenes

  • Removed QueueController
  • Rewrote and largely simplified SubscriptionController
  • Removed mixin usage in IbkrWsClient (still present in IbkrClient)

Implementation Details

Event Lifecycle

Most events received by the WebSocket go through the following flow:

Step Thread Data Type Description
1 ws_transport_thread raw data Received by the WebSocketApp
2 ws_transport_thread raw data Propagated to appropriate transport handler
3 ws_transport_thread raw data Parsed into TransportEvent and enqueued in runtime transport queue
4 ws_runtime_thread TransportEvent Reads from transport queue and calls appropriate handlers (retries up to 5 times on failure)
5 ws_runtime_thread TransportEvent Parsed by IbkrRouter into WsEvent instances (multiple instances can be generated from a single message)
6 ws_runtime_thread WsEvent Emitted to outgoing sink (AsyncSink by default, or directly to user sink if disabled)
7 async_sink_thread WsEvent Propagated to the user-provided sink

Two event classes TransportEvent and WsEvent - rather than just a single common Event - are used to create clear separation of concerns. TransportEvent instances are used only for WebSocket -> Client internal communication, while WsEvent instances are used for both inner-client and Client -> User Application communication.

Event Propagation

Events are now propagated using a dedicated thread. Once received by the client, they're parsed and enqueued into an outgoing queue. The new thread then consumes and propagates the events through sinks.

This ensures that slow user application code does not affect the functionality of the client.

This method can be disabled using a parameter, causing event propagation to be carried out by the runtime thread, which may be useful for debugging.

Example Usage

In V1:

from ibind import IbkrWsKey, IbkrWsClient

ws_client = IbkrWsClient(cacert=cacert, account_id=account_id)

ws_client.start()

requests = [
    {'channel': 'md+265598', 'data': {'fields': ['55', '71', '84', '86', '88', '85', '87', '7295', '7296', '70']}},
    {'channel': 'or'},
    {'channel': 'tr'},
    {'channel': f'sd+{account_id}'},
    {'channel': f'ld+{account_id}'},
    {'channel': 'pl'},
]
queue_accessors = [
    ws_client.new_queue_accessor(IbkrWsKey.TRADES),
    ws_client.new_queue_accessor(IbkrWsKey.MARKET_DATA),
    ws_client.new_queue_accessor(IbkrWsKey.ORDERS),
    ws_client.new_queue_accessor(IbkrWsKey.ACCOUNT_SUMMARY),
    ws_client.new_queue_accessor(IbkrWsKey.ACCOUNT_LEDGER),
    ws_client.new_queue_accessor(IbkrWsKey.PNL),
]

for request in requests:
    while not ws_client.subscribe(**request):
        time.sleep(1)

while ws_client.running:
    try:
        for qa in queue_accessors:
            while not qa.empty():
                print(str(qa), qa.get())

        time.sleep(1)
    except KeyboardInterrupt:
        print('KeyboardInterrupt')
        break

for request in requests:
    ws_client.unsubscribe(**request)

ws_client.shutdown()

In V2:

from ibind import IbkrWsClientV2, QueueSink
from ibind.subscriptions import MarketDataSubscription, OrdersSubscription, AccountLedgerSubscription, AccountSummarySubscription, PnlSubscription, TradesSubscription, MarketHistorySubscription, SubscriptionHandle

queue_sink = QueueSink()
ws_client = IbkrWsClientV2(cacert=cacert, account_id=account_id, sink=queue_sink)

ws_client.start()

subs = [
    TradesSubscription(),
    MarketDataSubscription(conid='265598', fields=('55', '71', '84', '86', '88', '85', '87', '7295', '7296', '70'), expiry_seconds=14 * 60),
    OrdersSubscription(),
    AccountSummarySubscription(account_id=account_id),
    AccountLedgerSubscription(account_id=account_id),
    PnlSubscription(),
]

sub_handles: List[SubscriptionHandle] = []
for sub in subs:
    handle = ws_client.subscribe(sub)
    sub_handles.append(handle)

ws_client.wait_all(sub_handles, timeout=10) # returns list of failed handles or empty list if all succeed

try:
    while ws_client.is_running():
        for sub in subs:
            while not queue_sink.empty(sub.event_type):
                ev = queue_sink.get(sub.event_type)
                print(ev)
        time.sleep(1)
except KeyboardInterrupt:
    print('Interrupt')

unsub_handles: List[SubscriptionHandle] = []
for sub in subs:
    handle = ws_client.unsubscribe(sub)
    unsub_handles.append(handle)

ws_client.wait_all(unsub_handles, timeout=10)

ws_client.shutdown()

Sink types:

from ibind import events, IbkrWsClientV2, LogSink, QueueSink, CallbackSink, CompositeSink

# Queue Sink - queue-based event consumer
queue_sink = QueueSink()

# Callback Sink - callback-based event consumer
callback_sink = CallbackSink()

def on_market_data(event: events.MarketData):
    print(event)

def on_market_history(event: events.MarketHistory):
    print(event)

def on_lifecycle(event: events.LifecycleEvent):
    print(event)

callback_sink.on(events.MarketData, on_market_data)
callback_sink.on(events.MarketHistory, on_market_data)
callback_sink.on(events.WsOpen, on_lifecycle)
callback_sink.on(events.WsClose, on_lifecycle)
callback_sink.on(events.WsError, on_lifecycle)
callback_sink.on(events.WsAuthenticated, on_lifecycle)
callback_sink.on(events.WsReady, on_lifecycle)
callback_sink.on(events.WsDegraded, on_lifecycle)

# Log Sink - useful for debugging
log_sink = LogSink()

# Composite Sink - allows us to use all above sinks at once
composite_sink = CompositeSink(callback_sink, queue_sink, log_sink)

ws_client = IbkrWsClientV2(cacert=cacert, account_id=account_id, sink=composite_sink)

Roadmap:

  • WebSocket client refactor
  • Tests refactor
  • New documentation (in progress...)
  • Migration guide

I'm very much open to feedback - please feel free to share it here if it is related to the refactor, or as a new issue if you'd like to suggest other changes/enhancements.

You can install and test this version out as 0.2.1rc16.

Voyz added 23 commits April 29, 2026 18:16
…hreading/lifecycle model, making (un)subscribing actions idempotent and introducing Pydantic models at input and output
…and changed subscription_controller._bindings key from Subscription to new the binding_key.

- added SubscriptionResolver which allow SubscriptionController to automatically detect binding_keys that need confirmation on (un)subscriptions
- finished implementing ibkr_subscriptions
# Conflicts:
#	ibind/ibkr_ws_v2/ibkr_events.py
…amed 'channel' to 'topic', implemented QueueSink as replacement of QueueController/Accessor

chore(ws_v2): cleaned up ws_runtime and ws_transport
fix(ws_v2): fixed TransportEvent attempts
refactor(ws_v2): renamed ClientInternalEvents to LifecycleEvents
Voyz added 5 commits May 10, 2026 13:46
…xport `WsEvent`,

and relocate IBKR topic-to-event resolution into the router for
clearer ownership

fix(ws_v2): harden runtime shutdown and transport state

improve runtime stop/close flow to set closed state consistently,
separate graceful vs unexpected disconnects, and mark transport
degraded when thread shutdown fails

reduce websocket lifecycle log noise by moving thread start/stop logs
to debug while keeping key connection, auth, and send events visible
…cate privacy, expecting imports to point at ibind.events. Also added docstrings
Voyz added 30 commits May 16, 2026 13:59
…empotent and added CallbackSink.has_callback

Also updated tests.
…added _preprocess_orders removing bgColor and fgColor1
…eue by event.received_at to ensure correct order of order emission/processing
…rrectly shared between instances

Also updated tests.
…ondition

Use a condition to coordinate WebSocketApp replacement during reset.
Wait for close/recreate transitions under the same lock and notify when
`_wsa` is set or cleared in the transport loop.

This prevents race conditions where reset could abandon or overwrite a
newly recreated socket instance while the transport thread is running.

Expand unit coverage for condition-based waiting, timeout paths, and
threaded reset behaviour.
When subscribe/unsubscribe is called again for a binding in FAILED
state, treat it as an explicit retry request by resetting attempts and
last_attempt and moving status back to NEW.

Also raise default subscription_retries and subscription_timeout to give
more time for transient websocket failures, and add unit tests covering
failed-binding reset behaviour for both subscribe and unsubscribe paths.
Update unit tests for new runtime worker flow.
Make `AUTHENTICATED` the only readiness state in `WsRuntime` and
consistently gate start/send/subscription reconciliation on it.
When auth is lost, transition back to `OPEN` and invalidate
subscriptions instead of degrading the connection state.

Update unit tests to reflect the new authentication lifecycle and
state transitions.
… ws_v2.runtime.

Moved internal_sink from ibkr_ws_client_v2 to ws_runtime. Added WsStarting, WsStopping and WsStopped events.

Also added unit tests for all these new modules.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Feature request: auto-refresh IBKR WS smd subscriptions on 15-min server-side expiry

2 participants