aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs91
-rw-r--r--src/table/gc.rs69
-rw-r--r--src/table/lib.rs2
-rw-r--r--src/table/schema.rs4
-rw-r--r--src/table/sync.rs11
-rw-r--r--src/table/table.rs2
6 files changed, 110 insertions, 69 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index 91607f7a..a491f877 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -129,31 +129,32 @@ where
let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
- let changed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
- let (old_entry, new_entry) = match store.get(&tree_key)? {
- Some(prev_bytes) => {
- let old_entry = self
- .decode_entry(&prev_bytes)
+ let changed =
+ (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
+ let (old_entry, new_entry) = match store.get(&tree_key)? {
+ Some(prev_bytes) => {
+ let old_entry = self
+ .decode_entry(&prev_bytes)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let mut new_entry = old_entry.clone();
+ new_entry.merge(&update);
+ (Some(old_entry), new_entry)
+ }
+ None => (None, update.clone()),
+ };
+
+ if Some(&new_entry) != old_entry.as_ref() {
+ let new_bytes = rmp_to_vec_all_named(&new_entry)
+ .map_err(Error::RMPEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let mut new_entry = old_entry.clone();
- new_entry.merge(&update);
- (Some(old_entry), new_entry)
+ let new_bytes_hash = blake2sum(&new_bytes[..]);
+ mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
+ store.insert(tree_key.clone(), new_bytes)?;
+ Ok(Some((old_entry, new_entry, new_bytes_hash)))
+ } else {
+ Ok(None)
}
- None => (None, update.clone()),
- };
-
- if Some(&new_entry) != old_entry.as_ref() {
- let new_bytes = rmp_to_vec_all_named(&new_entry)
- .map_err(Error::RMPEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let new_bytes_hash = blake2sum(&new_bytes[..]);
- mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
- store.insert(tree_key.clone(), new_bytes)?;
- Ok(Some((old_entry, new_entry, new_bytes_hash)))
- } else {
- Ok(None)
- }
- })?;
+ })?;
if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
let is_tombstone = new_entry.is_tombstone();
@@ -168,16 +169,17 @@ where
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if cur_v == v {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(true);
+ let removed =
+ (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if cur_v == v {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(true);
+ }
}
- }
- Ok(false)
- })?;
+ Ok(false)
+ })?;
if removed {
let old_entry = self.decode_entry(v)?;
@@ -187,17 +189,22 @@ where
Ok(removed)
}
- pub(crate) fn delete_if_equal_hash(self: &Arc<Self>, k: &[u8], vhash: Hash) -> Result<bool, Error> {
- let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
- if let Some(cur_v) = store.get(k)? {
- if blake2sum(&cur_v[..]) == vhash {
- store.remove(k)?;
- mkl_todo.insert(k, vec![])?;
- return Ok(Some(cur_v));
+ pub(crate) fn delete_if_equal_hash(
+ self: &Arc<Self>,
+ k: &[u8],
+ vhash: Hash,
+ ) -> Result<bool, Error> {
+ let removed =
+ (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| {
+ if let Some(cur_v) = store.get(k)? {
+ if blake2sum(&cur_v[..]) == vhash {
+ store.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
+ return Ok(Some(cur_v));
+ }
}
- }
- Ok(None)
- })?;
+ Ok(None)
+ })?;
if let Some(old_v) = removed {
let old_entry = self.decode_entry(&old_v[..])?;
diff --git a/src/table/gc.rs b/src/table/gc.rs
index afc8a473..594044b8 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -1,6 +1,6 @@
+use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
-use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -17,9 +17,9 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::data::*;
-use crate::table::*;
-use crate::schema::*;
use crate::replication::*;
+use crate::schema::*;
+use crate::table::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
@@ -99,7 +99,10 @@ where
let vhash = Hash::try_from(&vhash[..]).unwrap();
- let v_opt = self.data.store.get(&k[..])?
+ let v_opt = self
+ .data
+ .store
+ .get(&k[..])?
.filter(|v| blake2sum(&v[..]) == vhash);
if let Some(v) = v_opt {
@@ -113,7 +116,10 @@ where
}
for (k, vhash) in excluded {
- let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
+ let _ = self
+ .data
+ .gc_todo
+ .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
}
if entries.len() == 0 {
@@ -136,18 +142,29 @@ where
partitions.get_mut(&nodes).unwrap().push((k, vhash, v));
}
- let resps = join_all(partitions.into_iter()
- .map(|(nodes, items)| self.try_send_and_delete(nodes, items))).await;
+ let resps = join_all(
+ partitions
+ .into_iter()
+ .map(|(nodes, items)| self.try_send_and_delete(nodes, items)),
+ )
+ .await;
for resp in resps {
if let Err(e) = resp {
- warn!("({}) Unable to send and delete for GC: {}", self.data.name, e);
+ warn!(
+ "({}) Unable to send and delete for GC: {}",
+ self.data.name, e
+ );
}
}
Ok(true)
}
- async fn try_send_and_delete(&self, nodes: Vec<UUID>, items: Vec<(ByteBuf, Hash, ByteBuf)>) -> Result<(), Error> {
+ async fn try_send_and_delete(
+ &self,
+ nodes: Vec<UUID>,
+ items: Vec<(ByteBuf, Hash, ByteBuf)>,
+ ) -> Result<(), Error> {
let n_items = items.len();
let mut updates = vec![];
@@ -157,21 +174,33 @@ where
deletes.push((k, vhash));
}
- self.rpc_client.try_call_many(
- &nodes[..],
- GcRPC::Update(updates),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?;
+ self.rpc_client
+ .try_call_many(
+ &nodes[..],
+ GcRPC::Update(updates),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ )
+ .await?;
+
+ info!(
+ "({}) GC: {} items successfully pushed, will try to delete.",
+ self.data.name, n_items
+ );
- info!("({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items);
+ self.rpc_client
+ .try_call_many(
+ &nodes[..],
+ GcRPC::DeleteIfEqualHash(deletes.clone()),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ )
+ .await?;
- self.rpc_client.try_call_many(
- &nodes[..],
- GcRPC::DeleteIfEqualHash(deletes.clone()),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?;
-
for (k, vhash) in deletes {
self.data.delete_if_equal_hash(&k[..], vhash)?;
- let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
+ let _ = self
+ .data
+ .gc_todo
+ .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?;
}
Ok(())
diff --git a/src/table/lib.rs b/src/table/lib.rs
index 8a64ff0b..3b73163b 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -8,10 +8,10 @@ pub mod schema;
pub mod util;
pub mod data;
+pub mod gc;
pub mod merkle;
pub mod replication;
pub mod sync;
-pub mod gc;
pub mod table;
pub use schema::*;
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 5112ea15..4d754664 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -42,7 +42,9 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S;
- fn is_tombstone(&self) -> bool { false }
+ fn is_tombstone(&self) -> bool {
+ false
+ }
}
pub trait TableSchema: Send + Sync {
diff --git a/src/table/sync.rs b/src/table/sync.rs
index aae65852..6c8792d2 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -347,10 +347,13 @@ where
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
- self.rpc_client.try_call_many(
- &nodes[..],
- SyncRPC::Items(values),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT)).await?;
+ self.rpc_client
+ .try_call_many(
+ &nodes[..],
+ SyncRPC::Items(values),
+ RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ )
+ .await?;
// All remote nodes have written those items, now we can delete them locally
let mut not_removed = 0;
diff --git a/src/table/table.rs b/src/table/table.rs
index 7b0d9a24..2d3c5fe9 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -15,10 +15,10 @@ use garage_rpc::rpc_server::*;
use crate::crdt::CRDT;
use crate::data::*;
+use crate::gc::*;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
-use crate::gc::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);