diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/data.rs | 38 | ||||
-rw-r--r-- | src/table/lib.rs | 8 | ||||
-rw-r--r-- | src/table/merkle.rs | 6 | ||||
-rw-r--r-- | src/table/queue.rs | 84 | ||||
-rw-r--r-- | src/table/table.rs | 11 |
5 files changed, 142 insertions, 5 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index cae18999..e3b8c93f 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -31,6 +31,10 @@ pub struct TableData<F: TableSchema, R: TableReplication> { pub(crate) merkle_tree: db::Tree, pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo_notify: Notify, + + pub(crate) insert_queue: db::Tree, + pub(crate) insert_queue_notify: Notify, + pub(crate) gc_todo: CountedTree, pub(crate) metrics: TableMetrics, @@ -53,9 +57,13 @@ where .open_tree(&format!("{}:merkle_todo", F::TABLE_NAME)) .expect("Unable to open DB Merkle TODO tree"); + let insert_queue = db + .open_tree(&format!("{}:insert_queue", F::TABLE_NAME)) + .expect("Unable to open insert queue DB tree"); + let gc_todo = db .open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME)) - .expect("Unable to open DB tree"); + .expect("Unable to open GC DB tree"); let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2"); let metrics = TableMetrics::new( @@ -74,6 +82,8 @@ where merkle_tree, merkle_todo, merkle_todo_notify: Notify::new(), + insert_queue, + insert_queue_notify: Notify::new(), gc_todo, metrics, }) @@ -306,6 +316,32 @@ where Ok(removed) } + // ---- Insert queue functions ---- + + pub(crate) fn queue_insert( + &self, + tx: &mut db::Transaction, + ins: &F::E, + ) -> db::TxResult<(), Error> { + let tree_key = self.tree_key(ins.partition_key(), ins.sort_key()); + + let new_entry = match tx.get(&self.insert_queue, &tree_key)? { + Some(old_v) => { + let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; + entry.merge(ins); + rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)? + } + None => rmp_to_vec_all_named(ins) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?, + }; + tx.insert(&self.insert_queue, &tree_key, new_entry)?; + + Ok(()) + } + // ---- Utility functions ---- pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { diff --git a/src/table/lib.rs b/src/table/lib.rs index b0153e9a..fdf114a6 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -4,16 +4,18 @@ #[macro_use] extern crate tracing; -mod metrics; pub mod schema; pub mod util; pub mod data; +pub mod replication; +pub mod table; + mod gc; mod merkle; -pub mod replication; +mod metrics; +mod queue; mod sync; -pub mod table; pub use schema::*; pub use table::*; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index e977bfb5..bcf9f9d7 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -3,6 +3,7 @@ use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use tokio::select; use tokio::sync::watch; use garage_db as db; @@ -343,7 +344,10 @@ where if *must_exit.borrow() { return WorkerState::Done; } - tokio::time::sleep(Duration::from_secs(10)).await; + select! { + _ = tokio::time::sleep(Duration::from_secs(60)) => (), + _ = self.0.data.merkle_todo_notify.notified() => (), + } WorkerState::Busy } } diff --git a/src/table/queue.rs b/src/table/queue.rs new file mode 100644 index 00000000..3671ea7d --- /dev/null +++ b/src/table/queue.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::select; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::error::Error; + +use crate::replication::*; +use crate::schema::*; +use crate::table::*; + +const BATCH_SIZE: usize = 100; + +pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; + +#[async_trait] +impl<F, R> Worker for InsertQueueWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} queue", F::TABLE_NAME) + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64), + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + let mut kv_pairs = vec![]; + let mut values = vec![]; + + for entry_kv in self.0.data.insert_queue.iter()? { + let (k, v) = entry_kv?; + + values.push(self.0.data.decode_entry(&v)?); + kv_pairs.push((k, v)); + + if kv_pairs.len() > BATCH_SIZE { + break; + } + } + + if kv_pairs.is_empty() { + return Ok(WorkerState::Idle); + } + + self.0.insert_many(values).await?; + + self.0.data.insert_queue.db().transaction(|mut tx| { + for (k, v) in kv_pairs.iter() { + if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? { + if &v2 == v { + tx.remove(&self.0.data.insert_queue, k)?; + } + } + } + Ok(()) + })?; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + select! { + _ = tokio::time::sleep(Duration::from_secs(600)) => (), + _ = self.0.data.insert_queue_notify.notified() => (), + } + WorkerState::Busy + } +} diff --git a/src/table/table.rs b/src/table/table.rs index 8a66c420..c8e0576e 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -25,6 +25,7 @@ use crate::crdt::Crdt; use crate::data::*; use crate::gc::*; use crate::merkle::*; +use crate::queue::InsertQueueWorker; use crate::replication::*; use crate::schema::*; use crate::sync::*; @@ -88,6 +89,11 @@ where endpoint, }); + table + .system + .background + .spawn_worker(InsertQueueWorker(table.clone())); + table.endpoint.set_handler(table.clone()); table @@ -128,6 +134,11 @@ where Ok(()) } + /// Insert item locally + pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> { + self.data.queue_insert(tx, e) + } + pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error> where I: IntoIterator<Item = IE> + Send + Sync, |