From d6fa31a3483502c16fab6b3ba265f519e499f97a Mon Sep 17 00:00:00 2001 From: Anna Carroll Date: Wed, 15 Jan 2025 16:06:47 -0600 Subject: [PATCH 1/2] feat: collect metrics on submitBlock transaction revert/success --- bin/builder.rs | 9 ++++++ src/tasks/mod.rs | 1 + src/tasks/receipts.rs | 74 +++++++++++++++++++++++++++++++++++++++++++ src/tasks/submit.rs | 9 +++++- 4 files changed, 92 insertions(+), 1 deletion(-) create mode 100644 src/tasks/receipts.rs diff --git a/bin/builder.rs b/bin/builder.rs index 95dad79..29925de 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -4,6 +4,7 @@ use builder::config::BuilderConfig; use builder::service::serve_builder_with_span; use builder::tasks::block::BlockBuilder; use builder::tasks::oauth::Authenticator; +use builder::tasks::receipts::ReceiptTask; use builder::tasks::submit::SubmitTask; use metrics_exporter_prometheus::PrometheusBuilder; @@ -27,6 +28,10 @@ async fn main() -> eyre::Result<()> { let zenith = config.connect_zenith(host_provider.clone()); let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider); + + let receipts = ReceiptTask { host_provider: host_provider.clone() }; + let (tx_channel, receipts_jh) = receipts.spawn(); + let submit = SubmitTask { authenticator: authenticator.clone(), host_provider, @@ -34,6 +39,7 @@ async fn main() -> eyre::Result<()> { client: reqwest::Client::new(), sequencer_signer, config: config.clone(), + outbound_tx_channel: tx_channel, }; let authenticator_jh = authenticator.spawn(); @@ -47,6 +53,9 @@ async fn main() -> eyre::Result<()> { _ = submit_jh => { tracing::info!("submit finished"); }, + _ = receipts_jh => { + tracing::info!("receipts finished"); + }, _ = build_jh => { tracing::info!("build finished"); } diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index 3692d13..3bf3540 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -1,5 +1,6 @@ pub mod block; pub mod bundler; pub mod oauth; +pub mod receipts; pub mod submit; pub mod tx_poller; diff --git a/src/tasks/receipts.rs b/src/tasks/receipts.rs new file mode 100644 index 0000000..99130a6 --- /dev/null +++ b/src/tasks/receipts.rs @@ -0,0 +1,74 @@ +use crate::config::Provider; +use alloy::{primitives::TxHash, providers::Provider as _}; +use metrics::{counter, histogram}; +use std::time::Instant; +use tokio::{sync::mpsc, task::JoinHandle}; +use tracing::{debug, error}; + +/// Submits sidecars in ethereum txns to mainnet ethereum +#[derive(Debug, Clone)] +pub struct ReceiptTask { + /// Ethereum Provider + pub host_provider: Provider, +} + +impl ReceiptTask { + pub async fn log_tx(&self, pending_tx_hash: TxHash) { + // start timer when tx hash is received + let start: Instant = Instant::now(); + + // wait for the tx to mine, get its receipt + let receipt = self.host_provider.clone().get_transaction_receipt(pending_tx_hash).await; + + match receipt { + Ok(receipt) => { + match receipt { + Some(receipt) => { + // record how long it took to mine the transaction + // potential improvement: use the block timestamp to calculate the time elapsed + histogram!("receipts.tx_mine_time") + .record(start.elapsed().as_millis() as f64); + + // log whether the transaction reverted + if receipt.status() { + counter!("receipts.tx_reverted").increment(1); + debug!(tx_hash = %pending_tx_hash, "tx reverted"); + } else { + counter!("receipts.tx_succeeded").increment(1); + debug!(tx_hash = %pending_tx_hash, "tx succeeded"); + } + } + None => { + counter!("receipts.no_receipt").increment(1); + error!("no receipt found for tx hash"); + } + } + } + Err(e) => { + counter!("receipts.rpc_error").increment(1); + error!(error = ?e, "rpc error"); + } + } + } + + /// Spawns the task which collects metrics on pending transactions + pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { + let (sender, mut inbound) = mpsc::unbounded_channel(); + let handle = tokio::spawn(async move { + loop { + if let Some(pending_tx_hash) = inbound.recv().await { + let this = self.clone(); + tokio::spawn(async move { + let that = this.clone(); + that.log_tx(pending_tx_hash).await; + }); + } else { + tracing::debug!("upstream task gone"); + break; + } + } + }); + + (sender, handle) + } +} diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 29c1877..b63abce 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -8,7 +8,7 @@ use alloy::{ consensus::{constants::GWEI_TO_WEI, SimpleCoder}, eips::BlockNumberOrTag, network::{TransactionBuilder, TransactionBuilder4844}, - primitives::{FixedBytes, U256}, + primitives::{FixedBytes, TxHash, U256}, providers::{Provider as _, SendableTx, WalletProvider}, rpc::types::eth::TransactionRequest, signers::Signer, @@ -59,6 +59,8 @@ pub struct SubmitTask { pub config: crate::config::BuilderConfig, /// Authenticator pub authenticator: Authenticator, + // Channel over which to send pending transactions + pub outbound_tx_channel: mpsc::UnboundedSender, } impl SubmitTask { @@ -192,6 +194,11 @@ impl SubmitTask { spawn_provider_send!(&host_provider, &tx); } + // send the in-progress transaction over the outbound_tx_channel + if self.outbound_tx_channel.send(*tx.tx_hash()).is_err() { + tracing::error!("receipts task gone"); + } + // question mark unwraps join error, which would be an internal panic // then if let checks for rpc error if let Err(e) = fut.await? { From 7ebd8d219556a9fa66d2c3c697b445c254d6029c Mon Sep 17 00:00:00 2001 From: Anna Carroll Date: Thu, 16 Jan 2025 14:56:44 -0600 Subject: [PATCH 2/2] address PR comments --- bin/builder.rs | 10 ++++----- src/tasks/{receipts.rs => metrics.rs} | 29 ++++++++++++++++----------- src/tasks/mod.rs | 2 +- 3 files changed, 23 insertions(+), 18 deletions(-) rename src/tasks/{receipts.rs => metrics.rs} (71%) diff --git a/bin/builder.rs b/bin/builder.rs index 29925de..2b24056 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -3,8 +3,8 @@ use builder::config::BuilderConfig; use builder::service::serve_builder_with_span; use builder::tasks::block::BlockBuilder; +use builder::tasks::metrics::MetricsTask; use builder::tasks::oauth::Authenticator; -use builder::tasks::receipts::ReceiptTask; use builder::tasks::submit::SubmitTask; use metrics_exporter_prometheus::PrometheusBuilder; @@ -29,8 +29,8 @@ async fn main() -> eyre::Result<()> { let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider); - let receipts = ReceiptTask { host_provider: host_provider.clone() }; - let (tx_channel, receipts_jh) = receipts.spawn(); + let metrics = MetricsTask { host_provider: host_provider.clone() }; + let (tx_channel, metrics_jh) = metrics.spawn(); let submit = SubmitTask { authenticator: authenticator.clone(), @@ -53,8 +53,8 @@ async fn main() -> eyre::Result<()> { _ = submit_jh => { tracing::info!("submit finished"); }, - _ = receipts_jh => { - tracing::info!("receipts finished"); + _ = metrics_jh => { + tracing::info!("metrics finished"); }, _ = build_jh => { tracing::info!("build finished"); diff --git a/src/tasks/receipts.rs b/src/tasks/metrics.rs similarity index 71% rename from src/tasks/receipts.rs rename to src/tasks/metrics.rs index 99130a6..0799000 100644 --- a/src/tasks/receipts.rs +++ b/src/tasks/metrics.rs @@ -5,47 +5,49 @@ use std::time::Instant; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::{debug, error}; -/// Submits sidecars in ethereum txns to mainnet ethereum +/// Collects metrics on transactions sent by the Builder #[derive(Debug, Clone)] -pub struct ReceiptTask { +pub struct MetricsTask { /// Ethereum Provider pub host_provider: Provider, } -impl ReceiptTask { +impl MetricsTask { + /// Given a transaction hash, record metrics on the result of the transaction mining pub async fn log_tx(&self, pending_tx_hash: TxHash) { // start timer when tx hash is received let start: Instant = Instant::now(); // wait for the tx to mine, get its receipt - let receipt = self.host_provider.clone().get_transaction_receipt(pending_tx_hash).await; + let receipt_result = + self.host_provider.clone().get_transaction_receipt(pending_tx_hash).await; - match receipt { - Ok(receipt) => { - match receipt { + match receipt_result { + Ok(maybe_receipt) => { + match maybe_receipt { Some(receipt) => { // record how long it took to mine the transaction // potential improvement: use the block timestamp to calculate the time elapsed - histogram!("receipts.tx_mine_time") + histogram!("metrics.tx_mine_time") .record(start.elapsed().as_millis() as f64); // log whether the transaction reverted if receipt.status() { - counter!("receipts.tx_reverted").increment(1); + counter!("metrics.tx_reverted").increment(1); debug!(tx_hash = %pending_tx_hash, "tx reverted"); } else { - counter!("receipts.tx_succeeded").increment(1); + counter!("metrics.tx_succeeded").increment(1); debug!(tx_hash = %pending_tx_hash, "tx succeeded"); } } None => { - counter!("receipts.no_receipt").increment(1); + counter!("metrics.no_receipt").increment(1); error!("no receipt found for tx hash"); } } } Err(e) => { - counter!("receipts.rpc_error").increment(1); + counter!("metrics.rpc_error").increment(1); error!(error = ?e, "rpc error"); } } @@ -55,12 +57,15 @@ impl ReceiptTask { pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { let (sender, mut inbound) = mpsc::unbounded_channel(); let handle = tokio::spawn(async move { + debug!("metrics task spawned"); loop { if let Some(pending_tx_hash) = inbound.recv().await { let this = self.clone(); tokio::spawn(async move { + debug!("received tx hash"); let that = this.clone(); that.log_tx(pending_tx_hash).await; + debug!("logged tx metrics"); }); } else { tracing::debug!("upstream task gone"); diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index 3bf3540..3149dec 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -1,6 +1,6 @@ pub mod block; pub mod bundler; +pub mod metrics; pub mod oauth; -pub mod receipts; pub mod submit; pub mod tx_poller;