Skip to content

feat: filter confirmed transactions from the block #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions bin/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@ async fn main() -> eyre::Result<()> {
let span = tracing::info_span!("zenith-builder");

let config = BuilderConfig::load_from_env()?.clone();
let provider = config.connect_provider().await?;
let host_provider = config.connect_host_provider().await?;
let ru_provider = config.connect_ru_provider().await?;
let authenticator = Authenticator::new(&config);

tracing::debug!(rpc_url = config.host_rpc_url.as_ref(), "instantiated provider");

let sequencer_signer = config.connect_sequencer_signer().await?;
let zenith = config.connect_zenith(provider.clone());
let zenith = config.connect_zenith(host_provider.clone());

let builder = BlockBuilder::new(&config, authenticator.clone());
let builder = BlockBuilder::new(&config, authenticator.clone(), ru_provider);
let submit = SubmitTask {
authenticator: authenticator.clone(),
provider,
host_provider,
zenith,
client: reqwest::Client::new(),
sequencer_signer,
Expand Down
30 changes: 27 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use zenith_types::Zenith;
const HOST_CHAIN_ID: &str = "HOST_CHAIN_ID";
const RU_CHAIN_ID: &str = "RU_CHAIN_ID";
const HOST_RPC_URL: &str = "HOST_RPC_URL";
const ROLLUP_RPC_URL: &str = "ROLLUP_RPC_URL";
const TX_BROADCAST_URLS: &str = "TX_BROADCAST_URLS";
const ZENITH_ADDRESS: &str = "ZENITH_ADDRESS";
const QUINCEY_URL: &str = "QUINCEY_URL";
Expand Down Expand Up @@ -44,6 +45,8 @@ pub struct BuilderConfig {
pub ru_chain_id: u64,
/// URL for Host RPC node.
pub host_rpc_url: Cow<'static, str>,
/// URL for the Rollup RPC node.
pub ru_rpc_url: Cow<'static, str>,
/// Additional RPC URLs to which to broadcast transactions.
pub tx_broadcast_urls: Vec<Cow<'static, str>>,
/// address of the Zenith contract on Host.
Expand Down Expand Up @@ -116,7 +119,7 @@ impl ConfigError {
}
}

/// Provider type used by this transaction
/// Provider type used to read & write.
pub type Provider = FillProvider<
JoinFill<
JoinFill<
Expand All @@ -130,6 +133,17 @@ pub type Provider = FillProvider<
Ethereum,
>;

/// Provider type used to read-only.
pub type WalletlessProvider = FillProvider<
JoinFill<
Identity,
JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
>,
RootProvider<BoxTransport>,
BoxTransport,
Ethereum,
>;

pub type ZenithInstance = Zenith::ZenithInstance<BoxTransport, Provider>;

impl BuilderConfig {
Expand All @@ -139,6 +153,7 @@ impl BuilderConfig {
host_chain_id: load_u64(HOST_CHAIN_ID)?,
ru_chain_id: load_u64(RU_CHAIN_ID)?,
host_rpc_url: load_url(HOST_RPC_URL)?,
ru_rpc_url: load_url(ROLLUP_RPC_URL)?,
tx_broadcast_urls: env::var(TX_BROADCAST_URLS)
.unwrap_or_default()
.split(',')
Expand Down Expand Up @@ -180,8 +195,17 @@ impl BuilderConfig {
}
}

/// Connect to the provider using the configuration.
pub async fn connect_provider(&self) -> Result<Provider, ConfigError> {
/// Connect to the Rollup rpc provider.
pub async fn connect_ru_provider(&self) -> Result<WalletlessProvider, ConfigError> {
ProviderBuilder::new()
.with_recommended_fillers()
.on_builtin(&self.ru_rpc_url)
.await
.map_err(Into::into)
}

/// Connect to the Host rpc provider.
pub async fn connect_host_provider(&self) -> Result<Provider, ConfigError> {
let builder_signer = self.connect_builder_signer().await?;
ProviderBuilder::new()
.with_recommended_fillers()
Expand Down
72 changes: 54 additions & 18 deletions src/tasks/block.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use super::bundler::{Bundle, BundlePoller};
use super::oauth::Authenticator;
use super::tx_poller::TxPoller;
use crate::config::{BuilderConfig, WalletlessProvider};
use alloy::providers::Provider;
use alloy::{
consensus::{SidecarBuilder, SidecarCoder, TxEnvelope},
eips::eip2718::Decodable2718,
Expand All @@ -10,19 +15,13 @@ use tokio::{sync::mpsc, task::JoinHandle};
use tracing::Instrument;
use zenith_types::{encode_txns, Alloy2718Coder};

use super::bundler::{Bundle, BundlePoller};
use super::oauth::Authenticator;
use super::tx_poller::TxPoller;
use crate::config::BuilderConfig;

/// Ethereum's slot time in seconds.
pub const ETHEREUM_SLOT_TIME: u64 = 12;

#[derive(Debug, Default, Clone)]
/// A block in progress.
pub struct InProgressBlock {
transactions: Vec<TxEnvelope>,

raw_encoding: OnceLock<Bytes>,
hash: OnceLock<B256>,
}
Expand Down Expand Up @@ -57,15 +56,22 @@ impl InProgressBlock {

/// Ingest a transaction into the in-progress block. Fails
pub fn ingest_tx(&mut self, tx: &TxEnvelope) {
tracing::info!(hash = %tx.tx_hash(), "ingesting tx");
tracing::trace!(hash = %tx.tx_hash(), "ingesting tx");
self.unseal();
self.transactions.push(tx.clone());
}

/// Remove a transaction from the in-progress block.
pub fn remove_tx(&mut self, tx: &TxEnvelope) {
tracing::trace!(hash = %tx.tx_hash(), "removing tx");
self.unseal();
self.transactions.retain(|t| t.tx_hash() != tx.tx_hash());
}

/// Ingest a bundle into the in-progress block.
/// Ignores Signed Orders for now.
pub fn ingest_bundle(&mut self, bundle: Bundle) {
tracing::info!(bundle = %bundle.id, "ingesting bundle");
tracing::trace!(bundle = %bundle.id, "ingesting bundle");

let txs = bundle
.bundle
Expand Down Expand Up @@ -113,26 +119,32 @@ impl InProgressBlock {
/// BlockBuilder is a task that periodically builds a block then sends it for signing and submission.
pub struct BlockBuilder {
pub config: BuilderConfig,
pub ru_provider: WalletlessProvider,
pub tx_poller: TxPoller,
pub bundle_poller: BundlePoller,
}

impl BlockBuilder {
// create a new block builder with the given config.
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
pub fn new(
config: &BuilderConfig,
authenticator: Authenticator,
ru_provider: WalletlessProvider,
) -> Self {
Self {
config: config.clone(),
ru_provider,
tx_poller: TxPoller::new(config),
bundle_poller: BundlePoller::new(config, authenticator),
}
}

async fn get_transactions(&mut self, in_progress: &mut InProgressBlock) {
tracing::info!("query transactions from cache");
tracing::trace!("query transactions from cache");
let txns = self.tx_poller.check_tx_cache().await;
match txns {
Ok(txns) => {
tracing::info!("got transactions response");
tracing::trace!("got transactions response");
for txn in txns.into_iter() {
in_progress.ingest_tx(&txn);
}
Expand All @@ -145,11 +157,11 @@ impl BlockBuilder {
}

async fn _get_bundles(&mut self, in_progress: &mut InProgressBlock) {
tracing::info!("query bundles from cache");
tracing::trace!("query bundles from cache");
let bundles = self.bundle_poller.check_bundle_cache().await;
match bundles {
Ok(bundles) => {
tracing::info!("got bundles response");
tracing::trace!("got bundles response");
for bundle in bundles {
in_progress.ingest_bundle(bundle);
}
Expand All @@ -161,15 +173,36 @@ impl BlockBuilder {
self.bundle_poller.evict();
}

async fn filter_transactions(&self, in_progress: &mut InProgressBlock) {
// query the rollup node to see which transaction(s) have been included
let mut confirmed_transactions = Vec::new();
for transaction in in_progress.transactions.iter() {
let tx = self
.ru_provider
.get_transaction_by_hash(*transaction.tx_hash())
.await
.expect("failed to get receipt");
if tx.is_some() {
confirmed_transactions.push(transaction.clone());
}
}
tracing::trace!(confirmed = confirmed_transactions.len(), "found confirmed transactions");

// remove already-confirmed transactions
for transaction in confirmed_transactions {
in_progress.remove_tx(&transaction);
}
}

// calculate the duration in seconds until the beginning of the next block slot.
fn secs_to_next_slot(&mut self) -> u64 {
fn secs_to_next_slot(&self) -> u64 {
let curr_timestamp: u64 = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
let current_slot_time = (curr_timestamp - self.config.chain_offset) % ETHEREUM_SLOT_TIME;
(ETHEREUM_SLOT_TIME - current_slot_time) % ETHEREUM_SLOT_TIME
}

// add a buffer to the beginning of the block slot.
fn secs_to_next_target(&mut self) -> u64 {
fn secs_to_next_target(&self) -> u64 {
self.secs_to_next_slot() + self.config.target_slot_time
}

Expand All @@ -190,16 +223,19 @@ impl BlockBuilder {
// TODO: Implement bundle ingestion #later
// self.get_bundles(&mut in_progress).await;

// Filter confirmed transactions from the block
self.filter_transactions(&mut in_progress).await;

// submit the block if it has transactions
if !in_progress.is_empty() {
tracing::info!(txns = in_progress.len(), "sending block to submit task");
tracing::debug!(txns = in_progress.len(), "sending block to submit task");
let in_progress_block = std::mem::take(&mut in_progress);
if outbound.send(in_progress_block).is_err() {
tracing::debug!("downstream task gone");
tracing::error!("downstream task gone");
break;
}
} else {
tracing::info!("no transactions, skipping block submission");
tracing::debug!("no transactions, skipping block submission");
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/tasks/oauth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ mod tests {
let config = BuilderConfig {
host_chain_id: 17000,
ru_chain_id: 17001,
host_rpc_url: "http://rpc.holesky.signet.sh".into(),
host_rpc_url: "host-rpc.example.com".into(),
ru_rpc_url: "ru-rpc.example.com".into(),
zenith_address: Address::default(),
quincey_url: "http://localhost:8080".into(),
builder_port: 8080,
Expand Down
20 changes: 10 additions & 10 deletions src/tasks/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum ControlFlow {
/// Submits sidecars in ethereum txns to mainnet ethereum
pub struct SubmitTask {
/// Ethereum Provider
pub provider: Provider,
pub host_provider: Provider,
/// Zenith
pub zenith: ZenithInstance,
/// Reqwest
Expand Down Expand Up @@ -125,7 +125,7 @@ impl SubmitTask {
}

async fn next_host_block_height(&self) -> eyre::Result<u64> {
let result = self.provider.get_block_number().await?;
let result = self.host_provider.get_block_number().await?;
let next = result.checked_add(1).ok_or_else(|| eyre!("next host block height overflow"))?;
Ok(next)
}
Expand All @@ -150,12 +150,12 @@ impl SubmitTask {

let tx = self
.build_blob_tx(header, v, r, s, in_progress)?
.with_from(self.provider.default_signer_address())
.with_from(self.host_provider.default_signer_address())
.with_to(self.config.zenith_address)
.with_gas_limit(1_000_000);

if let Err(TransportError::ErrorResp(e)) =
self.provider.call(&tx).block(BlockNumberOrTag::Pending.into()).await
self.host_provider.call(&tx).block(BlockNumberOrTag::Pending.into()).await
{
error!(
code = e.code,
Expand Down Expand Up @@ -185,16 +185,16 @@ impl SubmitTask {
"sending transaction to network"
);

let SendableTx::Envelope(tx) = self.provider.fill(tx).await? else {
let SendableTx::Envelope(tx) = self.host_provider.fill(tx).await? else {
bail!("failed to fill transaction")
};

// Send the tx via the primary provider
let fut = spawn_provider_send!(&self.provider, &tx);
// Send the tx via the primary host_provider
let fut = spawn_provider_send!(&self.host_provider, &tx);

// Spawn send_tx futures for all additional broadcast providers
for provider in self.config.connect_additional_broadcast().await? {
spawn_provider_send!(&provider, &tx);
// Spawn send_tx futures for all additional broadcast host_providers
for host_provider in self.config.connect_additional_broadcast().await? {
spawn_provider_send!(&host_provider, &tx);
}

// question mark unwraps join error, which would be an internal panic
Expand Down
3 changes: 2 additions & 1 deletion tests/bundle_poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ mod tests {
let config = BuilderConfig {
host_chain_id: 17000,
ru_chain_id: 17001,
host_rpc_url: "http://rpc.holesky.signet.sh".into(),
host_rpc_url: "host-rpc.example.com".into(),
ru_rpc_url: "ru-rpc.example.com".into(),
zenith_address: Address::default(),
quincey_url: "http://localhost:8080".into(),
builder_port: 8080,
Expand Down
3 changes: 2 additions & 1 deletion tests/tx_poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ mod tests {
let config = BuilderConfig {
host_chain_id: 17000,
ru_chain_id: 17001,
host_rpc_url: "http://rpc.holesky.signet.sh".into(),
host_rpc_url: "host-rpc.example.com".into(),
ru_rpc_url: "ru-rpc.example.com".into(),
tx_broadcast_urls: vec!["http://localhost:9000".into()],
zenith_address: Address::default(),
quincey_url: "http://localhost:8080".into(),
Expand Down
Loading