aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs38
-rw-r--r--src/table/lib.rs8
-rw-r--r--src/table/merkle.rs6
-rw-r--r--src/table/queue.rs84
-rw-r--r--src/table/table.rs11
5 files changed, 142 insertions, 5 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index cae18999..e3b8c93f 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -31,6 +31,10 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
+
+ pub(crate) insert_queue: db::Tree,
+ pub(crate) insert_queue_notify: Notify,
+
pub(crate) gc_todo: CountedTree,
pub(crate) metrics: TableMetrics,
@@ -53,9 +57,13 @@ where
.open_tree(&format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
+ let insert_queue = db
+ .open_tree(&format!("{}:insert_queue", F::TABLE_NAME))
+ .expect("Unable to open insert queue DB tree");
+
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
- .expect("Unable to open DB tree");
+ .expect("Unable to open GC DB tree");
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
let metrics = TableMetrics::new(
@@ -74,6 +82,8 @@ where
merkle_tree,
merkle_todo,
merkle_todo_notify: Notify::new(),
+ insert_queue,
+ insert_queue_notify: Notify::new(),
gc_todo,
metrics,
})
@@ -306,6 +316,32 @@ where
Ok(removed)
}
+ // ---- Insert queue functions ----
+
+ pub(crate) fn queue_insert(
+ &self,
+ tx: &mut db::Transaction,
+ ins: &F::E,
+ ) -> db::TxResult<(), Error> {
+ let tree_key = self.tree_key(ins.partition_key(), ins.sort_key());
+
+ let new_entry = match tx.get(&self.insert_queue, &tree_key)? {
+ Some(old_v) => {
+ let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?;
+ entry.merge(ins);
+ rmp_to_vec_all_named(&entry)
+ .map_err(Error::RmpEncode)
+ .map_err(db::TxError::Abort)?
+ }
+ None => rmp_to_vec_all_named(ins)
+ .map_err(Error::RmpEncode)
+ .map_err(db::TxError::Abort)?,
+ };
+ tx.insert(&self.insert_queue, &tree_key, new_entry)?;
+
+ Ok(())
+ }
+
// ---- Utility functions ----
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
diff --git a/src/table/lib.rs b/src/table/lib.rs
index b0153e9a..fdf114a6 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -4,16 +4,18 @@
#[macro_use]
extern crate tracing;
-mod metrics;
pub mod schema;
pub mod util;
pub mod data;
+pub mod replication;
+pub mod table;
+
mod gc;
mod merkle;
-pub mod replication;
+mod metrics;
+mod queue;
mod sync;
-pub mod table;
pub use schema::*;
pub use table::*;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index e977bfb5..bcf9f9d7 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -3,6 +3,7 @@ use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use tokio::select;
use tokio::sync::watch;
use garage_db as db;
@@ -343,7 +344,10 @@ where
if *must_exit.borrow() {
return WorkerState::Done;
}
- tokio::time::sleep(Duration::from_secs(10)).await;
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(60)) => (),
+ _ = self.0.data.merkle_todo_notify.notified() => (),
+ }
WorkerState::Busy
}
}
diff --git a/src/table/queue.rs b/src/table/queue.rs
new file mode 100644
index 00000000..3671ea7d
--- /dev/null
+++ b/src/table/queue.rs
@@ -0,0 +1,84 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use tokio::select;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::error::Error;
+
+use crate::replication::*;
+use crate::schema::*;
+use crate::table::*;
+
+const BATCH_SIZE: usize = 100;
+
+pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>)
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static;
+
+#[async_trait]
+impl<F, R> Worker for InsertQueueWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("{} queue", F::TABLE_NAME)
+ }
+
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64),
+ ..Default::default()
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ let mut kv_pairs = vec![];
+ let mut values = vec![];
+
+ for entry_kv in self.0.data.insert_queue.iter()? {
+ let (k, v) = entry_kv?;
+
+ values.push(self.0.data.decode_entry(&v)?);
+ kv_pairs.push((k, v));
+
+ if kv_pairs.len() > BATCH_SIZE {
+ break;
+ }
+ }
+
+ if kv_pairs.is_empty() {
+ return Ok(WorkerState::Idle);
+ }
+
+ self.0.insert_many(values).await?;
+
+ self.0.data.insert_queue.db().transaction(|mut tx| {
+ for (k, v) in kv_pairs.iter() {
+ if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
+ if &v2 == v {
+ tx.remove(&self.0.data.insert_queue, k)?;
+ }
+ }
+ }
+ Ok(())
+ })?;
+
+ Ok(WorkerState::Busy)
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ if *must_exit.borrow() {
+ return WorkerState::Done;
+ }
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(600)) => (),
+ _ = self.0.data.insert_queue_notify.notified() => (),
+ }
+ WorkerState::Busy
+ }
+}
diff --git a/src/table/table.rs b/src/table/table.rs
index 8a66c420..c8e0576e 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -25,6 +25,7 @@ use crate::crdt::Crdt;
use crate::data::*;
use crate::gc::*;
use crate::merkle::*;
+use crate::queue::InsertQueueWorker;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
@@ -88,6 +89,11 @@ where
endpoint,
});
+ table
+ .system
+ .background
+ .spawn_worker(InsertQueueWorker(table.clone()));
+
table.endpoint.set_handler(table.clone());
table
@@ -128,6 +134,11 @@ where
Ok(())
}
+ /// Insert item locally
+ pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> {
+ self.data.queue_insert(tx, e)
+ }
+
pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,