Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external/photon
Submodule photon updated 1 files
+2 −0 src/openapi/mod.rs
10 changes: 10 additions & 0 deletions forester/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

## [Unreleased]

### Added

- **Graceful shutdown signaling** via `watch::channel`. Shutdown requests are now race-free regardless of when the run loop subscribes.
- **Panic isolation for `process_epoch`.** A panicking epoch no longer kills the run loop; the panic message is logged and processing continues.

### Fixed

- **`bigint_to_u8_32` now rejects negative `BigInt` inputs** (`light-prover-client`). Previously, negative inputs were silently converted to `[u8; 32]` using only the magnitude bytes, producing wrong-sign output that would cause silent proof-input corruption.
- **`pathIndex` widened from `u32` to `u64`** on both the Rust client and the Go prover server. The Gnark circuit already constrained by tree height (up to 40 bits for v2 address trees); only the JSON marshalling and runtime struct types were artificially narrow. This prevents proof generation failures once a v2 address tree exceeds ~4.3 billion entries.

### Breaking Changes

- **Removed `--photon-api-key` CLI arg and `PHOTON_API_KEY` env var.** The API key should now be included in `--indexer-url` as a query parameter:
Expand Down
12 changes: 11 additions & 1 deletion forester/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ pub struct StartArgs {
#[arg(
long,
env = "WORK_ITEM_BATCH_SIZE",
value_parser = clap::value_parser!(usize).range(1..),
value_parser = parse_nonzero_usize,
help = "Number of queue items to process per batch cycle. Smaller values reduce blockhash expiry risk, larger values reduce per-batch overhead."
)]
pub work_item_batch_size: Option<usize>,
Expand Down Expand Up @@ -392,6 +392,16 @@ impl StartArgs {
}
}

fn parse_nonzero_usize(value: &str) -> Result<usize, String> {
let parsed = value
.parse::<usize>()
.map_err(|err| format!("invalid positive integer: {err}"))?;
if parsed == 0 {
return Err("value must be at least 1".to_string());
}
Ok(parsed)
}

