From de9d6cddf709e686ada3d1e71de7b31d7704b8b5 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 12 Dec 2022 17:16:49 +0100 Subject: Prettier worker list table; remove useless CLI log messages --- src/table/gc.rs | 10 ++++------ src/table/merkle.rs | 12 +++++------- src/table/sync.rs | 10 ++++------ 3 files changed, 13 insertions(+), 19 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index 83e7eeff..cfdc9d2d 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -330,12 +330,10 @@ where format!("{} GC", F::TABLE_NAME) } - fn info(&self) -> Option { - let l = self.gc.data.gc_todo_len().unwrap_or(0); - if l > 0 { - Some(format!("{} items in queue", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64), + ..Default::default() } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index a5c29723..6f8a19b6 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -310,15 +310,13 @@ where R: TableReplication + 'static, { fn name(&self) -> String { - format!("{} Merkle tree updater", F::TABLE_NAME) + format!("{} Merkle", F::TABLE_NAME) } - fn info(&self) -> Option { - let l = self.0.todo_len().unwrap_or(0); - if l > 0 { - Some(format!("{} items in queue", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.0.todo_len().unwrap_or(0) as u64), + ..Default::default() } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 9d79d856..af7aa640 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -570,12 +570,10 @@ impl Worker for SyncWor format!("{} sync", F::TABLE_NAME) } - fn info(&self) -> Option { - let l = self.todo.len(); - if l > 0 { - Some(format!("{} partitions remaining", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.todo.len() as u64), + ..Default::default() } } -- cgit v1.2.3 From 687660b27f904422c689e09d2457293e5313d325 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Dec 2022 14:23:45 +0100 Subject: Implement `block list-errors` and `block info` --- src/table/util.rs | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src/table') diff --git a/src/table/util.rs b/src/table/util.rs index 20595a94..0b10cf3f 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -49,3 +49,9 @@ impl EnumerationOrder { } } } + +impl Default for EnumerationOrder { + fn default() -> Self { + EnumerationOrder::Forward + } +} -- cgit v1.2.3 From d6040e32a610a792d1e5365a7643eb99fbb5a217 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Dec 2022 15:43:22 +0100 Subject: cli: prettier table in garage stats --- src/table/merkle.rs | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/table') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 6f8a19b6..e977bfb5 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -293,6 +293,10 @@ where Ok(self.data.merkle_tree.len()?) } + pub fn merkle_tree_fast_len(&self) -> Result, Error> { + Ok(self.data.merkle_tree.fast_len()?) + } + pub fn todo_len(&self) -> Result { Ok(self.data.merkle_todo.len()?) } -- cgit v1.2.3 From 041b60ed1dc48563ad297a6a30230655555c9a20 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Dec 2022 15:54:03 +0100 Subject: Add block.rc_size, table.size and table.merkle_tree_size metrics --- src/table/data.rs | 8 +++++++- src/table/metrics.rs | 38 +++++++++++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 2 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index 3212e82b..93da2110 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -58,7 +58,13 @@ where .expect("Unable to open DB tree"); let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); - let metrics = TableMetrics::new(F::TABLE_NAME, merkle_todo.clone(), gc_todo.clone()); + let metrics = TableMetrics::new( + F::TABLE_NAME, + store.clone(), + merkle_tree.clone(), + merkle_todo.clone(), + gc_todo.clone(), + ); Arc::new(Self { system, diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 3a1783e0..8318a84f 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -5,6 +5,8 @@ use garage_db::counted_tree_hack::CountedTree; /// TableMetrics reference all counter used for metrics pub struct TableMetrics { + pub(crate) _table_size: ValueObserver, + pub(crate) _merkle_tree_size: ValueObserver, pub(crate) _merkle_todo_len: ValueObserver, pub(crate) _gc_todo_len: ValueObserver, @@ -20,9 +22,43 @@ pub struct TableMetrics { pub(crate) sync_items_received: Counter, } impl TableMetrics { - pub fn new(table_name: &'static str, merkle_todo: db::Tree, gc_todo: CountedTree) -> Self { + pub fn new( + table_name: &'static str, + store: db::Tree, + merkle_tree: db::Tree, + merkle_todo: db::Tree, + gc_todo: CountedTree, + ) -> Self { let meter = global::meter(table_name); TableMetrics { + _table_size: meter + .u64_value_observer( + "table.size", + move |observer| { + if let Ok(Some(v)) = store.fast_len() { + observer.observe( + v as u64, + &[KeyValue::new("table_name", table_name)], + ); + } + }, + ) + .with_description("Number of items in table") + .init(), + _merkle_tree_size: meter + .u64_value_observer( + "table.merkle_tree_size", + move |observer| { + if let Ok(Some(v)) = merkle_tree.fast_len() { + observer.observe( + v as u64, + &[KeyValue::new("table_name", table_name)], + ); + } + }, + ) + .with_description("Number of nodes in table's Merkle tree") + .init(), _merkle_todo_len: meter .u64_value_observer( "table.merkle_updater_todo_queue_length", -- cgit v1.2.3 From f8e528c15de0c9d31c16e5cd8e58f99f4132f103 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 10:48:49 +0100 Subject: Small refactor of tables internals --- src/table/data.rs | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index 93da2110..cae18999 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -173,9 +173,8 @@ where pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { let update = self.decode_entry(update_bytes)?; - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - self.update_entry_with(&tree_key[..], |ent| match ent { + self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent { Some(mut ent) => { ent.merge(&update); ent @@ -187,11 +186,14 @@ where pub fn update_entry_with( &self, - tree_key: &[u8], + partition_key: &F::P, + sort_key: &F::S, f: impl Fn(Option) -> F::E, ) -> Result, Error> { + let tree_key = self.tree_key(partition_key, sort_key); + let changed = self.store.db().transaction(|mut tx| { - let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { + let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; let new_entry = f(Some(old_entry.clone())); @@ -200,23 +202,23 @@ where None => (None, None, f(None)), }; - // Scenario 1: the value changed, so of course there is a change - let value_changed = Some(&new_entry) != old_entry.as_ref(); - + // Changed can be true in two scenarios + // Scenario 1: the actual represented value changed, + // so of course the messagepack encoding changed as well // Scenario 2: the value didn't change but due to a migration in the - // data format, the messagepack encoding changed. In this case - // we have to write the migrated value in the table and update - // the associated Merkle tree entry. + // data format, the messagepack encoding changed. In this case, + // we also have to write the migrated value in the table and update + // the associated Merkle tree entry. let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; - let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); + let changed = Some(&new_bytes[..]) != old_bytes.as_deref(); drop(old_bytes); - if value_changed || encoding_changed { - let new_bytes_hash = blake2sum(&new_bytes[..]); - tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?; - tx.insert(&self.store, tree_key, new_bytes)?; + if changed { + let new_bytes_hash = blake2sum(&new_bytes); + tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?; + tx.insert(&self.store, &tree_key, new_bytes)?; self.instance .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; @@ -242,7 +244,7 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?; + GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; } } @@ -258,10 +260,11 @@ where .db() .transaction(|mut tx| match tx.get(&self.store, k)? { Some(cur_v) if cur_v == v => { + let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; + tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; self.instance.updated(&mut tx, Some(&old_entry), None)?; Ok(true) } @@ -285,10 +288,11 @@ where .db() .transaction(|mut tx| match tx.get(&self.store, k)? { Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { + let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; + tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; self.instance.updated(&mut tx, Some(&old_entry), None)?; Ok(true) } -- cgit v1.2.3 From 83c8467e23c1f531ae233766d5dc7244afe57f08 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 11:58:06 +0100 Subject: Proper queueing for delayed inserts, now backed to disk --- src/table/data.rs | 38 +++++++++++++++++++++++- src/table/lib.rs | 8 +++-- src/table/merkle.rs | 6 +++- src/table/queue.rs | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/table/table.rs | 11 +++++++ 5 files changed, 142 insertions(+), 5 deletions(-) create mode 100644 src/table/queue.rs (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index cae18999..e3b8c93f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -31,6 +31,10 @@ pub struct TableData { pub(crate) merkle_tree: db::Tree, pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, + + pub(crate) insert_queue: db::Tree, + pub(crate) insert_queue_notify: Notify, + pub(crate) gc_todo: CountedTree, pub(crate) metrics: TableMetrics, @@ -53,9 +57,13 @@ where .open_tree(&format!("{}:merkle_todo", F::TABLE_NAME)) .expect("Unable to open DB Merkle TODO tree"); + let insert_queue = db + .open_tree(&format!("{}:insert_queue", F::TABLE_NAME)) + .expect("Unable to open insert queue DB tree"); + let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) - .expect("Unable to open DB tree"); + .expect("Unable to open GC DB tree"); let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let metrics = TableMetrics::new( @@ -74,6 +82,8 @@ where merkle_tree, merkle_todo, merkle_todo_notify: Notify::new(), + insert_queue, + insert_queue_notify: Notify::new(), gc_todo, metrics, }) @@ -306,6 +316,32 @@ where Ok(removed) } + // ---- Insert queue functions ---- + + pub(crate) fn queue_insert( + &self, + tx: &mut db::Transaction, + ins: &F::E, + ) -> db::TxResult<(), Error> { + let tree_key = self.tree_key(ins.partition_key(), ins.sort_key()); + + let new_entry = match tx.get(&self.insert_queue, &tree_key)? { + Some(old_v) => { + let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; + entry.merge(ins); + rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)? + } + None => rmp_to_vec_all_named(ins) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?, + }; + tx.insert(&self.insert_queue, &tree_key, new_entry)?; + + Ok(()) + } + // ---- Utility functions ---- pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { diff --git a/src/table/lib.rs b/src/table/lib.rs index b0153e9a..fdf114a6 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -4,16 +4,18 @@ #[macro_use] extern crate tracing; -mod metrics; pub mod schema; pub mod util; pub mod data; +pub mod replication; +pub mod table; + mod gc; mod merkle; -pub mod replication; +mod metrics; +mod queue; mod sync; -pub mod table; pub use schema::*; pub use table::*; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index e977bfb5..bcf9f9d7 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -3,6 +3,7 @@ use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use tokio::select; use tokio::sync::watch; use garage_db as db; @@ -343,7 +344,10 @@ where if *must_exit.borrow() { return WorkerState::Done; } - tokio::time::sleep(Duration::from_secs(10)).await; + select! { + _ = tokio::time::sleep(Duration::from_secs(60)) => (), + _ = self.0.data.merkle_todo_notify.notified() => (), + } WorkerState::Busy } } diff --git a/src/table/queue.rs b/src/table/queue.rs new file mode 100644 index 00000000..3671ea7d --- /dev/null +++ b/src/table/queue.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::select; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::error::Error; + +use crate::replication::*; +use crate::schema::*; +use crate::table::*; + +const BATCH_SIZE: usize = 100; + +pub(crate) struct InsertQueueWorker(pub(crate) Arc>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; + +#[async_trait] +impl Worker for InsertQueueWorker +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} queue", F::TABLE_NAME) + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64), + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + let mut kv_pairs = vec![]; + let mut values = vec![]; + + for entry_kv in self.0.data.insert_queue.iter()? { + let (k, v) = entry_kv?; + + values.push(self.0.data.decode_entry(&v)?); + kv_pairs.push((k, v)); + + if kv_pairs.len() > BATCH_SIZE { + break; + } + } + + if kv_pairs.is_empty() { + return Ok(WorkerState::Idle); + } + + self.0.insert_many(values).await?; + + self.0.data.insert_queue.db().transaction(|mut tx| { + for (k, v) in kv_pairs.iter() { + if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? { + if &v2 == v { + tx.remove(&self.0.data.insert_queue, k)?; + } + } + } + Ok(()) + })?; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + select! { + _ = tokio::time::sleep(Duration::from_secs(600)) => (), + _ = self.0.data.insert_queue_notify.notified() => (), + } + WorkerState::Busy + } +} diff --git a/src/table/table.rs b/src/table/table.rs index 8a66c420..c8e0576e 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -25,6 +25,7 @@ use crate::crdt::Crdt; use crate::data::*; use crate::gc::*; use crate::merkle::*; +use crate::queue::InsertQueueWorker; use crate::replication::*; use crate::schema::*; use crate::sync::*; @@ -88,6 +89,11 @@ where endpoint, }); + table + .system + .background + .spawn_worker(InsertQueueWorker(table.clone())); + table.endpoint.set_handler(table.clone()); table @@ -128,6 +134,11 @@ where Ok(()) } + /// Insert item locally + pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> { + self.data.queue_insert(tx, e) + } + pub async fn insert_many(&self, entries: I) -> Result<(), Error> where I: IntoIterator + Send + Sync, -- cgit v1.2.3 From 2183518edccadef47cdeaf6476033b52d8832d6e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:28:07 +0100 Subject: Spawn all background workers in a separate step --- src/table/Cargo.toml | 1 + src/table/gc.rs | 13 ++++++++----- src/table/merkle.rs | 12 ++++++------ src/table/sync.rs | 43 +++++++++++++++++++++++++------------------ src/table/table.rs | 22 ++++++++++++++-------- 5 files changed, 54 insertions(+), 37 deletions(-) (limited to 'src/table') diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 38c6b41c..a8d9b5e6 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -21,6 +21,7 @@ garage_util = { version = "0.8.0", path = "../util" } opentelemetry = "0.17" async-trait = "0.1.7" +arc-swap = "1.0" bytes = "1.0" hex = "0.4" hexdump = "0.1" diff --git a/src/table/gc.rs b/src/table/gc.rs index cfdc9d2d..c83c2050 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -54,24 +54,27 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(system: Arc, data: Arc>) -> Arc { + pub(crate) fn new(system: Arc, data: Arc>) -> Arc { let endpoint = system .netapp .endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME)); let gc = Arc::new(Self { - system: system.clone(), + system, data, endpoint, }); - gc.endpoint.set_handler(gc.clone()); - system.background.spawn_worker(GcWorker::new(gc.clone())); - gc } + pub(crate) fn spawn_workers(self: &Arc) { + self.system + .background + .spawn_worker(GcWorker::new(self.clone())); + } + async fn gc_loop_iter(&self) -> Result, Error> { let now = now_msec(); diff --git a/src/table/merkle.rs b/src/table/merkle.rs index bcf9f9d7..0fe7d2cb 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -70,17 +70,17 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(background: &BackgroundRunner, data: Arc>) -> Arc { + pub(crate) fn new(data: Arc>) -> Arc { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); - let ret = Arc::new(Self { + Arc::new(Self { data, empty_node_hash, - }); - - background.spawn_worker(MerkleWorker(ret.clone())); + }) + } - ret + pub(crate) fn spawn_workers(self: &Arc, background: &BackgroundRunner) { + background.spawn_worker(MerkleWorker(self.clone())); } fn updater_loop_iter(&self) -> Result { diff --git a/src/table/sync.rs b/src/table/sync.rs index af7aa640..7008a383 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures_util::stream::*; use opentelemetry::KeyValue; @@ -13,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::error::Error; +use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; use garage_rpc::system::System; @@ -32,7 +33,7 @@ pub struct TableSyncer data: Arc>, merkle: Arc>, - add_full_sync_tx: mpsc::UnboundedSender<()>, + add_full_sync_tx: ArcSwapOption>, endpoint: Arc>, } @@ -65,7 +66,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch( + pub(crate) fn new( system: Arc, data: Arc>, merkle: Arc>, @@ -74,34 +75,40 @@ where .netapp .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); - let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); - let syncer = Arc::new(Self { - system: system.clone(), + system, data, merkle, - add_full_sync_tx, + add_full_sync_tx: ArcSwapOption::new(None), endpoint, }); - syncer.endpoint.set_handler(syncer.clone()); - system.background.spawn_worker(SyncWorker { - syncer: syncer.clone(), - ring_recv: system.ring.clone(), - ring: system.ring.borrow().clone(), + syncer + } + + pub(crate) fn spawn_workers(self: &Arc) { + let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); + self.add_full_sync_tx + .store(Some(Arc::new(add_full_sync_tx))); + + self.system.background.spawn_worker(SyncWorker { + syncer: self.clone(), + ring_recv: self.system.ring.clone(), + ring: self.system.ring.borrow().clone(), add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), }); - - syncer } - pub fn add_full_sync(&self) { - if self.add_full_sync_tx.send(()).is_err() { - error!("({}) Could not add full sync", F::TABLE_NAME); - } + pub fn add_full_sync(&self) -> Result<(), Error> { + let tx = self.add_full_sync_tx.load(); + let tx = tx + .as_ref() + .ok_or_message("table sync worker is not running")?; + tx.send(()).ok_or_message("send error")?; + Ok(()) } // ---- diff --git a/src/table/table.rs b/src/table/table.rs index c8e0576e..cb200ef2 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -36,6 +36,7 @@ pub struct Table { pub data: Arc>, pub merkle_updater: Arc>, pub syncer: Arc>, + gc: Arc>, endpoint: Arc, Self>>, } @@ -76,29 +77,34 @@ where let data = TableData::new(system.clone(), instance, replication, db); - let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); + let merkle_updater = MerkleUpdater::new(data.clone()); - let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone()); - TableGc::launch(system.clone(), data.clone()); + let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone()); + let gc = TableGc::new(system.clone(), data.clone()); let table = Arc::new(Self { system, data, merkle_updater, + gc, syncer, endpoint, }); - table - .system - .background - .spawn_worker(InsertQueueWorker(table.clone())); - table.endpoint.set_handler(table.clone()); table } + pub fn spawn_workers(self: &Arc) { + self.merkle_updater.spawn_workers(&self.system.background); + self.syncer.spawn_workers(); + self.gc.spawn_workers(); + self.system + .background + .spawn_worker(InsertQueueWorker(self.clone())); + } + pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} insert", F::TABLE_NAME)); -- cgit v1.2.3 From d56c472712df7c064387429a5af73d3bc0eb438d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:51:16 +0100 Subject: Refactor background runner and get rid of job worker --- src/table/gc.rs | 6 ++---- src/table/sync.rs | 4 ++-- src/table/table.rs | 19 ++++++++----------- 3 files changed, 12 insertions(+), 17 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index c83c2050..1fc16364 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -69,10 +69,8 @@ where gc } - pub(crate) fn spawn_workers(self: &Arc) { - self.system - .background - .spawn_worker(GcWorker::new(self.clone())); + pub(crate) fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { + bg.spawn_worker(GcWorker::new(self.clone())); } async fn gc_loop_iter(&self) -> Result, Error> { diff --git a/src/table/sync.rs b/src/table/sync.rs index 7008a383..1e7618ca 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -87,12 +87,12 @@ where syncer } - pub(crate) fn spawn_workers(self: &Arc) { + pub(crate) fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); self.add_full_sync_tx .store(Some(Arc::new(add_full_sync_tx))); - self.system.background.spawn_worker(SyncWorker { + bg.spawn_worker(SyncWorker { syncer: self.clone(), ring_recv: self.system.ring.clone(), ring: self.system.ring.borrow().clone(), diff --git a/src/table/table.rs b/src/table/table.rs index cb200ef2..4d93102e 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -14,6 +14,7 @@ use opentelemetry::{ use garage_db as db; +use garage_util::background::{self, BackgroundRunner}; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -96,13 +97,11 @@ where table } - pub fn spawn_workers(self: &Arc) { - self.merkle_updater.spawn_workers(&self.system.background); - self.syncer.spawn_workers(); - self.gc.spawn_workers(); - self.system - .background - .spawn_worker(InsertQueueWorker(self.clone())); + pub fn spawn_workers(self: &Arc, bg: &BackgroundRunner) { + self.merkle_updater.spawn_workers(bg); + self.syncer.spawn_workers(bg); + self.gc.spawn_workers(bg); + bg.spawn_worker(InsertQueueWorker(self.clone())); } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { @@ -276,9 +275,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.system - .background - .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); + background::spawn(async move { self2.repair_on_read(&who[..], ent2).await }); } } @@ -375,7 +372,7 @@ where .into_iter() .map(|k| ret.get(&k).unwrap().clone()) .collect::>(); - self.system.background.spawn_cancellable(async move { + background::spawn(async move { for v in to_repair { self2.repair_on_read(&who[..], v).await?; } -- cgit v1.2.3 From d4af27f920ce48a60f2073e98b17bdf963241686 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 13:54:21 +0100 Subject: Add missing notify --- src/table/data.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index e3b8c93f..bf2bf88b 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -338,6 +338,7 @@ where .map_err(db::TxError::Abort)?, }; tx.insert(&self.insert_queue, &tree_key, new_entry)?; + self.insert_queue_notify.notify_one(); Ok(()) } -- cgit v1.2.3 From dfc131850a09e7ceacfa98315adbef156e07e9ca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 15:25:29 +0100 Subject: Simplified and more aggressive worker exit logic --- src/table/gc.rs | 5 +---- src/table/merkle.rs | 5 +---- src/table/queue.rs | 5 +---- src/table/sync.rs | 5 +---- 4 files changed, 4 insertions(+), 16 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index 1fc16364..90594fba 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -348,10 +348,7 @@ where } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { tokio::time::sleep(self.wait_delay).await; WorkerState::Busy } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 0fe7d2cb..736354fa 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -340,10 +340,7 @@ where .unwrap() } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { _ = tokio::time::sleep(Duration::from_secs(60)) => (), _ = self.0.data.merkle_todo_notify.notified() => (), diff --git a/src/table/queue.rs b/src/table/queue.rs index 3671ea7d..860f20d3 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -71,10 +71,7 @@ where Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { _ = tokio::time::sleep(Duration::from_secs(600)) => (), _ = self.0.data.insert_queue_notify.notified() => (), diff --git a/src/table/sync.rs b/src/table/sync.rs index 1e7618ca..d6d272ab 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -593,10 +593,7 @@ impl Worker for SyncWor } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { s = self.add_full_sync_rx.recv() => { if let Some(()) = s { -- cgit v1.2.3 From 510b62010871e9133a98f625b85f07a7e50f6f23 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 16:08:05 +0100 Subject: Get rid of background::spawn --- src/table/table.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'src/table') diff --git a/src/table/table.rs b/src/table/table.rs index 4d93102e..bbcd5971 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -14,7 +14,7 @@ use opentelemetry::{ use garage_db as db; -use garage_util::background::{self, BackgroundRunner}; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -275,7 +275,11 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - background::spawn(async move { self2.repair_on_read(&who[..], ent2).await }); + tokio::spawn(async move { + if let Err(e) = self2.repair_on_read(&who[..], ent2).await { + warn!("Error doing repair on read: {}", e); + } + }); } } @@ -372,11 +376,12 @@ where .into_iter() .map(|k| ret.get(&k).unwrap().clone()) .collect::>(); - background::spawn(async move { + tokio::spawn(async move { for v in to_repair { - self2.repair_on_read(&who[..], v).await?; + if let Err(e) = self2.repair_on_read(&who[..], v).await { + warn!("Error doing repair on read: {}", e); + } } - Ok(()) }); } -- cgit v1.2.3 From 13c86621267272af5bc89ec037d097739dae9aaf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 16:16:55 +0100 Subject: factorize --- src/table/data.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index bf2bf88b..40856b02 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -330,13 +330,12 @@ where let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; entry.merge(ins); rmp_to_vec_all_named(&entry) - .map_err(Error::RmpEncode) - .map_err(db::TxError::Abort)? } - None => rmp_to_vec_all_named(ins) - .map_err(Error::RmpEncode) - .map_err(db::TxError::Abort)?, + None => rmp_to_vec_all_named(ins), }; + let new_entry = new_entry + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?; tx.insert(&self.insert_queue, &tree_key, new_entry)?; self.insert_queue_notify.notify_one(); -- cgit v1.2.3 From 67755695254fb20bcb535d3484d692babb853d33 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 2 Jan 2023 14:15:33 +0100 Subject: Bump everything to v0.8.1 --- src/table/Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'src/table') diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 38c6b41c..861e3843 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_table" -version = "0.8.0" +version = "0.8.1" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -14,9 +14,9 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_db = { version = "0.8.0", path = "../db" } -garage_rpc = { version = "0.8.0", path = "../rpc" } -garage_util = { version = "0.8.0", path = "../util" } +garage_db = { version = "0.8.1", path = "../db" } +garage_rpc = { version = "0.8.1", path = "../rpc" } +garage_util = { version = "0.8.1", path = "../util" } opentelemetry = "0.17" -- cgit v1.2.3 From cdb2a591e9d393d24ab5c49bb905b0589b193299 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 14:44:47 +0100 Subject: Refactor how things are migrated --- src/table/data.rs | 30 ++++++++++++++++-------------- src/table/schema.rs | 23 ++++++++++++----------- src/table/sync.rs | 6 +++--- src/table/table.rs | 7 ++++--- 4 files changed, 35 insertions(+), 31 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index 40856b02..f93ed00d 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -10,6 +10,7 @@ use garage_db::counted_tree_hack::CountedTree; use garage_util::data::*; use garage_util::error::*; +use garage_util::migrate::Migrate; use garage_rpc::system::System; @@ -219,7 +220,8 @@ where // data format, the messagepack encoding changed. In this case, // we also have to write the migrated value in the table and update // the associated Merkle tree entry. - let new_bytes = rmp_to_vec_all_named(&new_entry) + let new_bytes = new_entry + .encode() .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; let changed = Some(&new_bytes[..]) != old_bytes.as_deref(); @@ -329,9 +331,9 @@ where Some(old_v) => { let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; entry.merge(ins); - rmp_to_vec_all_named(&entry) + entry.encode() } - None => rmp_to_vec_all_named(ins), + None => ins.encode(), }; let new_entry = new_entry .map_err(Error::RmpEncode) @@ -351,18 +353,18 @@ where } pub fn decode_entry(&self, bytes: &[u8]) -> Result { - match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { - Ok(x) => Ok(x), - Err(e) => match F::try_migrate(bytes) { - Some(x) => Ok(x), - None => { - warn!("Unable to decode entry of {}: {}", F::TABLE_NAME, e); - for line in hexdump::hexdump_iter(bytes) { - debug!("{}", line); - } - Err(e.into()) + match F::E::decode(bytes) { + Some(x) => Ok(x), + None => { + error!("Unable to decode entry of {}", F::TABLE_NAME); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); } - }, + Err(Error::Message(format!( + "Unable to decode entry of {}", + F::TABLE_NAME + ))) + } } } diff --git a/src/table/schema.rs b/src/table/schema.rs index f37e98d8..6538a32f 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use garage_db as db; use garage_util::data::*; +use garage_util::migrate::Migrate; use crate::crdt::Crdt; @@ -46,7 +47,7 @@ impl SortKey for FixedBytes32 { /// Trait for an entry in a table. It must be sortable and partitionnable. pub trait Entry: - Crdt + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static { /// Get the key used to partition fn partition_key(&self) -> &P; @@ -65,23 +66,23 @@ pub trait TableSchema: Send + Sync + 'static { const TABLE_NAME: &'static str; /// The partition key used in that table - type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type P: PartitionKey + + Clone + + PartialEq + + Serialize + + for<'de> Deserialize<'de> + + Send + + Sync + + 'static; /// The sort key used int that table - type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; /// They type for an entry in that table type E: Entry; /// The type for a filter that can be applied to select entries /// (e.g. filter out deleted entries) - type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; - - // Action to take if not able to decode current version: - // try loading from an older version - /// Try migrating an entry from an older version - fn try_migrate(_bytes: &[u8]) -> Option { - None - } + type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; /// Actions triggered by data changing in a table. If such actions /// include updates to the local database that should be applied diff --git a/src/table/sync.rs b/src/table/sync.rs index d6d272ab..abc034f8 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -302,7 +302,7 @@ where ); return Ok(()); } - let root_ck_hash = hash_of::(&root_ck)?; + let root_ck_hash = hash_of_merkle_node(&root_ck)?; // Check if they have the same root checksum // If so, do nothing. @@ -468,7 +468,7 @@ where match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; - let hash = hash_of::(&root_ck)?; + let hash = hash_of_merkle_node(&root_ck)?; Ok(SyncRpc::RootCkDifferent(hash != *h)) } SyncRpc::GetNode(k) => { @@ -622,7 +622,7 @@ impl Worker for SyncWor // ---- UTIL ---- -fn hash_of(x: &T) -> Result { +fn hash_of_merkle_node(x: &MerkleNode) -> Result { Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) } diff --git a/src/table/table.rs b/src/table/table.rs index bbcd5971..7f158314 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -18,6 +18,7 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; +use garage_util::migrate::Migrate; use garage_rpc::system::System; use garage_rpc::*; @@ -122,7 +123,7 @@ where let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); - let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); + let e_enc = Arc::new(ByteBuf::from(e.encode()?)); let rpc = TableRpc::::Update(vec![e_enc]); self.system @@ -173,7 +174,7 @@ where let entry = entry.borrow(); let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); - let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); + let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); for node in who { call_list.entry(node).or_default().push(e_enc.clone()); } @@ -412,7 +413,7 @@ where // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { - let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); + let what_enc = Arc::new(ByteBuf::from(what.encode()?)); self.system .rpc .try_call_many( -- cgit v1.2.3 From 426d8784dac0e39879af52d980887d3692fc907c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:08:37 +0100 Subject: cleanup --- src/table/data.rs | 6 +----- src/table/gc.rs | 32 ++++++-------------------------- src/table/merkle.rs | 17 +++-------------- src/table/queue.rs | 10 +++------- src/table/replication/parameters.rs | 2 +- src/table/schema.rs | 17 ++++++----------- src/table/sync.rs | 20 ++++++-------------- src/table/table.rs | 14 +++----------- 8 files changed, 29 insertions(+), 89 deletions(-) (limited to 'src/table') diff --git a/src/table/data.rs b/src/table/data.rs index f93ed00d..5c792f1f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -41,11 +41,7 @@ pub struct TableData { pub(crate) metrics: TableMetrics, } -impl TableData -where - F: TableSchema, - R: TableReplication, -{ +impl TableData { pub fn new(system: Arc, instance: F, replication: R, db: &db::Db) -> Arc { let store = db .open_tree(&format!("{}:table", F::TABLE_NAME)) diff --git a/src/table/gc.rs b/src/table/gc.rs index 90594fba..5b9124a7 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024; // and the moment the garbage collection actually happens) const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600); -pub(crate) struct TableGc { +pub(crate) struct TableGc { system: Arc, data: Arc>, @@ -49,11 +49,7 @@ impl Rpc for GcRpc { type Response = Result; } -impl TableGc -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl TableGc { pub(crate) fn new(system: Arc, data: Arc>) -> Arc { let endpoint = system .netapp @@ -277,11 +273,7 @@ where } #[async_trait] -impl EndpointHandler for TableGc -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl EndpointHandler for TableGc { async fn handle(self: &Arc, message: &GcRpc, _from: NodeID) -> Result { match message { GcRpc::Update(items) => { @@ -299,20 +291,12 @@ where } } -struct GcWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +struct GcWorker { gc: Arc>, wait_delay: Duration, } -impl GcWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl GcWorker { fn new(gc: Arc>) -> Self { Self { gc, @@ -322,11 +306,7 @@ where } #[async_trait] -impl Worker for GcWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Worker for GcWorker { fn name(&self) -> String { format!("{} GC", F::TABLE_NAME) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 736354fa..2d593e6d 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -65,11 +65,7 @@ pub enum MerkleNode { Leaf(Vec, Hash), } -impl MerkleUpdater -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl MerkleUpdater { pub(crate) fn new(data: Arc>) -> Arc { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); @@ -303,17 +299,10 @@ where } } -struct MerkleWorker(Arc>) -where - F: TableSchema + 'static, - R: TableReplication + 'static; +struct MerkleWorker(Arc>); #[async_trait] -impl Worker for MerkleWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Worker for MerkleWorker { fn name(&self) -> String { format!("{} Merkle", F::TABLE_NAME) } diff --git a/src/table/queue.rs b/src/table/queue.rs index 860f20d3..0857209b 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -16,15 +16,11 @@ const BATCH_SIZE: usize = 100; pub(crate) struct InsertQueueWorker(pub(crate) Arc>) where - F: TableSchema + 'static, - R: TableReplication + 'static; + F: TableSchema, + R: TableReplication; #[async_trait] -impl Worker for InsertQueueWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Worker for InsertQueueWorker { fn name(&self) -> String { format!("{} queue", F::TABLE_NAME) } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 3740d947..f00815a2 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -2,7 +2,7 @@ use garage_rpc::ring::*; use garage_util::data::*; /// Trait to describe how a table shall be replicated -pub trait TableReplication: Send + Sync { +pub trait TableReplication: Send + Sync + 'static { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods diff --git a/src/table/schema.rs b/src/table/schema.rs index 6538a32f..5cbf6c95 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -7,7 +7,9 @@ use garage_util::migrate::Migrate; use crate::crdt::Crdt; /// Trait for field used to partition data -pub trait PartitionKey { +pub trait PartitionKey: + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static +{ /// Get the key used to partition fn hash(&self) -> Hash; } @@ -28,7 +30,7 @@ impl PartitionKey for FixedBytes32 { } /// Trait for field used to sort data -pub trait SortKey { +pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { /// Get the key used to sort fn sort_key(&self) -> &[u8]; } @@ -66,16 +68,9 @@ pub trait TableSchema: Send + Sync + 'static { const TABLE_NAME: &'static str; /// The partition key used in that table - type P: PartitionKey - + Clone - + PartialEq - + Serialize - + for<'de> Deserialize<'de> - + Send - + Sync - + 'static; + type P: PartitionKey; /// The sort key used int that table - type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; + type S: SortKey; /// They type for an entry in that table type E: Entry; diff --git a/src/table/sync.rs b/src/table/sync.rs index abc034f8..29e7aa89 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -28,7 +28,7 @@ use crate::*; // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); -pub struct TableSyncer { +pub struct TableSyncer { system: Arc, data: Arc>, merkle: Arc>, @@ -61,11 +61,7 @@ struct TodoPartition { retain: bool, } -impl TableSyncer -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl TableSyncer { pub(crate) fn new( system: Arc, data: Arc>, @@ -459,11 +455,7 @@ where // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== #[async_trait] -impl EndpointHandler for TableSyncer -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl EndpointHandler for TableSyncer { async fn handle(self: &Arc, message: &SyncRpc, from: NodeID) -> Result { match message { SyncRpc::RootCkHash(range, h) => { @@ -497,7 +489,7 @@ where // -------- Sync Worker --------- -struct SyncWorker { +struct SyncWorker { syncer: Arc>, ring_recv: watch::Receiver>, ring: Arc, @@ -506,7 +498,7 @@ struct SyncWorker { next_full_sync: Instant, } -impl SyncWorker { +impl SyncWorker { fn add_full_sync(&mut self) { let system = &self.syncer.system; let data = &self.syncer.data; @@ -572,7 +564,7 @@ impl SyncWorker { } #[async_trait] -impl Worker for SyncWorker { +impl Worker for SyncWorker { fn name(&self) -> String { format!("{} sync", F::TABLE_NAME) } diff --git a/src/table/table.rs b/src/table/table.rs index 7f158314..7ad79677 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -33,7 +33,7 @@ use crate::schema::*; use crate::sync::*; use crate::util::*; -pub struct Table { +pub struct Table { pub system: Arc, pub data: Arc>, pub merkle_updater: Arc>, @@ -65,11 +65,7 @@ impl Rpc for TableRpc { type Response = Result, Error>; } -impl Table -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Table { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== pub fn new(instance: F, replication: R, system: Arc, db: &db::Db) -> Arc { @@ -428,11 +424,7 @@ where } #[async_trait] -impl EndpointHandler> for Table -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl EndpointHandler> for Table { async fn handle( self: &Arc, msg: &TableRpc, -- cgit v1.2.3 From 8d5505514f950dc1ca1249a3385c9913b5b5e8e0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:27:36 +0100 Subject: Make it explicit when using nonversioned encoding --- src/table/Cargo.toml | 1 - src/table/merkle.rs | 7 ++++--- src/table/sync.rs | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) (limited to 'src/table') diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index e1a74553..3911c945 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -28,7 +28,6 @@ hexdump = "0.1" tracing = "0.1.30" rand = "0.8" -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 2d593e6d..e86d0251 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -10,6 +10,7 @@ use garage_db as db; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::{nonversioned_decode, nonversioned_encode}; use garage_util::error::Error; use garage_rpc::ring::*; @@ -67,7 +68,7 @@ pub enum MerkleNode { impl MerkleUpdater { pub(crate) fn new(data: Arc>) -> Arc { - let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); + let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]); Arc::new(Self { data, @@ -273,7 +274,7 @@ impl MerkleUpdater { tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; + let vby = nonversioned_encode(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) @@ -364,7 +365,7 @@ impl MerkleNode { fn decode_opt(ent: &Option) -> Result { match ent { None => Ok(MerkleNode::Empty), - Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), + Some(v) => Ok(nonversioned_decode::(&v[..])?), } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 29e7aa89..c66c863f 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::nonversioned_encode; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; @@ -615,7 +616,7 @@ impl Worker for SyncWorker { // ---- UTIL ---- fn hash_of_merkle_node(x: &MerkleNode) -> Result { - Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) + Ok(blake2sum(&nonversioned_encode(x)?[..])) } fn join_ordered<'a, K: Ord + Eq, V1, V2>( -- cgit v1.2.3 From a54b67740d08e3fabeb1652a1bed14d78fea4b74 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:29:29 +0100 Subject: move debug_serialize to garage_util::encode --- src/table/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index c66c863f..1f23d3a1 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::encode::nonversioned_encode; +use garage_util::encode::{nonversioned_encode, debug_serialize}; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; -- cgit v1.2.3 From d6d571d51216d2077a41216e067b32736fbd745a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:30:21 +0100 Subject: cargo fmt --- src/table/sync.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/table') diff --git a/src/table/sync.rs b/src/table/sync.rs index 1f23d3a1..92a353c6 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::encode::{nonversioned_encode, debug_serialize}; +use garage_util::encode::{debug_serialize, nonversioned_encode}; use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; -- cgit v1.2.3