diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/router_macros.rs | 1 | ||||
-rw-r--r-- | src/model/bucket_alias_table.rs | 6 | ||||
-rw-r--r-- | src/model/bucket_table.rs | 7 | ||||
-rw-r--r-- | src/model/index_counter.rs | 27 | ||||
-rw-r--r-- | src/model/s3/object_table.rs | 3 | ||||
-rw-r--r-- | src/table/data.rs | 6 | ||||
-rw-r--r-- | src/table/gc.rs | 32 | ||||
-rw-r--r-- | src/table/merkle.rs | 17 | ||||
-rw-r--r-- | src/table/queue.rs | 10 | ||||
-rw-r--r-- | src/table/replication/parameters.rs | 2 | ||||
-rw-r--r-- | src/table/schema.rs | 17 | ||||
-rw-r--r-- | src/table/sync.rs | 20 | ||||
-rw-r--r-- | src/table/table.rs | 14 |
13 files changed, 54 insertions, 108 deletions
diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs index 959e69a3..07b5570c 100644 --- a/src/api/router_macros.rs +++ b/src/api/router_macros.rs @@ -145,6 +145,7 @@ macro_rules! generateQueryParameters { ) => { #[derive(Debug)] #[allow(non_camel_case_types)] + #[allow(clippy::upper_case_acronyms)] enum Keyword { EMPTY, $( $kw_name, )* diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs index d07394f6..54d7fbad 100644 --- a/src/model/bucket_alias_table.rs +++ b/src/model/bucket_alias_table.rs @@ -1,12 +1,12 @@ -use serde::{Deserialize, Serialize}; - use garage_util::data::*; use garage_table::crdt::*; use garage_table::*; mod v08 { - use super::*; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; /// The bucket alias table holds the names given to buckets /// in the global namespace. diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 38ed88ee..ac163736 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,5 +1,3 @@ -use serde::{Deserialize, Serialize}; - use garage_table::crdt::*; use garage_table::*; use garage_util::data::*; @@ -8,7 +6,10 @@ use garage_util::time::*; use crate::permission::BucketKeyPerm; mod v08 { - use super::*; + use crate::permission::BucketKeyPerm; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; /// A bucket is a collection of objects /// diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index c3ed29c7..3cd3083a 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -31,7 +31,12 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { } mod v08 { - use super::*; + use super::CountedItem; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + + // ---- Global part (the table everyone queries) ---- /// A counter entry in the global table #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] @@ -48,6 +53,17 @@ mod v08 { } impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {} + + // ---- Local part (the counter we maintain transactionnaly on each node) ---- + + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] + pub(super) struct LocalCounterEntry<T: CountedItem> { + pub(super) pk: T::CP, + pub(super) sk: T::CS, + pub(super) values: BTreeMap<String, (u64, i64)>, + } + + impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {} } pub use v08::*; @@ -358,15 +374,6 @@ impl<T: CountedItem> IndexCounter<T> { // ---- -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct LocalCounterEntry<T: CountedItem> { - pk: T::CP, - sk: T::CS, - values: BTreeMap<String, (u64, i64)>, -} - -impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {} - impl<T: CountedItem> LocalCounterEntry<T> { fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> { CounterEntry { diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 616e0d35..518acc95 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -380,6 +380,3 @@ impl CountedItem for Object { ] } } - -// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv -// (we just want to change bucket into bucket_id by hashing it) diff --git a/src/table/data.rs b/src/table/data.rs index f93ed00d..5c792f1f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -41,11 +41,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub(crate) metrics: TableMetrics, } -impl<F, R> TableData<F, R> -where - F: TableSchema, - R: TableReplication, -{ +impl<F: TableSchema, R: TableReplication> TableData<F, R> { pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) diff --git a/src/table/gc.rs b/src/table/gc.rs index 90594fba..5b9124a7 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024; // and the moment the garbage collection actually happens) const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600); -pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> { +pub(crate) struct TableGc<F: TableSchema, R: TableReplication> { system: Arc<System>, data: Arc<TableData<F, R>>, @@ -49,11 +49,7 @@ impl Rpc for GcRpc { type Response = Result<GcRpc, Error>; } -impl<F, R> TableGc<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> TableGc<F, R> { pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { let endpoint = system .netapp @@ -277,11 +273,7 @@ where } #[async_trait] -impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> { async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> { match message { GcRpc::Update(items) => { @@ -299,20 +291,12 @@ where } } -struct GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +struct GcWorker<F: TableSchema, R: TableReplication> { gc: Arc<TableGc<F, R>>, wait_delay: Duration, } -impl<F, R> GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> GcWorker<F, R> { fn new(gc: Arc<TableGc<F, R>>) -> Self { Self { gc, @@ -322,11 +306,7 @@ where } #[async_trait] -impl<F, R> Worker for GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> { fn name(&self) -> String { format!("{} GC", F::TABLE_NAME) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 736354fa..2d593e6d 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -65,11 +65,7 @@ pub enum MerkleNode { Leaf(Vec<u8>, Hash), } -impl<F, R> MerkleUpdater<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> { pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); @@ -303,17 +299,10 @@ where } } -struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>) -where - F: TableSchema + 'static, - R: TableReplication + 'static; +struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>); #[async_trait] -impl<F, R> Worker for MerkleWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> { fn name(&self) -> String { format!("{} Merkle", F::TABLE_NAME) } diff --git a/src/table/queue.rs b/src/table/queue.rs index 860f20d3..0857209b 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -16,15 +16,11 @@ const BATCH_SIZE: usize = 100; pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>) where - F: TableSchema + 'static, - R: TableReplication + 'static; + F: TableSchema, + R: TableReplication; #[async_trait] -impl<F, R> Worker for InsertQueueWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> { fn name(&self) -> String { format!("{} queue", F::TABLE_NAME) } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 3740d947..f00815a2 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -2,7 +2,7 @@ use garage_rpc::ring::*; use garage_util::data::*; /// Trait to describe how a table shall be replicated -pub trait TableReplication: Send + Sync { +pub trait TableReplication: Send + Sync + 'static { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods diff --git a/src/table/schema.rs b/src/table/schema.rs index 6538a32f..5cbf6c95 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -7,7 +7,9 @@ use garage_util::migrate::Migrate; use crate::crdt::Crdt; /// Trait for field used to partition data -pub trait PartitionKey { +pub trait PartitionKey: + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static +{ /// Get the key used to partition fn hash(&self) -> Hash; } @@ -28,7 +30,7 @@ impl PartitionKey for FixedBytes32 { } /// Trait for field used to sort data -pub trait SortKey { +pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { /// Get the key used to sort fn sort_key(&self) -> &[u8]; } @@ -66,16 +68,9 @@ pub trait TableSchema: Send + Sync + 'static { const TABLE_NAME: &'static str; /// The partition key used in that table - type P: PartitionKey - + Clone - + PartialEq - + Serialize - + for<'de> Deserialize<'de> - + Send - + Sync - + 'static; + type P: PartitionKey; /// The sort key used int that table - type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; + type S: SortKey; /// They type for an entry in that table type E: Entry<Self::P, Self::S>; diff --git a/src/table/sync.rs b/src/table/sync.rs index abc034f8..29e7aa89 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -28,7 +28,7 @@ use crate::*; // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); -pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> { +pub struct TableSyncer<F: TableSchema, R: TableReplication> { system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, @@ -61,11 +61,7 @@ struct TodoPartition { retain: bool, } -impl<F, R> TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { pub(crate) fn new( system: Arc<System>, data: Arc<TableData<F, R>>, @@ -459,11 +455,7 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== #[async_trait] -impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> { async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> { match message { SyncRpc::RootCkHash(range, h) => { @@ -497,7 +489,7 @@ where // -------- Sync Worker --------- -struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { +struct SyncWorker<F: TableSchema, R: TableReplication> { syncer: Arc<TableSyncer<F, R>>, ring_recv: watch::Receiver<Arc<Ring>>, ring: Arc<Ring>, @@ -506,7 +498,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { next_full_sync: Instant, } -impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { +impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> { fn add_full_sync(&mut self) { let system = &self.syncer.system; let data = &self.syncer.data; @@ -572,7 +564,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { } #[async_trait] -impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { +impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { fn name(&self) -> String { format!("{} sync", F::TABLE_NAME) } diff --git a/src/table/table.rs b/src/table/table.rs index 7f158314..7ad79677 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -33,7 +33,7 @@ use crate::schema::*; use crate::sync::*; use crate::util::*; -pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { +pub struct Table<F: TableSchema, R: TableReplication> { pub system: Arc<System>, pub data: Arc<TableData<F, R>>, pub merkle_updater: Arc<MerkleUpdater<F, R>>, @@ -65,11 +65,7 @@ impl<F: TableSchema> Rpc for TableRpc<F> { type Response = Result<TableRpc<F>, Error>; } -impl<F, R> Table<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Table<F, R> { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> { @@ -428,11 +424,7 @@ where } #[async_trait] -impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> { async fn handle( self: &Arc<Self>, msg: &TableRpc<F>, |