diff --git a/crates/account-abstraction-core/src/domain/mempool.rs b/crates/account-abstraction-core/src/domain/mempool.rs index 701a234..f4f8ace 100644 --- a/crates/account-abstraction-core/src/domain/mempool.rs +++ b/crates/account-abstraction-core/src/domain/mempool.rs @@ -1,15 +1,15 @@ use crate::domain::types::{UserOpHash, WrappedUserOperation}; +use std::fmt::Debug; use std::sync::Arc; - -#[derive(Default)] +#[derive(Default, Debug)] pub struct PoolConfig { pub minimum_max_fee_per_gas: u128, } -pub trait Mempool: Send + Sync { +pub trait Mempool: Send + Sync + Debug { fn add_operation(&mut self, operation: &WrappedUserOperation) -> Result<(), anyhow::Error>; - fn get_top_operations(&self, n: usize) -> impl Iterator>; + fn get_top_operations(&self, n: usize) -> Vec>; fn remove_operation( &mut self, diff --git a/crates/account-abstraction-core/src/factories/kafka_engine.rs b/crates/account-abstraction-core/src/factories/kafka_engine.rs index a7d972b..85dfdd1 100644 --- a/crates/account-abstraction-core/src/factories/kafka_engine.rs +++ b/crates/account-abstraction-core/src/factories/kafka_engine.rs @@ -31,3 +31,22 @@ pub fn create_mempool_engine( Ok(Arc::new(engine)) } + +pub fn create_mempool_engine_with_url( + kafka_url: &str, + topic: &str, + consumer_group_id: &str, + pool_config: Option, +) -> anyhow::Result>> { + let mut client_config = ClientConfig::new(); + client_config.set("bootstrap.servers", kafka_url); + client_config.set("group.id", consumer_group_id); + client_config.set("enable.auto.commit", "true"); + + let consumer: StreamConsumer = client_config.create()?; + consumer.subscribe(&[topic])?; + + let event_source = Arc::new(KafkaEventSource::new(Arc::new(consumer))); + let mempool = Arc::new(RwLock::new(InMemoryMempool::new(pool_config.unwrap_or_default()))); + Ok(Arc::new(MempoolEngine::::new(mempool, event_source))) +} \ No newline at end of file diff --git a/crates/account-abstraction-core/src/infrastructure/in_memory/mempool.rs b/crates/account-abstraction-core/src/infrastructure/in_memory/mempool.rs index 5aa7166..caa73f1 100644 --- a/crates/account-abstraction-core/src/infrastructure/in_memory/mempool.rs +++ b/crates/account-abstraction-core/src/infrastructure/in_memory/mempool.rs @@ -82,6 +82,7 @@ impl Ord for ByNonce { } } +#[derive(Debug)] pub struct InMemoryMempool { config: PoolConfig, best: BTreeSet, @@ -101,7 +102,7 @@ impl Mempool for InMemoryMempool { Ok(()) } - fn get_top_operations(&self, n: usize) -> impl Iterator> { + fn get_top_operations(&self, n: usize) -> Vec> { self.best .iter() .filter_map(|op_by_fee| { @@ -127,6 +128,7 @@ impl Mempool for InMemoryMempool { } }) .take(n) + .collect() } fn remove_operation( @@ -310,7 +312,7 @@ mod tests { mempool.add_operation(&operation).unwrap(); - let best_before: Vec<_> = mempool.get_top_operations(10).collect(); + let best_before: Vec<_> = mempool.get_top_operations(10); assert_eq!(best_before.len(), 1); assert_eq!(best_before[0].hash, hash); @@ -318,7 +320,7 @@ mod tests { assert!(result.is_ok()); assert!(result.unwrap().is_some()); - let best_after: Vec<_> = mempool.get_top_operations(10).collect(); + let best_after: Vec<_> = mempool.get_top_operations(10); assert_eq!(best_after.len(), 0); } @@ -338,7 +340,7 @@ mod tests { let operation3 = create_wrapped_operation(1500, hash3); mempool.add_operation(&operation3).unwrap(); - let best: Vec<_> = mempool.get_top_operations(10).collect(); + let best: Vec<_> = mempool.get_top_operations(10); assert_eq!(best.len(), 3); assert_eq!(best[0].operation.max_fee_per_gas(), Uint::from(3000)); assert_eq!(best[1].operation.max_fee_per_gas(), Uint::from(2000)); @@ -361,7 +363,7 @@ mod tests { let operation3 = create_wrapped_operation(1500, hash3); mempool.add_operation(&operation3).unwrap(); - let best: Vec<_> = mempool.get_top_operations(2).collect(); + let best: Vec<_> = mempool.get_top_operations(2); assert_eq!(best.len(), 2); assert_eq!(best[0].operation.max_fee_per_gas(), Uint::from(3000)); assert_eq!(best[1].operation.max_fee_per_gas(), Uint::from(2000)); @@ -379,7 +381,7 @@ mod tests { let operation2 = create_wrapped_operation(2000, hash2); mempool.add_operation(&operation2).unwrap(); - let best: Vec<_> = mempool.get_top_operations(2).collect(); + let best: Vec<_> = mempool.get_top_operations(2); assert_eq!(best.len(), 2); assert_eq!(best[0].hash, hash1); assert_eq!(best[1].hash, hash2); @@ -417,7 +419,7 @@ mod tests { }; mempool.add_operation(&operation2).unwrap(); - let best: Vec<_> = mempool.get_top_operations(2).collect(); + let best: Vec<_> = mempool.get_top_operations(2); assert_eq!(best.len(), 1); assert_eq!(best[0].operation.nonce(), Uint::from(0)); } diff --git a/crates/account-abstraction-core/src/lib.rs b/crates/account-abstraction-core/src/lib.rs index 0b88b37..9d957a3 100644 --- a/crates/account-abstraction-core/src/lib.rs +++ b/crates/account-abstraction-core/src/lib.rs @@ -21,3 +21,4 @@ pub use services::{ }; pub use factories::kafka_engine::create_mempool_engine; +pub use factories::kafka_engine::create_mempool_engine_with_url; diff --git a/crates/account-abstraction-core/src/services/mempool_engine.rs b/crates/account-abstraction-core/src/services/mempool_engine.rs index a4a8f66..2208dfb 100644 --- a/crates/account-abstraction-core/src/services/mempool_engine.rs +++ b/crates/account-abstraction-core/src/services/mempool_engine.rs @@ -128,7 +128,7 @@ mod tests { let engine = MempoolEngine::new(mempool.clone(), mock_source); engine.process_next().await.unwrap(); - let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + let items: Vec<_> = mempool.read().await.get_top_operations(10); assert_eq!(items.len(), 1); assert_eq!(items[0].hash, FixedBytes::from(op_hash)); } @@ -149,11 +149,11 @@ mod tests { let engine = MempoolEngine::new(mempool.clone(), mock_source); engine.process_next().await.unwrap(); - let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + let items: Vec<_> = mempool.read().await.get_top_operations(10); assert_eq!(items.len(), 1); assert_eq!(items[0].hash, FixedBytes::from(op_hash)); engine.process_next().await.unwrap(); - let items: Vec<_> = mempool.read().await.get_top_operations(10).collect(); + let items: Vec<_> = mempool.read().await.get_top_operations(10); assert_eq!(items.len(), 0); } }