diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/Cargo.toml | 1 | ||||
-rw-r--r-- | src/table/data.rs | 78 | ||||
-rw-r--r-- | src/table/gc.rs | 16 | ||||
-rw-r--r-- | src/table/lib.rs | 8 | ||||
-rw-r--r-- | src/table/merkle.rs | 21 | ||||
-rw-r--r-- | src/table/queue.rs | 81 | ||||
-rw-r--r-- | src/table/sync.rs | 48 | ||||
-rw-r--r-- | src/table/table.rs | 37 |
8 files changed, 218 insertions, 72 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 861e3843..e1a74553 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -21,6 +21,7 @@ garage_util = { version = "0.8.1", path = "../util" } opentelemetry = "0.17" async-trait = "0.1.7" +arc-swap = "1.0" bytes = "1.0" hex = "0.4" hexdump = "0.1" diff --git a/src/table/data.rs b/src/table/data.rs index 93da2110..40856b02 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -31,6 +31,10 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub(crate) merkle_tree: db::Tree, pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, + + pub(crate) insert_queue: db::Tree, + pub(crate) insert_queue_notify: Notify, + pub(crate) gc_todo: CountedTree, pub(crate) metrics: TableMetrics, @@ -53,9 +57,13 @@ where .open_tree(&format!("{}:merkle_todo", F::TABLE_NAME)) .expect("Unable to open DB Merkle TODO tree"); + let insert_queue = db + .open_tree(&format!("{}:insert_queue", F::TABLE_NAME)) + .expect("Unable to open insert queue DB tree"); + let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) - .expect("Unable to open DB tree"); + .expect("Unable to open GC DB tree"); let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let metrics = TableMetrics::new( @@ -74,6 +82,8 @@ where merkle_tree, merkle_todo, merkle_todo_notify: Notify::new(), + insert_queue, + insert_queue_notify: Notify::new(), gc_todo, metrics, }) @@ -173,9 +183,8 @@ where pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { let update = self.decode_entry(update_bytes)?; - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - self.update_entry_with(&tree_key[..], |ent| match ent { + self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent { Some(mut ent) => { ent.merge(&update); ent @@ -187,11 +196,14 @@ where pub fn update_entry_with( &self, - tree_key: &[u8], + partition_key: &F::P, + sort_key: &F::S, f: impl Fn(Option<F::E>) -> F::E, ) -> Result<Option<F::E>, Error> { + let tree_key = self.tree_key(partition_key, sort_key); + let changed = self.store.db().transaction(|mut tx| { - let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? { + let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? { Some(old_bytes) => { let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?; let new_entry = f(Some(old_entry.clone())); @@ -200,23 +212,23 @@ where None => (None, None, f(None)), }; - // Scenario 1: the value changed, so of course there is a change - let value_changed = Some(&new_entry) != old_entry.as_ref(); - + // Changed can be true in two scenarios + // Scenario 1: the actual represented value changed, + // so of course the messagepack encoding changed as well // Scenario 2: the value didn't change but due to a migration in the - // data format, the messagepack encoding changed. In this case - // we have to write the migrated value in the table and update - // the associated Merkle tree entry. + // data format, the messagepack encoding changed. In this case, + // we also have to write the migrated value in the table and update + // the associated Merkle tree entry. let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; - let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]); + let changed = Some(&new_bytes[..]) != old_bytes.as_deref(); drop(old_bytes); - if value_changed || encoding_changed { - let new_bytes_hash = blake2sum(&new_bytes[..]); - tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?; - tx.insert(&self.store, tree_key, new_bytes)?; + if changed { + let new_bytes_hash = blake2sum(&new_bytes); + tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?; + tx.insert(&self.store, &tree_key, new_bytes)?; self.instance .updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?; @@ -242,7 +254,7 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?; + GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; } } @@ -258,10 +270,11 @@ where .db() .transaction(|mut tx| match tx.get(&self.store, k)? { Some(cur_v) if cur_v == v => { + let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; + tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; self.instance.updated(&mut tx, Some(&old_entry), None)?; Ok(true) } @@ -285,10 +298,11 @@ where .db() .transaction(|mut tx| match tx.get(&self.store, k)? { Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { + let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; + tx.remove(&self.store, k)?; tx.insert(&self.merkle_todo, k, vec![])?; - let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; self.instance.updated(&mut tx, Some(&old_entry), None)?; Ok(true) } @@ -302,6 +316,32 @@ where Ok(removed) } + // ---- Insert queue functions ---- + + pub(crate) fn queue_insert( + &self, + tx: &mut db::Transaction, + ins: &F::E, + ) -> db::TxResult<(), Error> { + let tree_key = self.tree_key(ins.partition_key(), ins.sort_key()); + + let new_entry = match tx.get(&self.insert_queue, &tree_key)? { + Some(old_v) => { + let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; + entry.merge(ins); + rmp_to_vec_all_named(&entry) + } + None => rmp_to_vec_all_named(ins), + }; + let new_entry = new_entry + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?; + tx.insert(&self.insert_queue, &tree_key, new_entry)?; + self.insert_queue_notify.notify_one(); + + Ok(()) + } + // ---- Utility functions ---- pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { diff --git a/src/table/gc.rs b/src/table/gc.rs index cfdc9d2d..90594fba 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -54,24 +54,25 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { + pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { let endpoint = system .netapp .endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME)); let gc = Arc::new(Self { - system: system.clone(), + system, data, endpoint, }); - gc.endpoint.set_handler(gc.clone()); - system.background.spawn_worker(GcWorker::new(gc.clone())); - gc } + pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { + bg.spawn_worker(GcWorker::new(self.clone())); + } + async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { let now = now_msec(); @@ -347,10 +348,7 @@ where } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { tokio::time::sleep(self.wait_delay).await; WorkerState::Busy } diff --git a/src/table/lib.rs b/src/table/lib.rs index b0153e9a..fdf114a6 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -4,16 +4,18 @@ #[macro_use] extern crate tracing; -mod metrics; pub mod schema; pub mod util; pub mod data; +pub mod replication; +pub mod table; + mod gc; mod merkle; -pub mod replication; +mod metrics; +mod queue; mod sync; -pub mod table; pub use schema::*; pub use table::*; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index e977bfb5..736354fa 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -3,6 +3,7 @@ use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use tokio::select; use tokio::sync::watch; use garage_db as db; @@ -69,17 +70,17 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> { + pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); - let ret = Arc::new(Self { + Arc::new(Self { data, empty_node_hash, - }); - - background.spawn_worker(MerkleWorker(ret.clone())); + }) + } - ret + pub(crate) fn spawn_workers(self: &Arc<Self>, background: &BackgroundRunner) { + background.spawn_worker(MerkleWorker(self.clone())); } fn updater_loop_iter(&self) -> Result<WorkerState, Error> { @@ -339,11 +340,11 @@ where .unwrap() } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; + async fn wait_for_work(&mut self) -> WorkerState { + select! { + _ = tokio::time::sleep(Duration::from_secs(60)) => (), + _ = self.0.data.merkle_todo_notify.notified() => (), } - tokio::time::sleep(Duration::from_secs(10)).await; WorkerState::Busy } } diff --git a/src/table/queue.rs b/src/table/queue.rs new file mode 100644 index 00000000..860f20d3 --- /dev/null +++ b/src/table/queue.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::select; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::error::Error; + +use crate::replication::*; +use crate::schema::*; +use crate::table::*; + +const BATCH_SIZE: usize = 100; + +pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; + +#[async_trait] +impl<F, R> Worker for InsertQueueWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} queue", F::TABLE_NAME) + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64), + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + let mut kv_pairs = vec![]; + let mut values = vec![]; + + for entry_kv in self.0.data.insert_queue.iter()? { + let (k, v) = entry_kv?; + + values.push(self.0.data.decode_entry(&v)?); + kv_pairs.push((k, v)); + + if kv_pairs.len() > BATCH_SIZE { + break; + } + } + + if kv_pairs.is_empty() { + return Ok(WorkerState::Idle); + } + + self.0.insert_many(values).await?; + + self.0.data.insert_queue.db().transaction(|mut tx| { + for (k, v) in kv_pairs.iter() { + if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? { + if &v2 == v { + tx.remove(&self.0.data.insert_queue, k)?; + } + } + } + Ok(()) + })?; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + select! { + _ = tokio::time::sleep(Duration::from_secs(600)) => (), + _ = self.0.data.insert_queue_notify.notified() => (), + } + WorkerState::Busy + } +} diff --git a/src/table/sync.rs b/src/table/sync.rs index af7aa640..d6d272ab 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures_util::stream::*; use opentelemetry::KeyValue; @@ -13,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::error::Error; +use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; use garage_rpc::system::System; @@ -32,7 +33,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, - add_full_sync_tx: mpsc::UnboundedSender<()>, + add_full_sync_tx: ArcSwapOption<mpsc::UnboundedSender<()>>, endpoint: Arc<Endpoint<SyncRpc, Self>>, } @@ -65,7 +66,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch( + pub(crate) fn new( system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, @@ -74,34 +75,40 @@ where .netapp .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); - let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); - let syncer = Arc::new(Self { - system: system.clone(), + system, data, merkle, - add_full_sync_tx, + add_full_sync_tx: ArcSwapOption::new(None), endpoint, }); - syncer.endpoint.set_handler(syncer.clone()); - system.background.spawn_worker(SyncWorker { - syncer: syncer.clone(), - ring_recv: system.ring.clone(), - ring: system.ring.borrow().clone(), + syncer + } + + pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { + let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); + self.add_full_sync_tx + .store(Some(Arc::new(add_full_sync_tx))); + + bg.spawn_worker(SyncWorker { + syncer: self.clone(), + ring_recv: self.system.ring.clone(), + ring: self.system.ring.borrow().clone(), add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), }); - - syncer } - pub fn add_full_sync(&self) { - if self.add_full_sync_tx.send(()).is_err() { - error!("({}) Could not add full sync", F::TABLE_NAME); - } + pub fn add_full_sync(&self) -> Result<(), Error> { + let tx = self.add_full_sync_tx.load(); + let tx = tx + .as_ref() + .ok_or_message("table sync worker is not running")?; + tx.send(()).ok_or_message("send error")?; + Ok(()) } // ---- @@ -586,10 +593,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { s = self.add_full_sync_rx.recv() => { if let Some(()) = s { diff --git a/src/table/table.rs b/src/table/table.rs index 8a66c420..bbcd5971 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -14,6 +14,7 @@ use opentelemetry::{ use garage_db as db; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -25,6 +26,7 @@ use crate::crdt::Crdt; use crate::data::*; use crate::gc::*; use crate::merkle::*; +use crate::queue::InsertQueueWorker; use crate::replication::*; use crate::schema::*; use crate::sync::*; @@ -35,6 +37,7 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { pub data: Arc<TableData<F, R>>, pub merkle_updater: Arc<MerkleUpdater<F, R>>, pub syncer: Arc<TableSyncer<F, R>>, + gc: Arc<TableGc<F, R>>, endpoint: Arc<Endpoint<TableRpc<F>, Self>>, } @@ -75,15 +78,16 @@ where let data = TableData::new(system.clone(), instance, replication, db); - let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); + let merkle_updater = MerkleUpdater::new(data.clone()); - let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone()); - TableGc::launch(system.clone(), data.clone()); + let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone()); + let gc = TableGc::new(system.clone(), data.clone()); let table = Arc::new(Self { system, data, merkle_updater, + gc, syncer, endpoint, }); @@ -93,6 +97,13 @@ where table } + pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { + self.merkle_updater.spawn_workers(bg); + self.syncer.spawn_workers(bg); + self.gc.spawn_workers(bg); + bg.spawn_worker(InsertQueueWorker(self.clone())); + } + pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} insert", F::TABLE_NAME)); @@ -128,6 +139,11 @@ where Ok(()) } + /// Insert item locally + pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> { + self.data.queue_insert(tx, e) + } + pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error> where I: IntoIterator<Item = IE> + Send + Sync, @@ -259,9 +275,11 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.system - .background - .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); + tokio::spawn(async move { + if let Err(e) = self2.repair_on_read(&who[..], ent2).await { + warn!("Error doing repair on read: {}", e); + } + }); } } @@ -358,11 +376,12 @@ where .into_iter() .map(|k| ret.get(&k).unwrap().clone()) .collect::<Vec<_>>(); - self.system.background.spawn_cancellable(async move { + tokio::spawn(async move { for v in to_repair { - self2.repair_on_read(&who[..], v).await?; + if let Err(e) = self2.repair_on_read(&who[..], v).await { + warn!("Error doing repair on read: {}", e); + } } - Ok(()) }); } |