From b26e3f2e08b7625942188cb393d6f6df0ad32510 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Fri, 19 Apr 2024 16:08:09 +0200 Subject: [PATCH 1/5] Add test for the subscription bug with multiple identities --- crates/core/src/client.rs | 6 +- crates/core/src/client/client_connection.rs | 25 +++-- .../subscription/module_subscription_actor.rs | 92 ++++++++++++++++++- 3 files changed, 108 insertions(+), 15 deletions(-) diff --git a/crates/core/src/client.rs b/crates/core/src/client.rs index 68465e0ea5b..9afa5210e69 100644 --- a/crates/core/src/client.rs +++ b/crates/core/src/client.rs @@ -20,10 +20,10 @@ pub struct ClientActorId { impl ClientActorId { #[cfg(test)] - pub fn for_test(identity: Identity) -> Self { + pub fn for_test() -> Self { ClientActorId { - identity, - address: Address::ZERO, + identity: Identity::from_byte_array(rand::random()), + address: Address::from_arr(&rand::random()), name: ClientName(0), } } diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 0e137fbb94c..6cbe6a80b20 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -40,20 +40,27 @@ pub enum ClientSendError { } impl ClientConnectionSender { - pub fn dummy(id: ClientActorId, protocol: Protocol) -> Self { - let (sendtx, _) = mpsc::channel(1); + pub fn dummy_with_channel(id: ClientActorId, protocol: Protocol) -> (Self, mpsc::Receiver) { + let (sendtx, rx) = mpsc::channel(1); // just make something up, it doesn't need to be attached to a real task let abort_handle = match tokio::runtime::Handle::try_current() { Ok(h) => h.spawn(async {}).abort_handle(), Err(_) => tokio::runtime::Runtime::new().unwrap().spawn(async {}).abort_handle(), }; - Self { - id, - protocol, - sendtx, - abort_handle, - cancelled: AtomicBool::new(false), - } + ( + Self { + id, + protocol, + sendtx, + abort_handle, + cancelled: AtomicBool::new(false), + }, + rx, + ) + } + + pub fn dummy(id: ClientActorId, protocol: Protocol) -> Self { + Self::dummy_with_channel(id, protocol).0 } pub fn send_message(&self, message: impl Into) -> Result<(), ClientSendError> { diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index e3496ab6d75..c58b75614c4 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -181,7 +181,10 @@ mod tests { use super::ModuleSubscriptions; use crate::client::{ClientActorId, ClientConnectionSender, Protocol}; use crate::db::relational_db::tests_utils::TestDB; + use crate::energy::EnergyQuanta; use crate::execution_context::ExecutionContext; + use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall}; + use crate::host::{ArgsTuple, Timestamp}; use spacetimedb_client_api_messages::client_api::Subscribe; use spacetimedb_lib::{error::ResultTest, AlgebraicType, Identity}; use spacetimedb_sats::product; @@ -190,7 +193,6 @@ mod tests { use tokio::sync::mpsc; #[test] - /// Asserts that a subscription holds a tx handle for the entire length of its evaluation. fn test_tx_subscription_ordering() -> ResultTest<()> { let test_db = TestDB::durable()?; @@ -204,8 +206,8 @@ mod tests { })?; let id = Identity::ZERO; - let client = ClientActorId::for_test(id); - let sender = Arc::new(ClientConnectionSender::dummy(client, Protocol::Binary)); + let client_id = ClientActorId::for_test(); + let sender = Arc::new(ClientConnectionSender::dummy(client_id, Protocol::Binary)); let module_subscriptions = ModuleSubscriptions::new(db.clone(), id); let (send, mut recv) = mpsc::unbounded_channel(); @@ -250,4 +252,88 @@ mod tests { Ok(()) } + + #[test] + /// Asserts that a subscription holds a tx handle for the entire length of its evaluation. + fn test_subscriptions_for_the_same_client_identity() -> ResultTest<()> { + let test_db = TestDB::durable()?; + let runtime = test_db.runtime().cloned().unwrap(); + + // Create table with no rows + let db = Arc::new(test_db.db.clone()); + let table_id = db.create_table_for_test("T", &[("a", AlgebraicType::U8)], &[])?; + + let id = ClientActorId::for_test(); + let sender = Arc::new(ClientConnectionSender::dummy(id, Protocol::Binary)); + let module_subscriptions = ModuleSubscriptions::new(db.clone(), id.identity); + + let client_id0 = ClientActorId::for_test(); + let (client0, mut rx0) = ClientConnectionSender::dummy_with_channel(client_id0, Protocol::Binary); + + let client_id1 = ClientActorId::for_test(); + let (client1, mut rx1) = ClientConnectionSender::dummy_with_channel(client_id1, Protocol::Binary); + + // Subscribing to T should return a single row + let query_strings = vec!["select * from T where a = 1".into()]; + module_subscriptions + .add_subscriber( + Arc::new(client0), + Subscribe { + query_strings, + request_id: 0, + }, + Instant::now(), + None, + ) + .unwrap(); + + let query_strings = vec!["select * from T where a = 2".into()]; + module_subscriptions + .add_subscriber( + Arc::new(client1), + Subscribe { + query_strings, + request_id: 0, + }, + Instant::now(), + None, + ) + .unwrap(); + + let inserts = Arc::new([product![product!(1_u8), product!(2_u8)]]); + let table_update = DatabaseTableUpdate { + table_id, + table_name: Box::from("Foo"), + inserts, + deletes: Arc::new([]), + }; + let database_update = DatabaseUpdate { + tables: vec![table_update], + }; + let event = Arc::new(ModuleEvent { + timestamp: Timestamp::now(), + caller_identity: client_id0.identity, + caller_address: None, + function_call: ModuleFunctionCall { + reducer: "DummyReducer".into(), + args: ArgsTuple::nullary(), + }, + status: EventStatus::Committed(database_update), + energy_quanta_used: EnergyQuanta::ZERO, + host_execution_duration: Duration::default(), + request_id: None, + timer: None, + }); + + let subscriptions = module_subscriptions.subscriptions.read(); + module_subscriptions.blocking_broadcast_event(Some(&sender), &*subscriptions, event); + drop(subscriptions); + + runtime.block_on(async move { + rx0.recv().await.expect("Expected at least one message"); + rx1.recv().await.expect("Expected at least one message"); + }); + + Ok(()) + } } From 98786b3c3bc3a5e23cb161d38c70bdcc19a7246f Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Fri, 19 Apr 2024 16:29:28 +0200 Subject: [PATCH 2/5] clippy --- crates/core/src/subscription/module_subscription_actor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index c58b75614c4..559ba487f69 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -326,7 +326,7 @@ mod tests { }); let subscriptions = module_subscriptions.subscriptions.read(); - module_subscriptions.blocking_broadcast_event(Some(&sender), &*subscriptions, event); + module_subscriptions.blocking_broadcast_event(Some(&sender), &subscriptions, event); drop(subscriptions); runtime.block_on(async move { From 7b7ccec360426c5abdb777e59fa5ce578fcc57e1 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Fri, 19 Apr 2024 16:49:51 +0200 Subject: [PATCH 3/5] Fix test --- .../subscription/module_subscription_actor.rs | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 559ba487f69..79162831a89 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -193,6 +193,7 @@ mod tests { use tokio::sync::mpsc; #[test] + /// Asserts that a subscription holds a tx handle for the entire length of its evaluation. fn test_tx_subscription_ordering() -> ResultTest<()> { let test_db = TestDB::durable()?; @@ -254,7 +255,7 @@ mod tests { } #[test] - /// Asserts that a subscription holds a tx handle for the entire length of its evaluation. + /// checks if multiple clients with the same identity are properly handled fn test_subscriptions_for_the_same_client_identity() -> ResultTest<()> { let test_db = TestDB::durable()?; let runtime = test_db.runtime().cloned().unwrap(); @@ -268,10 +269,8 @@ mod tests { let module_subscriptions = ModuleSubscriptions::new(db.clone(), id.identity); let client_id0 = ClientActorId::for_test(); - let (client0, mut rx0) = ClientConnectionSender::dummy_with_channel(client_id0, Protocol::Binary); - - let client_id1 = ClientActorId::for_test(); - let (client1, mut rx1) = ClientConnectionSender::dummy_with_channel(client_id1, Protocol::Binary); + let (client0, mut rx0) = ClientConnectionSender::dummy_with_channel(client_id0.clone(), Protocol::Binary); + let (client1, mut rx1) = ClientConnectionSender::dummy_with_channel(client_id0, Protocol::Binary); // Subscribing to T should return a single row let query_strings = vec!["select * from T where a = 1".into()]; @@ -325,13 +324,18 @@ mod tests { timer: None, }); - let subscriptions = module_subscriptions.subscriptions.read(); - module_subscriptions.blocking_broadcast_event(Some(&sender), &subscriptions, event); - drop(subscriptions); + runtime.spawn_blocking(move || { + let subscriptions = module_subscriptions.subscriptions.read(); + module_subscriptions.blocking_broadcast_event(Some(&sender), &subscriptions, event); + }); runtime.block_on(async move { - rx0.recv().await.expect("Expected at least one message"); - rx1.recv().await.expect("Expected at least one message"); + tokio::time::timeout(Duration::from_millis(10), async move { + rx0.recv().await.expect("Expected at least one message"); + rx1.recv().await.expect("Expected at least one message"); + }) + .await + .unwrap(); }); Ok(()) From a1ca3beaec49d4780d414dcb353ff0b5685a5ba6 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Mon, 22 Apr 2024 16:50:02 +0200 Subject: [PATCH 4/5] wip --- crates/core/src/client.rs | 6 +- crates/core/src/client/client_connection.rs | 2 +- .../subscription/module_subscription_actor.rs | 57 +++++++++++++------ .../module_subscription_manager.rs | 4 +- 4 files changed, 46 insertions(+), 23 deletions(-) diff --git a/crates/core/src/client.rs b/crates/core/src/client.rs index 9afa5210e69..22c50523359 100644 --- a/crates/core/src/client.rs +++ b/crates/core/src/client.rs @@ -20,10 +20,10 @@ pub struct ClientActorId { impl ClientActorId { #[cfg(test)] - pub fn for_test() -> Self { + pub fn for_test(identity: Option, address: Option
) -> Self { ClientActorId { - identity: Identity::from_byte_array(rand::random()), - address: Address::from_arr(&rand::random()), + identity: identity.unwrap_or(Identity::from_byte_array(rand::random())), + address: address.unwrap_or(Address::from_arr(&rand::random())), name: ClientName(0), } } diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 6cbe6a80b20..315b5f4ceb9 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -41,7 +41,7 @@ pub enum ClientSendError { impl ClientConnectionSender { pub fn dummy_with_channel(id: ClientActorId, protocol: Protocol) -> (Self, mpsc::Receiver) { - let (sendtx, rx) = mpsc::channel(1); + let (sendtx, rx) = mpsc::channel(10); // just make something up, it doesn't need to be attached to a real task let abort_handle = match tokio::runtime::Handle::try_current() { Ok(h) => h.spawn(async {}).abort_handle(), diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 79162831a89..4706e854b57 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -179,11 +179,14 @@ impl ModuleSubscriptions { #[cfg(test)] mod tests { use super::ModuleSubscriptions; + use crate::client::messages::{SerializableMessage, SubscriptionUpdate, TransactionUpdateMessage}; use crate::client::{ClientActorId, ClientConnectionSender, Protocol}; use crate::db::relational_db::tests_utils::TestDB; use crate::energy::EnergyQuanta; use crate::execution_context::ExecutionContext; - use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall}; + use crate::host::module_host::{ + DatabaseTableUpdate, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ProtocolDatabaseUpdate, + }; use crate::host::{ArgsTuple, Timestamp}; use spacetimedb_client_api_messages::client_api::Subscribe; use spacetimedb_lib::{error::ResultTest, AlgebraicType, Identity}; @@ -207,7 +210,7 @@ mod tests { })?; let id = Identity::ZERO; - let client_id = ClientActorId::for_test(); + let client_id = ClientActorId::for_test(None, None); let sender = Arc::new(ClientConnectionSender::dummy(client_id, Protocol::Binary)); let module_subscriptions = ModuleSubscriptions::new(db.clone(), id); @@ -264,13 +267,14 @@ mod tests { let db = Arc::new(test_db.db.clone()); let table_id = db.create_table_for_test("T", &[("a", AlgebraicType::U8)], &[])?; - let id = ClientActorId::for_test(); + let id = ClientActorId::for_test(None, None); let sender = Arc::new(ClientConnectionSender::dummy(id, Protocol::Binary)); let module_subscriptions = ModuleSubscriptions::new(db.clone(), id.identity); - let client_id0 = ClientActorId::for_test(); - let (client0, mut rx0) = ClientConnectionSender::dummy_with_channel(client_id0.clone(), Protocol::Binary); - let (client1, mut rx1) = ClientConnectionSender::dummy_with_channel(client_id0, Protocol::Binary); + let client_id0 = ClientActorId::for_test(None, None); + let client_id1 = ClientActorId::for_test(Some(client_id0.identity), None); + let (client0, mut rx0) = ClientConnectionSender::dummy_with_channel(client_id0, Protocol::Binary); + let (client1, mut rx1) = ClientConnectionSender::dummy_with_channel(client_id1, Protocol::Binary); // Subscribing to T should return a single row let query_strings = vec!["select * from T where a = 1".into()]; @@ -292,17 +296,17 @@ mod tests { Arc::new(client1), Subscribe { query_strings, - request_id: 0, + request_id: 1, }, Instant::now(), None, ) .unwrap(); - let inserts = Arc::new([product![product!(1_u8), product!(2_u8)]]); + let inserts = Arc::new([product!(1u8), product!(2u8), product!(2u8)]); let table_update = DatabaseTableUpdate { table_id, - table_name: Box::from("Foo"), + table_name: Box::from("T"), inserts, deletes: Arc::new([]), }; @@ -324,15 +328,34 @@ mod tests { timer: None, }); - runtime.spawn_blocking(move || { - let subscriptions = module_subscriptions.subscriptions.read(); - module_subscriptions.blocking_broadcast_event(Some(&sender), &subscriptions, event); - }); - runtime.block_on(async move { - tokio::time::timeout(Duration::from_millis(10), async move { - rx0.recv().await.expect("Expected at least one message"); - rx1.recv().await.expect("Expected at least one message"); + tokio::task::block_in_place(move || { + let subscriptions = module_subscriptions.subscriptions.read(); + module_subscriptions.blocking_broadcast_event(Some(&sender), &subscriptions, event); + }); + tokio::time::sleep(Duration::from_secs(4)).await; + tokio::time::timeout(Duration::from_millis(100), async move { + rx0.recv().await.expect("Expected subscription update message"); + let m0 = rx0.recv().await.expect("Expected transaction update message"); + rx1.recv().await.expect("Expected subscription update message"); + let m1 = rx1.recv().await.expect("Expected transaction update message"); + + // check if the first client got the update with only 1 row and the second client + // got the update with 2 rows + assert!(matches!(m0, + SerializableMessage::ProtocolUpdate( + TransactionUpdateMessage { + database_update: SubscriptionUpdate { + database_update: ProtocolDatabaseUpdate { tables, .. }, + ..}, + ..}) if tables.clone().left().unwrap()[0].table_row_operations.len() == 1)); + assert!(matches!(m1, + SerializableMessage::ProtocolUpdate( + TransactionUpdateMessage { + database_update: SubscriptionUpdate { + database_update: ProtocolDatabaseUpdate { tables, .. }, + ..}, + ..}) if tables.clone().left().unwrap()[0].table_row_operations.len() == 2)); }) .await .unwrap(); diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 0ee4072fc46..88489cabcfc 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -30,11 +30,11 @@ type Client = Arc; #[derive(Debug, Default)] pub struct SubscriptionManager { // Subscriber identities and their client connections. - clients: HashMap, + pub clients: HashMap, // Queries for which there is at least one subscriber. queries: HashMap, // The subscribers for each query. - subscribers: HashMap>, + pub subscribers: HashMap>, // Inverted index from tables to queries that read from them. tables: IntMap>, } From da9b98fbe80c2602c5eb6d7fb8d7d2e40449bd76 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Mon, 22 Apr 2024 18:02:02 +0200 Subject: [PATCH 5/5] Revert changes used for debugging --- crates/core/src/subscription/module_subscription_manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 88489cabcfc..0ee4072fc46 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -30,11 +30,11 @@ type Client = Arc; #[derive(Debug, Default)] pub struct SubscriptionManager { // Subscriber identities and their client connections. - pub clients: HashMap, + clients: HashMap, // Queries for which there is at least one subscriber. queries: HashMap, // The subscribers for each query. - pub subscribers: HashMap>, + subscribers: HashMap>, // Inverted index from tables to queries that read from them. tables: IntMap>, }