diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 92f296f3b8c..39b11732792 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -4356,4 +4356,65 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_subscriptions_for_the_same_client_identity() -> anyhow::Result<()> { + let db = relational_db()?; + + let identity = identity_from_u8(7); + let client_id_for_a = ClientActorId { + identity, + connection_id: connection_id_from_u8(1), + name: ClientName(1), + }; + let client_id_for_b = ClientActorId { + identity, + connection_id: connection_id_from_u8(2), + name: ClientName(2), + }; + + let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a, &db); + let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b, &db); + + let auth_for_a = AuthCtx::new(db.owner_identity(), client_id_for_a.identity); + let auth_for_b = AuthCtx::new(db.owner_identity(), client_id_for_b.identity); + + let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); + let table_id = db.create_table_for_test("t", &[("a", AlgebraicType::U8)], &[])?; + let schema = ProductType::from([AlgebraicType::U8]); + + let mut query_ids = 0; + subscribe_multi( + &subs, + auth_for_a, + &["select * from t where a = 1"], + tx_for_a, + &mut query_ids, + ) + .await?; + subscribe_multi( + &subs, + auth_for_b, + &["select * from t where a = 2"], + tx_for_b, + &mut query_ids, + ) + .await?; + + assert!(matches!( + rx_for_a.recv().await, + Some(OutboundMessage::V1(SerializableMessage::Subscription(_))) + )); + assert!(matches!( + rx_for_b.recv().await, + Some(OutboundMessage::V1(SerializableMessage::Subscription(_))) + )); + + commit_tx(&db, &subs, [], [(table_id, product![1_u8]), (table_id, product![2_u8])])?; + + assert_tx_update_for_table(rx_for_a.recv(), table_id, &schema, [product![1_u8]], []).await; + assert_tx_update_for_table(rx_for_b.recv(), table_id, &schema, [product![2_u8]], []).await; + + Ok(()) + } }