Skip to content

feat: adds bundles to in progress block creation #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ path = "bin/submit_transaction.rs"
[dependencies]
zenith-types = "0.13"

alloy = { version = "0.7.3", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev"] }
alloy = { version = "0.7.3", features = ["full", "json-rpc", "signer-aws", "rpc-types-mev", "rlp"] }
alloy-rlp = { version = "0.3.4" }

aws-config = "1.1.7"
Expand All @@ -50,4 +50,4 @@ tracing-subscriber = "0.3.18"
async-trait = "0.1.80"
oauth2 = "4.4.2"
metrics = "0.24.1"
metrics-exporter-prometheus = "0.16.0"
metrics-exporter-prometheus = "0.16.0"
3 changes: 1 addition & 2 deletions bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,10 @@ async fn main() -> eyre::Result<()> {
let sequencer_signer = config.connect_sequencer_signer().await?;
let zenith = config.connect_zenith(host_provider.clone());

let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider);

let metrics = MetricsTask { host_provider: host_provider.clone() };
let (tx_channel, metrics_jh) = metrics.spawn();

let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider.clone());
let submit = SubmitTask {
authenticator: authenticator.clone(),
host_provider,
Expand Down
22 changes: 15 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::signer::{LocalOrAws, SignerError};
use alloy::network::{Ethereum, EthereumWallet};
use alloy::primitives::Address;
use alloy::providers::fillers::BlobGasFiller;
use alloy::providers::{
fillers::{ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller, WalletFiller},
Identity, ProviderBuilder, RootProvider,
use alloy::{
network::{Ethereum, EthereumWallet},
primitives::Address,
providers::{
fillers::{
BlobGasFiller, ChainIdFiller, FillProvider, GasFiller, JoinFill, NonceFiller,
WalletFiller,
},
Identity, ProviderBuilder, RootProvider,
},
transports::BoxTransport,
};
use alloy::transports::BoxTransport;
use std::{borrow::Cow, env, num, str::FromStr};
use zenith_types::Zenith;

Expand All @@ -17,6 +21,7 @@ const HOST_RPC_URL: &str = "HOST_RPC_URL";
const ROLLUP_RPC_URL: &str = "ROLLUP_RPC_URL";
const TX_BROADCAST_URLS: &str = "TX_BROADCAST_URLS";
const ZENITH_ADDRESS: &str = "ZENITH_ADDRESS";
const BUILDER_HELPER_ADDRESS: &str = "BUILDER_HELPER_ADDRESS";
const QUINCEY_URL: &str = "QUINCEY_URL";
const BUILDER_PORT: &str = "BUILDER_PORT";
const SEQUENCER_KEY: &str = "SEQUENCER_KEY"; // empty (to use Quincey) OR AWS key ID (to use AWS signer) OR raw private key (to use local signer)
Expand Down Expand Up @@ -50,6 +55,8 @@ pub struct BuilderConfig {
pub tx_broadcast_urls: Vec<Cow<'static, str>>,
/// address of the Zenith contract on Host.
pub zenith_address: Address,
/// address of the Builder Helper contract on Host.
pub builder_helper_address: Address,
/// URL for remote Quincey Sequencer server to sign blocks.
/// Disregarded if a sequencer_signer is configured.
pub quincey_url: Cow<'static, str>,
Expand Down Expand Up @@ -157,6 +164,7 @@ impl BuilderConfig {
.map(Into::into)
.collect(),
zenith_address: load_address(ZENITH_ADDRESS)?,
builder_helper_address: load_address(BUILDER_HELPER_ADDRESS)?,
quincey_url: load_url(QUINCEY_URL)?,
builder_port: load_u16(BUILDER_PORT)?,
sequencer_key: load_string_option(SEQUENCER_KEY),
Expand Down
128 changes: 103 additions & 25 deletions src/tasks/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ use super::bundler::{Bundle, BundlePoller};
use super::oauth::Authenticator;
use super::tx_poller::TxPoller;
use crate::config::{BuilderConfig, WalletlessProvider};
use alloy::primitives::{keccak256, Bytes, B256};
use alloy::providers::Provider;
use alloy::{
consensus::{SidecarBuilder, SidecarCoder, TxEnvelope},
eips::eip2718::Decodable2718,
primitives::{keccak256, Bytes, B256},
providers::Provider as _,
rlp::Buf,
};
use alloy_rlp::Buf;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{sync::OnceLock, time::Duration};
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::Instrument;
use zenith_types::{encode_txns, Alloy2718Coder};
use tracing::{debug, error, info, trace, Instrument};
use zenith_types::{encode_txns, Alloy2718Coder, ZenithEthBundle};

/// Ethereum's slot time in seconds.
pub const ETHEREUM_SLOT_TIME: u64 = 12;
Expand Down Expand Up @@ -56,22 +56,22 @@ impl InProgressBlock {

/// Ingest a transaction into the in-progress block. Fails
pub fn ingest_tx(&mut self, tx: &TxEnvelope) {
tracing::trace!(hash = %tx.tx_hash(), "ingesting tx");
trace!(hash = %tx.tx_hash(), "ingesting tx");
self.unseal();
self.transactions.push(tx.clone());
}

/// Remove a transaction from the in-progress block.
pub fn remove_tx(&mut self, tx: &TxEnvelope) {
tracing::trace!(hash = %tx.tx_hash(), "removing tx");
trace!(hash = %tx.tx_hash(), "removing tx");
self.unseal();
self.transactions.retain(|t| t.tx_hash() != tx.tx_hash());
}

/// Ingest a bundle into the in-progress block.
/// Ignores Signed Orders for now.
pub fn ingest_bundle(&mut self, bundle: Bundle) {
tracing::trace!(bundle = %bundle.id, "ingesting bundle");
trace!(bundle = %bundle.id, "ingesting bundle");

let txs = bundle
.bundle
Expand All @@ -87,7 +87,7 @@ impl InProgressBlock {
// As this builder does not provide bundles landing "top of block", its fine to just extend.
self.transactions.extend(txs);
} else {
tracing::error!("failed to decode bundle. dropping");
error!("failed to decode bundle. dropping");
}
}

Expand Down Expand Up @@ -139,39 +139,51 @@ impl BlockBuilder {
}
}

/// Fetches transactions from the cache and ingests them into the in progress block
async fn get_transactions(&mut self, in_progress: &mut InProgressBlock) {
tracing::trace!("query transactions from cache");
trace!("query transactions from cache");
let txns = self.tx_poller.check_tx_cache().await;
match txns {
Ok(txns) => {
tracing::trace!("got transactions response");
trace!("got transactions response");
for txn in txns.into_iter() {
in_progress.ingest_tx(&txn);
}
}
Err(e) => {
tracing::error!(error = %e, "error polling transactions");
error!(error = %e, "error polling transactions");
}
}
}

async fn _get_bundles(&mut self, in_progress: &mut InProgressBlock) {
tracing::trace!("query bundles from cache");
/// Fetches bundles from the cache and ingests them into the in progress block
async fn get_bundles(&mut self, in_progress: &mut InProgressBlock) {
trace!("query bundles from cache");
let bundles = self.bundle_poller.check_bundle_cache().await;
match bundles {
Ok(bundles) => {
tracing::trace!("got bundles response");
for bundle in bundles {
in_progress.ingest_bundle(bundle);
match self.simulate_bundle(&bundle.bundle).await {
Ok(()) => in_progress.ingest_bundle(bundle.clone()),
Err(e) => error!(error = %e, id = ?bundle.id, "bundle simulation failed"),
}
}
}
Err(e) => {
tracing::error!(error = %e, "error polling bundles");
error!(error = %e, "error polling bundles");
}
}
self.bundle_poller.evict();
}

/// Simulates a Zenith bundle against the rollup state
async fn simulate_bundle(&mut self, bundle: &ZenithEthBundle) -> eyre::Result<()> {
// TODO: Simulate bundles with the Simulation Engine
// [ENG-672](https://linear.app/initiates/issue/ENG-672/add-support-for-bundles)
debug!(hash = ?bundle.bundle.bundle_hash(), block_number = ?bundle.block_number(), "bundle simulations is not implemented yet - skipping simulation");
Ok(())
}

async fn filter_transactions(&self, in_progress: &mut InProgressBlock) {
// query the rollup node to see which transaction(s) have been included
let mut confirmed_transactions = Vec::new();
Expand All @@ -185,7 +197,7 @@ impl BlockBuilder {
confirmed_transactions.push(transaction.clone());
}
}
tracing::trace!(confirmed = confirmed_transactions.len(), "found confirmed transactions");
trace!(confirmed = confirmed_transactions.len(), "found confirmed transactions");

// remove already-confirmed transactions
for transaction in confirmed_transactions {
Expand Down Expand Up @@ -213,32 +225,98 @@ impl BlockBuilder {
loop {
// sleep the buffer time
tokio::time::sleep(Duration::from_secs(self.secs_to_next_target())).await;
tracing::info!("beginning block build cycle");
info!("beginning block build cycle");

// Build a block
let mut in_progress = InProgressBlock::default();
self.get_transactions(&mut in_progress).await;

// TODO: Implement bundle ingestion #later
// self.get_bundles(&mut in_progress).await;
self.get_bundles(&mut in_progress).await;

// Filter confirmed transactions from the block
self.filter_transactions(&mut in_progress).await;

// submit the block if it has transactions
if !in_progress.is_empty() {
tracing::debug!(txns = in_progress.len(), "sending block to submit task");
debug!(txns = in_progress.len(), "sending block to submit task");
let in_progress_block = std::mem::take(&mut in_progress);
if outbound.send(in_progress_block).is_err() {
tracing::error!("downstream task gone");
error!("downstream task gone");
break;
}
} else {
tracing::debug!("no transactions, skipping block submission");
debug!("no transactions, skipping block submission");
}
}
}
.in_current_span(),
)
}
}

#[cfg(test)]
mod tests {
use super::*;
use alloy::primitives::Address;
use alloy::{
eips::eip2718::Encodable2718,
network::{EthereumWallet, TransactionBuilder},
rpc::types::{mev::EthSendBundle, TransactionRequest},
signers::local::PrivateKeySigner,
};
use zenith_types::ZenithEthBundle;

/// Create a mock bundle for testing with a single transaction
async fn create_mock_bundle(wallet: &EthereumWallet) -> Bundle {
let tx = TransactionRequest::default()
.to(Address::ZERO)
.from(wallet.default_signer().address())
.nonce(1)
.max_fee_per_gas(2)
.max_priority_fee_per_gas(3)
.gas_limit(4)
.build(wallet)
.await
.unwrap()
.encoded_2718();

let eth_bundle = EthSendBundle {
txs: vec![tx.into()],
block_number: 1,
min_timestamp: Some(u64::MIN),
max_timestamp: Some(u64::MAX),
reverting_tx_hashes: vec![],
replacement_uuid: Some("replacement_uuid".to_owned()),
};

let zenith_bundle = ZenithEthBundle { bundle: eth_bundle, host_fills: None };

Bundle { id: "mock_bundle".to_owned(), bundle: zenith_bundle }
}

#[tokio::test]
async fn test_ingest_bundle() {
// Setup random creds
let signer = PrivateKeySigner::random();
let wallet = EthereumWallet::from(signer);

// Create an empty InProgressBlock and bundle
let mut in_progress_block = InProgressBlock::new();
let bundle = create_mock_bundle(&wallet).await;

// Save previous hash for comparison
let prev_hash = in_progress_block.contents_hash();

// Ingest the bundle
in_progress_block.ingest_bundle(bundle);

// Assert hash is changed after ingest
assert_ne!(prev_hash, in_progress_block.contents_hash(), "Bundle should change block hash");

// Assert that the transaction was persisted into block
assert_eq!(in_progress_block.len(), 1, "Bundle should be persisted");

// Assert that the block is properly sealed
let raw_encoding = in_progress_block.encode_raw();
assert!(!raw_encoding.is_empty(), "Raw encoding should not be empty");
}
}
1 change: 1 addition & 0 deletions src/tasks/oauth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ mod tests {
oauth_token_url: "http://localhost:9000".into(),
tx_broadcast_urls: vec!["http://localhost:9000".into()],
oauth_token_refresh_interval: 300, // 5 minutes
builder_helper_address: Address::default(),
};
Ok(config)
}
Expand Down
22 changes: 16 additions & 6 deletions src/tasks/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ use oauth2::TokenResponse;
use std::time::Instant;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::{debug, error, instrument, trace};
use zenith_types::{SignRequest, SignResponse, Zenith, Zenith::IncorrectHostBlock};
use zenith_types::{
BundleHelper::{self, FillPermit2},
SignRequest, SignResponse,
Zenith::IncorrectHostBlock,
};

macro_rules! spawn_provider_send {
($provider:expr, $tx:expr) => {
Expand Down Expand Up @@ -110,20 +114,23 @@ impl SubmitTask {
/// Builds blob transaction from the provided header and signature values
fn build_blob_tx(
&self,
header: Zenith::BlockHeader,
fills: Vec<FillPermit2>,
header: BundleHelper::BlockHeader,
v: u8,
r: FixedBytes<32>,
s: FixedBytes<32>,
in_progress: &InProgressBlock,
) -> eyre::Result<TransactionRequest> {
let data = Zenith::submitBlockCall { header, v, r, s, _4: Default::default() }.abi_encode();
let data = zenith_types::BundleHelper::submitCall { fills, header, v, r, s }.abi_encode();

let sidecar = in_progress.encode_blob::<SimpleCoder>().build()?;
Ok(TransactionRequest::default()
.with_blob_sidecar(sidecar)
.with_input(data)
.with_max_priority_fee_per_gas((GWEI_TO_WEI * 16) as u128))
}

/// Returns the next host block height
async fn next_host_block_height(&self) -> eyre::Result<u64> {
let result = self.host_provider.get_block_number().await?;
let next = result.checked_add(1).ok_or_else(|| eyre!("next host block height overflow"))?;
Expand All @@ -138,18 +145,19 @@ impl SubmitTask {
) -> eyre::Result<ControlFlow> {
let (v, r, s) = extract_signature_components(&resp.sig);

let header = Zenith::BlockHeader {
let header = zenith_types::BundleHelper::BlockHeader {
hostBlockNumber: resp.req.host_block_number,
rollupChainId: U256::from(self.config.ru_chain_id),
gasLimit: resp.req.gas_limit,
rewardAddress: resp.req.ru_reward_address,
blockDataHash: in_progress.contents_hash(),
};

let fills = vec![]; // NB: ignored until fills are implemented
let tx = self
.build_blob_tx(header, v, r, s, in_progress)?
.build_blob_tx(fills, header, v, r, s, in_progress)?
.with_from(self.host_provider.default_signer_address())
.with_to(self.config.zenith_address)
.with_to(self.config.builder_helper_address)
.with_gas_limit(1_000_000);

if let Err(TransportError::ErrorResp(e)) =
Expand All @@ -168,6 +176,8 @@ impl SubmitTask {

return Ok(ControlFlow::Skip);
}

// All validation checks have passed, send the transaction
self.send_transaction(resp, tx).await
}

Expand Down
1 change: 1 addition & 0 deletions tests/bundle_poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod tests {
oauth_token_url: "http://localhost:8080".into(),
tx_broadcast_urls: vec!["http://localhost:9000".into()],
oauth_token_refresh_interval: 300, // 5 minutes
builder_helper_address: Address::default(),
};
Ok(config)
}
Expand Down
Loading
Loading