Skip to content

Commit ce91617

Browse files
committed
feat: adds bundles to in progress block creation
- checks the cache for bundles - ingests any new bundles that it finds into the in progress block - adds a unit test for bundle ingestion
1 parent f1ea58e commit ce91617

File tree

2 files changed

+89
-15
lines changed

2 files changed

+89
-15
lines changed

src/tasks/block.rs

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use alloy_rlp::Buf;
1212
use std::time::{SystemTime, UNIX_EPOCH};
1313
use std::{sync::OnceLock, time::Duration};
1414
use tokio::{sync::mpsc, task::JoinHandle};
15-
use tracing::Instrument;
15+
use tracing::{debug, error, Instrument};
1616
use zenith_types::{encode_txns, Alloy2718Coder};
1717

1818
/// Ethereum's slot time in seconds.
@@ -71,7 +71,7 @@ impl InProgressBlock {
7171
/// Ingest a bundle into the in-progress block.
7272
/// Ignores Signed Orders for now.
7373
pub fn ingest_bundle(&mut self, bundle: Bundle) {
74-
tracing::trace!(bundle = %bundle.id, "ingesting bundle");
74+
debug!(bundle = %bundle.id, "ingesting bundle");
7575

7676
let txs = bundle
7777
.bundle
@@ -83,11 +83,9 @@ impl InProgressBlock {
8383

8484
if let Ok(txs) = txs {
8585
self.unseal();
86-
// extend the transactions with the decoded transactions.
87-
// As this builder does not provide bundles landing "top of block", its fine to just extend.
8886
self.transactions.extend(txs);
8987
} else {
90-
tracing::error!("failed to decode bundle. dropping");
88+
error!("failed to decode bundle. dropping");
9189
}
9290
}
9391

@@ -155,12 +153,12 @@ impl BlockBuilder {
155153
}
156154
}
157155

158-
async fn _get_bundles(&mut self, in_progress: &mut InProgressBlock) {
156+
async fn get_bundles(&mut self, in_progress: &mut InProgressBlock) {
159157
tracing::trace!("query bundles from cache");
160158
let bundles = self.bundle_poller.check_bundle_cache().await;
159+
// OPTIMIZE: Sort bundles received from cache
161160
match bundles {
162161
Ok(bundles) => {
163-
tracing::trace!("got bundles response");
164162
for bundle in bundles {
165163
in_progress.ingest_bundle(bundle);
166164
}
@@ -218,9 +216,7 @@ impl BlockBuilder {
218216
// Build a block
219217
let mut in_progress = InProgressBlock::default();
220218
self.get_transactions(&mut in_progress).await;
221-
222-
// TODO: Implement bundle ingestion #later
223-
// self.get_bundles(&mut in_progress).await;
219+
self.get_bundles(&mut in_progress).await;
224220

225221
// Filter confirmed transactions from the block
226222
self.filter_transactions(&mut in_progress).await;
@@ -242,3 +238,71 @@ impl BlockBuilder {
242238
)
243239
}
244240
}
241+
242+
#[cfg(test)]
243+
mod tests {
244+
use super::*;
245+
use alloy::primitives::Address;
246+
use alloy::{
247+
eips::eip2718::Encodable2718,
248+
network::{EthereumWallet, TransactionBuilder},
249+
rpc::types::{mev::EthSendBundle, TransactionRequest},
250+
signers::local::PrivateKeySigner,
251+
};
252+
use zenith_types::ZenithEthBundle;
253+
254+
/// Create a mock bundle for testing with a single transaction
255+
async fn create_mock_bundle(wallet: &EthereumWallet) -> Bundle {
256+
let tx = TransactionRequest::default()
257+
.to(Address::ZERO)
258+
.from(wallet.default_signer().address())
259+
.nonce(1)
260+
.max_fee_per_gas(2)
261+
.max_priority_fee_per_gas(3)
262+
.gas_limit(4)
263+
.build(wallet)
264+
.await
265+
.unwrap()
266+
.encoded_2718();
267+
268+
let eth_bundle = EthSendBundle {
269+
txs: vec![tx.into()],
270+
block_number: 1,
271+
min_timestamp: Some(u64::MIN),
272+
max_timestamp: Some(u64::MAX),
273+
reverting_tx_hashes: vec![],
274+
replacement_uuid: Some("replacement_uuid".to_owned()),
275+
};
276+
277+
let zenith_bundle = ZenithEthBundle { bundle: eth_bundle, host_fills: None };
278+
279+
Bundle { id: "mock_bundle".to_owned(), bundle: zenith_bundle }
280+
}
281+
282+
#[tokio::test]
283+
async fn test_ingest_bundle() {
284+
// Setup random creds
285+
let signer = PrivateKeySigner::random();
286+
let wallet = EthereumWallet::from(signer);
287+
288+
// Create an empty InProgressBlock and bundle
289+
let mut in_progress_block = InProgressBlock::new();
290+
let bundle = create_mock_bundle(&wallet).await;
291+
292+
// Save previous hash for comparison
293+
let prev_hash = in_progress_block.contents_hash();
294+
295+
// Ingest the bundle
296+
in_progress_block.ingest_bundle(bundle);
297+
298+
// Assert hash is changed after ingest
299+
assert_ne!(prev_hash, in_progress_block.contents_hash(), "Bundle should change block hash");
300+
301+
// Assert that the transaction was persisted into block
302+
assert_eq!(in_progress_block.len(), 1, "Bundle should be persisted");
303+
304+
// Assert that the block is properly sealed
305+
let raw_encoding = in_progress_block.encode_raw();
306+
assert!(!raw_encoding.is_empty(), "Raw encoding should not be empty");
307+
}
308+
}

src/tasks/submit.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ use oauth2::TokenResponse;
2121
use std::time::Instant;
2222
use tokio::{sync::mpsc, task::JoinHandle};
2323
use tracing::{debug, error, instrument, trace};
24-
use zenith_types::{SignRequest, SignResponse, Zenith, Zenith::IncorrectHostBlock};
24+
use zenith_types::{
25+
BundleHelper::{self, FillPermit2},
26+
SignRequest, SignResponse,
27+
Zenith::IncorrectHostBlock,
28+
};
2529

2630
macro_rules! spawn_provider_send {
2731
($provider:expr, $tx:expr) => {
@@ -108,20 +112,23 @@ impl SubmitTask {
108112
/// Builds blob transaction from the provided header and signature values
109113
fn build_blob_tx(
110114
&self,
111-
header: Zenith::BlockHeader,
115+
fills: Vec<FillPermit2>,
116+
header: BundleHelper::BlockHeader,
112117
v: u8,
113118
r: FixedBytes<32>,
114119
s: FixedBytes<32>,
115120
in_progress: &InProgressBlock,
116121
) -> eyre::Result<TransactionRequest> {
117-
let data = Zenith::submitBlockCall { header, v, r, s, _4: Default::default() }.abi_encode();
122+
let data = zenith_types::BundleHelper::submitCall { fills, header, v, r, s }.abi_encode();
123+
118124
let sidecar = in_progress.encode_blob::<SimpleCoder>().build()?;
119125
Ok(TransactionRequest::default()
120126
.with_blob_sidecar(sidecar)
121127
.with_input(data)
122128
.with_max_priority_fee_per_gas((GWEI_TO_WEI * 16) as u128))
123129
}
124130

131+
/// Returns the next host block height
125132
async fn next_host_block_height(&self) -> eyre::Result<u64> {
126133
let result = self.host_provider.get_block_number().await?;
127134
let next = result.checked_add(1).ok_or_else(|| eyre!("next host block height overflow"))?;
@@ -138,16 +145,17 @@ impl SubmitTask {
138145
let r: FixedBytes<32> = resp.sig.r().into();
139146
let s: FixedBytes<32> = resp.sig.s().into();
140147

141-
let header = Zenith::BlockHeader {
148+
let header = zenith_types::BundleHelper::BlockHeader {
142149
hostBlockNumber: resp.req.host_block_number,
143150
rollupChainId: U256::from(self.config.ru_chain_id),
144151
gasLimit: resp.req.gas_limit,
145152
rewardAddress: resp.req.ru_reward_address,
146153
blockDataHash: in_progress.contents_hash(),
147154
};
148155

156+
let fills = vec![]; // NB: ignored until fills are implemented
149157
let tx = self
150-
.build_blob_tx(header, v, r, s, in_progress)?
158+
.build_blob_tx(fills, header, v, r, s, in_progress)?
151159
.with_from(self.host_provider.default_signer_address())
152160
.with_to(self.config.zenith_address)
153161
.with_gas_limit(1_000_000);
@@ -168,6 +176,8 @@ impl SubmitTask {
168176

169177
return Ok(ControlFlow::Skip);
170178
}
179+
180+
// All validation checks have passed, send the transaction
171181
self.send_transaction(resp, tx).await
172182
}
173183

0 commit comments

Comments
 (0)