Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/node/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ where
Box::pin(async move {
let zone_tokens = self.zone_tokens().await?;
zone_rpc::filter::scope_filter_addresses(&mut filter, &zone_tokens)?;
zone_rpc::filter::scope_filter(&mut filter);
zone_rpc::filter::scope_filter_for_caller(&mut filter, &auth.caller)?;
let logs = EthFilterApiServer::logs(&self.eth.filter, filter)
.await
.map_err(internal)?;
Expand All @@ -697,7 +697,7 @@ where
Box::pin(async move {
let zone_tokens = self.zone_tokens().await?;
zone_rpc::filter::scope_filter_addresses(&mut filter, &zone_tokens)?;
zone_rpc::filter::scope_filter(&mut filter);
zone_rpc::filter::scope_filter_for_caller(&mut filter, &auth.caller)?;
let id = EthFilterApiServer::new_filter(&self.eth.filter, filter)
.await
.map_err(internal)?;
Expand Down Expand Up @@ -830,7 +830,7 @@ where

let zone_tokens = self.zone_tokens().await?;
zone_rpc::filter::scope_filter_addresses(&mut filter, &zone_tokens)?;
zone_rpc::filter::scope_filter(&mut filter);
zone_rpc::filter::scope_filter_for_caller(&mut filter, &caller)?;

let stream = provider
.canonical_state_stream()
Expand Down
30 changes: 29 additions & 1 deletion crates/node/tests/it/private_rpc_e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ fn corrupt_token_hex(token: &str) -> String {
hex::encode(bytes)
}

fn address_topic(address: Address) -> String {
format!("{:#x}", B256::left_padding_from(address.as_slice()))
}

fn assert_filter_not_found_error(response: &serde_json::Value) {
let error = response
.get("error")
Expand Down Expand Up @@ -884,9 +888,33 @@ async fn test_ws_logs_subscription_is_sender_scoped() -> eyre::Result<()> {
let owner_token = ctx.user_token(&owner_signer);
let mut owner_ws = connect_private_rpc_ws(&ctx.private_rpc_url, &owner_token).await?;

owner_ws
.send(Message::Text(
jsonrpc_with_params(
"eth_subscribe",
json!(["logs", {"address": format!("{PATH_USD_ADDRESS:#x}")}]),
1,
)
.into(),
))
.await?;
let broad_subscription = ws_next_json(&mut owner_ws).await?;
assert_eq!(broad_subscription["id"], 1);
assert_eq!(
broad_subscription["error"]["code"].as_i64().unwrap(),
-32602,
"broad private log subscriptions should be rejected"
);

let owner_subscription = ws_subscribe(
&mut owner_ws,
json!(["logs", {"address": format!("{PATH_USD_ADDRESS:#x}")}]),
json!([
"logs",
{
"address": format!("{PATH_USD_ADDRESS:#x}"),
"topics": [null, address_topic(owner_signer.address())],
}
]),
)
.await?;

Expand Down
110 changes: 110 additions & 0 deletions crates/rpc/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ pub const WHITELISTED_TOPICS: [B256; 5] = [
BURN_TOPIC,
];

const TWO_PARTY_TOPICS: [B256; 3] = [TRANSFER_TOPIC, APPROVAL_TOPIC, TRANSFER_WITH_MEMO_TOPIC];
const CALLER_SCOPED_FILTER_ERROR: &str =
"private log filter must include authenticated caller in topic1 or topic2";

/// Returns `true` if `caller` appears in an eligible indexed-topic position
/// for the log's event type.
///
Expand Down Expand Up @@ -148,6 +152,37 @@ pub fn scope_filter(filter: &mut Filter) {
}
}

/// Scopes a user-supplied filter to whitelisted event topics and requires the
/// authenticated caller to appear in an eligible indexed topic before backend
/// log retrieval.
pub fn scope_filter_for_caller(filter: &mut Filter, caller: &Address) -> Result<(), JsonRpcError> {
scope_filter(filter);
if filter.topics[0].len() == 1 && filter.topics[0].contains(&B256::ZERO) {
return Ok(());
}
Comment on lines +160 to +162

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this can just be:

if filter.topics[0] == FilterSet::from(B256::ZERO) {
    return Ok(());
}


let caller_word = B256::left_padding_from(caller.as_slice());
if filter.topics[1].contains(&caller_word) {
filter.topics[1] = FilterSet::from(caller_word);
return Ok(());
}

if filter.topics[2].contains(&caller_word) {
let topic0 = filter.topics[0]
.iter()
.copied()
.filter(|topic| TWO_PARTY_TOPICS.contains(topic))
.collect::<Vec<_>>();
if !topic0.is_empty() {
filter.topics[0] = FilterSet::from(topic0);
filter.topics[2] = FilterSet::from(caller_word);
return Ok(());
}
}

Err(JsonRpcError::invalid_params(CALLER_SCOPED_FILTER_ERROR))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -515,6 +550,81 @@ mod tests {
assert_eq!(filter.topics[0], FilterSet::from(B256::ZERO));
}

#[test]
fn scope_filter_for_caller_rejects_broad_filter() {
let caller = address!("0x0000000000000000000000000000000000000001");
let mut filter = Filter::default();

let err = scope_filter_for_caller(&mut filter, &caller).unwrap_err();

assert_eq!(err.code, JsonRpcError::invalid_params("").code);
assert_eq!(err.message, CALLER_SCOPED_FILTER_ERROR);
}

#[test]
fn scope_filter_for_caller_scopes_topic1_caller() {
let caller = address!("0x0000000000000000000000000000000000000001");
let other = address!("0x0000000000000000000000000000000000000002");
let caller_topic = caller_word(&caller);
let other_topic = caller_word(&other);
let mut filter = Filter::default();
filter.topics[1] = FilterSet::from(vec![caller_topic, other_topic]);
filter.topics[2] = FilterSet::from(other_topic);

scope_filter_for_caller(&mut filter, &caller).unwrap();

assert_eq!(filter.topics[0].len(), WHITELISTED_TOPICS.len());
assert_eq!(filter.topics[1], FilterSet::from(caller_topic));
assert_eq!(filter.topics[2], FilterSet::from(other_topic));
}

#[test]
fn scope_filter_for_caller_scopes_topic2_caller_for_two_party_events() {
let caller = address!("0x0000000000000000000000000000000000000001");
let other = address!("0x0000000000000000000000000000000000000002");
let caller_topic = caller_word(&caller);
let other_topic = caller_word(&other);
let mut filter = Filter::default();
filter.topics[0] = FilterSet::from(vec![TRANSFER_TOPIC, MINT_TOPIC]);
filter.topics[1] = FilterSet::from(other_topic);
filter.topics[2] = FilterSet::from(vec![caller_topic, caller_word(&other)]);

scope_filter_for_caller(&mut filter, &caller).unwrap();

assert_eq!(filter.topics[0], FilterSet::from(TRANSFER_TOPIC));
assert_eq!(filter.topics[1], FilterSet::from(other_topic));
assert_eq!(filter.topics[2], FilterSet::from(caller_topic));
}

#[test]
fn scope_filter_for_caller_rejects_wrong_caller() {
let caller = address!("0x0000000000000000000000000000000000000001");
let a = address!("0x0000000000000000000000000000000000000002");
let b = address!("0x0000000000000000000000000000000000000003");
let mut filter = Filter::default();
filter.topics[0] = FilterSet::from(TRANSFER_TOPIC);
filter.topics[1] = FilterSet::from(caller_word(&a));
filter.topics[2] = FilterSet::from(caller_word(&b));

let err = scope_filter_for_caller(&mut filter, &caller).unwrap_err();

assert_eq!(err.code, JsonRpcError::invalid_params("").code);
assert_eq!(err.message, CALLER_SCOPED_FILTER_ERROR);
}

#[test]
fn scope_filter_for_caller_rejects_topic2_only_for_one_party_events() {
let caller = address!("0x0000000000000000000000000000000000000001");
let mut filter = Filter::default();
filter.topics[0] = FilterSet::from(MINT_TOPIC);
filter.topics[2] = FilterSet::from(caller_word(&caller));

let err = scope_filter_for_caller(&mut filter, &caller).unwrap_err();

assert_eq!(err.code, JsonRpcError::invalid_params("").code);
assert_eq!(err.message, CALLER_SCOPED_FILTER_ERROR);
}

#[test]
fn scope_filter_addresses_scopes_omitted_address() {
let token_a = address!("0x00000000000000000000000000000000000000aa");
Expand Down
Loading