aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs67
-rw-r--r--src/table/gc.rs212
-rw-r--r--src/table/lib.rs1
-rw-r--r--src/table/merkle.rs5
-rw-r--r--src/table/schema.rs2
-rw-r--r--src/table/sync.rs18
-rw-r--r--src/table/table.rs8
7 files changed, 279 insertions, 34 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index 5e7314d2..91607f7a 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -1,3 +1,4 @@
+use core::borrow::Borrow;
use std::sync::Arc;
use log::warn;
@@ -17,6 +18,7 @@ pub struct TableData<F: TableSchema> {
pub instance: F,
pub store: sled::Tree,
+ pub gc_todo: sled::Tree,
pub merkle_updater: Arc<MerkleUpdater>,
}
@@ -41,6 +43,10 @@ where
.open_tree(&format!("{}:merkle_tree", name))
.expect("Unable to open DB Merkle tree tree");
+ let gc_todo = db
+ .open_tree(&format!("{}:gc_todo", name))
+ .expect("Unable to open DB tree");
+
let merkle_updater = MerkleUpdater::launch(
name.clone(),
background,
@@ -52,6 +58,7 @@ where
name,
instance,
store,
+ gc_todo,
merkle_updater,
})
}
@@ -103,10 +110,17 @@ where
}
// Mutation functions
-
- pub(crate) fn update_many(&self, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
+ // When changing this code, take care of propagating modifications correctly:
+ // - When an entry is modified or deleted, call the updated() function
+ // on the table instance
+ // - When an entry is modified or deleted, add it to the merkle updater's todo list.
+ // This has to be done atomically with the modification for the merkle updater
+ // to maintain consistency. The merkle updater must then be notified with todo_notify.
+ // - When an entry is updated to be a tombstone, add it to the gc_todo tree
+
+ pub(crate) fn update_many<T: Borrow<ByteBuf>>(&self, entries: &[T]) -> Result<(), Error> {
for update_bytes in entries.iter() {
- self.update_entry(update_bytes.as_slice())?;
+ self.update_entry(update_bytes.borrow().as_slice())?;
}
Ok(())
}
@@ -115,8 +129,8 @@ where
let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
- let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| {
- let (old_entry, new_entry) = match db.get(&tree_key)? {
+ let changed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
+ let (old_entry, new_entry) = match store.get(&tree_key)? {
Some(prev_bytes) => {
let old_entry = self
.decode_entry(&prev_bytes)
@@ -132,27 +146,32 @@ where
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RMPEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?;
- db.insert(tree_key.clone(), new_bytes)?;
- Ok(Some((old_entry, new_entry)))
+ let new_bytes_hash = blake2sum(&new_bytes[..]);
+ mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
+ store.insert(tree_key.clone(), new_bytes)?;
+ Ok(Some((old_entry, new_entry, new_bytes_hash)))
} else {
Ok(None)
}
})?;
- if let Some((old_entry, new_entry)) = changed {
+ if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
+ let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry));
self.merkle_updater.todo_notify.notify();
+ if is_tombstone {
+ self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
+ }
}
Ok(())
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| {
- if let Some(cur_v) = txn.get(k)? {
+ let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
if cur_v == v {
- txn.remove(k)?;
+ store.remove(k)?;
mkl_todo.insert(k, vec![])?;
return Ok(true);
}
@@ -168,6 +187,30 @@ where
Ok(removed)
}
+ pub(crate) fn delete_if_equal_hash(self: &Arc<Self>, k: &[u8], vhash: Hash) -> Result<bool, Error> {
+ let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if blake2sum(&cur_v[..]) == vhash {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(Some(cur_v));
+ }
+ }
+ Ok(None)
+ })?;
+
+ if let Some(old_v) = removed {
+ let old_entry = self.decode_entry(&old_v[..])?;
+ self.instance.updated(Some(old_entry), None);
+ self.merkle_updater.todo_notify.notify();
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+
+ // ---- Utility functions ----
+
pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec();
ret.extend(s.sort_key());
diff --git a/src/table/gc.rs b/src/table/gc.rs
new file mode 100644
index 00000000..afc8a473
--- /dev/null
+++ b/src/table/gc.rs
@@ -0,0 +1,212 @@
+use std::sync::Arc;
+use std::time::Duration;
+use std::collections::HashMap;
+
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
+
+use futures::future::join_all;
+use futures::select;
+use futures_util::future::*;
+use tokio::sync::watch;
+
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use garage_rpc::rpc_client::*;
+use garage_rpc::rpc_server::*;
+
+use crate::data::*;
+use crate::table::*;
+use crate::schema::*;
+use crate::replication::*;
+
+const TABLE_GC_BATCH_SIZE: usize = 1024;
+const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
+
+pub struct TableGC<F: TableSchema, R: TableReplication> {
+ data: Arc<TableData<F>>,
+ aux: Arc<TableAux<R>>,
+
+ rpc_client: Arc<RpcClient<GcRPC>>,
+}
+
+#[derive(Serialize, Deserialize)]
+enum GcRPC {
+ Update(Vec<ByteBuf>),
+ DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
+ Ok,
+}
+
+impl RpcMessage for GcRPC {}
+
+impl<F, R> TableGC<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub(crate) fn launch(
+ data: Arc<TableData<F>>,
+ aux: Arc<TableAux<R>>,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
+ let rpc_path = format!("table_{}/gc", data.name);
+ let rpc_client = aux.system.rpc_client::<GcRPC>(&rpc_path);
+
+ let gc = Arc::new(Self {
+ data: data.clone(),
+ aux: aux.clone(),
+ rpc_client,
+ });
+
+ gc.register_handler(rpc_server, rpc_path);
+
+ let gc1 = gc.clone();
+ aux.system.background.spawn_worker(
+ format!("GC loop for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
+ );
+
+ gc
+ }
+
+ async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> {
+ while !*must_exit.borrow() {
+ match self.gc_loop_iter().await {
+ Ok(true) => {
+ // Stuff was done, loop imediately
+ }
+ Ok(false) => {
+ select! {
+ _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (),
+ _ = must_exit.recv().fuse() => (),
+ }
+ }
+ Err(e) => {
+ warn!("({}) Error doing GC: {}", self.data.name, e);
+ }
+ }
+ }
+ Ok(())
+ }
+
+ async fn gc_loop_iter(&self) -> Result<bool, Error> {
+ let mut entries = vec![];
+ let mut excluded = vec![];
+
+ for item in self.data.gc_todo.iter() {
+ let (k, vhash) = item?;
+
+ let vhash = Hash::try_from(&vhash[..]).unwrap();
+
+ let v_opt = self.data.store.get(&k[..])?
+ .filter(|v| blake2sum(&v[..]) == vhash);
+
+ if let Some(v) = v_opt {
+ entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec())));
+ if entries.len() >= TABLE_GC_BATCH_SIZE {
+ break;
+ }
+ } else {
+ excluded.push((k, vhash));
+ }
+ }
+
+ for (k, vhash) in excluded {
+ let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
+ }
+
+ if entries.len() == 0 {
+ // Nothing to do in this iteration
+ return Ok(false);
+ }
+
+ debug!("({}) GC: doing {} items", self.data.name, entries.len());
+
+ let mut partitions = HashMap::new();
+ for (k, vhash, v) in entries {
+ let pkh = Hash::try_from(&k[..32]).unwrap();
+ let mut nodes = self.aux.replication.write_nodes(&pkh, &self.aux.system);
+ nodes.retain(|x| *x != self.aux.system.id);
+ nodes.sort();
+
+ if !partitions.contains_key(&nodes) {
+ partitions.insert(nodes.clone(), vec![]);
+ }
+ partitions.get_mut(&nodes).unwrap().push((k, vhash, v));
+ }
+
+ let resps = join_all(partitions.into_iter()
+ .map(|(nodes, items)| self.try_send_and_delete(nodes, items))).await;
+ for resp in resps {
+ if let Err(e) = resp {
+ warn!("({}) Unable to send and delete for GC: {}", self.data.name, e);
+ }
+ }
+
+ Ok(true)
+ }
+
+ async fn try_send_and_delete(&self, nodes: Vec<UUID>, items: Vec<(ByteBuf, Hash, ByteBuf)>) -> Result<(), Error> {
+ let n_items = items.len();
+
+ let mut updates = vec![];
+ let mut deletes = vec![];
+ for (k, vhash, v) in items {
+ updates.push(v);
+ deletes.push((k, vhash));
+ }
+
+ self.rpc_client.try_call_many(
+ &nodes[..],
+ GcRPC::Update(updates),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?;
+
+ info!("({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items);
+
+ self.rpc_client.try_call_many(
+ &nodes[..],
+ GcRPC::DeleteIfEqualHash(deletes.clone()),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?;
+
+ for (k, vhash) in deletes {
+ self.data.delete_if_equal_hash(&k[..], vhash)?;
+ let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
+ }
+
+ Ok(())
+ }
+
+ // ---- RPC HANDLER ----
+
+ fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ let self2 = self.clone();
+ rpc_server.add_handler::<GcRPC, _, _>(path, move |msg, _addr| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+
+ let self2 = self.clone();
+ self.rpc_client
+ .set_local_handler(self.aux.system.id, move |msg| {
+ let self2 = self2.clone();
+ async move { self2.handle_rpc(&msg).await }
+ });
+ }
+
+ async fn handle_rpc(self: &Arc<Self>, message: &GcRPC) -> Result<GcRPC, Error> {
+ match message {
+ GcRPC::Update(items) => {
+ self.data.update_many(items)?;
+ Ok(GcRPC::Ok)
+ }
+ GcRPC::DeleteIfEqualHash(items) => {
+ for (key, vhash) in items.iter() {
+ self.data.delete_if_equal_hash(&key[..], *vhash)?;
+ }
+ Ok(GcRPC::Ok)
+ }
+ _ => Err(Error::Message(format!("Unexpected GC RPC"))),
+ }
+ }
+}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index 18c29c35..8a64ff0b 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -11,6 +11,7 @@ pub mod data;
pub mod merkle;
pub mod replication;
pub mod sync;
+pub mod gc;
pub mod table;
pub use schema::*;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index b04a2a88..7a0adba1 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -139,10 +139,7 @@ impl MerkleUpdater {
let new_vhash = if vhash_by.len() == 0 {
None
} else {
- let vhash_by: [u8; 32] = vhash_by
- .try_into()
- .map_err(|_| Error::Message(format!("Invalid value in Merkle todo table")))?;
- Some(Hash::from(vhash_by))
+ Some(Hash::try_from(&vhash_by[..]).unwrap())
};
let key = MerkleNodeKey {
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 7fbb7b25..5112ea15 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -41,6 +41,8 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
{
fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S;
+
+ fn is_tombstone(&self) -> bool { false }
}
pub trait TableSchema: Send + Sync {
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 4be8cd10..aae65852 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -3,7 +3,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
-use futures::future::join_all;
use futures::{pin_mut, select};
use futures_util::future::*;
use futures_util::stream::*;
@@ -347,16 +346,11 @@ where
nodes: &[UUID],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
- let update_msg = Arc::new(SyncRPC::Items(values));
-
- for res in join_all(nodes.iter().map(|to| {
- self.rpc_client
- .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
- }))
- .await
- {
- res?;
- }
+
+ self.rpc_client.try_call_many(
+ &nodes[..],
+ SyncRPC::Items(values),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT)).await?;
// All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0;
@@ -577,7 +571,7 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
- pub(crate) async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
+ async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
match message {
SyncRPC::RootCkHash(range, h) => {
let root_ck = self.get_root_ck(*range)?;
diff --git a/src/table/table.rs b/src/table/table.rs
index dd3394bd..7b0d9a24 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -18,6 +18,7 @@ use crate::data::*;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
+use crate::gc::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
@@ -44,8 +45,6 @@ pub(crate) enum TableRPC<F: TableSchema> {
ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize),
Update(Vec<Arc<ByteBuf>>),
-
- SyncRPC(SyncRPC),
}
impl<F: TableSchema> RpcMessage for TableRPC<F> {}
@@ -76,6 +75,7 @@ where
});
let syncer = TableSyncer::launch(data.clone(), aux.clone(), rpc_server);
+ TableGC::launch(data.clone(), aux.clone(), rpc_server);
let table = Arc::new(Self {
data,
@@ -308,10 +308,6 @@ where
self.data.update_many(pairs)?;
Ok(TableRPC::Ok)
}
- TableRPC::SyncRPC(rpc) => {
- let response = self.syncer.handle_rpc(rpc).await?;
- Ok(TableRPC::SyncRPC(response))
- }
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
}
}