impl StatusArgs {
pub fn enable_metrics(&self) -> bool {
self.push_gateway_url.is_some()
Expand Down
10 changes: 5 additions & 5 deletions forester/src/compressible/ctoken/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ use crate::{
pub struct CTokenCompressor<R: Rpc + Indexer> {
rpc_pool: Arc<SolanaRpcPool<R>>,
tracker: Arc<CTokenAccountTracker>,
payer_keypair: Keypair,
payer_keypair: Arc<Keypair>,
transaction_policy: TransactionPolicy,
}

impl<R: Rpc + Indexer> Clone for CTokenCompressor<R> {
fn clone(&self) -> Self {
Self {
rpc_pool: Arc::clone(&self.rpc_pool),
tracker: Arc::clone(&self.tracker),
payer_keypair: self.payer_keypair.insecure_clone(),
rpc_pool: self.rpc_pool.clone(),
tracker: self.tracker.clone(),
payer_keypair: self.payer_keypair.clone(),
transaction_policy: self.transaction_policy,
}
}
Expand All @@ -49,7 +49,7 @@ impl<R: Rpc + Indexer> CTokenCompressor<R> {
pub fn new(
rpc_pool: Arc<SolanaRpcPool<R>>,
tracker: Arc<CTokenAccountTracker>,
payer_keypair: Keypair,
payer_keypair: Arc<Keypair>,
transaction_policy: TransactionPolicy,
) -> Self {
Self {
Expand Down
10 changes: 7 additions & 3 deletions forester/src/compressible/ctoken/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,13 @@ impl CTokenAccountTracker {
/// Returns all tracked token accounts (not mints), ignoring compressible_slot.
/// Use `get_ready_to_compress(current_slot)` to get only accounts ready for compression.
pub fn get_all_token_accounts(&self) -> Vec<CTokenAccountState> {
self.get_ready_to_compress(u64::MAX)
.into_iter()
.filter(|state| state.account.is_token_account())
let pending = self.pending();
self.accounts()
.iter()
.filter(|entry| {
entry.value().account.is_token_account() && !pending.contains(entry.key())
})
.map(|entry| entry.value().clone())
.collect()
}

Expand Down
63 changes: 40 additions & 23 deletions forester/src/compressible/mint/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ use crate::{
pub struct MintCompressor<R: Rpc + Indexer> {
rpc_pool: Arc<SolanaRpcPool<R>>,
tracker: Arc<MintAccountTracker>,
payer_keypair: Keypair,
payer_keypair: Arc<Keypair>,
transaction_policy: TransactionPolicy,
}

impl<R: Rpc + Indexer> Clone for MintCompressor<R> {
fn clone(&self) -> Self {
Self {
rpc_pool: Arc::clone(&self.rpc_pool),
tracker: Arc::clone(&self.tracker),
payer_keypair: self.payer_keypair.insecure_clone(),
rpc_pool: self.rpc_pool.clone(),
tracker: self.tracker.clone(),
payer_keypair: self.payer_keypair.clone(),
transaction_policy: self.transaction_policy,
}
}
Expand All @@ -49,7 +49,7 @@ impl<R: Rpc + Indexer> MintCompressor<R> {
pub fn new(
rpc_pool: Arc<SolanaRpcPool<R>>,
tracker: Arc<MintAccountTracker>,
payer_keypair: Keypair,
payer_keypair: Arc<Keypair>,
transaction_policy: TransactionPolicy,
) -> Self {
Self {
Expand Down Expand Up @@ -133,21 +133,20 @@ impl<R: Rpc + Indexer> MintCompressor<R> {
/// Use this when you need fine-grained control over individual compressions.
pub async fn compress_batch_concurrent(
&self,
mint_states: &[MintAccountState],
pubkeys: &[Pubkey],
max_concurrent: usize,
cancelled: Arc<AtomicBool>,
) -> CompressionOutcomes<MintAccountState> {
if mint_states.is_empty() {
) -> CompressionOutcomes {
if pubkeys.is_empty() {
return Vec::new();
}

// Guard against max_concurrent == 0 to avoid buffer_unordered panic
if max_concurrent == 0 {
return mint_states
return pubkeys
.iter()
.cloned()
.map(|mint_state| CompressionOutcome::Failed {
state: mint_state,
.map(|&pubkey| CompressionOutcome::Failed {
pubkey,
error: CompressionTaskError::Failed(anyhow::anyhow!(
"max_concurrent must be > 0"
)),
Expand All @@ -156,30 +155,48 @@ impl<R: Rpc + Indexer> MintCompressor<R> {
}

// Mark all as pending upfront
let all_pubkeys: Vec<Pubkey> = mint_states.iter().map(|s| s.pubkey).collect();
self.tracker.mark_pending(&all_pubkeys);
self.tracker.mark_pending(pubkeys);

// Create futures for each mint
let compression_futures = mint_states.iter().cloned().map(|mint_state| {
let compression_futures = pubkeys.iter().copied().map(|pubkey| {
let compressor = self.clone();
let cancelled = cancelled.clone();
async move {
// Check cancellation before processing
if cancelled.load(Ordering::Relaxed) {
compressor.tracker.unmark_pending(&[mint_state.pubkey]);
compressor.tracker.unmark_pending(&[pubkey]);
return CompressionOutcome::Failed {
state: mint_state,
pubkey,
error: CompressionTaskError::Cancelled,
};
}

let mint_state = match compressor
.tracker
.accounts()
.get(&pubkey)
.map(|r| r.clone())
{
Some(state) => state,
None => {
compressor.tracker.unmark_pending(&[pubkey]);
return CompressionOutcome::Failed {
pubkey,
error: CompressionTaskError::Failed(anyhow::anyhow!(
"mint {} removed from tracker before compression",
pubkey
)),
};
}
};

match compressor.compress(&mint_state).await {
Ok(sig) => CompressionOutcome::Compressed {
signature: sig,
state: mint_state,
pubkey,
},
Err(e) => CompressionOutcome::Failed {
state: mint_state,
pubkey,
error: e.into(),
},
}
Expand All @@ -195,11 +212,11 @@ impl<R: Rpc + Indexer> MintCompressor<R> {
// Remove successfully compressed mints; unmark failed ones
for result in &results {
match result {
CompressionOutcome::Compressed { state, .. } => {
self.tracker.remove_compressed(&state.pubkey);
CompressionOutcome::Compressed { pubkey, .. } => {
self.tracker.remove_compressed(pubkey);
}
CompressionOutcome::Failed { state, .. } => {
self.tracker.unmark_pending(&[state.pubkey]);
CompressionOutcome::Failed { pubkey, .. } => {
self.tracker.unmark_pending(&[*pubkey]);
}
}
}
Expand Down
60 changes: 39 additions & 21 deletions forester/src/compressible/pda/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ pub struct CachedProgramConfig {
pub struct PdaCompressor<R: Rpc + Indexer> {
rpc_pool: Arc<SolanaRpcPool<R>>,
tracker: Arc<PdaAccountTracker>,
payer_keypair: Keypair,
payer_keypair: Arc<Keypair>,
transaction_policy: TransactionPolicy,
}

impl<R: Rpc + Indexer> Clone for PdaCompressor<R> {
fn clone(&self) -> Self {
Self {
rpc_pool: Arc::clone(&self.rpc_pool),
tracker: Arc::clone(&self.tracker),
payer_keypair: self.payer_keypair.insecure_clone(),
rpc_pool: self.rpc_pool.clone(),
tracker: self.tracker.clone(),
payer_keypair: self.payer_keypair.clone(),
transaction_policy: self.transaction_policy,
}
}
Expand All @@ -74,7 +74,7 @@ impl<R: Rpc + Indexer> PdaCompressor<R> {
pub fn new(
rpc_pool: Arc<SolanaRpcPool<R>>,
tracker: Arc<PdaAccountTracker>,
payer_keypair: Keypair,
payer_keypair: Arc<Keypair>,
transaction_policy: TransactionPolicy,
) -> Self {
Self {
Expand Down Expand Up @@ -156,22 +156,21 @@ impl<R: Rpc + Indexer> PdaCompressor<R> {
/// Successfully compressed accounts are removed from the tracker.
pub async fn compress_batch_concurrent(
&self,
account_states: &[PdaAccountState],
pubkeys: &[Pubkey],
program_config: &PdaProgramConfig,
cached_config: &CachedProgramConfig,
max_concurrent: usize,
cancelled: Arc<AtomicBool>,
) -> CompressionOutcomes<PdaAccountState> {
if account_states.is_empty() {
) -> CompressionOutcomes {
if pubkeys.is_empty() {
return Vec::new();
}

// Mark all accounts as pending upfront so concurrent cycles skip them
let all_pubkeys: Vec<Pubkey> = account_states.iter().map(|s| s.pubkey).collect();
self.tracker.mark_pending(&all_pubkeys);
self.tracker.mark_pending(pubkeys);

// Create futures for each account
let compression_futures = account_states.iter().cloned().map(|account_state| {
let compression_futures = pubkeys.iter().copied().map(|pubkey| {
let compressor = self.clone();
let program_config = program_config.clone();
let cached_config = cached_config.clone();
Expand All @@ -180,24 +179,43 @@ impl<R: Rpc + Indexer> PdaCompressor<R> {
async move {
// Check cancellation before processing
if cancelled.load(Ordering::Relaxed) {
// Unmark since we won't process this account
compressor.tracker.unmark_pending(&[account_state.pubkey]);
compressor.tracker.unmark_pending(&[pubkey]);
return CompressionOutcome::Failed {
state: account_state,
pubkey,
error: CompressionTaskError::Cancelled,
};
}

// Look up account state from tracker; it may have been removed
let account_state = match compressor
.tracker
.accounts()
.get(&pubkey)
.map(|r| r.clone())
{
Some(state) => state,
None => {
compressor.tracker.unmark_pending(&[pubkey]);
return CompressionOutcome::Failed {
pubkey,
error: CompressionTaskError::Failed(anyhow::anyhow!(
"account {} removed from tracker before compression",
pubkey
)),
};
}
};

match compressor
.compress(&account_state, &program_config, &cached_config)
.await
{
Ok(sig) => CompressionOutcome::Compressed {
signature: sig,
state: account_state,
pubkey,
},
Err(e) => CompressionOutcome::Failed {
state: account_state,
pubkey,
error: e.into(),
},
}
Expand All @@ -213,11 +231,11 @@ impl<R: Rpc + Indexer> PdaCompressor<R> {
// Remove successfully compressed PDAs; unmark failed ones
for result in &results {
match result {
CompressionOutcome::Compressed { state, .. } => {
self.tracker.remove_compressed(&state.pubkey);
CompressionOutcome::Compressed { pubkey, .. } => {
self.tracker.remove_compressed(pubkey);
}
CompressionOutcome::Failed { state, .. } => {
self.tracker.unmark_pending(&[state.pubkey]);
CompressionOutcome::Failed { pubkey, .. } => {
self.tracker.unmark_pending(&[*pubkey]);
}
}
}
Expand Down Expand Up @@ -396,7 +414,7 @@ impl<R: Rpc + Indexer> PdaCompressor<R> {
);

let payer_pubkey = self.payer_keypair.pubkey();
let signers = [&self.payer_keypair];
let signers = [self.payer_keypair.as_ref()];
let instructions = vec![ix];
let priority_fee_accounts = collect_priority_fee_accounts(payer_pubkey, &instructions);
let signature = send_transaction_with_policy(
Expand Down
Loading
Loading