Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aws/blobs_batch_sender/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ owen = { workspace = true, features = ["aws-integration"] }
serde = { workspace = true }
serde_json = { workspace = true }
aws-sdk-s3 = "1.98.0"
alloy = { version = "1.0.32", features = ["full"] }
123 changes: 123 additions & 0 deletions aws/blobs_batch_sender/src/contract.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use crate::contract::sEOA::SubmitNewBlobInput;
use alloy::{
consensus::BlobTransactionSidecar,
network::EthereumWallet,
primitives::{Address, Bytes, FixedBytes},
providers::{
Identity, ProviderBuilder,
fillers::{
BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller,
WalletFiller,
},
},
sol,
};
use blobs_batch_sender::BlobsBatchSenderConfig;
use lambda_runtime::Error;
use owen::blobs_queue::BlobsQueueS3JsonFile;

sol!(
#[allow(missing_docs)]
#[sol(rpc)]
sEOA,
"../../submodules/account-abstraction/artifacts/contracts/sEOA.sol/sEOA.json"
);

type HardlyTypedProvider = FillProvider<
JoinFill<
JoinFill<
Identity,
JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
>,
WalletFiller<EthereumWallet>,
>,
alloy::providers::RootProvider,
>;

struct SendBatchTxInput {
tx_params: Vec<SubmitNewBlobInput>,
sidecar: BlobTransactionSidecar,
}

pub struct SmartEoaManager {
ddex_sequencer_address: Address,
s_eoa: sEOA::sEOAInstance<HardlyTypedProvider>,
}

impl SmartEoaManager {
pub fn build(config: &BlobsBatchSenderConfig, wallet: EthereumWallet) -> Result<Self, Error> {
let provider = ProviderBuilder::new()
.wallet(wallet)
.connect_http(config.rpc_url.parse()?);

let s_eoa = sEOA::new(config.s_eoa_address, provider);

Ok(Self {
ddex_sequencer_address: config.ddex_sequencer_address,
s_eoa,
})
}

fn parse_batch_input(
blob_tx_data_vec: Vec<BlobsQueueS3JsonFile>,
) -> Result<SendBatchTxInput, Error> {
let tx_params: Vec<SubmitNewBlobInput> = blob_tx_data_vec
.iter()
.map(|blob_json_file| SubmitNewBlobInput {
imageId: blob_json_file.image_id,
commitment: Bytes::from(blob_json_file.tx_data.kzg_commitment.to_vec()),
blobSha2: FixedBytes::<32>::from(blob_json_file.tx_data.blob_sha2),
})
.collect();

let mut sidecar = BlobTransactionSidecar {
blobs: Vec::new(),
commitments: Vec::new(),
proofs: Vec::new(),
};

for blob_json_file in blob_tx_data_vec {
sidecar.blobs.push(
blob_json_file
.tx_data
.blob_sidecar
.blobs
.first()
.expect("Blob missing in input")
.clone(),
);
sidecar.commitments.push(
blob_json_file
.tx_data
.blob_sidecar
.commitments
.first()
.expect("commitment missing in input")
.clone(),
);
sidecar.proofs.push(
blob_json_file
.tx_data
.blob_sidecar
.proofs
.first()
.expect("proof missing in input")
.clone(),
);
}
Ok(SendBatchTxInput { tx_params, sidecar })
}

pub async fn send_batch(
&self,
blob_tx_data_vec: Vec<BlobsQueueS3JsonFile>,
) -> Result<(), Error> {
let batch_input = Self::parse_batch_input(blob_tx_data_vec)?;
let mut _tx_builder = self
.s_eoa
.submitNewBlobBatch(batch_input.tx_params, self.ddex_sequencer_address)
.sidecar(batch_input.sidecar)
.max_fee_per_blob_gas(1000000001);
Ok(())
}
}
28 changes: 22 additions & 6 deletions aws/blobs_batch_sender/src/event_handler.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,29 @@
use aws_lambda_events::event::sqs::SqsEvent;
use blobs_batch_sender::BlobsBatchSenderConfig;
use lambda_runtime::{Error, LambdaEvent};
use owen::blobs_queue::BlobsQueueMessageBody;
use owen::{
blobs_queue::BlobsQueueMessageBody,
wallet::{OwenWallet, OwenWalletConfig},
};

use crate::s3;
use crate::{contract::SmartEoaManager, s3::BlobsStorage};

pub(crate) async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
let config = BlobsBatchSenderConfig::build()?;
let blobs_storage = BlobsStorage::build(&config).await?;
let owen_wallet_config = OwenWalletConfig::from(&config)?;
let owen_wallet = OwenWallet::build(&owen_wallet_config).await?;
let smart_eoa_manager = SmartEoaManager::build(&config, owen_wallet.wallet)?;

let blobhashes = extract_blobhashes(event)?;
let blob_tx_data_vec = blobs_storage.read(blobhashes).await?;

smart_eoa_manager.send_batch(blob_tx_data_vec).await?;

Ok(())
}

fn extract_blobhashes(event: LambdaEvent<SqsEvent>) -> Result<Vec<String>, Error> {
let messages: Vec<BlobsQueueMessageBody> = event
.payload
.records
Expand All @@ -24,8 +43,5 @@ pub(crate) async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(),
.iter()
.map(|message| message.blobhash.clone())
.collect();

s3::read_blobs(blobhashes).await?;

Ok(())
Ok(blobhashes)
}
75 changes: 75 additions & 0 deletions aws/blobs_batch_sender/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use alloy::primitives::Address;
use lambda_runtime::Error;
use owen::constants;
use owen::wallet::HasOwenWalletFields;
use std::env;
use std::str::FromStr;

