diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/data.rs | 91 | ||||
-rw-r--r-- | src/table/gc.rs | 69 | ||||
-rw-r--r-- | src/table/lib.rs | 2 | ||||
-rw-r--r-- | src/table/schema.rs | 4 | ||||
-rw-r--r-- | src/table/sync.rs | 11 | ||||
-rw-r--r-- | src/table/table.rs | 2 |
6 files changed, 110 insertions, 69 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index 91607f7a..a491f877 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -129,31 +129,32 @@ where let update = self.decode_entry(update_bytes)?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - let changed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - let (old_entry, new_entry) = match store.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) + let changed = + (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + let (old_entry, new_entry) = match store.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) + let new_bytes_hash = blake2sum(&new_bytes[..]); + mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; + store.insert(tree_key.clone(), new_bytes)?; + Ok(Some((old_entry, new_entry, new_bytes_hash))) + } else { + Ok(None) } - None => (None, update.clone()), - }; - - if Some(&new_entry) != old_entry.as_ref() { - let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let new_bytes_hash = blake2sum(&new_bytes[..]); - mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?; - store.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry, new_bytes_hash))) - } else { - Ok(None) - } - })?; + })?; if let Some((old_entry, new_entry, new_bytes_hash)) = changed { let is_tombstone = new_entry.is_tombstone(); @@ -168,16 +169,17 @@ where } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if cur_v == v { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); + let removed = + (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if cur_v == v { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(true); + } } - } - Ok(false) - })?; + Ok(false) + })?; if removed { let old_entry = self.decode_entry(v)?; @@ -187,17 +189,22 @@ where Ok(removed) } - pub(crate) fn delete_if_equal_hash(self: &Arc<Self>, k: &[u8], vhash: Hash) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { - if let Some(cur_v) = store.get(k)? { - if blake2sum(&cur_v[..]) == vhash { - store.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(Some(cur_v)); + pub(crate) fn delete_if_equal_hash( + self: &Arc<Self>, + k: &[u8], + vhash: Hash, + ) -> Result<bool, Error> { + let removed = + (&self.store, &self.merkle_updater.todo).transaction(|(store, mkl_todo)| { + if let Some(cur_v) = store.get(k)? { + if blake2sum(&cur_v[..]) == vhash { + store.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(Some(cur_v)); + } } - } - Ok(None) - })?; + Ok(None) + })?; if let Some(old_v) = removed { let old_entry = self.decode_entry(&old_v[..])?; diff --git a/src/table/gc.rs b/src/table/gc.rs index afc8a473..594044b8 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -1,6 +1,6 @@ +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use std::collections::HashMap; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -17,9 +17,9 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::data::*; -use crate::table::*; -use crate::schema::*; use crate::replication::*; +use crate::schema::*; +use crate::table::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); @@ -99,7 +99,10 @@ where let vhash = Hash::try_from(&vhash[..]).unwrap(); - let v_opt = self.data.store.get(&k[..])? + let v_opt = self + .data + .store + .get(&k[..])? .filter(|v| blake2sum(&v[..]) == vhash); if let Some(v) = v_opt { @@ -113,7 +116,10 @@ where } for (k, vhash) in excluded { - let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; + let _ = self + .data + .gc_todo + .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; } if entries.len() == 0 { @@ -136,18 +142,29 @@ where partitions.get_mut(&nodes).unwrap().push((k, vhash, v)); } - let resps = join_all(partitions.into_iter() - .map(|(nodes, items)| self.try_send_and_delete(nodes, items))).await; + let resps = join_all( + partitions + .into_iter() + .map(|(nodes, items)| self.try_send_and_delete(nodes, items)), + ) + .await; for resp in resps { if let Err(e) = resp { - warn!("({}) Unable to send and delete for GC: {}", self.data.name, e); + warn!( + "({}) Unable to send and delete for GC: {}", + self.data.name, e + ); } } Ok(true) } - async fn try_send_and_delete(&self, nodes: Vec<UUID>, items: Vec<(ByteBuf, Hash, ByteBuf)>) -> Result<(), Error> { + async fn try_send_and_delete( + &self, + nodes: Vec<UUID>, + items: Vec<(ByteBuf, Hash, ByteBuf)>, + ) -> Result<(), Error> { let n_items = items.len(); let mut updates = vec![]; @@ -157,21 +174,33 @@ where deletes.push((k, vhash)); } - self.rpc_client.try_call_many( - &nodes[..], - GcRPC::Update(updates), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?; + self.rpc_client + .try_call_many( + &nodes[..], + GcRPC::Update(updates), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + ) + .await?; + + info!( + "({}) GC: {} items successfully pushed, will try to delete.", + self.data.name, n_items + ); - info!("({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items); + self.rpc_client + .try_call_many( + &nodes[..], + GcRPC::DeleteIfEqualHash(deletes.clone()), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT), + ) + .await?; - self.rpc_client.try_call_many( - &nodes[..], - GcRPC::DeleteIfEqualHash(deletes.clone()), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT)).await?; - for (k, vhash) in deletes { self.data.delete_if_equal_hash(&k[..], vhash)?; - let _ = self.data.gc_todo.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; + let _ = self + .data + .gc_todo + .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash), None)?; } Ok(()) diff --git a/src/table/lib.rs b/src/table/lib.rs index 8a64ff0b..3b73163b 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -8,10 +8,10 @@ pub mod schema; pub mod util; pub mod data; +pub mod gc; pub mod merkle; pub mod replication; pub mod sync; -pub mod gc; pub mod table; pub use schema::*; diff --git a/src/table/schema.rs b/src/table/schema.rs index 5112ea15..4d754664 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -42,7 +42,9 @@ pub trait Entry<P: PartitionKey, S: SortKey>: fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; - fn is_tombstone(&self) -> bool { false } + fn is_tombstone(&self) -> bool { + false + } } pub trait TableSchema: Send + Sync { diff --git a/src/table/sync.rs b/src/table/sync.rs index aae65852..6c8792d2 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -347,10 +347,13 @@ where ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); - self.rpc_client.try_call_many( - &nodes[..], - SyncRPC::Items(values), - RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT)).await?; + self.rpc_client + .try_call_many( + &nodes[..], + SyncRPC::Items(values), + RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT), + ) + .await?; // All remote nodes have written those items, now we can delete them locally let mut not_removed = 0; diff --git a/src/table/table.rs b/src/table/table.rs index 7b0d9a24..2d3c5fe9 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -15,10 +15,10 @@ use garage_rpc::rpc_server::*; use crate::crdt::CRDT; use crate::data::*; +use crate::gc::*; use crate::replication::*; use crate::schema::*; use crate::sync::*; -use crate::gc::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); |