Skip to content

feat: Add mempool drop checks to the provider heartbeat #2689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 82 additions & 6 deletions crates/provider/src/heart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use alloy_primitives::{
map::{B256HashMap, B256HashSet},
TxHash, B256,
};
use alloy_rpc_client::WeakClient;
use alloy_transport::{utils::Spawnable, TransportError};
use futures::{stream::StreamExt, FutureExt, Stream};
use std::{
Expand Down Expand Up @@ -353,6 +354,9 @@ pub enum WatchTxError {
/// Transaction was not confirmed after configured timeout.
#[error("transaction was not confirmed within the timeout")]
Timeout,
/// Transaction was dropped from the mempool.
#[error("transaction was dropped from the mempool")]
Dropped,
Comment on lines +357 to +359
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is technically breaking, as the enum is not marked with #[non_exhaustive], but I'm not sure what's the project policy in that case is.

}

/// The type sent by the [`HeartbeatHandle`] to the [`Heartbeat`] background task.
Expand All @@ -362,6 +366,9 @@ struct TxWatcher {
/// The block at which the transaction was received. To be filled once known.
/// Invariant: any confirmed transaction in `Heart` has this value set.
received_at_block: Option<u64>,
/// The time at which the transaction was added to the watcher.
/// Used to calculate the mempool checks.
checked_at: Instant,
tx: oneshot::Sender<Result<(), WatchTxError>>,
}

Expand Down Expand Up @@ -439,7 +446,9 @@ impl HeartbeatHandle {
) -> Result<PendingTransaction, PendingTransactionConfig> {
let (tx, rx) = oneshot::channel();
let tx_hash = config.tx_hash;
match self.tx.send(TxWatcher { config, received_at_block, tx }).await {
let added_at = Instant::now();
match self.tx.send(TxWatcher { config, received_at_block, checked_at: added_at, tx }).await
{
Ok(()) => Ok(PendingTransaction { tx_hash, rx }),
Err(e) => Err(e.0.config),
}
Expand All @@ -451,6 +460,12 @@ pub(crate) struct Heartbeat<N, S> {
/// The stream of incoming blocks to watch.
stream: futures::stream::Fuse<S>,

/// Client is needed to confirm the existence of transactions in the mempool.
client: WeakClient,

/// Interval between mempool checks.
mempool_check_interval: Option<Duration>,

/// Lookbehind blocks in form of mapping block number -> vector of transaction hashes.
past_blocks: VecDeque<(u64, B256HashSet)>,

Expand All @@ -468,9 +483,15 @@ pub(crate) struct Heartbeat<N, S> {

impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat<N, S> {
/// Create a new heartbeat task.
pub(crate) fn new(stream: S) -> Self {
pub(crate) fn new(
stream: S,
client: WeakClient,
mempool_check_interval: Option<Duration>,
) -> Self {
Self {
stream: stream.fuse(),
client,
mempool_check_interval,
past_blocks: Default::default(),
unconfirmed: Default::default(),
waiting_confs: Default::default(),
Expand All @@ -491,10 +512,10 @@ impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat
/// Get the next time to reap a transaction. If no reaps, this is a very
/// long time from now (i.e. will not be woken).
fn next_reap(&self) -> Instant {
self.reap_at
.first_key_value()
.map(|(k, _)| *k)
.unwrap_or_else(|| Instant::now() + Duration::from_secs(60_000))
self.reap_at.first_key_value().map(|(k, _)| *k).unwrap_or_else(|| {
let offset = self.mempool_check_interval.unwrap_or(Duration::from_secs(60_000));
Instant::now() + offset
})
}

/// Reap any timeout
Expand All @@ -511,6 +532,52 @@ impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat
}
}

/// Reap any transactions that have been wiped from the mempool.
async fn reap_mempool_wipes(&mut self) {
let Some(check_interval) = self.mempool_check_interval else {
return;
};

let Some(client) = self.client.upgrade() else {
// No client available, nothing to do.
return;
};

// Collect the list of transactions that disappeared from the mempool.
let mut to_reap = Vec::new();
for (hash, tx) in
self.unconfirmed.iter_mut().filter(|(_, tx)| tx.checked_at.elapsed() > check_interval)
{
match client
.request::<_, Option<N::TransactionResponse>>("eth_getTransactionByHash", (*hash,))
.await
{
Ok(Some(_)) => {
// Transaction exists, reschedule the check.
tx.checked_at = Instant::now();
}
Err(err) => {
// Some transport error; we can try on the next iteration.
debug!(tx=%hash, %err, "failed to fetch transaction from mempool");
break;
}
Ok(None) => {
// Transaction is missing, must be removed.
to_reap.push(*hash);
}
}
}
for tx_hash in to_reap {
if let Some(watcher) = self.unconfirmed.remove(&tx_hash) {
debug!(tx=%tx_hash, "reaped");
watcher.notify(Err(WatchTxError::Dropped));
// We don't remove the transaction from `reap_at`, since for that we need to do
// a full scan. The cost is one spurious wake up per transaction removed here,
// but mempool drops are rare, so it shouldn't have any noticeable impact.
}
}
}

/// Reap transactions overridden by the reorg.
/// Accepts new chain height as an argument, and drops any subscriptions
/// that were received in blocks affected by the reorg (e.g. >= new_height).
Expand Down Expand Up @@ -661,6 +728,7 @@ impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat
}

async fn into_future(mut self, mut ixns: mpsc::Receiver<TxWatcher>) {
let mut last_mempool_check = Instant::now();
'shutdown: loop {
{
let next_reap = self.next_reap();
Expand Down Expand Up @@ -690,6 +758,14 @@ impl<N: Network, S: Stream<Item = N::BlockResponse> + Unpin + 'static> Heartbeat

// Always reap timeouts
self.reap_timeouts();

// Only do mempool checks if it's enabled and enough time has passed.
if let Some(check_interval) = self.mempool_check_interval {
if last_mempool_check.elapsed() >= check_interval {
self.reap_mempool_wipes().await;
last_mempool_check = Instant::now();
}
}
Comment on lines +763 to +768
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential timing issue in the mempool check logic. Currently, last_mempool_check is updated after the async reap_mempool_wipes() completes, but the elapsed time check happens before starting this operation. If the mempool check takes significant time to complete, subsequent iterations might incorrectly skip checks because the elapsed time would include the previous check's duration.

Consider updating last_mempool_check before starting the async operation or adjusting the logic to account for the time spent in the async operation. This would ensure consistent check intervals regardless of how long each check takes to complete.

Suggested change
if let Some(check_interval) = self.mempool_check_interval {
if last_mempool_check.elapsed() >= check_interval {
self.reap_mempool_wipes().await;
last_mempool_check = Instant::now();
}
}
if let Some(check_interval) = self.mempool_check_interval {
if last_mempool_check.elapsed() >= check_interval {
last_mempool_check = Instant::now();
self.reap_mempool_wipes().await;
}
}

Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better to run these checks less often, in case e.g. some network request takes too long.

}
}
}
50 changes: 47 additions & 3 deletions crates/provider/src/provider/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{
fmt,
marker::PhantomData,
sync::{Arc, OnceLock},
time::Duration,
};

#[cfg(feature = "pubsub")]
Expand Down Expand Up @@ -78,6 +79,28 @@ impl<N: Network> RootProvider<N> {
pub async fn connect_boxed<C: TransportConnect>(conn: C) -> Result<Self, TransportError> {
Self::connect_with(conn).await
}

/// Sets the mempool check configuration.
///
/// Represents the configuration for checking the presence of transaction in the mempool.
/// Under certain circumstances, transaction can be evicted from the mempool, meaning that
/// an already submitted transaction will never get a receipt and won't be confirmed.
///
/// This configuration allows to configure the interval at which transaction should be checked.
/// If set to `None`, no mempool checks will be performed.
///
/// The mempool check is enabled by default, and it's only recommended to disable it if
/// there are other means to detect hanging transactions implemented by the caller.
/// By default, it will be chosen by heartbeat task based on the network polling interval.
///
/// Returns `false` in case mempool check cannot be set (e.g. it was set previously or heartbeat
/// actor is already running).
pub fn set_mempool_check(&self, interval: Option<Duration>) -> bool {
if self.inner.mempool_check_interval.set(interval).is_err() {
return false;
}
true
}
}

impl<N: Network> RootProvider<N> {
Expand Down Expand Up @@ -114,9 +137,19 @@ impl<N: Network> RootProvider<N> {
#[inline]
pub(crate) fn get_heart(&self) -> &HeartbeatHandle {
self.inner.heart.get_or_init(|| {
let mempool_check_interval = self
.inner
.mempool_check_interval
.get_or_init(|| Some(self.inner.client.poll_interval() * 10));

let new_blocks = NewBlocks::<N>::new(self.inner.weak_client());
let stream = new_blocks.into_stream();
Heartbeat::<N, _>::new(Box::pin(stream)).spawn()
Heartbeat::<N, _>::new(
Box::pin(stream),
self.inner.weak_client(),
*mempool_check_interval,
)
.spawn()
})
}
}
Expand All @@ -125,19 +158,30 @@ impl<N: Network> RootProvider<N> {
/// base of every provider stack.
pub(crate) struct RootProviderInner<N: Network = Ethereum> {
client: RpcClient,
mempool_check_interval: OnceLock<Option<Duration>>,
heart: OnceLock<HeartbeatHandle>,
_network: PhantomData<N>,
}

impl<N: Network> Clone for RootProviderInner<N> {
fn clone(&self) -> Self {
Self { client: self.client.clone(), heart: self.heart.clone(), _network: PhantomData }
Self {
client: self.client.clone(),
mempool_check_interval: self.mempool_check_interval.clone(),
heart: self.heart.clone(),
_network: PhantomData,
}
}
}

impl<N: Network> RootProviderInner<N> {
pub(crate) fn new(client: RpcClient) -> Self {
Self { client, heart: Default::default(), _network: PhantomData }
Self {
client,
mempool_check_interval: Default::default(),
heart: Default::default(),
_network: PhantomData,
}
}

pub(crate) fn weak_client(&self) -> WeakClient {
Expand Down
25 changes: 25 additions & 0 deletions crates/provider/src/provider/trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,31 @@ mod tests {
assert_eq!(0, num);
}

#[tokio::test]
async fn test_tx_dropped() {
let anvil = Anvil::new().spawn();
let client = RpcClient::builder()
.http(anvil.endpoint_url())
.with_poll_interval(Duration::from_millis(10));

let provider = RootProvider::<Ethereum>::new(client);
// Try getting some non-existent transaction.
let pending_tx = PendingTransactionBuilder::new(
provider,
b256!("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"),
)
.get_receipt();

let receipt_err = tokio::time::timeout(Duration::from_secs(1), pending_tx)
.await
.expect("Waiting for receipt error timed out")
.expect_err("Expected error for non-existent transaction receipt");
assert!(
matches!(receipt_err, PendingTransactionError::TxWatcher(crate::WatchTxError::Dropped)),
"Expected Dropped error, got: {receipt_err:?}"
);
}

#[tokio::test]
async fn test_send_tx() {
let provider = ProviderBuilder::new().connect_anvil_with_wallet();
Expand Down