diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/Cargo.toml | 1 | ||||
-rw-r--r-- | src/table/data.rs | 36 | ||||
-rw-r--r-- | src/table/gc.rs | 32 | ||||
-rw-r--r-- | src/table/merkle.rs | 24 | ||||
-rw-r--r-- | src/table/queue.rs | 10 | ||||
-rw-r--r-- | src/table/replication/parameters.rs | 2 | ||||
-rw-r--r-- | src/table/schema.rs | 22 | ||||
-rw-r--r-- | src/table/sync.rs | 29 | ||||
-rw-r--r-- | src/table/table.rs | 21 |
9 files changed, 61 insertions, 116 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index e1a74553..3911c945 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -28,7 +28,6 @@ hexdump = "0.1" tracing = "0.1.30" rand = "0.8" -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/table/data.rs b/src/table/data.rs index 40856b02..5c792f1f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -10,6 +10,7 @@ use garage_db::counted_tree_hack::CountedTree; use garage_util::data::*; use garage_util::error::*; +use garage_util::migrate::Migrate; use garage_rpc::system::System; @@ -40,11 +41,7 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub(crate) metrics: TableMetrics, } -impl<F, R> TableData<F, R> -where - F: TableSchema, - R: TableReplication, -{ +impl<F: TableSchema, R: TableReplication> TableData<F, R> { pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) @@ -219,7 +216,8 @@ where // data format, the messagepack encoding changed. In this case, // we also have to write the migrated value in the table and update // the associated Merkle tree entry. - let new_bytes = rmp_to_vec_all_named(&new_entry) + let new_bytes = new_entry + .encode() .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; let changed = Some(&new_bytes[..]) != old_bytes.as_deref(); @@ -329,9 +327,9 @@ where Some(old_v) => { let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; entry.merge(ins); - rmp_to_vec_all_named(&entry) + entry.encode() } - None => rmp_to_vec_all_named(ins), + None => ins.encode(), }; let new_entry = new_entry .map_err(Error::RmpEncode) @@ -351,18 +349,18 @@ where } pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { - match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { - Ok(x) => Ok(x), - Err(e) => match F::try_migrate(bytes) { - Some(x) => Ok(x), - None => { - warn!("Unable to decode entry of {}: {}", F::TABLE_NAME, e); - for line in hexdump::hexdump_iter(bytes) { - debug!("{}", line); - } - Err(e.into()) + match F::E::decode(bytes) { + Some(x) => Ok(x), + None => { + error!("Unable to decode entry of {}", F::TABLE_NAME); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); } - }, + Err(Error::Message(format!( + "Unable to decode entry of {}", + F::TABLE_NAME + ))) + } } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 90594fba..5b9124a7 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024; // and the moment the garbage collection actually happens) const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600); -pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> { +pub(crate) struct TableGc<F: TableSchema, R: TableReplication> { system: Arc<System>, data: Arc<TableData<F, R>>, @@ -49,11 +49,7 @@ impl Rpc for GcRpc { type Response = Result<GcRpc, Error>; } -impl<F, R> TableGc<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> TableGc<F, R> { pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { let endpoint = system .netapp @@ -277,11 +273,7 @@ where } #[async_trait] -impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> { async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> { match message { GcRpc::Update(items) => { @@ -299,20 +291,12 @@ where } } -struct GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +struct GcWorker<F: TableSchema, R: TableReplication> { gc: Arc<TableGc<F, R>>, wait_delay: Duration, } -impl<F, R> GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> GcWorker<F, R> { fn new(gc: Arc<TableGc<F, R>>) -> Self { Self { gc, @@ -322,11 +306,7 @@ where } #[async_trait] -impl<F, R> Worker for GcWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> { fn name(&self) -> String { format!("{} GC", F::TABLE_NAME) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 736354fa..e86d0251 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -10,6 +10,7 @@ use garage_db as db; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::{nonversioned_decode, nonversioned_encode}; use garage_util::error::Error; use garage_rpc::ring::*; @@ -65,13 +66,9 @@ pub enum MerkleNode { Leaf(Vec<u8>, Hash), } -impl<F, R> MerkleUpdater<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> { pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> { - let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); + let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]); Arc::new(Self { data, @@ -277,7 +274,7 @@ where tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; + let vby = nonversioned_encode(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) @@ -303,17 +300,10 @@ where } } -struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>) -where - F: TableSchema + 'static, - R: TableReplication + 'static; +struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>); #[async_trait] -impl<F, R> Worker for MerkleWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> { fn name(&self) -> String { format!("{} Merkle", F::TABLE_NAME) } @@ -375,7 +365,7 @@ impl MerkleNode { fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> { match ent { None => Ok(MerkleNode::Empty), - Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), + Some(v) => Ok(nonversioned_decode::<MerkleNode>(&v[..])?), } } diff --git a/src/table/queue.rs b/src/table/queue.rs index 860f20d3..0857209b 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -16,15 +16,11 @@ const BATCH_SIZE: usize = 100; pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>) where - F: TableSchema + 'static, - R: TableReplication + 'static; + F: TableSchema, + R: TableReplication; #[async_trait] -impl<F, R> Worker for InsertQueueWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> { fn name(&self) -> String { format!("{} queue", F::TABLE_NAME) } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 3740d947..f00815a2 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -2,7 +2,7 @@ use garage_rpc::ring::*; use garage_util::data::*; /// Trait to describe how a table shall be replicated -pub trait TableReplication: Send + Sync { +pub trait TableReplication: Send + Sync + 'static { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods diff --git a/src/table/schema.rs b/src/table/schema.rs index f37e98d8..5cbf6c95 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -2,11 +2,14 @@ use serde::{Deserialize, Serialize}; use garage_db as db; use garage_util::data::*; +use garage_util::migrate::Migrate; use crate::crdt::Crdt; /// Trait for field used to partition data -pub trait PartitionKey { +pub trait PartitionKey: + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static +{ /// Get the key used to partition fn hash(&self) -> Hash; } @@ -27,7 +30,7 @@ impl PartitionKey for FixedBytes32 { } /// Trait for field used to sort data -pub trait SortKey { +pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { /// Get the key used to sort fn sort_key(&self) -> &[u8]; } @@ -46,7 +49,7 @@ impl SortKey for FixedBytes32 { /// Trait for an entry in a table. It must be sortable and partitionnable. pub trait Entry<P: PartitionKey, S: SortKey>: - Crdt + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static { /// Get the key used to partition fn partition_key(&self) -> &P; @@ -65,23 +68,16 @@ pub trait TableSchema: Send + Sync + 'static { const TABLE_NAME: &'static str; /// The partition key used in that table - type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type P: PartitionKey; /// The sort key used int that table - type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type S: SortKey; /// They type for an entry in that table type E: Entry<Self::P, Self::S>; /// The type for a filter that can be applied to select entries /// (e.g. filter out deleted entries) - type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; - - // Action to take if not able to decode current version: - // try loading from an older version - /// Try migrating an entry from an older version - fn try_migrate(_bytes: &[u8]) -> Option<Self::E> { - None - } + type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; /// Actions triggered by data changing in a table. If such actions /// include updates to the local database that should be applied diff --git a/src/table/sync.rs b/src/table/sync.rs index d6d272ab..92a353c6 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::{debug_serialize, nonversioned_encode}; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; @@ -28,7 +29,7 @@ use crate::*; // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); -pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> { +pub struct TableSyncer<F: TableSchema, R: TableReplication> { system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, @@ -61,11 +62,7 @@ struct TodoPartition { retain: bool, } -impl<F, R> TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { pub(crate) fn new( system: Arc<System>, data: Arc<TableData<F, R>>, @@ -302,7 +299,7 @@ where ); return Ok(()); } - let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?; + let root_ck_hash = hash_of_merkle_node(&root_ck)?; // Check if they have the same root checksum // If so, do nothing. @@ -459,16 +456,12 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== #[async_trait] -impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> { async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> { match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; - let hash = hash_of::<MerkleNode>(&root_ck)?; + let hash = hash_of_merkle_node(&root_ck)?; Ok(SyncRpc::RootCkDifferent(hash != *h)) } SyncRpc::GetNode(k) => { @@ -497,7 +490,7 @@ where // -------- Sync Worker --------- -struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { +struct SyncWorker<F: TableSchema, R: TableReplication> { syncer: Arc<TableSyncer<F, R>>, ring_recv: watch::Receiver<Arc<Ring>>, ring: Arc<Ring>, @@ -506,7 +499,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { next_full_sync: Instant, } -impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { +impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> { fn add_full_sync(&mut self) { let system = &self.syncer.system; let data = &self.syncer.data; @@ -572,7 +565,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { } #[async_trait] -impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { +impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> { fn name(&self) -> String { format!("{} sync", F::TABLE_NAME) } @@ -622,8 +615,8 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor // ---- UTIL ---- -fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> { - Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) +fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> { + Ok(blake2sum(&nonversioned_encode(x)?[..])) } fn join_ordered<'a, K: Ord + Eq, V1, V2>( diff --git a/src/table/table.rs b/src/table/table.rs index bbcd5971..7ad79677 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -18,6 +18,7 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; +use garage_util::migrate::Migrate; use garage_rpc::system::System; use garage_rpc::*; @@ -32,7 +33,7 @@ use crate::schema::*; use crate::sync::*; use crate::util::*; -pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { +pub struct Table<F: TableSchema, R: TableReplication> { pub system: Arc<System>, pub data: Arc<TableData<F, R>>, pub merkle_updater: Arc<MerkleUpdater<F, R>>, @@ -64,11 +65,7 @@ impl<F: TableSchema> Rpc for TableRpc<F> { type Response = Result<TableRpc<F>, Error>; } -impl<F, R> Table<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Table<F, R> { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> { @@ -122,7 +119,7 @@ where let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); - let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); + let e_enc = Arc::new(ByteBuf::from(e.encode()?)); let rpc = TableRpc::<F>::Update(vec![e_enc]); self.system @@ -173,7 +170,7 @@ where let entry = entry.borrow(); let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); - let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); + let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); for node in who { call_list.entry(node).or_default().push(e_enc.clone()); } @@ -412,7 +409,7 @@ where // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { - let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); + let what_enc = Arc::new(ByteBuf::from(what.encode()?)); self.system .rpc .try_call_many( @@ -427,11 +424,7 @@ where } #[async_trait] -impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> { async fn handle( self: &Arc<Self>, msg: &TableRpc<F>, |