aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/router_macros.rs1
-rw-r--r--src/model/bucket_alias_table.rs6
-rw-r--r--src/model/bucket_table.rs7
-rw-r--r--src/model/index_counter.rs27
-rw-r--r--src/model/s3/object_table.rs3
-rw-r--r--src/table/data.rs6
-rw-r--r--src/table/gc.rs32
-rw-r--r--src/table/merkle.rs17
-rw-r--r--src/table/queue.rs10
-rw-r--r--src/table/replication/parameters.rs2
-rw-r--r--src/table/schema.rs17
-rw-r--r--src/table/sync.rs20
-rw-r--r--src/table/table.rs14
13 files changed, 54 insertions, 108 deletions
diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs
index 959e69a3..07b5570c 100644
--- a/src/api/router_macros.rs
+++ b/src/api/router_macros.rs
@@ -145,6 +145,7 @@ macro_rules! generateQueryParameters {
) => {
#[derive(Debug)]
#[allow(non_camel_case_types)]
+ #[allow(clippy::upper_case_acronyms)]
enum Keyword {
EMPTY,
$( $kw_name, )*
diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs
index d07394f6..54d7fbad 100644
--- a/src/model/bucket_alias_table.rs
+++ b/src/model/bucket_alias_table.rs
@@ -1,12 +1,12 @@
-use serde::{Deserialize, Serialize};
-
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::*;
mod v08 {
- use super::*;
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
/// The bucket alias table holds the names given to buckets
/// in the global namespace.
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 38ed88ee..ac163736 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -1,5 +1,3 @@
-use serde::{Deserialize, Serialize};
-
use garage_table::crdt::*;
use garage_table::*;
use garage_util::data::*;
@@ -8,7 +6,10 @@ use garage_util::time::*;
use crate::permission::BucketKeyPerm;
mod v08 {
- use super::*;
+ use crate::permission::BucketKeyPerm;
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
/// A bucket is a collection of objects
///
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index c3ed29c7..3cd3083a 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -31,7 +31,12 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
}
mod v08 {
- use super::*;
+ use super::CountedItem;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
+
+ // ---- Global part (the table everyone queries) ----
/// A counter entry in the global table
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
@@ -48,6 +53,17 @@ mod v08 {
}
impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {}
+
+ // ---- Local part (the counter we maintain transactionnaly on each node) ----
+
+ #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+ pub(super) struct LocalCounterEntry<T: CountedItem> {
+ pub(super) pk: T::CP,
+ pub(super) sk: T::CS,
+ pub(super) values: BTreeMap<String, (u64, i64)>,
+ }
+
+ impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {}
}
pub use v08::*;
@@ -358,15 +374,6 @@ impl<T: CountedItem> IndexCounter<T> {
// ----
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-struct LocalCounterEntry<T: CountedItem> {
- pk: T::CP,
- sk: T::CS,
- values: BTreeMap<String, (u64, i64)>,
-}
-
-impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {}
-
impl<T: CountedItem> LocalCounterEntry<T> {
fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
CounterEntry {
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 616e0d35..518acc95 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -380,6 +380,3 @@ impl CountedItem for Object {
]
}
}
-
-// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
-// (we just want to change bucket into bucket_id by hashing it)
diff --git a/src/table/data.rs b/src/table/data.rs
index f93ed00d..5c792f1f 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -41,11 +41,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) metrics: TableMetrics,
}
-impl<F, R> TableData<F, R>
-where
- F: TableSchema,
- R: TableReplication,
-{
+impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME))
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 90594fba..5b9124a7 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024;
// and the moment the garbage collection actually happens)
const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600);
-pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub(crate) struct TableGc<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
@@ -49,11 +49,7 @@ impl Rpc for GcRpc {
type Response = Result<GcRpc, Error>;
}
-impl<F, R> TableGc<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
let endpoint = system
.netapp
@@ -277,11 +273,7 @@ where
}
#[async_trait]
-impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
match message {
GcRpc::Update(items) => {
@@ -299,20 +291,12 @@ where
}
}
-struct GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+struct GcWorker<F: TableSchema, R: TableReplication> {
gc: Arc<TableGc<F, R>>,
wait_delay: Duration,
}
-impl<F, R> GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> GcWorker<F, R> {
fn new(gc: Arc<TableGc<F, R>>) -> Self {
Self {
gc,
@@ -322,11 +306,7 @@ where
}
#[async_trait]
-impl<F, R> Worker for GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
fn name(&self) -> String {
format!("{} GC", F::TABLE_NAME)
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 736354fa..2d593e6d 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -65,11 +65,7 @@ pub enum MerkleNode {
Leaf(Vec<u8>, Hash),
}
-impl<F, R> MerkleUpdater<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
@@ -303,17 +299,10 @@ where
}
}
-struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static;
+struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>);
#[async_trait]
-impl<F, R> Worker for MerkleWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> {
fn name(&self) -> String {
format!("{} Merkle", F::TABLE_NAME)
}
diff --git a/src/table/queue.rs b/src/table/queue.rs
index 860f20d3..0857209b 100644
--- a/src/table/queue.rs
+++ b/src/table/queue.rs
@@ -16,15 +16,11 @@ const BATCH_SIZE: usize = 100;
pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>)
where
- F: TableSchema + 'static,
- R: TableReplication + 'static;
+ F: TableSchema,
+ R: TableReplication;
#[async_trait]
-impl<F, R> Worker for InsertQueueWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
fn name(&self) -> String {
format!("{} queue", F::TABLE_NAME)
}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 3740d947..f00815a2 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -2,7 +2,7 @@ use garage_rpc::ring::*;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
-pub trait TableReplication: Send + Sync {
+pub trait TableReplication: Send + Sync + 'static {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 6538a32f..5cbf6c95 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -7,7 +7,9 @@ use garage_util::migrate::Migrate;
use crate::crdt::Crdt;
/// Trait for field used to partition data
-pub trait PartitionKey {
+pub trait PartitionKey:
+ Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
+{
/// Get the key used to partition
fn hash(&self) -> Hash;
}
@@ -28,7 +30,7 @@ impl PartitionKey for FixedBytes32 {
}
/// Trait for field used to sort data
-pub trait SortKey {
+pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
/// Get the key used to sort
fn sort_key(&self) -> &[u8];
}
@@ -66,16 +68,9 @@ pub trait TableSchema: Send + Sync + 'static {
const TABLE_NAME: &'static str;
/// The partition key used in that table
- type P: PartitionKey
- + Clone
- + PartialEq
- + Serialize
- + for<'de> Deserialize<'de>
- + Send
- + Sync
- + 'static;
+ type P: PartitionKey;
/// The sort key used int that table
- type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
+ type S: SortKey;
/// They type for an entry in that table
type E: Entry<Self::P, Self::S>;
diff --git a/src/table/sync.rs b/src/table/sync.rs
index abc034f8..29e7aa89 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -28,7 +28,7 @@ use crate::*;
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
-pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
@@ -61,11 +61,7 @@ struct TodoPartition {
retain: bool,
}
-impl<F, R> TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
pub(crate) fn new(
system: Arc<System>,
data: Arc<TableData<F, R>>,
@@ -459,11 +455,7 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
#[async_trait]
-impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
@@ -497,7 +489,7 @@ where
// -------- Sync Worker ---------
-struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
+struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
ring_recv: watch::Receiver<Arc<Ring>>,
ring: Arc<Ring>,
@@ -506,7 +498,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
next_full_sync: Instant,
}
-impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
fn add_full_sync(&mut self) {
let system = &self.syncer.system;
let data = &self.syncer.data;
@@ -572,7 +564,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
}
#[async_trait]
-impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn name(&self) -> String {
format!("{} sync", F::TABLE_NAME)
}
diff --git a/src/table/table.rs b/src/table/table.rs
index 7f158314..7ad79677 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -33,7 +33,7 @@ use crate::schema::*;
use crate::sync::*;
use crate::util::*;
-pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub struct Table<F: TableSchema, R: TableReplication> {
pub system: Arc<System>,
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
@@ -65,11 +65,7 @@ impl<F: TableSchema> Rpc for TableRpc<F> {
type Response = Result<TableRpc<F>, Error>;
}
-impl<F, R> Table<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Table<F, R> {
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
@@ -428,11 +424,7 @@ where
}
#[async_trait]
-impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
async fn handle(
self: &Arc<Self>,
msg: &TableRpc<F>,