impl HasOwenWalletFields for BlobsBatchSenderConfig {
fn use_kms(&self) -> bool {
self.use_kms
}
fn rpc_url(&self) -> String {
self.rpc_url.clone()
}
fn private_key(&self) -> Option<String> {
self.private_key.clone()
}
fn signer_kms_id(&self) -> Option<String> {
self.signer_kms_id.clone()
}
}

pub struct BlobsBatchSenderConfig {
pub ddex_sequencer_address: Address,
pub s_eoa_address: Address,
pub rpc_url: String,
pub blobs_temp_storage_bucket_name: String,
pub use_kms: bool,
pub private_key: Option<String>,
pub signer_kms_id: Option<String>,
}

impl BlobsBatchSenderConfig {
fn get_env_var(key: &str) -> String {
env::var(key).expect(format!("Missing env variable: {key}").as_str())
}
pub fn build() -> Result<BlobsBatchSenderConfig, Error> {
let rpc_url = Self::get_env_var("RPC_URL");
let ddex_sequencer_address = Address::from_str(
std::env::var("DDEX_SEQUENCER_ADDRESS")
.unwrap_or_else(|_| constants::DDEX_SEQUENCER_ADDRESS.to_string())
.as_str(),
)
.expect("Could not parse ddex sequencer address");

let s_eoa_address = Address::from_str(Self::get_env_var("S_EOA_ADDRESS").as_str())?;

let blobs_temp_storage_bucket_name = Self::get_env_var("BLOBS_TEMP_STORAGE_BUCKET_NAME");

let mut signer_kms_id = None;
let mut private_key = None;
let use_kms = matches!(
std::env::var("USE_KMS")
.unwrap_or_else(|_| "false".to_string())
.as_str(),
"1" | "true"
);

if use_kms {
signer_kms_id = Some(Self::get_env_var("SIGNER_KMS_ID"));
} else {
private_key = Some(Self::get_env_var("PRIVATE_KEY"));
}

Ok(BlobsBatchSenderConfig {
ddex_sequencer_address,
rpc_url,
s_eoa_address,
blobs_temp_storage_bucket_name,
private_key,
use_kms,
signer_kms_id,
})
}
}
1 change: 1 addition & 0 deletions aws/blobs_batch_sender/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod contract;
mod event_handler;
mod s3;
use event_handler::function_handler;
Expand Down
65 changes: 38 additions & 27 deletions aws/blobs_batch_sender/src/s3.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,48 @@
use std::env;

use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::Client;
use blobs_batch_sender::BlobsBatchSenderConfig;
use lambda_runtime::Error;
use owen::blob::BlobTransactionData;
use owen::blobs_queue::BlobsQueueS3JsonFile;
use tokio::io::AsyncReadExt;

pub async fn read_blobs(blobhashes: Vec<String>) -> Result<(), Error> {
let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
let config = aws_config::from_env().region(region_provider).load().await;
let client = Client::new(&config);

let bucket = env::var("BLOBS_TEMP_STORAGE_BUCKET_NAME")
.expect(format!("Missing env variable: BLOBS_TEMP_STORAGE_BUCKET_NAME").as_str());

let mut blobs: Vec<BlobTransactionData> = Vec::new();
pub struct BlobsStorage {
client: Client,
bucket_name: String,
}

for blobhash in blobhashes {
let key = format!("blobs/{}.json", blobhash);
let resp = client.get_object().bucket(&bucket).key(key).send().await?;
impl BlobsStorage {
pub async fn build(config: &BlobsBatchSenderConfig) -> Result<Self, Error> {
let region_provider = RegionProviderChain::default_provider().or_else("us-east-1");
let aws_config = aws_config::from_env().region(region_provider).load().await;
let client: Client = Client::new(&aws_config);

let mut body = resp.body.into_async_read();
let mut contents = String::new();
body.read_to_string(&mut contents).await?;
let blob_transaction_data: BlobTransactionData = serde_json::from_str(&contents).unwrap();
blobs.push(blob_transaction_data);
Ok(Self {
client,
bucket_name: config.blobs_temp_storage_bucket_name.clone(),
})
}

let commitments: Vec<Vec<u8>> = blobs
.iter()
.map(|blob| blob.kzg_commitment.clone())
.collect();
println!("{commitments:?}");

Ok(())
pub async fn read(&self, blobhashes: Vec<String>) -> Result<Vec<BlobsQueueS3JsonFile>, Error> {
let mut blobs: Vec<BlobsQueueS3JsonFile> = Vec::new();

for blobhash in blobhashes {
let key = format!("blobs/{}.json", blobhash);
let resp = self
.client
.get_object()
.bucket(self.bucket_name.clone())
.key(key)
.send()
.await?;

let mut body = resp.body.into_async_read();
let mut contents = String::new();
body.read_to_string(&mut contents).await?;
let blob_transaction_data: BlobsQueueS3JsonFile =
serde_json::from_str(&contents).unwrap();
blobs.push(blob_transaction_data);
}

Ok(blobs)
}
}
2 changes: 1 addition & 1 deletion owen/src/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use c_kzg::{ethereum_kzg_settings, Blob};
use log_macros::{format_error, log_info};
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
#[derive(Deserialize, Serialize, Clone)]
pub struct BlobTransactionData {
pub kzg_commitment: Vec<u8>,
pub blob_sidecar: BlobTransactionSidecar,
Expand Down
Loading