diff --git a/bin/builder.rs b/bin/builder.rs index 95dad79..2b24056 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -3,6 +3,7 @@ use builder::config::BuilderConfig; use builder::service::serve_builder_with_span; use builder::tasks::block::BlockBuilder; +use builder::tasks::metrics::MetricsTask; use builder::tasks::oauth::Authenticator; use builder::tasks::submit::SubmitTask; use metrics_exporter_prometheus::PrometheusBuilder; @@ -27,6 +28,10 @@ async fn main() -> eyre::Result<()> { let zenith = config.connect_zenith(host_provider.clone()); let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider); + + let metrics = MetricsTask { host_provider: host_provider.clone() }; + let (tx_channel, metrics_jh) = metrics.spawn(); + let submit = SubmitTask { authenticator: authenticator.clone(), host_provider, @@ -34,6 +39,7 @@ async fn main() -> eyre::Result<()> { client: reqwest::Client::new(), sequencer_signer, config: config.clone(), + outbound_tx_channel: tx_channel, }; let authenticator_jh = authenticator.spawn(); @@ -47,6 +53,9 @@ async fn main() -> eyre::Result<()> { _ = submit_jh => { tracing::info!("submit finished"); }, + _ = metrics_jh => { + tracing::info!("metrics finished"); + }, _ = build_jh => { tracing::info!("build finished"); } diff --git a/src/tasks/metrics.rs b/src/tasks/metrics.rs new file mode 100644 index 0000000..0799000 --- /dev/null +++ b/src/tasks/metrics.rs @@ -0,0 +1,79 @@ +use crate::config::Provider; +use alloy::{primitives::TxHash, providers::Provider as _}; +use metrics::{counter, histogram}; +use std::time::Instant; +use tokio::{sync::mpsc, task::JoinHandle}; +use tracing::{debug, error}; + +/// Collects metrics on transactions sent by the Builder +#[derive(Debug, Clone)] +pub struct MetricsTask { + /// Ethereum Provider + pub host_provider: Provider, +} + +impl MetricsTask { + /// Given a transaction hash, record metrics on the result of the transaction mining + pub async fn log_tx(&self, pending_tx_hash: TxHash) { + // start timer when tx hash is received + let start: Instant = Instant::now(); + + // wait for the tx to mine, get its receipt + let receipt_result = + self.host_provider.clone().get_transaction_receipt(pending_tx_hash).await; + + match receipt_result { + Ok(maybe_receipt) => { + match maybe_receipt { + Some(receipt) => { + // record how long it took to mine the transaction + // potential improvement: use the block timestamp to calculate the time elapsed + histogram!("metrics.tx_mine_time") + .record(start.elapsed().as_millis() as f64); + + // log whether the transaction reverted + if receipt.status() { + counter!("metrics.tx_reverted").increment(1); + debug!(tx_hash = %pending_tx_hash, "tx reverted"); + } else { + counter!("metrics.tx_succeeded").increment(1); + debug!(tx_hash = %pending_tx_hash, "tx succeeded"); + } + } + None => { + counter!("metrics.no_receipt").increment(1); + error!("no receipt found for tx hash"); + } + } + } + Err(e) => { + counter!("metrics.rpc_error").increment(1); + error!(error = ?e, "rpc error"); + } + } + } + + /// Spawns the task which collects metrics on pending transactions + pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { + let (sender, mut inbound) = mpsc::unbounded_channel(); + let handle = tokio::spawn(async move { + debug!("metrics task spawned"); + loop { + if let Some(pending_tx_hash) = inbound.recv().await { + let this = self.clone(); + tokio::spawn(async move { + debug!("received tx hash"); + let that = this.clone(); + that.log_tx(pending_tx_hash).await; + debug!("logged tx metrics"); + }); + } else { + tracing::debug!("upstream task gone"); + break; + } + } + }); + + (sender, handle) + } +} diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index 3692d13..3149dec 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -1,5 +1,6 @@ pub mod block; pub mod bundler; +pub mod metrics; pub mod oauth; pub mod submit; pub mod tx_poller; diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 29c1877..b63abce 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -8,7 +8,7 @@ use alloy::{ consensus::{constants::GWEI_TO_WEI, SimpleCoder}, eips::BlockNumberOrTag, network::{TransactionBuilder, TransactionBuilder4844}, - primitives::{FixedBytes, U256}, + primitives::{FixedBytes, TxHash, U256}, providers::{Provider as _, SendableTx, WalletProvider}, rpc::types::eth::TransactionRequest, signers::Signer, @@ -59,6 +59,8 @@ pub struct SubmitTask { pub config: crate::config::BuilderConfig, /// Authenticator pub authenticator: Authenticator, + // Channel over which to send pending transactions + pub outbound_tx_channel: mpsc::UnboundedSender, } impl SubmitTask { @@ -192,6 +194,11 @@ impl SubmitTask { spawn_provider_send!(&host_provider, &tx); } + // send the in-progress transaction over the outbound_tx_channel + if self.outbound_tx_channel.send(*tx.tx_hash()).is_err() { + tracing::error!("receipts task gone"); + } + // question mark unwraps join error, which would be an internal panic // then if let checks for rpc error if let Err(e) = fut.await? {