diff --git a/bin/builder.rs b/bin/builder.rs index ccea2bf..8a419ac 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -2,9 +2,9 @@ use builder::config::BuilderConfig; use builder::service::serve_builder_with_span; -use builder::tasks::bundler::BundlePoller; +use builder::tasks::block::BlockBuilder; use builder::tasks::oauth::Authenticator; -use builder::tasks::tx_poller::TxPoller; +use builder::tasks::submit::SubmitTask; use tokio::select; @@ -22,12 +22,8 @@ async fn main() -> eyre::Result<()> { let sequencer_signer = config.connect_sequencer_signer().await?; let zenith = config.connect_zenith(provider.clone()); - let port = config.builder_port; - let tx_poller = TxPoller::new(&config); - let bundle_poller = BundlePoller::new(&config, authenticator.clone()).await; - let builder = builder::tasks::block::BlockBuilder::new(&config); - - let submit = builder::tasks::submit::SubmitTask { + let builder = BlockBuilder::new(&config, authenticator.clone()); + let submit = SubmitTask { authenticator: authenticator.clone(), provider, zenith, @@ -38,10 +34,9 @@ async fn main() -> eyre::Result<()> { let authenticator_jh = authenticator.spawn(); let (submit_channel, submit_jh) = submit.spawn(); - let (tx_channel, bundle_channel, build_jh) = builder.spawn(submit_channel); - let tx_poller_jh = tx_poller.spawn(tx_channel.clone()); - let bundle_poller_jh = bundle_poller.spawn(bundle_channel); + let build_jh = builder.spawn(submit_channel); + let port = config.builder_port; let server = serve_builder_with_span(([0, 0, 0, 0], port), span); select! { @@ -54,12 +49,6 @@ async fn main() -> eyre::Result<()> { _ = server => { tracing::info!("server finished"); } - _ = tx_poller_jh => { - tracing::info!("tx_poller finished"); - } - _ = bundle_poller_jh => { - tracing::info!("bundle_poller finished"); - } _ = authenticator_jh => { tracing::info!("authenticator finished"); } diff --git a/src/config.rs b/src/config.rs index 976e564..be110d6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -25,7 +25,6 @@ const BLOCK_CONFIRMATION_BUFFER: &str = "BLOCK_CONFIRMATION_BUFFER"; const BUILDER_REWARDS_ADDRESS: &str = "BUILDER_REWARDS_ADDRESS"; const ROLLUP_BLOCK_GAS_LIMIT: &str = "ROLLUP_BLOCK_GAS_LIMIT"; const TX_POOL_URL: &str = "TX_POOL_URL"; -const TX_POOL_POLL_INTERVAL: &str = "TX_POOL_POLL_INTERVAL"; const AUTH_TOKEN_REFRESH_INTERVAL: &str = "AUTH_TOKEN_REFRESH_INTERVAL"; const TX_POOL_CACHE_DURATION: &str = "TX_POOL_CACHE_DURATION"; const OAUTH_CLIENT_ID: &str = "OAUTH_CLIENT_ID"; @@ -69,8 +68,6 @@ pub struct BuilderConfig { pub rollup_block_gas_limit: u64, /// URL of the tx pool to poll for incoming transactions. pub tx_pool_url: Cow<'static, str>, - //// Interval in seconds to poll the tx-pool for new transactions. - pub tx_pool_poll_interval: u64, /// Duration in seconds transactions can live in the tx-pool cache. pub tx_pool_cache_duration: u64, /// OAuth client ID for the builder. @@ -155,7 +152,6 @@ impl BuilderConfig { builder_rewards_address: load_address(BUILDER_REWARDS_ADDRESS)?, rollup_block_gas_limit: load_u64(ROLLUP_BLOCK_GAS_LIMIT)?, tx_pool_url: load_url(TX_POOL_URL)?, - tx_pool_poll_interval: load_u64(TX_POOL_POLL_INTERVAL)?, tx_pool_cache_duration: load_u64(TX_POOL_CACHE_DURATION)?, oauth_client_id: load_string(OAUTH_CLIENT_ID)?, oauth_client_secret: load_string(OAUTH_CLIENT_SECRET)?, diff --git a/src/tasks/block.rs b/src/tasks/block.rs index c674cdc..2a25c3d 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -5,14 +5,15 @@ use alloy::{ use alloy_primitives::{keccak256, Bytes, B256}; use alloy_rlp::Buf; use std::{sync::OnceLock, time::Duration}; -use tokio::{select, sync::mpsc, task::JoinHandle}; +use tokio::{sync::mpsc, task::JoinHandle}; use tracing::Instrument; use zenith_types::{encode_txns, Alloy2718Coder}; +use super::bundler::{Bundle, BundlePoller}; +use super::oauth::Authenticator; +use super::tx_poller::TxPoller; use crate::config::BuilderConfig; -use super::bundler::Bundle; - #[derive(Debug, Default, Clone)] /// A block in progress. pub struct InProgressBlock { @@ -109,75 +110,78 @@ impl InProgressBlock { pub struct BlockBuilder { pub incoming_transactions_buffer: u64, pub config: BuilderConfig, + pub tx_poller: TxPoller, + pub bundle_poller: BundlePoller, } impl BlockBuilder { // create a new block builder with the given config. - pub fn new(config: &BuilderConfig) -> Self { + pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self { Self { config: config.clone(), incoming_transactions_buffer: config.incoming_transactions_buffer, + tx_poller: TxPoller::new(config), + bundle_poller: BundlePoller::new(config, authenticator), } } - /// Spawn the block builder task, returning the inbound channel to it, and - /// a handle to the running task. - pub fn spawn( - self, - outbound: mpsc::UnboundedSender, - ) -> (mpsc::UnboundedSender, mpsc::UnboundedSender, JoinHandle<()>) { - let mut in_progress = InProgressBlock::default(); - - let (tx_sender, mut tx_inbound) = mpsc::unbounded_channel(); - let (bundle_sender, mut bundle_inbound) = mpsc::unbounded_channel(); + async fn get_transactions(&mut self, in_progress: &mut InProgressBlock) { + let txns = self.tx_poller.check_tx_cache().await; + match txns { + Ok(txns) => { + for txn in txns.into_iter() { + in_progress.ingest_tx(&txn); + } + } + Err(e) => { + tracing::error!(error = %e, "error polling transactions"); + } + } + self.tx_poller.evict(); + } - let mut sleep = - Box::pin(tokio::time::sleep(Duration::from_secs(self.incoming_transactions_buffer))); + async fn get_bundles(&mut self, in_progress: &mut InProgressBlock) { + let bundles = self.bundle_poller.check_bundle_cache().await; + match bundles { + Ok(bundles) => { + for bundle in bundles { + in_progress.ingest_bundle(bundle); + } + } + Err(e) => { + tracing::error!(error = %e, "error polling bundles"); + } + } + self.bundle_poller.evict(); + } - let handle = tokio::spawn( + /// Spawn the block builder task, returning the inbound channel to it, and + /// a handle to the running task. + pub fn spawn(mut self, outbound: mpsc::UnboundedSender) -> JoinHandle<()> { + tokio::spawn( async move { loop { - - select! { - biased; - _ = &mut sleep => { - if !in_progress.is_empty() { - tracing::debug!(txns = in_progress.len(), "sending block to submit task"); - let in_progress_block = std::mem::take(&mut in_progress); - if outbound.send(in_progress_block).is_err() { - tracing::debug!("downstream task gone"); - break - } - } - - // Reset the sleep timer, as we want to do so when (and only when) our sleep future has elapsed, - // irrespective of whether we have any blocks to build. - sleep.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(self.incoming_transactions_buffer)); - } - tx_resp = tx_inbound.recv() => { - match tx_resp { - Some(tx) => in_progress.ingest_tx(&tx), - None => { - tracing::debug!("upstream task gone"); - break - } - } - } - bundle_resp = bundle_inbound.recv() => { - match bundle_resp { - Some(bundle) => in_progress.ingest_bundle(bundle), - None => { - tracing::debug!("upstream task gone"); - break - } - } + // sleep the buffer time + tokio::time::sleep(Duration::from_secs(self.incoming_transactions_buffer)) + .await; + + // Build a block + let mut in_progress = InProgressBlock::default(); + self.get_transactions(&mut in_progress).await; + self.get_bundles(&mut in_progress).await; + + // submit the block if it has transactions + if !in_progress.is_empty() { + tracing::debug!(txns = in_progress.len(), "sending block to submit task"); + let in_progress_block = std::mem::take(&mut in_progress); + if outbound.send(in_progress_block).is_err() { + tracing::debug!("downstream task gone"); + break; } } } } .in_current_span(), - ); - - (tx_sender, bundle_sender, handle) + ) } } diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index d51f057..d9ee73f 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -6,8 +6,6 @@ use alloy_primitives::map::HashMap; use reqwest::Url; use serde::{Deserialize, Serialize}; use signet_types::SignetEthBundle; -use tokio::{sync::mpsc, task::JoinHandle}; -use tracing::debug; use oauth2::TokenResponse; @@ -34,7 +32,7 @@ pub struct BundlePoller { /// Implements a poller for the block builder to pull bundles from the tx cache. impl BundlePoller { /// Creates a new BundlePoller from the provided builder config. - pub async fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self { + pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self { Self { config: config.clone(), authenticator, seen_uuids: HashMap::new() } } @@ -73,7 +71,7 @@ impl BundlePoller { } /// Evicts expired bundles from the cache. - fn evict(&mut self) { + pub fn evict(&mut self) { let expired_keys: Vec = self .seen_uuids .iter() @@ -92,34 +90,4 @@ impl BundlePoller { self.seen_uuids.remove(&key); } } - - pub fn spawn(mut self, bundle_channel: mpsc::UnboundedSender) -> JoinHandle<()> { - let handle: JoinHandle<()> = tokio::spawn(async move { - loop { - let bundle_channel = bundle_channel.clone(); - let bundles = self.check_bundle_cache().await; - - match bundles { - Ok(bundles) => { - for bundle in bundles { - let result = bundle_channel.send(bundle); - if result.is_err() { - tracing::debug!("bundle_channel failed to send bundle"); - } - } - } - Err(err) => { - debug!(?err, "error fetching bundles from tx-pool"); - } - } - - // evict expired bundles once every loop - self.evict(); - - tokio::time::sleep(Duration::from_secs(self.config.tx_pool_poll_interval)).await; - } - }); - - handle - } } diff --git a/src/tasks/oauth.rs b/src/tasks/oauth.rs index c2a6e1a..76b49a4 100644 --- a/src/tasks/oauth.rs +++ b/src/tasks/oauth.rs @@ -114,7 +114,7 @@ impl Authenticator { } mod tests { - use crate::{config::BuilderConfig, tasks::block::BlockBuilder}; + use crate::config::BuilderConfig; use alloy_primitives::Address; use eyre::Result; @@ -124,7 +124,7 @@ mod tests { use super::*; use oauth2::TokenResponse; - let config = setup_test_builder()?.1; + let config = setup_test_config()?; let auth = Authenticator::new(&config); let token = auth.fetch_oauth_token().await?; dbg!(&token); @@ -135,7 +135,7 @@ mod tests { } #[allow(dead_code)] - pub fn setup_test_builder() -> Result<(BlockBuilder, BuilderConfig)> { + pub fn setup_test_config() -> Result { let config = BuilderConfig { host_chain_id: 17000, ru_chain_id: 17001, @@ -151,7 +151,6 @@ mod tests { rollup_block_gas_limit: 100_000, tx_pool_url: "http://localhost:9000/".into(), tx_pool_cache_duration: 5, - tx_pool_poll_interval: 5, oauth_client_id: "some_client_id".into(), oauth_client_secret: "some_client_secret".into(), oauth_authenticate_url: "http://localhost:9000".into(), @@ -160,6 +159,6 @@ mod tests { tx_broadcast_urls: vec!["http://localhost:9000".into()], oauth_token_refresh_interval: 300, // 5 minutes }; - Ok((BlockBuilder::new(&config), config)) + Ok(config) } } diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs index bf727c6..e56510c 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/tx_poller.rs @@ -8,8 +8,6 @@ use eyre::Error; use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; use serde_json::from_slice; -use tokio::sync::mpsc; -use tokio::task::JoinHandle; pub use crate::config::BuilderConfig; @@ -62,7 +60,7 @@ impl TxPoller { } /// removes entries from seen_txns that have lived past expiry - fn evict(&mut self) { + pub fn evict(&mut self) { let expired_keys: Vec = self .seen_txns .iter() @@ -81,37 +79,4 @@ impl TxPoller { self.seen_txns.remove(&key); } } - - /// spawns a task that polls the tx-pool for unique transactions and ingests them into the tx_channel. - pub fn spawn(mut self, tx_channel: mpsc::UnboundedSender) -> JoinHandle<()> { - let handle: JoinHandle<()> = tokio::spawn(async move { - loop { - let channel = tx_channel.clone(); - let txns = self.check_tx_cache().await; - - // send recently discovered transactions to the builder pipeline - match txns { - Ok(txns) => { - for txn in txns.into_iter() { - let result = channel.send(txn); - if result.is_err() { - tracing::debug!("tx_poller failed to send tx"); - continue; - } - } - } - Err(e) => { - println!("Error polling transactions: {}", e); - } - } - - // evict expired txns once every loop - self.evict(); - - tokio::time::sleep(Duration::from_secs(self.config.tx_pool_poll_interval)).await; - } - }); - - handle - } } diff --git a/tests/bundle_poller_test.rs b/tests/bundle_poller_test.rs index 8c7769d..9b3d701 100644 --- a/tests/bundle_poller_test.rs +++ b/tests/bundle_poller_test.rs @@ -1,17 +1,14 @@ mod tests { use alloy_primitives::Address; - use builder::{ - config::BuilderConfig, - tasks::{block::BlockBuilder, oauth::Authenticator}, - }; + use builder::{config::BuilderConfig, tasks::oauth::Authenticator}; use eyre::Result; #[ignore = "integration test"] #[tokio::test] async fn test_bundle_poller_roundtrip() -> Result<()> { - let (_, config) = setup_test_builder().await.unwrap(); + let config = setup_test_config().await.unwrap(); let auth = Authenticator::new(&config); - let mut bundle_poller = builder::tasks::bundler::BundlePoller::new(&config, auth).await; + let mut bundle_poller = builder::tasks::bundler::BundlePoller::new(&config, auth); let got = bundle_poller.check_bundle_cache().await?; dbg!(got); @@ -19,7 +16,7 @@ mod tests { Ok(()) } - async fn setup_test_builder() -> Result<(BlockBuilder, BuilderConfig)> { + async fn setup_test_config() -> Result { let config = BuilderConfig { host_chain_id: 17000, ru_chain_id: 17001, @@ -36,7 +33,6 @@ mod tests { tx_pool_url: "http://localhost:9000/".into(), // tx_pool_url: "https://transactions.holesky.signet.sh".into(), tx_pool_cache_duration: 5, - tx_pool_poll_interval: 5, oauth_client_id: "some_client_id".into(), oauth_client_secret: "some_client_secret".into(), oauth_authenticate_url: "http://localhost:8080".into(), @@ -45,6 +41,6 @@ mod tests { tx_broadcast_urls: vec!["http://localhost:9000".into()], oauth_token_refresh_interval: 300, // 5 minutes }; - Ok((BlockBuilder::new(&config), config)) + Ok(config) } } diff --git a/tests/tx_poller_test.rs b/tests/tx_poller_test.rs index 2bc6a2c..8a1ad20 100644 --- a/tests/tx_poller_test.rs +++ b/tests/tx_poller_test.rs @@ -5,14 +5,14 @@ mod tests { use alloy::signers::{local::PrivateKeySigner, SignerSync}; use alloy_primitives::{bytes, Address, TxKind, U256}; use builder::config::BuilderConfig; - use builder::tasks::{block::BlockBuilder, tx_poller}; + use builder::tasks::tx_poller; use eyre::{Ok, Result}; #[ignore = "integration test"] #[tokio::test] async fn test_tx_roundtrip() -> Result<()> { // Create a new test environment - let (_, config) = setup_test_builder().await?; + let config = setup_test_config().await?; // Post a transaction to the cache post_tx(&config).await?; @@ -63,7 +63,7 @@ mod tests { } // Sets up a block builder with test values - pub async fn setup_test_builder() -> Result<(BlockBuilder, BuilderConfig)> { + pub async fn setup_test_config() -> Result { let config = BuilderConfig { host_chain_id: 17000, ru_chain_id: 17001, @@ -80,7 +80,6 @@ mod tests { rollup_block_gas_limit: 100_000, tx_pool_url: "http://localhost:9000/".into(), tx_pool_cache_duration: 5, - tx_pool_poll_interval: 5, oauth_client_id: "some_client_id".into(), oauth_client_secret: "some_client_secret".into(), oauth_authenticate_url: "http://localhost:8080".into(), @@ -88,6 +87,6 @@ mod tests { oauth_audience: "https://transactions.holesky.signet.sh".into(), oauth_token_refresh_interval: 300, // 5 minutes }; - Ok((BlockBuilder::new(&config), config)) + Ok(config) } }