Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rust/NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@

### Internal Changes

- Added `ZerobusStream::signal_shutdown` (crate-private), a `&self`-callable
helper that flips `is_closed` and cancels the cancellation token. Lets
`MultiplexedStream` tear down sub-stream background tasks from its poison
path and `Drop` without needing `&mut`. JoinHandle reaping still happens in
`close` or the existing `Drop` impl.

### Breaking Changes

### Deprecations
Expand Down
2 changes: 1 addition & 1 deletion rust/sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ protoc-bin-vendored.workspace = true
default = []
# Arrow Flight is in Beta
arrow-flight = ["dep:arrow-flight", "dep:arrow-array", "dep:arrow-schema", "dep:arrow-ipc", "dep:futures"]
testing = []
testing = ["dep:futures"]
21 changes: 20 additions & 1 deletion rust/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ mod default_token_factory;
mod errors;
mod headers_provider;
mod landing_zone;
#[cfg(feature = "testing")]
Comment thread
danilonajkov-db marked this conversation as resolved.
mod multiplexed_stream;
mod offset_generator;
mod proxy;
mod record_types;
Expand Down Expand Up @@ -92,13 +94,16 @@ pub use callbacks::AckCallback;
pub use default_token_factory::DefaultTokenFactory;
pub use errors::ZerobusError;
pub use headers_provider::{HeadersProvider, OAuthHeadersProvider};
#[cfg(feature = "testing")]
pub use multiplexed_stream::{MessageId, MultiplexedStream};
pub use offset_generator::{OffsetId, OffsetIdGenerator};
pub use proxy::{ConnectorFactory, ProxyConnector};
pub use record_types::{
EncodedBatch, EncodedBatchIter, EncodedRecord, JsonEncodedRecord, JsonString, JsonValue,
ProtoBytes, ProtoEncodedRecord, ProtoMessage,
};
pub use stream_configuration::StreamConfigurationOptions;

#[cfg(feature = "testing")]
pub use tls_config::NoTlsConfig;
pub use tls_config::{SecureTlsConfig, TlsConfig};
Expand Down Expand Up @@ -512,7 +517,6 @@ impl ZerobusSdk {
let mut guard = self.shared_channel.lock().await;

if guard.is_none() {
// Create the channel for the first time.
let endpoint = Endpoint::from_shared(self.zerobus_endpoint.clone())
.map_err(|err| ZerobusError::ChannelCreationError(err.to_string()))?
.user_agent(self.sdk_identifier.as_ref())
Expand Down Expand Up @@ -1847,6 +1851,21 @@ impl ZerobusStream {
.to_string(),
))
}

#[cfg(feature = "testing")]
pub(crate) fn has_capacity(&self) -> bool {
self.landing_zone.len() < self.options.max_inflight_requests
}

// Signal the stream to stop accepting work and tear down its background
// tasks. Unlike `close`, this only needs `&self` — it relies on the
// cancellation token and `is_closed` flag, both of which are already
// interior-mutable. The `JoinHandle`s aren't reaped here; that happens in
// `close` or `Drop`.
pub(crate) fn signal_shutdown(&self) {
self.is_closed.store(true, Ordering::Relaxed);
self.cancellation_token.cancel();
}
}

impl Drop for ZerobusStream {
Expand Down
Loading
Loading