diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index b0eed44e9..f8eab1525 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -29,6 +29,22 @@ jobs: - name: Run tests (Bitcoin mode, REST+Electrum) run: RUST_LOG=debug cargo test + test-bitcoin-28: + runs-on: ubuntu-22.04 + steps: + - run: sudo apt-get update && sudo apt-get install libfuse2 + - uses: actions/checkout@v4 + - uses: dtolnay/rust-toolchain@1.75.0 + - uses: Swatinem/rust-cache@v2 + - name: Download bitcoind 28.0 + run: | + curl -sSL https://bitcoincore.org/bin/bitcoin-core-28.0/bitcoin-28.0-x86_64-linux-gnu.tar.gz | tar -xz + chmod +x bitcoin-28.0/bin/bitcoind + - name: Run tests (Bitcoin 28.0, REST+Electrum) + run: RUST_LOG=debug cargo test --features bitcoind_28_0 + env: + BITCOIND_EXE: ${{ github.workspace }}/bitcoin-28.0/bin/bitcoind + test-electrum-raw: runs-on: ubuntu-22.04 steps: diff --git a/Cargo.lock b/Cargo.lock index 7d625d04f..ad6152360 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -919,6 +919,7 @@ dependencies = [ "opentelemetry-semantic-conventions", "page_size", "prometheus", + "rand 0.9.1", "rayon", "rocksdb", "rust-crypto", @@ -1038,7 +1039,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1639,7 +1640,7 @@ checksum = "e19b23d53f35ce9f56aebc7d1bb4e6ac1e9c0db7ac85c8d1760c04379edced37" dependencies = [ "hermit-abi 0.4.0", "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1753,7 +1754,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -2363,10 +2364,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", + "rand_chacha 0.3.1", "rand_core 0.6.4", ] +[[package]] +name = "rand" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -2377,6 +2388,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", +] + [[package]] name = "rand_core" version = "0.3.1" @@ -2401,6 +2422,15 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.1", +] + [[package]] name = "rayon" version = "1.10.0" @@ -2620,7 +2650,7 @@ dependencies = [ "errno 0.3.10", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3170,7 +3200,7 @@ dependencies = [ "getrandom 0.3.1", "once_cell", "rustix", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3927,7 +3957,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a0ed15da7..c02915686 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,10 +6,9 @@ version = "0.4.1" authors = ["Roman Zeyde "] description = "An efficient re-implementation of Electrum Server in Rust" license = "MIT" -homepage = "https://github.com/romanz/electrs" -repository = "https://github.com/romanz/electrs" +homepage = "https://github.com/blockstream/electrs" +repository = "https://github.com/blockstream/electrs" keywords = ["bitcoin", "electrum", "server", "index", "database"] -documentation = "https://docs.rs/electrs/" readme = "README.md" edition = "2018" default-run = "electrs" @@ -27,13 +26,14 @@ otlp-tracing = [ "opentelemetry-semantic-conventions", "electrs_macros/otlp-tracing" ] +bitcoind_28_0 = [] [dependencies] arraydeque = "0.5.1" arrayref = "0.3.6" base64 = "0.22" bincode = "1.3.1" -bitcoin = { version = "0.32", features = ["serde"] } +bitcoin = { version = "0.32.4", features = ["serde"] } clap = "2.33.3" crossbeam-channel = "0.5.0" dirs = "5.0.1" @@ -70,6 +70,7 @@ opentelemetry-otlp = { version = "0.13.0", default-features = false, features = tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt"], optional = true } opentelemetry-semantic-conventions = { version = "0.12.0", optional = true } tracing = { version = "0.1.40", default-features = false, features = ["attributes"], optional = true } +rand = "0.9.1" # optional dependencies for electrum-discovery electrum-client = { version = "0.8", optional = true } diff --git a/doc/usage.md b/doc/usage.md index 2bc36b23e..5d4dba54c 100644 --- a/doc/usage.md +++ b/doc/usage.md @@ -7,7 +7,7 @@ and [latest Electrum wallet](https://electrum.org/#download) (3.2+). Also, install the following packages (on Debian): ```bash $ sudo apt update -$ sudo apt install clang cmake # for building 'rust-rocksdb' +$ sudo apt install clang cmake build-essential # for building 'rust-rocksdb' ``` ## Build diff --git a/electrs_macros/src/lib.rs b/electrs_macros/src/lib.rs index c8cb8447e..abe44fcd0 100644 --- a/electrs_macros/src/lib.rs +++ b/electrs_macros/src/lib.rs @@ -1,6 +1,5 @@ use proc_macro::TokenStream; - #[proc_macro_attribute] #[cfg(feature = "otlp-tracing")] pub fn trace(attr: TokenStream, item: TokenStream) -> TokenStream { @@ -32,4 +31,4 @@ pub fn trace(attr: TokenStream, item: TokenStream) -> TokenStream { #[cfg(not(feature = "otlp-tracing"))] pub fn trace(_attr: TokenStream, item: TokenStream) -> TokenStream { item -} \ No newline at end of file +} diff --git a/flake.lock b/flake.lock index 6d1fa8e96..253916520 100644 --- a/flake.lock +++ b/flake.lock @@ -1,17 +1,12 @@ { "nodes": { "crane": { - "inputs": { - "nixpkgs": [ - "nixpkgs" - ] - }, "locked": { - "lastModified": 1711586303, - "narHash": "sha256-iZDHWTqQj6z6ccqTSEOPOxQ8KMFAemInUObN2R9vHSs=", + "lastModified": 1754269165, + "narHash": "sha256-0tcS8FHd4QjbCVoxN9jI+PjHgA4vc/IjkUSp+N3zy0U=", "owner": "ipetkov", "repo": "crane", - "rev": "a329cd00398379c62e76fc3b8d4ec2934260d636", + "rev": "444e81206df3f7d92780680e45858e31d2f07a08", "type": "github" }, "original": { @@ -25,11 +20,11 @@ "systems": "systems" }, "locked": { - "lastModified": 1710146030, - "narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=", + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", "owner": "numtide", "repo": "flake-utils", - "rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", "type": "github" }, "original": { @@ -40,11 +35,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1711523803, - "narHash": "sha256-UKcYiHWHQynzj6CN/vTcix4yd1eCu1uFdsuarupdCQQ=", + "lastModified": 1744463964, + "narHash": "sha256-LWqduOgLHCFxiTNYi3Uj5Lgz0SR+Xhw3kr/3Xd0GPTM=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "2726f127c15a4cc9810843b96cad73c7eb39e443", + "rev": "2631b0b7abcea6e640ce31cd78ea58910d31e650", "type": "github" }, "original": { @@ -64,19 +59,16 @@ }, "rust-overlay": { "inputs": { - "flake-utils": [ - "flake-utils" - ], "nixpkgs": [ "nixpkgs" ] }, "locked": { - "lastModified": 1711592024, - "narHash": "sha256-oD4OJ3TRmVrbAuKZWxElRCyCagNCDuhfw2exBmNOy48=", + "lastModified": 1754966322, + "narHash": "sha256-7f/LH60DnjjQVKbXAsHIniGaU7ixVM7eWU3hyjT24YI=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "aa858717377db2ed8ffd2d44147d907baee656e5", + "rev": "7c13cec2e3828d964b9980d0ffd680bd8d4dce90", "type": "github" }, "original": { diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index 42f5cf024..59f957ae5 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -6,10 +6,11 @@ extern crate electrs; use crossbeam_channel::{self as channel}; use error_chain::ChainedError; -use std::process; +use std::{env, process, thread}; use std::sync::{Arc, RwLock}; use std::time::Duration; - +use bitcoin::hex::DisplayHex; +use rand::{rng, RngCore}; use electrs::{ config::Config, daemon::Daemon, @@ -28,6 +29,9 @@ use electrs::otlp_trace; use electrs::elements::AssetRegistry; use electrs::metrics::MetricOpts; +/// Default salt rotation interval in seconds (24 hours) +const DEFAULT_SALT_ROTATION_INTERVAL_SECS: u64 = 24 * 3600; + fn fetch_from(config: &Config, store: &Store) -> FetchFrom { let mut jsonrpc_import = config.jsonrpc_import; if !jsonrpc_import { @@ -44,7 +48,7 @@ fn fetch_from(config: &Config, store: &Store) -> FetchFrom { } } -fn run_server(config: Arc) -> Result<()> { +fn run_server(config: Arc, salt_rwlock: Arc>) -> Result<()> { let (block_hash_notify, block_hash_receive) = channel::bounded(1); let signal = Waiter::start(block_hash_receive); let metrics = Metrics::new(config.monitoring_addr); @@ -64,7 +68,7 @@ fn run_server(config: Arc) -> Result<()> { signal.clone(), &metrics, )?); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config)); + let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); let mut indexer = Indexer::open( Arc::clone(&store), fetch_from(&config, &store), @@ -116,7 +120,12 @@ fn run_server(config: Arc) -> Result<()> { // TODO: configuration for which servers to start let rest_server = rest::start(Arc::clone(&config), Arc::clone(&query)); - let electrum_server = ElectrumRPC::start(Arc::clone(&config), Arc::clone(&query), &metrics); + let electrum_server = ElectrumRPC::start( + Arc::clone(&config), + Arc::clone(&query), + &metrics, + Arc::clone(&salt_rwlock), + ); let main_loop_count = metrics.gauge(MetricOpts::new( "electrs_main_loop_count", @@ -151,9 +160,50 @@ fn run_server(config: Arc) -> Result<()> { Ok(()) } +fn generate_salt() -> String { + let mut random_bytes = [0u8; 32]; + rng().fill_bytes(&mut random_bytes); + random_bytes.to_lower_hex_string() +} + +fn rotate_salt(salt: &mut String) { + *salt = generate_salt(); +} + +fn get_salt_rotation_interval() -> Duration { + let var_name = "SALT_ROTATION_INTERVAL_SECS"; + let secs = env::var(var_name) + .ok() + .and_then(|val| val.parse::().ok()) + .unwrap_or(DEFAULT_SALT_ROTATION_INTERVAL_SECS); + + Duration::from_secs(secs) +} + +fn spawn_salt_rotation_thread() -> Arc> { + let salt = generate_salt(); + let salt_rwlock = Arc::new(RwLock::new(salt)); + let writer_arc = Arc::clone(&salt_rwlock); + let interval = get_salt_rotation_interval(); + + thread::spawn(move || { + loop { + thread::sleep(interval); // 24 hours + { + let mut guard = writer_arc.write().unwrap(); + rotate_salt(&mut *guard); + info!("Salt rotated"); + } + } + }); + salt_rwlock +} + fn main_() { + let salt_rwlock = spawn_salt_rotation_thread(); + let config = Arc::new(Config::from_args()); - if let Err(e) = run_server(config) { + if let Err(e) = run_server(config, Arc::clone(&salt_rwlock)) { error!("server failed: {}", e.display_chain()); process::exit(1); } diff --git a/src/bin/popular-scripts.rs b/src/bin/popular-scripts.rs index ef550fce5..a7b245817 100644 --- a/src/bin/popular-scripts.rs +++ b/src/bin/popular-scripts.rs @@ -2,14 +2,13 @@ extern crate electrs; use bitcoin::hex::DisplayHex; use electrs::{ - config::Config, - new_index::{Store, TxHistoryKey}, - util::bincode, + config::Config, metrics::Metrics, new_index::{Store, TxHistoryKey}, util::bincode }; fn main() { let config = Config::from_args(); - let store = Store::open(&config.db_path.join("newindex"), &config); + let metrics = Metrics::new(config.monitoring_addr); + let store = Store::open(&config.db_path.join("newindex"), &config, &metrics); let mut iter = store.history_db().raw_iterator(); iter.seek(b"H"); diff --git a/src/bin/tx-fingerprint-stats.rs b/src/bin/tx-fingerprint-stats.rs index 9073faec8..83b3f213a 100644 --- a/src/bin/tx-fingerprint-stats.rs +++ b/src/bin/tx-fingerprint-stats.rs @@ -23,7 +23,8 @@ fn main() { let signal = Waiter::start(crossbeam_channel::never()); let config = Config::from_args(); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config)); + let metrics = Metrics::new(config.monitoring_addr); + let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); let metrics = Metrics::new(config.monitoring_addr); metrics.start(); diff --git a/src/chain.rs b/src/chain.rs index 16ebe75da..5930f8149 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -29,6 +29,8 @@ pub enum Network { #[cfg(not(feature = "liquid"))] Testnet, #[cfg(not(feature = "liquid"))] + Testnet4, + #[cfg(not(feature = "liquid"))] Regtest, #[cfg(not(feature = "liquid"))] Signet, @@ -97,6 +99,7 @@ impl Network { return vec![ "mainnet".to_string(), "testnet".to_string(), + "testnet4".to_string(), "regtest".to_string(), "signet".to_string(), ]; @@ -123,6 +126,8 @@ pub fn bitcoin_genesis_hash(network: BNetwork) -> bitcoin::BlockHash { genesis_block(BNetwork::Bitcoin).block_hash(); static ref TESTNET_GENESIS: bitcoin::BlockHash = genesis_block(BNetwork::Testnet).block_hash(); + static ref TESTNET4_GENESIS: bitcoin::BlockHash = + genesis_block(BNetwork::Testnet4).block_hash(); static ref REGTEST_GENESIS: bitcoin::BlockHash = genesis_block(BNetwork::Regtest).block_hash(); static ref SIGNET_GENESIS: bitcoin::BlockHash = @@ -131,6 +136,7 @@ pub fn bitcoin_genesis_hash(network: BNetwork) -> bitcoin::BlockHash { match network { BNetwork::Bitcoin => *BITCOIN_GENESIS, BNetwork::Testnet => *TESTNET_GENESIS, + BNetwork::Testnet4 => *TESTNET4_GENESIS, BNetwork::Regtest => *REGTEST_GENESIS, BNetwork::Signet => *SIGNET_GENESIS, _ => panic!("unknown network {:?}", network), @@ -165,6 +171,8 @@ impl From<&str> for Network { #[cfg(not(feature = "liquid"))] "testnet" => Network::Testnet, #[cfg(not(feature = "liquid"))] + "testnet4" => Network::Testnet4, + #[cfg(not(feature = "liquid"))] "regtest" => Network::Regtest, #[cfg(not(feature = "liquid"))] "signet" => Network::Signet, @@ -187,6 +195,7 @@ impl From for BNetwork { match network { Network::Bitcoin => BNetwork::Bitcoin, Network::Testnet => BNetwork::Testnet, + Network::Testnet4 => BNetwork::Testnet4, Network::Regtest => BNetwork::Regtest, Network::Signet => BNetwork::Signet, } @@ -199,6 +208,7 @@ impl From for Network { match network { BNetwork::Bitcoin => Network::Bitcoin, BNetwork::Testnet => Network::Testnet, + BNetwork::Testnet4 => Network::Testnet4, BNetwork::Regtest => Network::Regtest, BNetwork::Signet => Network::Signet, _ => panic!("unknown network {:?}", network), diff --git a/src/config.rs b/src/config.rs index d083e988f..d23128e91 100644 --- a/src/config.rs +++ b/src/config.rs @@ -40,7 +40,7 @@ pub struct Config { pub utxos_limit: usize, pub electrum_txs_limit: usize, pub electrum_banner: String, - pub electrum_rpc_logging: Option, + pub rpc_logging: RpcLogging, pub zmq_addr: Option, /// Enable compaction during initial sync @@ -49,6 +49,24 @@ pub struct Config { /// however, this requires much more disk space. pub initial_sync_compaction: bool, + /// RocksDB block cache size in MB (per database) + /// Caches decompressed blocks in memory to avoid repeated decompression (CPU intensive) + /// Total memory usage = cache_size * 3_databases (txstore, history, cache) + /// Recommendation: Start with 1024MB for production + /// Higher values reduce CPU load from cache misses but use more RAM + pub db_block_cache_mb: usize, + + /// RocksDB parallelism level (background compaction and flush threads) + /// Recommendation: Set to number of CPU cores for optimal performance + /// This configures max_background_jobs and thread pools automatically + pub db_parallelism: usize, + + /// RocksDB write buffer size in MB (per database) + /// Each database uses this much RAM for in-memory writes before flushing to disk + /// Total RAM usage = write_buffer_size * max_write_buffer_number * 3_databases + /// Larger buffers = fewer flushes (less CPU) but more RAM usage + pub db_write_buffer_size_mb: usize, + #[cfg(feature = "liquid")] pub parent_network: BNetwork, #[cfg(feature = "liquid")] @@ -74,10 +92,6 @@ fn str_to_socketaddr(address: &str, what: &str) -> SocketAddr { impl Config { pub fn from_args() -> Config { let network_help = format!("Select network type ({})", Network::names().join(", ")); - let rpc_logging_help = format!( - "Select RPC logging option ({})", - RpcLogging::options().join(", ") - ); let args = App::new("Electrum Rust Server") .version(crate_version!()) @@ -125,19 +139,19 @@ impl Config { .arg( Arg::with_name("electrum_rpc_addr") .long("electrum-rpc-addr") - .help("Electrum server JSONRPC 'addr:port' to listen on (default: '127.0.0.1:50001' for mainnet, '127.0.0.1:60001' for testnet and '127.0.0.1:60401' for regtest)") + .help("Electrum server JSONRPC 'addr:port' to listen on (default: '127.0.0.1:50001' for mainnet, '127.0.0.1:60001' for testnet3, '127.0.0.1:40001' for testnet4 and '127.0.0.1:60401' for regtest)") .takes_value(true), ) .arg( Arg::with_name("http_addr") .long("http-addr") - .help("HTTP server 'addr:port' to listen on (default: '127.0.0.1:3000' for mainnet, '127.0.0.1:3001' for testnet and '127.0.0.1:3002' for regtest)") + .help("HTTP server 'addr:port' to listen on (default: '127.0.0.1:3000' for mainnet, '127.0.0.1:3001' for testnet3 and '127.0.0.1:3004' for testnet4 and '127.0.0.1:3002' for regtest)") .takes_value(true), ) .arg( Arg::with_name("daemon_rpc_addr") .long("daemon-rpc-addr") - .help("Bitcoin daemon JSONRPC 'addr:port' to connect (default: 127.0.0.1:8332 for mainnet, 127.0.0.1:18332 for testnet and 127.0.0.1:18443 for regtest)") + .help("Bitcoin daemon JSONRPC 'addr:port' to connect (default: 127.0.0.1:8332 for mainnet, 127.0.0.1:18332 for testnet3 and 127.0.0.1:48332 for testnet4 and 127.0.0.1:18443 for regtest)") .takes_value(true), ) .arg( @@ -149,7 +163,7 @@ impl Config { .arg( Arg::with_name("monitoring_addr") .long("monitoring-addr") - .help("Prometheus monitoring 'addr:port' to listen on (default: 127.0.0.1:4224 for mainnet, 127.0.0.1:14224 for testnet and 127.0.0.1:24224 for regtest)") + .help("Prometheus monitoring 'addr:port' to listen on (default: 127.0.0.1:4224 for mainnet, 127.0.0.1:14224 for testnet3 and 127.0.0.1:44224 for testnet4 and 127.0.0.1:24224 for regtest)") .takes_value(true), ) .arg( @@ -201,15 +215,43 @@ impl Config { .help("Welcome banner for the Electrum server, shown in the console to clients.") .takes_value(true) ).arg( - Arg::with_name("electrum_rpc_logging") - .long("electrum-rpc-logging") - .help(&rpc_logging_help) - .takes_value(true), + Arg::with_name("enable_json_rpc_logging") + .long("enable-json-rpc-logging") + .help("turns on rpc logging") + .takes_value(false) + ).arg( + Arg::with_name("hide_json_rpc_logging_parameters") + .long("hide-json-rpc-logging-parameters") + .help("disables parameter printing in rpc logs") + .takes_value(false) + ).arg( + Arg::with_name("anonymize_json_rpc_logging_source_ip") + .long("anonymize-json-rpc-logging-source-ip") + .help("enables ip anonymization in rpc logs") + .takes_value(false) ).arg( Arg::with_name("initial_sync_compaction") .long("initial-sync-compaction") .help("Perform compaction during initial sync (slower but less disk space required)") ).arg( + Arg::with_name("db_block_cache_mb") + .long("db-block-cache-mb") + .help("RocksDB block cache size in MB per database") + .takes_value(true) + .default_value("8") + ).arg( + Arg::with_name("db_parallelism") + .long("db-parallelism") + .help("RocksDB parallelism level. Set to number of CPU cores for optimal performance") + .takes_value(true) + .default_value("2") + ).arg( + Arg::with_name("db_write_buffer_size_mb") + .long("db-write-buffer-size-mb") + .help("RocksDB write buffer size in MB per database. RAM usage = size * max_write_buffers(2) * 3_databases") + .takes_value(true) + .default_value("256") + ).arg( Arg::with_name("zmq_addr") .long("zmq-addr") .help("Optional zmq socket address of the bitcoind daemon") @@ -282,6 +324,8 @@ impl Config { #[cfg(not(feature = "liquid"))] Network::Testnet => 18332, #[cfg(not(feature = "liquid"))] + Network::Testnet4 => 48332, + #[cfg(not(feature = "liquid"))] Network::Regtest => 18443, #[cfg(not(feature = "liquid"))] Network::Signet => 38332, @@ -297,6 +341,8 @@ impl Config { #[cfg(not(feature = "liquid"))] Network::Testnet => 60001, #[cfg(not(feature = "liquid"))] + Network::Testnet4 => 40001, + #[cfg(not(feature = "liquid"))] Network::Regtest => 60401, #[cfg(not(feature = "liquid"))] Network::Signet => 60601, @@ -314,6 +360,8 @@ impl Config { #[cfg(not(feature = "liquid"))] Network::Testnet => 3001, #[cfg(not(feature = "liquid"))] + Network::Testnet4 => 3004, + #[cfg(not(feature = "liquid"))] Network::Regtest => 3002, #[cfg(not(feature = "liquid"))] Network::Signet => 3003, @@ -331,6 +379,8 @@ impl Config { #[cfg(not(feature = "liquid"))] Network::Testnet => 14224, #[cfg(not(feature = "liquid"))] + Network::Testnet4 => 44224, + #[cfg(not(feature = "liquid"))] Network::Regtest => 24224, #[cfg(not(feature = "liquid"))] Network::Signet => 54224, @@ -419,9 +469,15 @@ impl Config { electrum_rpc_addr, electrum_txs_limit: value_t_or_exit!(m, "electrum_txs_limit", usize), electrum_banner, - electrum_rpc_logging: m - .value_of("electrum_rpc_logging") - .map(|option| RpcLogging::from(option)), + rpc_logging: { + let params = RpcLogging { + enabled: m.is_present("enable_json_rpc_logging"), + hide_params: m.is_present("hide_json_rpc_logging_parameters"), + anonymize_ip: m.is_present("anonymize_json_rpc_logging_source_ip"), + }; + params.validate(); + params + }, http_addr, http_socket_file, monitoring_addr, @@ -432,6 +488,9 @@ impl Config { cors: m.value_of("cors").map(|s| s.to_string()), precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()), initial_sync_compaction: m.is_present("initial_sync_compaction"), + db_block_cache_mb: value_t_or_exit!(m, "db_block_cache_mb", usize), + db_parallelism: value_t_or_exit!(m, "db_parallelism", usize), + db_write_buffer_size_mb: value_t_or_exit!(m, "db_write_buffer_size_mb", usize), zmq_addr, #[cfg(feature = "liquid")] @@ -463,25 +522,17 @@ impl Config { } } -#[derive(Debug, Clone)] -pub enum RpcLogging { - Full, - NoParams, +#[derive(Debug, Default, Clone)] +pub struct RpcLogging { + pub enabled: bool, + pub hide_params: bool, + pub anonymize_ip: bool, } impl RpcLogging { - pub fn options() -> Vec { - return vec!["full".to_string(), "no-params".to_string()]; - } -} - -impl From<&str> for RpcLogging { - fn from(option: &str) -> Self { - match option { - "full" => RpcLogging::Full, - "no-params" => RpcLogging::NoParams, - - _ => panic!("unsupported RPC logging option: {:?}", option), + pub fn validate(&self) { + if !self.enabled && (self.hide_params || self.anonymize_ip) { + panic!("Flags '--hide-json-rpc-logging-parameters' or '--anonymize-json-rpc-logging-source-ip' require '--enable-json-rpc-logging'"); } } } @@ -493,6 +544,8 @@ pub fn get_network_subdir(network: Network) -> Option<&'static str> { #[cfg(not(feature = "liquid"))] Network::Testnet => Some("testnet3"), #[cfg(not(feature = "liquid"))] + Network::Testnet4 => Some("testnet4"), + #[cfg(not(feature = "liquid"))] Network::Regtest => Some("regtest"), #[cfg(not(feature = "liquid"))] Network::Signet => Some("signet"), diff --git a/src/daemon.rs b/src/daemon.rs index 84b93ac03..1038d3ebc 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -141,6 +141,34 @@ struct NetworkInfo { relayfee: f64, // in BTC/kB } +#[derive(Serialize, Deserialize, Debug)] +struct MempoolFeesSubmitPackage { + base: f64, + #[serde(rename = "effective-feerate")] + effective_feerate: Option, + #[serde(rename = "effective-includes")] + effective_includes: Option>, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct SubmitPackageResult { + package_msg: String, + #[serde(rename = "tx-results")] + tx_results: HashMap, + #[serde(rename = "replaced-transactions")] + replaced_transactions: Option>, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct TxResult { + txid: String, + #[serde(rename = "other-wtxid")] + other_wtxid: Option, + vsize: Option, + fees: Option, + error: Option, +} + pub trait CookieGetter: Send + Sync { fn get(&self) -> Result>; } @@ -366,7 +394,14 @@ impl Daemon { loop { let info = daemon.getblockchaininfo()?; - if !info.initialblockdownload.unwrap_or(false) && info.blocks == info.headers { + // NOTE: initial_block_download is always true on regtest, so we check + // verificationprogress instead + let initial_block_download = match network { + Network::Regtest => info.verificationprogress < 1.0, + _ => info.initialblockdownload.unwrap_or(false), + }; + + if !initial_block_download && info.blocks == info.headers { break; } @@ -611,7 +646,7 @@ impl Daemon { /// Fetch the given transactions in parallel over multiple threads and RPC connections, /// ignoring any missing ones and returning whatever is available. #[trace] - pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { + pub fn gettransactions_available(&self, txids: &[&Txid]) -> Result> { const RPC_INVALID_ADDRESS_OR_KEY: i64 = -5; let params_list: Vec = txids @@ -671,6 +706,25 @@ impl Daemon { ) } + pub fn submit_package( + &self, + txhex: Vec, + maxfeerate: Option, + maxburnamount: Option, + ) -> Result { + let params = match (maxfeerate, maxburnamount) { + (Some(rate), Some(burn)) => { + json!([txhex, format!("{:.8}", rate), format!("{:.8}", burn)]) + } + (Some(rate), None) => json!([txhex, format!("{:.8}", rate)]), + (None, Some(burn)) => json!([txhex, null, format!("{:.8}", burn)]), + (None, None) => json!([txhex]), + }; + let result = self.request("submitpackage", params)?; + serde_json::from_value::(result) + .chain_err(|| "invalid submitpackage reply") + } + // Get estimated feerates for the provided confirmation targets using a batch RPC request // Missing estimates are logged but do not cause a failure, whatever is available is returned #[allow(clippy::float_cmp)] diff --git a/src/electrum/server.rs b/src/electrum/server.rs index f3ae3db39..ea5579699 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::io::{BufRead, BufReader, Write}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::sync::mpsc::{self, Receiver, Sender, SyncSender, TrySendError}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread; use std::time::Instant; @@ -19,7 +19,6 @@ use electrs_macros::trace; use bitcoin::consensus::encode::serialize_hex; #[cfg(feature = "liquid")] use elements::encode::serialize_hex; - use crate::chain::Txid; use crate::config::{Config, RpcLogging}; use crate::electrum::{get_electrum_height, ProtocolVersion}; @@ -95,7 +94,7 @@ fn get_status_hash(txs: Vec<(Txid, Option)>, query: &Query) -> Option { - if $self.rpc_logging.is_some() { + if $self.rpc_logging.enabled { $self.log_rpc_event($event); } }; @@ -112,7 +111,8 @@ struct Connection { txs_limit: usize, #[cfg(feature = "electrum-discovery")] discovery: Option>, - rpc_logging: Option, + rpc_logging: RpcLogging, + salt: String, } impl Connection { @@ -124,7 +124,8 @@ impl Connection { stats: Arc, txs_limit: usize, #[cfg(feature = "electrum-discovery")] discovery: Option>, - rpc_logging: Option, + rpc_logging: RpcLogging, + salt: String, ) -> Connection { Connection { query, @@ -138,6 +139,7 @@ impl Connection { #[cfg(feature = "electrum-discovery")] discovery, rpc_logging, + salt, } } @@ -525,11 +527,25 @@ impl Connection { Ok(result) } + fn hash_ip_with_salt(&self, ip: &str) -> String { + let mut hasher = Sha256::new(); + hasher.input(self.salt.as_bytes()); + hasher.input(ip.as_bytes()); + hasher.result_str() + } + fn log_rpc_event(&self, mut log: Value) { + let real_ip = self.addr.ip().to_string(); + let ip_to_log = if self.rpc_logging.anonymize_ip { + self.hash_ip_with_salt(&real_ip) + } else { + real_ip + }; + log.as_object_mut().unwrap().insert( "source".into(), json!({ - "ip": self.addr.ip().to_string(), + "ip": ip_to_log, "port": self.addr.port(), }), ); @@ -594,28 +610,20 @@ impl Connection { cmd.get("id"), ) { (Some(&Value::String(ref method)), &Value::Array(ref params), Some(ref id)) => { - conditionally_log_rpc_event!( - self, - json!({ - "event": "rpc request", - "id": id, - "method": method, - "params": if let Some(RpcLogging::Full) = self.rpc_logging { - json!(params) - } else { - Value::Null - } - }) - ); - let reply = self.handle_command(method, params, id)?; conditionally_log_rpc_event!( self, json!({ - "event": "rpc response", + "event": "rpc_response", "method": method, - "payload_size": reply.to_string().as_bytes().len(), + "params": if self.rpc_logging.hide_params { + Value::Null + } else { + json!(params) + }, + "request_size": serde_json::to_vec(&cmd).map(|v| v.len()).unwrap_or(0), + "response_size": reply.to_string().as_bytes().len(), "duration_micros": start_time.elapsed().as_micros(), "id": id, }) @@ -666,7 +674,7 @@ impl Connection { pub fn run(mut self, receiver: Receiver) { self.stats.clients.inc(); - conditionally_log_rpc_event!(self, json!({ "event": "connection established" })); + conditionally_log_rpc_event!(self, json!({ "event": "connection_established" })); let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); let sender = self.sender.clone(); @@ -684,7 +692,7 @@ impl Connection { .sub(self.status_hashes.len() as i64); debug!("[{}] shutting down connection", self.addr); - conditionally_log_rpc_event!(self, json!({ "event": "connection closed" })); + conditionally_log_rpc_event!(self, json!({ "event": "connection_closed" })); let _ = self.stream.shutdown(Shutdown::Both); if let Err(err) = child.join().expect("receiver panicked") { @@ -787,7 +795,12 @@ impl RPC { chan } - pub fn start(config: Arc, query: Arc, metrics: &Metrics) -> RPC { + pub fn start( + config: Arc, + query: Arc, + metrics: &Metrics, + salt_rwlock: Arc> + ) -> RPC { let stats = Arc::new(Stats { latency: metrics.histogram_vec( HistogramOpts::new("electrum_rpc", "Electrum RPC latency (seconds)"), @@ -847,13 +860,15 @@ impl RPC { let query = Arc::clone(&query); let stats = Arc::clone(&stats); let garbage_sender = garbage_sender.clone(); - let rpc_logging = config.electrum_rpc_logging.clone(); + let rpc_logging = config.rpc_logging.clone(); #[cfg(feature = "electrum-discovery")] let discovery = discovery.clone(); let (sender, receiver) = mpsc::sync_channel(10); senders.lock().unwrap().push(sender.clone()); + let salt = salt_rwlock.read().unwrap().clone(); + let spawned = spawn_thread("peer", move || { info!("[{}] connected peer", addr); let conn = Connection::new( @@ -866,6 +881,7 @@ impl RPC { #[cfg(feature = "electrum-discovery")] discovery, rpc_logging, + salt, ); conn.run(receiver); info!("[{}] disconnected peer", addr); diff --git a/src/new_index/db.rs b/src/new_index/db.rs index 20db12f4c..e889aad63 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -1,9 +1,15 @@ +use prometheus::GaugeVec; use rocksdb; +use std::convert::TryInto; use std::path::Path; +use std::sync::Arc; +use std::thread; +use std::time::Duration; use crate::config::Config; -use crate::util::{bincode, Bytes}; +use crate::new_index::db_metrics::RocksDbMetrics; +use crate::util::{bincode, spawn_thread, Bytes}; static DB_VERSION: u32 = 1; @@ -71,7 +77,7 @@ impl<'a> Iterator for ReverseScanIterator<'a> { #[derive(Debug)] pub struct DB { - db: rocksdb::DB, + db: Arc, } #[derive(Copy, Clone, Debug)] @@ -89,18 +95,29 @@ impl DB { db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); db_opts.set_target_file_size_base(1_073_741_824); - db_opts.set_write_buffer_size(256 << 20); db_opts.set_disable_auto_compactions(!config.initial_sync_compaction); // for initial bulk load + + let parallelism: i32 = config.db_parallelism.try_into() + .expect("db_parallelism value too large for i32"); + + // Configure parallelism (background jobs and thread pools) + db_opts.increase_parallelism(parallelism); + + // Configure write buffer size (not set by increase_parallelism) + db_opts.set_write_buffer_size(config.db_write_buffer_size_mb * 1024 * 1024); + // db_opts.set_advise_random_on_open(???); db_opts.set_compaction_readahead_size(1 << 20); - db_opts.increase_parallelism(2); - // let mut block_opts = rocksdb::BlockBasedOptions::default(); - // block_opts.set_block_size(???); + // Configure block cache + let mut block_opts = rocksdb::BlockBasedOptions::default(); + let cache_size_bytes = config.db_block_cache_mb * 1024 * 1024; + block_opts.set_block_cache(&rocksdb::Cache::new_lru_cache(cache_size_bytes)); + db_opts.set_block_based_table_factory(&block_opts); let db = DB { - db: rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB"), + db: Arc::new(rocksdb::DB::open(&db_opts, path).expect("failed to open RocksDB")) }; db.verify_compatibility(config); db @@ -220,4 +237,54 @@ impl DB { Some(_) => (), } } + + pub fn start_stats_exporter(&self, db_metrics: Arc, db_name: &str) { + let db_arc = Arc::clone(&self.db); + let label = db_name.to_string(); + + let update_gauge = move |gauge: &GaugeVec, property: &str| { + if let Ok(Some(value)) = db_arc.property_value(property) { + if let Ok(v) = value.parse::() { + gauge.with_label_values(&[&label]).set(v); + } + } + }; + + spawn_thread("db_stats_exporter", move || loop { + update_gauge(&db_metrics.num_immutable_mem_table, "rocksdb.num-immutable-mem-table"); + update_gauge(&db_metrics.mem_table_flush_pending, "rocksdb.mem-table-flush-pending"); + update_gauge(&db_metrics.compaction_pending, "rocksdb.compaction-pending"); + update_gauge(&db_metrics.background_errors, "rocksdb.background-errors"); + update_gauge(&db_metrics.cur_size_active_mem_table, "rocksdb.cur-size-active-mem-table"); + update_gauge(&db_metrics.cur_size_all_mem_tables, "rocksdb.cur-size-all-mem-tables"); + update_gauge(&db_metrics.size_all_mem_tables, "rocksdb.size-all-mem-tables"); + update_gauge(&db_metrics.num_entries_active_mem_table, "rocksdb.num-entries-active-mem-table"); + update_gauge(&db_metrics.num_entries_imm_mem_tables, "rocksdb.num-entries-imm-mem-tables"); + update_gauge(&db_metrics.num_deletes_active_mem_table, "rocksdb.num-deletes-active-mem-table"); + update_gauge(&db_metrics.num_deletes_imm_mem_tables, "rocksdb.num-deletes-imm-mem-tables"); + update_gauge(&db_metrics.estimate_num_keys, "rocksdb.estimate-num-keys"); + update_gauge(&db_metrics.estimate_table_readers_mem, "rocksdb.estimate-table-readers-mem"); + update_gauge(&db_metrics.is_file_deletions_enabled, "rocksdb.is-file-deletions-enabled"); + update_gauge(&db_metrics.num_snapshots, "rocksdb.num-snapshots"); + update_gauge(&db_metrics.oldest_snapshot_time, "rocksdb.oldest-snapshot-time"); + update_gauge(&db_metrics.num_live_versions, "rocksdb.num-live-versions"); + update_gauge(&db_metrics.current_super_version_number, "rocksdb.current-super-version-number"); + update_gauge(&db_metrics.estimate_live_data_size, "rocksdb.estimate-live-data-size"); + update_gauge(&db_metrics.min_log_number_to_keep, "rocksdb.min-log-number-to-keep"); + update_gauge(&db_metrics.min_obsolete_sst_number_to_keep, "rocksdb.min-obsolete-sst-number-to-keep"); + update_gauge(&db_metrics.total_sst_files_size, "rocksdb.total-sst-files-size"); + update_gauge(&db_metrics.live_sst_files_size, "rocksdb.live-sst-files-size"); + update_gauge(&db_metrics.base_level, "rocksdb.base-level"); + update_gauge(&db_metrics.estimate_pending_compaction_bytes, "rocksdb.estimate-pending-compaction-bytes"); + update_gauge(&db_metrics.num_running_compactions, "rocksdb.num-running-compactions"); + update_gauge(&db_metrics.num_running_flushes, "rocksdb.num-running-flushes"); + update_gauge(&db_metrics.actual_delayed_write_rate, "rocksdb.actual-delayed-write-rate"); + update_gauge(&db_metrics.is_write_stopped, "rocksdb.is-write-stopped"); + update_gauge(&db_metrics.estimate_oldest_key_time, "rocksdb.estimate-oldest-key-time"); + update_gauge(&db_metrics.block_cache_capacity, "rocksdb.block-cache-capacity"); + update_gauge(&db_metrics.block_cache_usage, "rocksdb.block-cache-usage"); + update_gauge(&db_metrics.block_cache_pinned_usage, "rocksdb.block-cache-pinned-usage"); + thread::sleep(Duration::from_secs(5)); + }); + } } diff --git a/src/new_index/db_metrics.rs b/src/new_index/db_metrics.rs new file mode 100644 index 000000000..e8df0db43 --- /dev/null +++ b/src/new_index/db_metrics.rs @@ -0,0 +1,233 @@ +use crate::metrics::{GaugeVec, MetricOpts, Metrics}; + +#[derive(Debug)] +pub struct RocksDbMetrics { + // Memory table metrics + pub num_immutable_mem_table: GaugeVec, + pub mem_table_flush_pending: GaugeVec, + pub cur_size_active_mem_table: GaugeVec, + pub cur_size_all_mem_tables: GaugeVec, + pub size_all_mem_tables: GaugeVec, + pub num_entries_active_mem_table: GaugeVec, + pub num_entries_imm_mem_tables: GaugeVec, + pub num_deletes_active_mem_table: GaugeVec, + pub num_deletes_imm_mem_tables: GaugeVec, + + // Compaction metrics + pub compaction_pending: GaugeVec, + pub estimate_pending_compaction_bytes: GaugeVec, + pub num_running_compactions: GaugeVec, + pub num_running_flushes: GaugeVec, + + // Error metrics + pub background_errors: GaugeVec, + + // Key and data size estimates + pub estimate_num_keys: GaugeVec, + pub estimate_live_data_size: GaugeVec, + pub estimate_oldest_key_time: GaugeVec, + + // Table reader memory + pub estimate_table_readers_mem: GaugeVec, + + // File and SST metrics + pub is_file_deletions_enabled: GaugeVec, + pub total_sst_files_size: GaugeVec, + pub live_sst_files_size: GaugeVec, + pub min_obsolete_sst_number_to_keep: GaugeVec, + + // Snapshot metrics + pub num_snapshots: GaugeVec, + pub oldest_snapshot_time: GaugeVec, + + // Version metrics + pub num_live_versions: GaugeVec, + pub current_super_version_number: GaugeVec, + + // Log metrics + pub min_log_number_to_keep: GaugeVec, + + // Level metrics + pub base_level: GaugeVec, + + // Write metrics + pub actual_delayed_write_rate: GaugeVec, + pub is_write_stopped: GaugeVec, + + // Block cache metrics + pub block_cache_capacity: GaugeVec, + pub block_cache_usage: GaugeVec, + pub block_cache_pinned_usage: GaugeVec, +} + +impl RocksDbMetrics { + pub fn new(metrics: &Metrics) -> Self { + let labels = &["db"]; + + Self { + // Memory table metrics + num_immutable_mem_table: metrics.gauge_vec(MetricOpts::new( + "rocksdb_num_immutable_mem_table", + "Number of immutable memtables that have not yet been flushed." + ), labels), + mem_table_flush_pending: metrics.gauge_vec(MetricOpts::new( + "rocksdb_mem_table_flush_pending", + "1 if a memtable flush is pending and 0 otherwise." + ), labels), + cur_size_active_mem_table: metrics.gauge_vec(MetricOpts::new( + "rocksdb_cur_size_active_mem_table_bytes", + "Approximate size of active memtable in bytes." + ), labels), + cur_size_all_mem_tables: metrics.gauge_vec(MetricOpts::new( + "rocksdb_cur_size_all_mem_tables_bytes", + "Approximate size of active and unflushed immutable memtables in bytes." + ), labels), + size_all_mem_tables: metrics.gauge_vec(MetricOpts::new( + "rocksdb_size_all_mem_tables_bytes", + "Approximate size of active, unflushed immutable, and pinned immutable memtables in bytes." + ), labels), + num_entries_active_mem_table: metrics.gauge_vec(MetricOpts::new( + "rocksdb_num_entries_active_mem_table", + "Total number of entries in the active memtable." + ), labels), + num_entries_imm_mem_tables: metrics.gauge_vec(MetricOpts::new( + "rocksdb_num_entries_imm_mem_tables", + "Total number of entries in the unflushed immutable memtables." + ), labels), + num_deletes_active_mem_table: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_deletes_active_mem_table"), + "Total number of delete entries in the active memtable." + ), labels), + num_deletes_imm_mem_tables: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_deletes_imm_mem_tables"), + "Total number of delete entries in the unflushed immutable memtables." + ), labels), + + // Compaction metrics + compaction_pending: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_compaction_pending"), + "1 if at least one compaction is pending; otherwise, 0." + ), labels), + + estimate_pending_compaction_bytes: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_estimate_pending_compaction_bytes"), + "Estimated total number of bytes compaction needs to rewrite." + ), labels), + + num_running_compactions: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_running_compactions"), + "Number of currently running compactions." + ), labels), + + num_running_flushes: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_running_flushes"), + "Number of currently running flushes." + ), labels), + + // Error metrics + background_errors: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_background_errors_total"), + "Accumulated number of background errors." + ), labels), + + // Key and data size estimates + estimate_num_keys: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_estimate_num_keys"), + "Estimated number of total keys in the active and unflushed immutable memtables and storage." + ), labels), + + estimate_live_data_size: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_estimate_live_data_size_bytes"), + "Estimated live data size in bytes." + ), labels), + + estimate_oldest_key_time: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_estimate_oldest_key_time_seconds"), + "Estimated oldest key timestamp." + ), labels), + + // Table reader memory + estimate_table_readers_mem: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_estimate_table_readers_mem_bytes"), + "Estimated memory used for reading SST tables, excluding memory used in block cache." + ), labels), + + // File and SST metrics + is_file_deletions_enabled: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_is_file_deletions_enabled"), + "0 if deletion of obsolete files is enabled; otherwise, non-zero." + ), labels), + + total_sst_files_size: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_total_sst_files_size_bytes"), + "Total size of all SST files in bytes." + ), labels), + + live_sst_files_size: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_live_sst_files_size_bytes"), + "Total size (bytes) of all SST files belonging to any of the CF's versions." + ), labels), + + min_obsolete_sst_number_to_keep: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_min_obsolete_sst_number_to_keep"), + "Minimum file number for an obsolete SST to be kept, or maximum uint64_t value if obsolete files can be deleted." + ), labels), + + // Snapshot metrics + num_snapshots: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_snapshots"), + "Number of unreleased snapshots of the database." + ), labels), + oldest_snapshot_time: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_oldest_snapshot_time_seconds"), + "Unix timestamp of oldest unreleased snapshot." + ), labels), + + // Version metrics + num_live_versions: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_num_live_versions"), + "Number of live versions." + ), labels), + current_super_version_number: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_current_super_version_number"), + "Number of current LSM version. Incremented after any change to LSM tree." + ), labels), + + // Log metrics + min_log_number_to_keep: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_min_log_number_to_keep"), + "Minimum log number to keep." + ), labels), + + // Level metrics + base_level: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_base_level"), + "Base level for compaction." + ), labels), + + // Write metrics + actual_delayed_write_rate: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_actual_delayed_write_rate"), + "The current actual delayed write rate. 0 means no delay." + ), labels), + is_write_stopped: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_is_write_stopped"), + "1 if write has been stopped." + ), labels), + + // Block cache metrics + block_cache_capacity: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_block_cache_capacity_bytes"), + "The block cache capacity in bytes." + ), labels), + block_cache_usage: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_block_cache_usage_bytes"), + "The memory size for the entries residing in block cache." + ), labels), + block_cache_pinned_usage: metrics.gauge_vec(MetricOpts::new( + format!("rocksdb_block_cache_pinned_usage_bytes"), + "The memory size for the entries being pinned." + ), labels), + } + } +} diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 90ad2b67b..c5788e683 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -3,6 +3,7 @@ use itertools::{Either, Itertools}; #[cfg(not(feature = "liquid"))] use bitcoin::consensus::encode::serialize; +use electrs_macros::trace; #[cfg(feature = "liquid")] use elements::{encode::serialize, AssetId}; @@ -11,7 +12,6 @@ use std::iter::FromIterator; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; -use electrs_macros::trace; use crate::chain::{deserialize, BlockHash, Network, OutPoint, Transaction, TxOut, Txid}; use crate::config::Config; use crate::daemon::Daemon; @@ -516,79 +516,112 @@ impl Mempool { daemon: &Daemon, tip: &BlockHash, ) -> Result { - let _timer = mempool.read().unwrap().latency.with_label_values(&["update"]).start_timer(); - - // Continuously attempt to fetch mempool transactions until we're able to get them in full - let mut fetched_txs = HashMap::::new(); - let mut indexed_txids = mempool.read().unwrap().txids_set(); - loop { - // Get bitcoind's current list of mempool txids - let all_txids = daemon - .getmempooltxids() - .chain_err(|| "failed to update mempool from daemon")?; - - // Remove evicted mempool transactions - mempool - .write() - .unwrap() - .remove(indexed_txids.difference(&all_txids).collect()); - - indexed_txids.retain(|txid| all_txids.contains(txid)); - fetched_txs.retain(|txid, _| all_txids.contains(txid)); - - // Fetch missing transactions from bitcoind - let new_txids = all_txids - .iter() - .filter(|&txid| !fetched_txs.contains_key(txid) && !indexed_txids.contains(txid)) - .collect::>(); - if new_txids.is_empty() { - break; - } - debug!( - "mempool with total {} txs, {} fetched, {} missing", - all_txids.len(), - indexed_txids.len() + fetched_txs.len(), - new_txids.len() - ); + let (_timer, count) = { + let mempool = mempool.read().unwrap(); + let timer = mempool.latency.with_label_values(&["update"]).start_timer(); + (timer, mempool.count.clone()) + }; - { - let mempool = mempool.read().unwrap(); + // Get bitcoind's current list of mempool txids + let bitcoind_txids = daemon + .getmempooltxids() + .chain_err(|| "failed to update mempool from daemon")?; + + // Get the list of mempool txids in the local mempool view + let indexed_txids = mempool.read().unwrap().txids_set(); + + // Remove evicted mempool transactions from the local mempool view + let evicted_txids = indexed_txids + .difference(&bitcoind_txids) + .collect::>(); + if !evicted_txids.is_empty() { + mempool.write().unwrap().remove(evicted_txids); + } // avoids acquiring a lock when there are no evictions + + // Find transactions available in bitcoind's mempool but not indexed locally + let new_txids = bitcoind_txids + .difference(&indexed_txids) + .collect::>(); + + debug!( + "mempool with total {} txs, {} indexed locally, {} to fetch", + bitcoind_txids.len(), + indexed_txids.len(), + new_txids.len() + ); + count + .with_label_values(&["all_txs"]) + .set(bitcoind_txids.len() as f64); + count + .with_label_values(&["indexed_txs"]) + .set(indexed_txids.len() as f64); + count + .with_label_values(&["missing_txs"]) + .set(new_txids.len() as f64); + + if new_txids.is_empty() { + return Ok(true); + } - mempool.count.with_label_values(&["all_txs"]).set(all_txids.len() as f64); - mempool.count.with_label_values(&["fetched_txs"]).set((indexed_txids.len() + fetched_txs.len()) as f64); - mempool.count.with_label_values(&["missing_txs"]).set(new_txids.len() as f64); - } + // Fetch missing transactions from bitcoind + let mut fetched_txs = daemon.gettransactions_available(&new_txids)?; - let new_txs = daemon.gettransactions_available(&new_txids)?; + // Abort if the chain tip moved while fetching transactions + if daemon.getbestblockhash()? != *tip { + warn!("chain tip moved while updating mempool"); + return Ok(false); + } - // Abort if the chain tip moved while fetching transactions - if daemon.getbestblockhash()? != *tip { - warn!("chain tip moved while updating mempool"); - return Ok(false); - } + // Find which transactions were requested but are no longer available in bitcoind's mempool, + // typically due to Replace-By-Fee (or mempool eviction for some other reason) taking place + // between querying for the mempool txids and querying for the transactions themselves. + let mut replaced_txids: HashSet<_> = new_txids + .into_iter() + .filter(|txid| !fetched_txs.contains_key(*txid)) + .cloned() + .collect(); - let fetched_count = new_txs.len(); - fetched_txs.extend(new_txs); + if replaced_txids.is_empty() { + trace!("fetched complete mempool snapshot"); + } else { + warn!( + "could not to fetch {} replaced/evicted mempool transactions: {:?}", + replaced_txids.len(), + replaced_txids.iter().take(10).collect::>() + ); + } - // Retry if any transactions were evicted form the mempool before we managed to get them - if fetched_count != new_txids.len() { - warn!( - "failed to fetch {} mempool txs, retrying...", - new_txids.len() - fetched_count - ); - } else { - break; - } + // If we were unable to get a complete consistent snapshot of the bitcoind mempool, + // detect and remove any transactions that spend from the missing (replaced) transactions + // or any of their descendants. This is necessary because it could be possible to fetch the + // child tx successfully before the parent is replaced, but miss the replaced parent tx. + while !replaced_txids.is_empty() { + let mut descendants_txids = HashSet::new(); + fetched_txs.retain(|txid, tx| { + let parent_was_replaced = tx + .input + .iter() + .any(|txin| replaced_txids.contains(&txin.previous_output.txid)); + if parent_was_replaced { + descendants_txids.insert(*txid); + } + !parent_was_replaced + }); + trace!( + "detected {} replaced mempool descendants", + descendants_txids.len() + ); + replaced_txids = descendants_txids; } // Add fetched transactions to our view of the mempool - { + trace!("indexing {} new mempool transactions", fetched_txs.len()); + if !fetched_txs.is_empty() { let mut mempool = mempool.write().unwrap(); mempool.add(fetched_txs)?; - mempool - .count + count .with_label_values(&["txs"]) .set(mempool.txstore.len() as f64); diff --git a/src/new_index/mod.rs b/src/new_index/mod.rs index 09730b104..f82291e55 100644 --- a/src/new_index/mod.rs +++ b/src/new_index/mod.rs @@ -1,4 +1,5 @@ pub mod db; +pub mod db_metrics; mod fetch; mod mempool; pub mod precache; diff --git a/src/new_index/query.rs b/src/new_index/query.rs index df258bea9..03c5d201f 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -6,7 +6,7 @@ use std::time::{Duration, Instant}; use crate::chain::{Network, OutPoint, Transaction, TxOut, Txid}; use crate::config::Config; -use crate::daemon::Daemon; +use crate::daemon::{Daemon, SubmitPackageResult}; use crate::errors::*; use crate::new_index::{ChainQuery, Mempool, ScriptStats, SpendingInput, Utxo}; use crate::util::{is_spendable, BlockId, Bytes, TransactionStatus}; @@ -82,6 +82,16 @@ impl Query { Ok(txid) } + #[trace] + pub fn submit_package( + &self, + txhex: Vec, + maxfeerate: Option, + maxburnamount: Option, + ) -> Result { + self.daemon.submit_package(txhex, maxfeerate, maxburnamount) + } + #[trace] pub fn utxo(&self, scripthash: &[u8]) -> Result> { let mut utxos = self.chain.utxo(scripthash, self.config.utxos_limit)?; diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 578999aff..7fb74e825 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -21,9 +21,9 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::path::Path; use std::sync::{Arc, RwLock}; -use crate::chain::{ +use crate::{chain::{ BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value, -}; +}, new_index::db_metrics::RocksDbMetrics}; use crate::config::Config; use crate::daemon::Daemon; use crate::errors::*; @@ -58,7 +58,7 @@ pub struct Store { } impl Store { - pub fn open(path: &Path, config: &Config) -> Self { + pub fn open(path: &Path, config: &Config, metrics: &Metrics) -> Self { let txstore_db = DB::open(&path.join("txstore"), config); let added_blockhashes = load_blockhashes(&txstore_db, &BlockRow::done_filter()); debug!("{} blocks were added", added_blockhashes.len()); @@ -69,6 +69,11 @@ impl Store { let cache_db = DB::open(&path.join("cache"), config); + let db_metrics = Arc::new(RocksDbMetrics::new(&metrics)); + txstore_db.start_stats_exporter(Arc::clone(&db_metrics), "txstore_db"); + history_db.start_stats_exporter(Arc::clone(&db_metrics), "history_db"); + cache_db.start_stats_exporter(Arc::clone(&db_metrics), "cache_db"); + let headers = if let Some(tip_hash) = txstore_db.get(b"t") { let tip_hash = deserialize(&tip_hash).expect("invalid chain tip in `t`"); let headers_map = load_blockheaders(&txstore_db); @@ -287,6 +292,7 @@ impl Indexer { ); start_fetcher(self.from, &daemon, to_index)?.map(|blocks| self.index(&blocks)); self.start_auto_compactions(&self.store.history_db); + self.start_auto_compactions(&self.store.cache_db); if let DBFlush::Disable = self.flush { debug!("flushing to disk"); diff --git a/src/rest.rs b/src/rest.rs index 7d5b501b5..cefc49b7c 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -1,4 +1,3 @@ - use crate::chain::{ address, BlockHash, Network, OutPoint, Script, Sequence, Transaction, TxIn, TxMerkleNode, TxOut, Txid, @@ -6,18 +5,18 @@ use crate::chain::{ use crate::config::Config; use crate::errors; use crate::new_index::{compute_script_hash, Query, SpendingInput, Utxo}; +#[cfg(feature = "liquid")] +use crate::util::optional_value_for_newer_blocks; use crate::util::{ create_socket, electrum_merkle, extract_tx_prevouts, get_innerscripts, get_tx_fee, has_prevout, is_coinbase, BlockHeaderMeta, BlockId, FullHash, ScriptToAddr, ScriptToAsm, TransactionStatus, DEFAULT_BLOCKHASH, }; -#[cfg(feature = "liquid")] -use crate::util::optional_value_for_newer_blocks; #[cfg(not(feature = "liquid"))] use bitcoin::consensus::encode; use bitcoin::hashes::FromSliceError as HashError; -use bitcoin::hex::{self, DisplayHex, FromHex}; +use bitcoin::hex::{self, DisplayHex, FromHex, HexToBytesIter}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Response, Server, StatusCode}; use hyperlocal::UnixServerExt; @@ -186,14 +185,18 @@ impl TransactionValue { status: Some(TransactionStatus::from(blockid)), #[cfg(feature = "liquid")] - discount_vsize: optional_value_for_newer_blocks(blockid, - START_OF_LIQUID_DISCOUNT_CT_POLICY, - tx.discount_vsize()), + discount_vsize: optional_value_for_newer_blocks( + blockid, + START_OF_LIQUID_DISCOUNT_CT_POLICY, + tx.discount_vsize(), + ), #[cfg(feature = "liquid")] - discount_weight: optional_value_for_newer_blocks(blockid, - START_OF_LIQUID_DISCOUNT_CT_POLICY, - tx.discount_weight()), + discount_weight: optional_value_for_newer_blocks( + blockid, + START_OF_LIQUID_DISCOUNT_CT_POLICY, + tx.discount_weight(), + ), } } } @@ -1015,6 +1018,63 @@ fn handle_request( let txid = query.broadcast_raw(&txhex)?; http_message(StatusCode::OK, txid.to_string(), 0) } + (&Method::POST, Some(&"txs"), Some(&"package"), None, None, None) => { + let txhexes: Vec = + serde_json::from_str(String::from_utf8(body.to_vec())?.as_str())?; + + if txhexes.len() > 25 { + Result::Err(HttpError::from( + "Exceeded maximum of 25 transactions".to_string(), + ))? + } + + let maxfeerate = query_params + .get("maxfeerate") + .map(|s| { + s.parse::() + .map_err(|_| HttpError::from("Invalid maxfeerate".to_string())) + }) + .transpose()?; + + let maxburnamount = query_params + .get("maxburnamount") + .map(|s| { + s.parse::() + .map_err(|_| HttpError::from("Invalid maxburnamount".to_string())) + }) + .transpose()?; + + // pre-checks + txhexes.iter().enumerate().try_for_each(|(index, txhex)| { + // each transaction must be of reasonable size + // (more than 60 bytes, within 400kWU standardness limit) + if !(120..800_000).contains(&txhex.len()) { + Result::Err(HttpError::from(format!( + "Invalid transaction size for item {}", + index + ))) + } else { + // must be a valid hex string + HexToBytesIter::new(txhex) + .map_err(|_| { + HttpError::from(format!("Invalid transaction hex for item {}", index)) + })? + .filter(|r| r.is_err()) + .next() + .transpose() + .map_err(|_| { + HttpError::from(format!("Invalid transaction hex for item {}", index)) + }) + .map(|_| ()) + } + })?; + + let result = query + .submit_package(txhexes, maxfeerate, maxburnamount) + .map_err(|err| HttpError::from(err.description().to_string()))?; + + json_response(result, TTL_SHORT) + } (&Method::GET, Some(&"mempool"), None, None, None, None) => { json_response(query.mempool().backlog_stats(), TTL_SHORT) diff --git a/src/util/transaction.rs b/src/util/transaction.rs index 8f43d2bbf..c63e070e0 100644 --- a/src/util/transaction.rs +++ b/src/util/transaction.rs @@ -45,11 +45,12 @@ impl From> for TransactionStatus { } } - #[cfg(feature = "liquid")] -pub fn optional_value_for_newer_blocks(block_id: Option, - check_time: u32, - value: usize) -> Option { +pub fn optional_value_for_newer_blocks( + block_id: Option, + check_time: u32, + value: usize, +) -> Option { match block_id { // use the provided value only if it was after the "activation" time Some(b) if b.time >= check_time => Some(value), @@ -132,13 +133,12 @@ where s.end() } - #[cfg(all(test, feature = "liquid"))] mod test { + use super::optional_value_for_newer_blocks; + use crate::util::BlockId; use bitcoin::hashes::Hash; use elements::BlockHash; - use crate::util::BlockId; - use super::optional_value_for_newer_blocks; #[test] fn opt_value_newer_block() { @@ -149,20 +149,58 @@ mod test { // unconfirmed block should include the value let block_id = None; - assert_eq!(optional_value_for_newer_blocks(block_id, check_time, value), Some(value)); + assert_eq!( + optional_value_for_newer_blocks(block_id, check_time, value), + Some(value) + ); // block time before check_time should NOT include the value - let block_id = Some(BlockId{ height, hash, time: 0 }); - assert_eq!(optional_value_for_newer_blocks(block_id, check_time, value), None); - let block_id = Some(BlockId{ height, hash, time: 31 }); - assert_eq!(optional_value_for_newer_blocks(block_id, check_time, value), None); + let block_id = Some(BlockId { + height, + hash, + time: 0, + }); + assert_eq!( + optional_value_for_newer_blocks(block_id, check_time, value), + None + ); + let block_id = Some(BlockId { + height, + hash, + time: 31, + }); + assert_eq!( + optional_value_for_newer_blocks(block_id, check_time, value), + None + ); // block time on or after check_time should include the value - let block_id = Some(BlockId{ height, hash, time: 32 }); - assert_eq!(optional_value_for_newer_blocks(block_id, check_time, value), Some(value)); - let block_id = Some(BlockId{ height, hash, time: 33 }); - assert_eq!(optional_value_for_newer_blocks(block_id, check_time, value), Some(value)); - let block_id = Some(BlockId{ height, hash, time: 333 }); - assert_eq!(optional_value_for_newer_blocks(block_id, check_time, value), Some(value)); + let block_id = Some(BlockId { + height, + hash, + time: 32, + }); + assert_eq!( + optional_value_for_newer_blocks(block_id, check_time, value), + Some(value) + ); + let block_id = Some(BlockId { + height, + hash, + time: 33, + }); + assert_eq!( + optional_value_for_newer_blocks(block_id, check_time, value), + Some(value) + ); + let block_id = Some(BlockId { + height, + hash, + time: 333, + }); + assert_eq!( + optional_value_for_newer_blocks(block_id, check_time, value), + Some(value) + ); } } diff --git a/tests/common.rs b/tests/common.rs index e4a7e8015..5fb995d2d 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -27,6 +27,7 @@ use electrs::{ rest, signal::Waiter, }; +use electrs::config::RpcLogging; pub struct TestRunner { config: Arc, @@ -38,6 +39,7 @@ pub struct TestRunner { daemon: Arc, mempool: Arc>, metrics: Metrics, + salt_rwlock: Arc>, } impl TestRunner { @@ -108,7 +110,7 @@ impl TestRunner { utxos_limit: 100, electrum_txs_limit: 100, electrum_banner: "".into(), - electrum_rpc_logging: None, + rpc_logging: RpcLogging::default(), zmq_addr: None, #[cfg(feature = "liquid")] @@ -116,6 +118,9 @@ impl TestRunner { #[cfg(feature = "liquid")] parent_network: bitcoin::Network::Regtest, initial_sync_compaction: false, + db_block_cache_mb: 8, + db_parallelism: 2, + db_write_buffer_size_mb: 256, //#[cfg(feature = "electrum-discovery")] //electrum_public_hosts: Option, //#[cfg(feature = "electrum-discovery")] @@ -139,7 +144,7 @@ impl TestRunner { &metrics, )?); - let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config)); + let store = Arc::new(Store::open(&config.db_path.join("newindex"), &config, &metrics)); let fetch_from = if !env::var("JSONRPC_IMPORT").is_ok() && !cfg!(feature = "liquid") { // run the initial indexing from the blk files then switch to using the jsonrpc, @@ -179,6 +184,8 @@ impl TestRunner { None, // TODO )); + let salt_rwlock = Arc::new(RwLock::new(String::from("foobar"))); + Ok(TestRunner { config, node, @@ -188,6 +195,7 @@ impl TestRunner { daemon, mempool, metrics, + salt_rwlock, }) } @@ -280,6 +288,7 @@ pub fn init_electrum_tester() -> Result<(ElectrumRPC, net::SocketAddr, TestRunne Arc::clone(&tester.config), Arc::clone(&tester.query), &tester.metrics, + Arc::clone(&tester.salt_rwlock), ); log::info!( "Electrum server running on {}", diff --git a/tests/rest.rs b/tests/rest.rs index f72b9c475..382ad16fd 100644 --- a/tests/rest.rs +++ b/tests/rest.rs @@ -185,11 +185,166 @@ fn test_rest() -> Result<()> { let broadcast2_res = ureq::post(&format!("http://{}/tx", rest_addr)).send_string(&tx_hex); let broadcast2_resp = broadcast2_res.unwrap_err().into_response().unwrap(); assert_eq!(broadcast2_resp.status(), 400); + + // Test POST /txs/package - simple validation test + // Test with invalid JSON first to verify the endpoint exists + let invalid_package_result = ureq::post(&format!("http://{}/txs/package", rest_addr)) + .set("Content-Type", "application/json") + .send_string("invalid json"); + let invalid_package_resp = invalid_package_result.unwrap_err().into_response().unwrap(); + let status = invalid_package_resp.status(); + // Should be 400 for bad JSON, not 404 for missing endpoint assert_eq!( - broadcast2_resp.into_string()?, - "sendrawtransaction RPC error -27: Transaction already in block chain" + status, 400, + "Endpoint should exist and return 400 for invalid JSON" ); + // Now test with valid but empty package, should fail + let empty_package_result = ureq::post(&format!("http://{}/txs/package", rest_addr)) + .set("Content-Type", "application/json") + .send_string("[]"); + let empty_package_resp = empty_package_result.unwrap_err().into_response().unwrap(); + let status = empty_package_resp.status(); + assert_eq!(status, 400); + + // bitcoin 28.0 only tests - submitpackage + #[cfg(all(not(feature = "liquid"), feature = "bitcoind_28_0"))] + { + // Test with a real transaction package - create parent-child transactions + // submitpackage requires between 2 and 25 transactions with proper dependencies + let package_addr1 = tester.newaddress()?; + let package_addr2 = tester.newaddress()?; + + // Create parent transaction + let tx1_result = tester.node_client().call::( + "createrawtransaction", + &[ + serde_json::json!([]), + serde_json::json!({package_addr1.to_string(): 0.5}), + ], + )?; + let tx1_unsigned_hex = tx1_result.as_str().expect("raw tx hex").to_string(); + + let tx1_fund_result = tester + .node_client() + .call::("fundrawtransaction", &[serde_json::json!(tx1_unsigned_hex)])?; + let tx1_funded_hex = tx1_fund_result["hex"] + .as_str() + .expect("funded tx hex") + .to_string(); + + let tx1_sign_result = tester.node_client().call::( + "signrawtransactionwithwallet", + &[serde_json::json!(tx1_funded_hex)], + )?; + let tx1_signed_hex = tx1_sign_result["hex"] + .as_str() + .expect("signed tx hex") + .to_string(); + + // Decode parent transaction to get its txid and find the output to spend + let tx1_decoded = tester + .node_client() + .call::("decoderawtransaction", &[serde_json::json!(tx1_signed_hex)])?; + let tx1_txid = tx1_decoded["txid"].as_str().expect("parent txid"); + + // Find the output going to package_addr1 (the one we want to spend) + let tx1_vouts = tx1_decoded["vout"].as_array().expect("parent vouts"); + let mut spend_vout_index = None; + let mut spend_vout_value = 0u64; + + for (i, vout) in tx1_vouts.iter().enumerate() { + if let Some(script_pub_key) = vout.get("scriptPubKey") { + if let Some(address) = script_pub_key.get("address") { + if address.as_str() == Some(&package_addr1.to_string()) { + spend_vout_index = Some(i); + // Convert from BTC to satoshis + spend_vout_value = + (vout["value"].as_f64().expect("vout value") * 100_000_000.0) as u64; + break; + } + } + } + } + + let spend_vout_index = spend_vout_index.expect("Could not find output to spend"); + + // Create child transaction that spends from parent + // Leave some satoshis for fee (e.g., 1000 sats) + let child_output_value = spend_vout_value - 1000; + let child_output_btc = child_output_value as f64 / 100_000_000.0; + + let tx2_result = tester.node_client().call::( + "createrawtransaction", + &[ + serde_json::json!([{ + "txid": tx1_txid, + "vout": spend_vout_index + }]), + serde_json::json!({package_addr2.to_string(): child_output_btc}), + ], + )?; + let tx2_unsigned_hex = tx2_result.as_str().expect("raw tx hex").to_string(); + + // Sign the child transaction + // We need to provide the parent transaction's output details for signing + let tx2_sign_result = tester.node_client().call::( + "signrawtransactionwithwallet", + &[ + serde_json::json!(tx2_unsigned_hex), + serde_json::json!([{ + "txid": tx1_txid, + "vout": spend_vout_index, + "scriptPubKey": tx1_vouts[spend_vout_index]["scriptPubKey"]["hex"].as_str().unwrap(), + "amount": spend_vout_value as f64 / 100_000_000.0 + }]) + ], + )?; + let tx2_signed_hex = tx2_sign_result["hex"] + .as_str() + .expect("signed tx hex") + .to_string(); + + // Debug: try calling submitpackage directly to see the result + eprintln!("Trying submitpackage directly with parent-child transactions..."); + let direct_result = tester.node_client().call::( + "submitpackage", + &[serde_json::json!([ + tx1_signed_hex.clone(), + tx2_signed_hex.clone() + ])], + ); + match direct_result { + Ok(result) => { + eprintln!("Direct submitpackage succeeded: {:#?}", result); + } + Err(e) => { + eprintln!("Direct submitpackage failed: {:?}", e); + } + } + + // Now submit this transaction package via the package endpoint + let package_json = + serde_json::json!([tx1_signed_hex.clone(), tx2_signed_hex.clone()]).to_string(); + let package_result = ureq::post(&format!("http://{}/txs/package", rest_addr)) + .set("Content-Type", "application/json") + .send_string(&package_json); + + let package_resp = package_result.unwrap(); + assert_eq!(package_resp.status(), 200); + let package_result = package_resp.into_json::()?; + + // Verify the response structure + assert!(package_result["tx-results"].is_object()); + assert!(package_result["package_msg"].is_string()); + + let tx_results = package_result["tx-results"].as_object().unwrap(); + assert_eq!(tx_results.len(), 2); + + // The transactions should be processed (whether accepted or rejected) + assert!(!tx_results.is_empty()); + } + // Elements-only tests #[cfg(feature = "liquid")] {