diff --git a/Cargo.lock b/Cargo.lock index d9d2b72..7162620 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2308,6 +2308,7 @@ dependencies = [ name = "blobs_batch_sender" version = "0.1.0" dependencies = [ + "alloy", "aws-config", "aws-sdk-s3", "aws_lambda_events", diff --git a/aws/blobs_batch_sender/Cargo.toml b/aws/blobs_batch_sender/Cargo.toml index d597557..6faa8ad 100644 --- a/aws/blobs_batch_sender/Cargo.toml +++ b/aws/blobs_batch_sender/Cargo.toml @@ -14,3 +14,4 @@ owen = { workspace = true, features = ["aws-integration"] } serde = { workspace = true } serde_json = { workspace = true } aws-sdk-s3 = "1.98.0" +alloy = { version = "1.0.32", features = ["full"] } diff --git a/aws/blobs_batch_sender/src/contract.rs b/aws/blobs_batch_sender/src/contract.rs new file mode 100644 index 0000000..3d830e6 --- /dev/null +++ b/aws/blobs_batch_sender/src/contract.rs @@ -0,0 +1,123 @@ +use crate::contract::sEOA::SubmitNewBlobInput; +use alloy::{ + consensus::BlobTransactionSidecar, + network::EthereumWallet, + primitives::{Address, Bytes, FixedBytes}, + providers::{ + Identity, ProviderBuilder, + fillers::{ + BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller, + WalletFiller, + }, + }, + sol, +}; +use blobs_batch_sender::BlobsBatchSenderConfig; +use lambda_runtime::Error; +use owen::blobs_queue::BlobsQueueS3JsonFile; + +sol!( + #[allow(missing_docs)] + #[sol(rpc)] + sEOA, + "../../submodules/account-abstraction/artifacts/contracts/sEOA.sol/sEOA.json" +); + +type HardlyTypedProvider = FillProvider< + JoinFill< + JoinFill< + Identity, + JoinFill>>, + >, + WalletFiller, + >, + alloy::providers::RootProvider, +>; + +struct SendBatchTxInput { + tx_params: Vec, + sidecar: BlobTransactionSidecar, +} + +pub struct SmartEoaManager { + ddex_sequencer_address: Address, + s_eoa: sEOA::sEOAInstance, +} + +impl SmartEoaManager { + pub fn build(config: &BlobsBatchSenderConfig, wallet: EthereumWallet) -> Result { + let provider = ProviderBuilder::new() + .wallet(wallet) + .connect_http(config.rpc_url.parse()?); + + let s_eoa = sEOA::new(config.s_eoa_address, provider); + + Ok(Self { + ddex_sequencer_address: config.ddex_sequencer_address, + s_eoa, + }) + } + + fn parse_batch_input( + blob_tx_data_vec: Vec, + ) -> Result { + let tx_params: Vec = blob_tx_data_vec + .iter() + .map(|blob_json_file| SubmitNewBlobInput { + imageId: blob_json_file.image_id, + commitment: Bytes::from(blob_json_file.tx_data.kzg_commitment.to_vec()), + blobSha2: FixedBytes::<32>::from(blob_json_file.tx_data.blob_sha2), + }) + .collect(); + + let mut sidecar = BlobTransactionSidecar { + blobs: Vec::new(), + commitments: Vec::new(), + proofs: Vec::new(), + }; + + for blob_json_file in blob_tx_data_vec { + sidecar.blobs.push( + blob_json_file + .tx_data + .blob_sidecar + .blobs + .first() + .expect("Blob missing in input") + .clone(), + ); + sidecar.commitments.push( + blob_json_file + .tx_data + .blob_sidecar + .commitments + .first() + .expect("commitment missing in input") + .clone(), + ); + sidecar.proofs.push( + blob_json_file + .tx_data + .blob_sidecar + .proofs + .first() + .expect("proof missing in input") + .clone(), + ); + } + Ok(SendBatchTxInput { tx_params, sidecar }) + } + + pub async fn send_batch( + &self, + blob_tx_data_vec: Vec, + ) -> Result<(), Error> { + let batch_input = Self::parse_batch_input(blob_tx_data_vec)?; + let mut _tx_builder = self + .s_eoa + .submitNewBlobBatch(batch_input.tx_params, self.ddex_sequencer_address) + .sidecar(batch_input.sidecar) + .max_fee_per_blob_gas(1000000001); + Ok(()) + } +} diff --git a/aws/blobs_batch_sender/src/event_handler.rs b/aws/blobs_batch_sender/src/event_handler.rs index eee73e7..2560e36 100644 --- a/aws/blobs_batch_sender/src/event_handler.rs +++ b/aws/blobs_batch_sender/src/event_handler.rs @@ -1,10 +1,29 @@ use aws_lambda_events::event::sqs::SqsEvent; +use blobs_batch_sender::BlobsBatchSenderConfig; use lambda_runtime::{Error, LambdaEvent}; -use owen::blobs_queue::BlobsQueueMessageBody; +use owen::{ + blobs_queue::BlobsQueueMessageBody, + wallet::{OwenWallet, OwenWalletConfig}, +}; -use crate::s3; +use crate::{contract::SmartEoaManager, s3::BlobsStorage}; pub(crate) async fn function_handler(event: LambdaEvent) -> Result<(), Error> { + let config = BlobsBatchSenderConfig::build()?; + let blobs_storage = BlobsStorage::build(&config).await?; + let owen_wallet_config = OwenWalletConfig::from(&config)?; + let owen_wallet = OwenWallet::build(&owen_wallet_config).await?; + let smart_eoa_manager = SmartEoaManager::build(&config, owen_wallet.wallet)?; + + let blobhashes = extract_blobhashes(event)?; + let blob_tx_data_vec = blobs_storage.read(blobhashes).await?; + + smart_eoa_manager.send_batch(blob_tx_data_vec).await?; + + Ok(()) +} + +fn extract_blobhashes(event: LambdaEvent) -> Result, Error> { let messages: Vec = event .payload .records @@ -24,8 +43,5 @@ pub(crate) async fn function_handler(event: LambdaEvent) -> Result<(), .iter() .map(|message| message.blobhash.clone()) .collect(); - - s3::read_blobs(blobhashes).await?; - - Ok(()) + Ok(blobhashes) } diff --git a/aws/blobs_batch_sender/src/lib.rs b/aws/blobs_batch_sender/src/lib.rs new file mode 100644 index 0000000..d342a4c --- /dev/null +++ b/aws/blobs_batch_sender/src/lib.rs @@ -0,0 +1,75 @@ +use alloy::primitives::Address; +use lambda_runtime::Error; +use owen::constants; +use owen::wallet::HasOwenWalletFields; +use std::env; +use std::str::FromStr; + +impl HasOwenWalletFields for BlobsBatchSenderConfig { + fn use_kms(&self) -> bool { + self.use_kms + } + fn rpc_url(&self) -> String { + self.rpc_url.clone() + } + fn private_key(&self) -> Option { + self.private_key.clone() + } + fn signer_kms_id(&self) -> Option { + self.signer_kms_id.clone() + } +} + +pub struct BlobsBatchSenderConfig { + pub ddex_sequencer_address: Address, + pub s_eoa_address: Address, + pub rpc_url: String, + pub blobs_temp_storage_bucket_name: String, + pub use_kms: bool, + pub private_key: Option, + pub signer_kms_id: Option, +} + +impl BlobsBatchSenderConfig { + fn get_env_var(key: &str) -> String { + env::var(key).expect(format!("Missing env variable: {key}").as_str()) + } + pub fn build() -> Result { + let rpc_url = Self::get_env_var("RPC_URL"); + let ddex_sequencer_address = Address::from_str( + std::env::var("DDEX_SEQUENCER_ADDRESS") + .unwrap_or_else(|_| constants::DDEX_SEQUENCER_ADDRESS.to_string()) + .as_str(), + ) + .expect("Could not parse ddex sequencer address"); + + let s_eoa_address = Address::from_str(Self::get_env_var("S_EOA_ADDRESS").as_str())?; + + let blobs_temp_storage_bucket_name = Self::get_env_var("BLOBS_TEMP_STORAGE_BUCKET_NAME"); + + let mut signer_kms_id = None; + let mut private_key = None; + let use_kms = matches!( + std::env::var("USE_KMS") + .unwrap_or_else(|_| "false".to_string()) + .as_str(), + "1" | "true" + ); + + if use_kms { + signer_kms_id = Some(Self::get_env_var("SIGNER_KMS_ID")); + } else { + private_key = Some(Self::get_env_var("PRIVATE_KEY")); + } + + Ok(BlobsBatchSenderConfig { + ddex_sequencer_address, + rpc_url, + s_eoa_address, + blobs_temp_storage_bucket_name, + private_key, + use_kms, + signer_kms_id, + }) + } +} diff --git a/aws/blobs_batch_sender/src/main.rs b/aws/blobs_batch_sender/src/main.rs index 38572a6..181443c 100644 --- a/aws/blobs_batch_sender/src/main.rs +++ b/aws/blobs_batch_sender/src/main.rs @@ -1,3 +1,4 @@ +mod contract; mod event_handler; mod s3; use event_handler::function_handler; diff --git a/aws/blobs_batch_sender/src/s3.rs b/aws/blobs_batch_sender/src/s3.rs index c3fcdfb..9c411d2 100644 --- a/aws/blobs_batch_sender/src/s3.rs +++ b/aws/blobs_batch_sender/src/s3.rs @@ -1,37 +1,48 @@ -use std::env; - use aws_config::meta::region::RegionProviderChain; use aws_sdk_s3::Client; +use blobs_batch_sender::BlobsBatchSenderConfig; use lambda_runtime::Error; -use owen::blob::BlobTransactionData; +use owen::blobs_queue::BlobsQueueS3JsonFile; use tokio::io::AsyncReadExt; -pub async fn read_blobs(blobhashes: Vec) -> Result<(), Error> { - let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); - let config = aws_config::from_env().region(region_provider).load().await; - let client = Client::new(&config); - - let bucket = env::var("BLOBS_TEMP_STORAGE_BUCKET_NAME") - .expect(format!("Missing env variable: BLOBS_TEMP_STORAGE_BUCKET_NAME").as_str()); - - let mut blobs: Vec = Vec::new(); +pub struct BlobsStorage { + client: Client, + bucket_name: String, +} - for blobhash in blobhashes { - let key = format!("blobs/{}.json", blobhash); - let resp = client.get_object().bucket(&bucket).key(key).send().await?; +impl BlobsStorage { + pub async fn build(config: &BlobsBatchSenderConfig) -> Result { + let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); + let aws_config = aws_config::from_env().region(region_provider).load().await; + let client: Client = Client::new(&aws_config); - let mut body = resp.body.into_async_read(); - let mut contents = String::new(); - body.read_to_string(&mut contents).await?; - let blob_transaction_data: BlobTransactionData = serde_json::from_str(&contents).unwrap(); - blobs.push(blob_transaction_data); + Ok(Self { + client, + bucket_name: config.blobs_temp_storage_bucket_name.clone(), + }) } - let commitments: Vec> = blobs - .iter() - .map(|blob| blob.kzg_commitment.clone()) - .collect(); - println!("{commitments:?}"); - - Ok(()) + pub async fn read(&self, blobhashes: Vec) -> Result, Error> { + let mut blobs: Vec = Vec::new(); + + for blobhash in blobhashes { + let key = format!("blobs/{}.json", blobhash); + let resp = self + .client + .get_object() + .bucket(self.bucket_name.clone()) + .key(key) + .send() + .await?; + + let mut body = resp.body.into_async_read(); + let mut contents = String::new(); + body.read_to_string(&mut contents).await?; + let blob_transaction_data: BlobsQueueS3JsonFile = + serde_json::from_str(&contents).unwrap(); + blobs.push(blob_transaction_data); + } + + Ok(blobs) + } } diff --git a/owen/src/blob.rs b/owen/src/blob.rs index d18f7ea..4a895b4 100644 --- a/owen/src/blob.rs +++ b/owen/src/blob.rs @@ -4,7 +4,7 @@ use c_kzg::{ethereum_kzg_settings, Blob}; use log_macros::{format_error, log_info}; use serde::{Deserialize, Serialize}; -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] pub struct BlobTransactionData { pub kzg_commitment: Vec, pub blob_sidecar: BlobTransactionSidecar, diff --git a/owen/src/blobs_queue.rs b/owen/src/blobs_queue.rs index e79bdcc..755985f 100644 --- a/owen/src/blobs_queue.rs +++ b/owen/src/blobs_queue.rs @@ -11,10 +11,18 @@ use std::env; #[derive(Deserialize, Serialize)] pub struct BlobsQueueMessageBody { pub blobhash: String, + pub owen_instance: String, +} + +#[derive(Deserialize, Serialize)] +pub struct BlobsQueueS3JsonFile { + pub tx_data: BlobTransactionData, + pub image_id: FixedBytes<32>, } pub struct BlobsQueueProducer { queue_url: String, + owen_instance: String, blobs_temp_storage_bucket_name: String, s3_client: aws_sdk_s3::Client, sqs_client: aws_sdk_sqs::Client, @@ -24,6 +32,7 @@ impl BlobsQueueProducer { pub async fn build() -> anyhow::Result { let queue_url = Self::get_env_var("OWEN_BLOBS_QUEUE_URL"); let blobs_temp_storage_bucket_name = Self::get_env_var("BLOBS_TEMP_STORAGE_BUCKET_NAME"); + let owen_instance = Self::get_env_var("USERNAME"); let region_provider = RegionProviderChain::default_provider().or_else("us-east-1"); let aws_main_config = aws_config::defaults(BehaviorVersion::latest()) @@ -35,6 +44,7 @@ impl BlobsQueueProducer { Ok(Self { queue_url, + owen_instance, s3_client, sqs_client, blobs_temp_storage_bucket_name, @@ -43,11 +53,16 @@ impl BlobsQueueProducer { fn get_env_var(key: &str) -> String { env::var(key).expect(format!("Missing env variable: {key}").as_str()) } - pub async fn enqueue_blob(&self, transaction_data: BlobTransactionData) -> anyhow::Result<()> { + pub async fn enqueue_blob( + &self, + transaction_data: BlobTransactionData, + image_id: FixedBytes<32>, + ) -> anyhow::Result<()> { let kzg_commitment = Bytes::from(transaction_data.kzg_commitment.to_vec()); let blobhash: FixedBytes<32> = commitment_to_blobhash(&kzg_commitment); - self.send_to_s3(&transaction_data, &blobhash).await?; + self.send_to_s3(&transaction_data, image_id, &blobhash) + .await?; self.send_to_sqs(&blobhash).await?; Ok(()) } @@ -55,13 +70,18 @@ impl BlobsQueueProducer { async fn send_to_s3( &self, transaction_data: &BlobTransactionData, + image_id: FixedBytes<32>, blobhash: &FixedBytes<32>, ) -> anyhow::Result<()> { log_info!( "Sending transaction data to S3 for: {}", blobhash.to_string() ); - let json_string = serde_json::to_string_pretty(&transaction_data)?; + let blobs_queue_s3_json_file = BlobsQueueS3JsonFile { + tx_data: transaction_data.clone(), + image_id, + }; + let json_string = serde_json::to_string_pretty(&blobs_queue_s3_json_file)?; let put_object_output = self .s3_client @@ -81,6 +101,7 @@ impl BlobsQueueProducer { log_info!("Enqueue: {}", blobhash.to_string()); let blobs_queue_message_body = BlobsQueueMessageBody { blobhash: blobhash.to_string(), + owen_instance: self.owen_instance.clone(), }; let json_string = serde_json::to_string_pretty(&blobs_queue_message_body)?; let send_message_output = self diff --git a/owen/src/lib.rs b/owen/src/lib.rs index 2e5621f..a2f1146 100644 --- a/owen/src/lib.rs +++ b/owen/src/lib.rs @@ -8,13 +8,13 @@ mod image_processor; mod ipfs; pub mod logger; pub mod output_generator; -mod wallet; +pub mod wallet; use alloy::primitives::Address; use blob::BlobTransactionData; use contracts::ContractsManager; use ddex_parser::ParserError; pub use log; -use log_macros::{format_error, log_error}; +use log_macros::log_error; use sentry::User; use serde_json::json; use std::env; @@ -22,7 +22,7 @@ use std::str::FromStr; use crate::ipfs::IpfsManager; use crate::output_generator::{DdexMessage, OutputFilesGenerator}; -use crate::wallet::OwenWallet; +use crate::wallet::{OwenWallet, OwenWalletConfig}; #[cfg(any(feature = "aws-integration", feature = "local-s3"))] pub mod s3_message_storage; @@ -154,34 +154,11 @@ impl Config { Ok(config) } - - fn try_private_key(&self) -> anyhow::Result<&String> { - if self.use_kms == false { - self.private_key - .as_ref() - .ok_or_else(|| format_error!("Missing private_key")) - } else { - return Err(format_error!( - "private_key not available with USE_KMS=true flag" - )); - } - } - - fn try_signer_kms_id(&self) -> anyhow::Result<&String> { - if self.use_kms == true { - self.signer_kms_id - .as_ref() - .ok_or_else(|| format_error!("Missing signer_kms_id")) - } else { - return Err(format_error!( - "signer_kms_id not available without USE_KMS=true flag" - )); - } - } } pub async fn run(config: &Config) -> anyhow::Result> { - let owen_wallet = OwenWallet::build(&config).await?; + let owen_wallet_config = OwenWalletConfig::from(config)?; + let owen_wallet = OwenWallet::build(&owen_wallet_config).await?; let contracts_manager = ContractsManager::build(&config, &owen_wallet).await?; contracts_manager.check_image_compatibility().await?; @@ -193,11 +170,12 @@ pub async fn run(config: &Config) -> anyhow::Result> { if config.use_batch_sender == true { if cfg!(feature = "aws-integration") { + let image_id = contracts_manager.image_id; #[cfg(feature = "aws-integration")] let blobs_queue_producer = blobs_queue::BlobsQueueProducer::build().await?; #[cfg(feature = "aws-integration")] blobs_queue_producer - .enqueue_blob(blob_transaction_data) + .enqueue_blob(blob_transaction_data, image_id) .await?; } else { panic!( diff --git a/owen/src/output_generator.rs b/owen/src/output_generator.rs index ac80c34..22bb735 100644 --- a/owen/src/output_generator.rs +++ b/owen/src/output_generator.rs @@ -306,7 +306,7 @@ impl<'a, 'b> OutputFilesGenerator<'a, 'b> { #[cfg(test)] mod tests { use super::*; - use crate::wallet::OwenWallet; + use crate::wallet::{OwenWallet, OwenWalletConfig}; use alloy::primitives::Address; use std::str::FromStr; @@ -335,7 +335,8 @@ mod tests { signer_kms_id: None, use_batch_sender: false, }; - let owen_wallet = OwenWallet::build(&config).await?; + let owen_wallet_config = OwenWalletConfig::from(&config)?; + let owen_wallet = OwenWallet::build(&owen_wallet_config).await?; let ipfs_manager = IpfsManager::build(&config, &owen_wallet).await?; let output_files_generator = OutputFilesGenerator::build(&config, &ipfs_manager)?; let ddex_messages = output_files_generator.generate_files().await?; @@ -374,7 +375,8 @@ mod tests { use_batch_sender: false, }; fs::create_dir_all(&config.input_files_dir).unwrap(); - let owen_wallet = OwenWallet::build(&config).await.unwrap(); + let owen_wallet_config = OwenWalletConfig::from(&config).unwrap(); + let owen_wallet = OwenWallet::build(&owen_wallet_config).await.unwrap(); let ipfs_manager = IpfsManager::build(&config, &owen_wallet).await.unwrap(); let output_files_generator = OutputFilesGenerator::build(&config, &ipfs_manager).unwrap(); let _ = output_files_generator.generate_files().await.unwrap(); diff --git a/owen/src/wallet.rs b/owen/src/wallet.rs index 621d008..389e397 100644 --- a/owen/src/wallet.rs +++ b/owen/src/wallet.rs @@ -11,14 +11,102 @@ use aws_config::BehaviorVersion; use log_macros::{format_error, log_info}; pub struct OwenWallet { - use_kms: bool, + pub use_kms: bool, aws_signer: Option, private_key_signer: Option, pub wallet: EthereumWallet, } +pub trait HasOwenWalletFields { + fn use_kms(&self) -> bool; + fn rpc_url(&self) -> String; + fn private_key(&self) -> Option; + fn signer_kms_id(&self) -> Option; +} + +impl HasOwenWalletFields for Config { + fn use_kms(&self) -> bool { + self.use_kms + } + fn rpc_url(&self) -> String { + self.rpc_url.clone() + } + fn private_key(&self) -> Option { + self.private_key.clone() + } + fn signer_kms_id(&self) -> Option { + self.signer_kms_id.clone() + } +} + +pub struct OwenWalletConfig { + pub use_kms: bool, + pub rpc_url: String, + pub private_key: Option, + pub signer_kms_id: Option, +} + +impl OwenWalletConfig { + pub fn build() -> anyhow::Result { + let rpc_url = Config::get_env_var("RPC_URL"); + let mut signer_kms_id = None; + let mut private_key = None; + let use_kms = matches!( + std::env::var("USE_KMS") + .unwrap_or_else(|_| "false".to_string()) + .as_str(), + "1" | "true" + ); + + if use_kms { + signer_kms_id = Some(Config::get_env_var("SIGNER_KMS_ID")); + } else { + private_key = Some(Config::get_env_var("PRIVATE_KEY")); + } + + Ok(Self { + use_kms, + rpc_url, + private_key, + signer_kms_id, + }) + } + + pub fn from(config_source: &C) -> anyhow::Result { + Ok(Self { + use_kms: config_source.use_kms(), + rpc_url: config_source.rpc_url(), + private_key: config_source.private_key(), + signer_kms_id: config_source.signer_kms_id(), + }) + } + fn try_private_key(&self) -> anyhow::Result<&String> { + if self.use_kms == false { + self.private_key + .as_ref() + .ok_or_else(|| format_error!("Missing private_key")) + } else { + return Err(format_error!( + "private_key not available with USE_KMS=true flag" + )); + } + } + + fn try_signer_kms_id(&self) -> anyhow::Result<&String> { + if self.use_kms == true { + self.signer_kms_id + .as_ref() + .ok_or_else(|| format_error!("Missing signer_kms_id")) + } else { + return Err(format_error!( + "signer_kms_id not available without USE_KMS=true flag" + )); + } + } +} + impl OwenWallet { - pub async fn build(config: &Config) -> anyhow::Result { + pub async fn build(config: &OwenWalletConfig) -> anyhow::Result { let wallet: EthereumWallet; let mut aws_signer = None; let mut private_key_signer: Option = None; diff --git a/submodules/account-abstraction b/submodules/account-abstraction index 2636980..dc2187f 160000 --- a/submodules/account-abstraction +++ b/submodules/account-abstraction @@ -1 +1 @@ -Subproject commit 2636980b07a0cec779219965d4f421625cea99f6 +Subproject commit dc2187fc612c631d8b0b282ee0f4082c56de8b62