diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/data.rs | 6 | ||||
-rw-r--r-- | src/table/gc.rs | 32 | ||||
-rw-r--r-- | src/table/merkle.rs | 17 | ||||
-rw-r--r-- | src/table/queue.rs | 10 | ||||
-rw-r--r-- | src/table/replication/parameters.rs | 2 | ||||
-rw-r--r-- | src/table/schema.rs | 17 | ||||
-rw-r--r-- | src/table/sync.rs | 20 | ||||
-rw-r--r-- | src/table/table.rs | 14 |
8 files changed, 29 insertions, 89 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index f93ed00d..5c792f1f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -41,11 +41,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub(crate) metrics: TableMetrics, } -impl<F, R> TableData<F, R> -where - F: TableSchema, - R: TableReplication, -{ +impl<F: TableSchema, R: TableReplication> TableData<F, R> { pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) diff --git a/src/table/gc.rs b/src/table/gc.rs index 90594fba..5b9124a7 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024; // and the moment the garbage collection actually happens) const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600); -pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> { +pub(crate) struct TableGc<F: TableSchema, R: TableReplication> { system: Arc<System>, data: Arc<TableData<F, R>>, @@ -49,11 +49,7 @@ impl Rpc for GcRpc { type Response = Result<GcRpc, Error>; } -impl<F, R> TableGc<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> TableGc<F, R> { pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { let endpoint = system .netapp @@ -277,11 +273,7 @@ where } #[async_trait] -impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> { async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> { match message { GcRpc::Update(items) => { @@ -299,20 +291,12 @@ where } } -struct GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +struct GcWorker<F: TableSchema, R: TableReplication> { gc: Arc<TableGc<F, R>>, wait_delay: Duration, } -impl<F, R> GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> GcWorker<F, R> { fn new(gc: Arc<TableGc<F, R>>) -> Self { Self { gc, @@ -322,11 +306,7 @@ where } #[async_trait] -impl<F, R> Worker for GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> { fn name(&self) -> String { format!("{} GC", F::TABLE_NAME) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 736354fa..2d593e6d 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -65,11 +65,7 @@ pub enum MerkleNode { Leaf(Vec<u8>, Hash), } -impl<F, R> MerkleUpdater<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> { pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); @@ -303,17 +299,10 @@ where } } -struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>) -where - F: TableSchema + 'static, - R: TableReplication + 'static; +struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>); #[async_trait] -impl<F, R> Worker for MerkleWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> { fn name(&self) -> String { format!("{} Merkle", F::TABLE_NAME) } diff --git a/src/table/queue.rs b/src/table/queue.rs index 860f20d3..0857209b 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -16,15 +16,11 @@ const BATCH_SIZE: usize = 100; pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>) where - F: TableSchema + 'static, - R: TableReplication + 'static; + F: TableSchema, + R: TableReplication; #[async_trait] -impl<F, R> Worker for InsertQueueWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> { fn name(&self) -> String { format!("{} queue", F::TABLE_NAME) } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 3740d947..f00815a2 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -2,7 +2,7 @@ use garage_rpc::ring::*; use garage_util::data::*; /// Trait to describe how a table shall be replicated -pub trait TableReplication: Send + Sync { +pub trait TableReplication: Send + Sync + 'static { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods diff --git a/src/table/schema.rs b/src/table/schema.rs index 6538a32f..5cbf6c95 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -7,7 +7,9 @@ use garage_util::migrate::Migrate; use crate::crdt::Crdt; /// Trait for field used to partition data -pub trait PartitionKey { +pub trait PartitionKey: + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static +{ /// Get the key used to partition fn hash(&self) -> Hash; } @@ -28,7 +30,7 @@ impl PartitionKey for FixedBytes32 { } /// Trait for field used to sort data -pub trait SortKey { +pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { /// Get the key used to sort fn sort_key(&self) -> &[u8]; } @@ -66,16 +68,9 @@ pub trait TableSchema: Send + Sync + 'static { const TABLE_NAME: &'static str; /// The partition key used in that table - type P: PartitionKey - + Clone - + PartialEq - + Serialize - + for<'de> Deserialize<'de> - + Send - + Sync - + 'static; + type P: PartitionKey; /// The sort key used int that table - type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; + type S: SortKey; /// They type for an entry in that table type E: Entry<Self::P, Self::S>; diff --git a/src/table/sync.rs b/src/table/sync.rs index abc034f8..29e7aa89 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -28,7 +28,7 @@ use crate::*; // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); -pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> { +pub struct TableSyncer<F: TableSchema, R: TableReplication> { system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, @@ -61,11 +61,7 @@ struct TodoPartition { retain: bool, } -impl<F, R> TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { pub(crate) fn new( system: Arc<System>, data: Arc<TableData<F, R>>, @@ -459,11 +455,7 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== #[async_trait] -impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> { async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> { match message { SyncRpc::RootCkHash(range, h) => { @@ -497,7 +489,7 @@ where // -------- Sync Worker --------- -struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { +struct SyncWorker<F: TableSchema, R: TableReplication> { syncer: Arc<TableSyncer<F, R>>, ring_recv: watch::Receiver<Arc<Ring>>, ring: Arc<Ring>, @@ -506,7 +498,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { next_full_sync: Instant, } -impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { +impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> { fn add_full_sync(&mut self) { let system = &self.syncer.system; let data = &self.syncer.data; @@ -572,7 +564,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { } #[async_trait] -impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { +impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { fn name(&self) -> String { format!("{} sync", F::TABLE_NAME) } diff --git a/src/table/table.rs b/src/table/table.rs index 7f158314..7ad79677 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -33,7 +33,7 @@ use crate::schema::*; use crate::sync::*; use crate::util::*; -pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { +pub struct Table<F: TableSchema, R: TableReplication> { pub system: Arc<System>, pub data: Arc<TableData<F, R>>, pub merkle_updater: Arc<MerkleUpdater<F, R>>, @@ -65,11 +65,7 @@ impl<F: TableSchema> Rpc for TableRpc<F> { type Response = Result<TableRpc<F>, Error>; } -impl<F, R> Table<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Table<F, R> { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> { @@ -428,11 +424,7 @@ where } #[async_trait] -impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> { async fn handle( self: &Arc<Self>, msg: &TableRpc<F>, |