diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/Cargo.toml | 1 | ||||
-rw-r--r-- | src/table/data.rs | 189 | ||||
-rw-r--r-- | src/table/lib.rs | 4 | ||||
-rw-r--r-- | src/table/merkle.rs | 39 | ||||
-rw-r--r-- | src/table/replication/fullcopy.rs (renamed from src/table/table_fullcopy.rs) | 2 | ||||
-rw-r--r-- | src/table/replication/mod.rs | 6 | ||||
-rw-r--r-- | src/table/replication/parameters.rs | 22 | ||||
-rw-r--r-- | src/table/replication/sharded.rs (renamed from src/table/table_sharded.rs) | 2 | ||||
-rw-r--r-- | src/table/table.rs | 265 | ||||
-rw-r--r-- | src/table/table_sync.rs | 129 |
10 files changed, 368 insertions, 291 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6485a542..6b3aaceb 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -19,7 +19,6 @@ garage_rpc = { version = "0.1.1", path = "../rpc" } bytes = "0.4" rand = "0.7" hex = "0.3" -arc-swap = "0.4" log = "0.4" hexdump = "0.1" diff --git a/src/table/data.rs b/src/table/data.rs new file mode 100644 index 00000000..fa89fc27 --- /dev/null +++ b/src/table/data.rs @@ -0,0 +1,189 @@ +use std::sync::Arc; + +use log::warn; +use sled::Transactional; +use serde_bytes::ByteBuf; + +use garage_util::data::*; +use garage_util::error::*; +use garage_util::background::BackgroundRunner; + +use crate::schema::*; +use crate::merkle::*; +use crate::crdt::CRDT; + +pub struct TableData<F: TableSchema> { + pub name: String, + pub instance: F, + + pub store: sled::Tree, + pub(crate) merkle_updater: Arc<MerkleUpdater>, +} + +impl<F> TableData<F> where F: TableSchema { + pub fn new( + name: String, + instance: F, + db: &sled::Db, + background: Arc<BackgroundRunner>, + ) -> Arc<Self> { + let store = db + .open_tree(&format!("{}:table", name)) + .expect("Unable to open DB tree"); + + let merkle_todo_store = db + .open_tree(&format!("{}:merkle_todo", name)) + .expect("Unable to open DB Merkle TODO tree"); + let merkle_tree_store = db + .open_tree(&format!("{}:merkle_tree", name)) + .expect("Unable to open DB Merkle tree tree"); + + let merkle_updater = MerkleUpdater::launch( + name.clone(), + background, + merkle_todo_store, + merkle_tree_store, + ); + + Arc::new(Self{ + name, + instance, + store, + merkle_updater, + }) + } + + // Read functions + + pub fn read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> { + let tree_key = self.tree_key(p, s); + if let Some(bytes) = self.store.get(&tree_key)? { + Ok(Some(ByteBuf::from(bytes.to_vec()))) + } else { + Ok(None) + } + } + + pub fn read_range( + &self, + p: &F::P, + s: &Option<F::S>, + filter: &Option<F::Filter>, + limit: usize, + ) -> Result<Vec<Arc<ByteBuf>>, Error> { + let partition_hash = p.hash(); + let first_key = match s { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(p, sk), + }; + let mut ret = vec![]; + for item in self.store.range(first_key..) { + let (key, value) = item?; + if &key[..32] != partition_hash.as_slice() { + break; + } + let keep = match filter { + None => true, + Some(f) => { + let entry = self.decode_entry(value.as_ref())?; + F::matches_filter(&entry, f) + } + }; + if keep { + ret.push(Arc::new(ByteBuf::from(value.as_ref()))); + } + if ret.len() >= limit { + break; + } + } + Ok(ret) + } + + // Mutation functions + + pub(crate) fn update_many(&self, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { + for update_bytes in entries.iter() { + self.update_entry(update_bytes.as_slice())?; + } + Ok(()) + } + + pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> { + let update = self.decode_entry(update_bytes)?; + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { + let (old_entry, new_entry) = match db.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; + db.insert(tree_key.clone(), new_bytes)?; + Ok(Some((old_entry, new_entry))) + } else { + Ok(None) + } + })?; + + if let Some((old_entry, new_entry)) = changed { + self.instance.updated(old_entry, Some(new_entry)); + //self.syncer.load_full().unwrap().invalidate(&tree_key[..]); + } + + Ok(()) + } + + pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { + let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { + if let Some(cur_v) = txn.get(k)? { + if cur_v == v { + txn.remove(k)?; + mkl_todo.insert(k, vec![])?; + return Ok(true); + } + } + Ok(false) + })?; + + if removed { + let old_entry = self.decode_entry(v)?; + self.instance.updated(Some(old_entry), None); + //self.syncer.load_full().unwrap().invalidate(k); + } + Ok(removed) + } + + pub(crate) fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { + let mut ret = p.hash().to_vec(); + ret.extend(s.sort_key()); + ret + } + + pub(crate) fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { + match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { + Ok(x) => Ok(x), + Err(e) => match F::try_migrate(bytes) { + Some(x) => Ok(x), + None => { + warn!("Unable to decode entry of {}: {}", self.name, e); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); + } + Err(e.into()) + } + }, + } + } +} diff --git a/src/table/lib.rs b/src/table/lib.rs index 62fd30c5..bb249a56 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -8,9 +8,9 @@ pub mod schema; pub mod util; pub mod merkle; +pub mod replication; +pub mod data; pub mod table; -pub mod table_fullcopy; -pub mod table_sharded; pub mod table_sync; pub use schema::*; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 50cb90d5..ef197dc8 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -61,7 +61,7 @@ pub enum MerkleNode { } impl MerkleUpdater { - pub(crate) fn new( + pub(crate) fn launch( table_name: String, background: Arc<BackgroundRunner>, todo: sled::Tree, @@ -69,22 +69,22 @@ impl MerkleUpdater { ) -> Arc<Self> { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); - Arc::new(Self { + let ret = Arc::new(Self { table_name, background, todo, todo_notify: Notify::new(), merkle_tree, empty_node_hash, - }) - } + }); - pub(crate) fn launch(self: &Arc<Self>) { - let self2 = self.clone(); - self.background.spawn_worker( - format!("Merkle tree updater for {}", self.table_name), - |must_exit: watch::Receiver<bool>| self2.updater_loop(must_exit), + let ret2 = ret.clone(); + ret.background.spawn_worker( + format!("Merkle tree updater for {}", ret.table_name), + |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit), ); + + ret } async fn updater_loop( @@ -156,28 +156,37 @@ impl MerkleUpdater { new_vhash: Option<Hash>, ) -> ConflictableTransactionResult<Option<Hash>, Error> { let i = key.prefix.len(); + + // Read node at current position (defined by the prefix stored in key) + // Calculate an update to apply to this node + // This update is an Option<_>, so that it is None if the update is a no-op + // and we can thus skip recalculating and re-storing everything let mutate = match self.read_node_txn(tx, &key)? { MerkleNode::Empty => { if let Some(vhv) = new_vhash { Some(MerkleNode::Leaf(k.to_vec(), vhv)) } else { + // Nothing to do, keep empty node None } } MerkleNode::Intermediate(mut children) => { let key2 = key.next_key(khash); if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? { + // Subtree changed, update this node as well if subhash == self.empty_node_hash { intermediate_rm_child(&mut children, key2.prefix[i]); } else { intermediate_set_child(&mut children, key2.prefix[i], subhash); } + if children.len() == 0 { // should not happen warn!("Replacing intermediate node with empty node, should not happen."); Some(MerkleNode::Empty) } else if children.len() == 1 { - // move node down to this level + // We now have a single node (case when the update deleted one of only two + // children). Move that single child to this level of the tree. let key_sub = key.add_byte(children[0].0); let subnode = self.read_node_txn(tx, &key_sub)?; tx.remove(key_sub.encode())?; @@ -186,19 +195,23 @@ impl MerkleUpdater { Some(MerkleNode::Intermediate(children)) } } else { + // Subtree not changed, nothing to do None } } MerkleNode::Leaf(exlf_key, exlf_hash) => { if exlf_key == k { + // This leaf is for the same key that the one we are updating match new_vhash { Some(vhv) if vhv == exlf_hash => None, Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)), None => Some(MerkleNode::Empty), } } else { + // This is an only leaf for another key if let Some(vhv) = new_vhash { - // Create two sub-nodes and replace by intermediary node + // Move that other key to a subnode, create another subnode for our + // insertion and replace current node by an intermediary node let (pos1, h1) = { let key2 = key.next_key(blake2sum(&exlf_key[..])); let subhash = @@ -216,6 +229,9 @@ impl MerkleUpdater { intermediate_set_child(&mut int, pos2, h2); Some(MerkleNode::Intermediate(int)) } else { + // Nothing to do, we don't want to insert this value because it is None, + // and we don't want to change the other value because it's for something + // else None } } @@ -263,6 +279,7 @@ impl MerkleUpdater { } } + // Access a node in the Merkle tree, used by the sync protocol pub(crate) fn read_node( &self, k: &MerkleNodeKey, diff --git a/src/table/table_fullcopy.rs b/src/table/replication/fullcopy.rs index c55879d8..a62a6c3c 100644 --- a/src/table/table_fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -4,7 +4,7 @@ use garage_rpc::membership::System; use garage_rpc::ring::Ring; use garage_util::data::*; -use crate::*; +use crate::replication::*; #[derive(Clone)] pub struct TableFullReplication { diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs new file mode 100644 index 00000000..d43d7f19 --- /dev/null +++ b/src/table/replication/mod.rs @@ -0,0 +1,6 @@ +mod parameters; + +pub mod fullcopy; +pub mod sharded; + +pub use parameters::*; diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs new file mode 100644 index 00000000..4607b050 --- /dev/null +++ b/src/table/replication/parameters.rs @@ -0,0 +1,22 @@ +use garage_rpc::membership::System; +use garage_rpc::ring::Ring; + +use garage_util::data::*; + +pub trait TableReplication: Send + Sync { + // See examples in table_sharded.rs and table_fullcopy.rs + // To understand various replication methods + + // Which nodes to send reads from + fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; + fn read_quorum(&self) -> usize; + + // Which nodes to send writes to + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; + fn write_quorum(&self, system: &System) -> usize; + fn max_write_errors(&self) -> usize; + + // Which are the nodes that do actually replicate the data + fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>; + fn split_points(&self, ring: &Ring) -> Vec<Hash>; +} diff --git a/src/table/table_sharded.rs b/src/table/replication/sharded.rs index 47bdfeaf..42a742cd 100644 --- a/src/table/table_sharded.rs +++ b/src/table/replication/sharded.rs @@ -2,7 +2,7 @@ use garage_rpc::membership::System; use garage_rpc::ring::Ring; use garage_util::data::*; -use crate::*; +use crate::replication::*; #[derive(Clone)] pub struct TableShardedReplication { diff --git a/src/table/table.rs b/src/table/table.rs index 0e75754c..a4cb4b24 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,40 +2,35 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; -use log::warn; - -use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use sled::Transactional; use garage_util::data::*; use garage_util::error::Error; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::crdt::CRDT; -use crate::merkle::*; +use crate::data::*; use crate::schema::*; use crate::table_sync::*; +use crate::replication::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); -pub struct Table<F: TableSchema, R: TableReplication> { - pub instance: F, +pub struct TableAux<F: TableSchema, R: TableReplication> { + pub system: Arc<System>, pub replication: R, - - pub name: String, pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>, +} - pub system: Arc<System>, - pub store: sled::Tree, - pub syncer: ArcSwapOption<TableSyncer<F, R>>, - merkle_updater: Arc<MerkleUpdater>, +pub struct Table<F: TableSchema, R: TableReplication> { + pub data: Arc<TableData<F>>, + pub aux: Arc<TableAux<F, R>>, + pub syncer: Arc<TableSyncer<F, R>>, } #[derive(Serialize, Deserialize)] @@ -55,23 +50,6 @@ pub(crate) enum TableRPC<F: TableSchema> { impl<F: TableSchema> RpcMessage for TableRPC<F> {} -pub trait TableReplication: Send + Sync { - // See examples in table_sharded.rs and table_fullcopy.rs - // To understand various replication methods - - // Which nodes to send reads from - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; - fn read_quorum(&self) -> usize; - - // Which nodes to send writes to - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; - fn write_quorum(&self, system: &System) -> usize; - fn max_write_errors(&self) -> usize; - - // Which are the nodes that do actually replicate the data - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>; - fn split_points(&self, ring: &Ring) -> Vec<Hash>; -} impl<F, R> Table<F, R> where @@ -88,60 +66,51 @@ where name: String, rpc_server: &mut RpcServer, ) -> Arc<Self> { - let store = db - .open_tree(&format!("{}:table", name)) - .expect("Unable to open DB tree"); - - let merkle_todo_store = db - .open_tree(&format!("{}:merkle_todo", name)) - .expect("Unable to open DB Merkle TODO tree"); - let merkle_tree_store = db - .open_tree(&format!("{}:merkle_tree", name)) - .expect("Unable to open DB Merkle tree tree"); - let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); - let merkle_updater = MerkleUpdater::new( - name.clone(), + let data = TableData::new( + name, + instance, + db, system.background.clone(), - merkle_todo_store, - merkle_tree_store, ); - let table = Arc::new(Self { - instance, + let aux = Arc::new(TableAux{ + system, replication, - name, rpc_client, - system, - store, - syncer: ArcSwapOption::from(None), - merkle_updater, }); - table.clone().register_handler(rpc_server, rpc_path); - let syncer = TableSyncer::launch(table.clone()); - table.syncer.swap(Some(syncer)); + let syncer = TableSyncer::launch( + data.clone(), + aux.clone(), + ); - table.merkle_updater.launch(); + let table = Arc::new(Self { + data, + aux, + syncer, + }); + + table.clone().register_handler(rpc_server, rpc_path); table } pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.aux.replication.write_nodes(&hash, &self.aux.system); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::<F>::Update(vec![e_enc]); - self.rpc_client + self.aux.rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) + RequestStrategy::with_quorum(self.aux.replication.write_quorum(&self.aux.system)) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -153,7 +122,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.aux.replication.write_nodes(&hash, &self.aux.system); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -166,7 +135,7 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::<F>::Update(entries); - let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::<FuturesUnordered<_>>(); @@ -177,7 +146,7 @@ where errors.push(e); } } - if errors.len() > self.replication.max_write_errors() { + if errors.len() > self.aux.replication.max_write_errors() { Err(Error::Message("Too many errors".into())) } else { Ok(()) @@ -190,16 +159,17 @@ where sort_key: &F::S, ) -> Result<Option<F::E>, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.aux.replication.read_nodes(&hash, &self.aux.system); //eprintln!("get who: {:?}", who); let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self + .aux .rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.aux.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -210,7 +180,7 @@ where for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { - let v = self.decode_entry(v_bytes.as_slice())?; + let v = self.data.decode_entry(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -230,7 +200,7 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.system + self.aux.system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } @@ -246,16 +216,16 @@ where limit: usize, ) -> Result<Vec<F::E>, Error> { let hash = partition_key.hash(); - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.aux.replication.read_nodes(&hash, &self.aux.system); let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .rpc_client + .aux.rpc_client .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.read_quorum()) + RequestStrategy::with_quorum(self.aux.replication.read_quorum()) .with_timeout(TABLE_RPC_TIMEOUT) .interrupt_after_quorum(true), ) @@ -266,8 +236,8 @@ where for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = self.decode_entry(entry_bytes.as_slice())?; - let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); + let entry = self.data.decode_entry(entry_bytes.as_slice())?; + let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { ret.insert(entry_key, Some(entry)); @@ -287,7 +257,7 @@ where } if !to_repair.is_empty() { let self2 = self.clone(); - self.system.background.spawn_cancellable(async move { + self.aux.system.background.spawn_cancellable(async move { for (_, v) in to_repair.iter_mut() { self2.repair_on_read(&who[..], v.take().unwrap()).await?; } @@ -306,7 +276,7 @@ where async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.rpc_client + self.aux.rpc_client .try_call_many( &who[..], TableRPC::<F>::Update(vec![what_enc]), @@ -326,8 +296,8 @@ where }); let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { + self.aux.rpc_client + .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } }); @@ -336,157 +306,24 @@ where async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> { match msg { TableRPC::ReadEntry(key, sort_key) => { - let value = self.handle_read_entry(key, sort_key)?; + let value = self.data.read_entry(key, sort_key)?; Ok(TableRPC::ReadEntryResponse(value)) } TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { - let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?; + let values = self.data.read_range(key, begin_sort_key, filter, *limit)?; Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { - self.handle_update(pairs)?; + self.data.update_many(pairs)?; Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { - let syncer = self.syncer.load_full().unwrap(); - let response = syncer - .handle_rpc(rpc, self.system.background.stop_signal.clone()) + let response = self.syncer + .handle_rpc(rpc, self.aux.system.background.stop_signal.clone()) .await?; Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), } } - - fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> { - let tree_key = self.tree_key(p, s); - if let Some(bytes) = self.store.get(&tree_key)? { - Ok(Some(ByteBuf::from(bytes.to_vec()))) - } else { - Ok(None) - } - } - - fn handle_read_range( - &self, - p: &F::P, - s: &Option<F::S>, - filter: &Option<F::Filter>, - limit: usize, - ) -> Result<Vec<Arc<ByteBuf>>, Error> { - let partition_hash = p.hash(); - let first_key = match s { - None => partition_hash.to_vec(), - Some(sk) => self.tree_key(p, sk), - }; - let mut ret = vec![]; - for item in self.store.range(first_key..) { - let (key, value) = item?; - if &key[..32] != partition_hash.as_slice() { - break; - } - let keep = match filter { - None => true, - Some(f) => { - let entry = self.decode_entry(value.as_ref())?; - F::matches_filter(&entry, f) - } - }; - if keep { - ret.push(Arc::new(ByteBuf::from(value.as_ref()))); - } - if ret.len() >= limit { - break; - } - } - Ok(ret) - } - - // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================ - - pub fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { - for update_bytes in entries.iter() { - self.update_entry(update_bytes.as_slice())?; - } - Ok(()) - } - - pub(crate) fn update_entry(self: &Arc<Self>, update_bytes: &[u8]) -> Result<(), Error> { - let update = self.decode_entry(update_bytes)?; - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - - let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { - let (old_entry, new_entry) = match db.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; - - if Some(&new_entry) != old_entry.as_ref() { - let new_bytes = rmp_to_vec_all_named(&new_entry) - .map_err(Error::RMPEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; - db.insert(tree_key.clone(), new_bytes)?; - Ok(Some((old_entry, new_entry))) - } else { - Ok(None) - } - })?; - - if let Some((old_entry, new_entry)) = changed { - self.instance.updated(old_entry, Some(new_entry)); - self.syncer.load_full().unwrap().invalidate(&tree_key[..]); - } - - Ok(()) - } - - pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { - if let Some(cur_v) = txn.get(k)? { - if cur_v == v { - txn.remove(k)?; - mkl_todo.insert(k, vec![])?; - return Ok(true); - } - } - Ok(false) - })?; - - if removed { - let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None); - self.syncer.load_full().unwrap().invalidate(k); - } - Ok(removed) - } - - fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { - let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); - ret - } - - fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { - match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { - Ok(x) => Ok(x), - Err(e) => match F::try_migrate(bytes) { - Some(x) => Ok(x), - None => { - warn!("Unable to decode entry of {}: {}", self.name, e); - for line in hexdump::hexdump_iter(bytes) { - debug!("{}", line); - } - Err(e.into()) - } - }, - } - } } diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 51f8cd6f..7394be1b 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -16,18 +16,22 @@ use garage_util::data::*; use garage_util::error::Error; use crate::*; +use crate::data::*; +use crate::replication::*; const MAX_DEPTH: usize = 16; + const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); -// Scan & sync every 12 hours -const SCAN_INTERVAL: Duration = Duration::from_secs(12 * 60 * 60); +// Do anti-entropy every 10 minutes +const SCAN_INTERVAL: Duration = Duration::from_secs(10 * 60); -// Expire cache after 30 minutes -const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(30 * 60); +const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60); pub struct TableSyncer<F: TableSchema, R: TableReplication> { - table: Arc<Table<F, R>>, + data: Arc<TableData<F>>, + aux: Arc<TableAux<F, R>>, + todo: Mutex<SyncTodo>, cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>, } @@ -106,10 +110,13 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { - let todo = SyncTodo { todo: Vec::new() }; - let syncer = Arc::new(TableSyncer { - table: table.clone(), + pub(crate) fn launch(data: Arc<TableData<F>>, + aux: Arc<TableAux<F, R>>) -> Arc<Self> { + let todo = SyncTodo{ todo: vec![] }; + + let syncer = Arc::new(Self { + data: data.clone(), + aux: aux.clone(), todo: Mutex::new(todo), cache: (0..MAX_DEPTH) .map(|_| Mutex::new(BTreeMap::new())) @@ -119,21 +126,21 @@ where let (busy_tx, busy_rx) = mpsc::unbounded_channel(); let s1 = syncer.clone(); - table.system.background.spawn_worker( - format!("table sync watcher for {}", table.name), + aux.system.background.spawn_worker( + format!("table sync watcher for {}", data.name), move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), ); let s2 = syncer.clone(); - table.system.background.spawn_worker( - format!("table syncer for {}", table.name), + aux.system.background.spawn_worker( + format!("table syncer for {}", data.name), move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), ); let s3 = syncer.clone(); tokio::spawn(async move { tokio::time::delay_for(Duration::from_secs(20)).await; - s3.add_full_scan().await; + s3.add_full_scan(); }); syncer @@ -144,8 +151,8 @@ where mut must_exit: watch::Receiver<bool>, mut busy_rx: mpsc::UnboundedReceiver<bool>, ) -> Result<(), Error> { - let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone(); + let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone(); + let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone(); let mut nothing_to_do_since = Some(Instant::now()); while !*must_exit.borrow() { @@ -158,8 +165,8 @@ where select! { new_ring_r = s_ring_recv => { if let Some(new_ring) = new_ring_r { - debug!("({}) Adding ring difference to syncer todo list", self.table.name); - self.todo.lock().unwrap().add_ring_difference(&self.table, &prev_ring, &new_ring); + debug!("({}) Adding ring difference to syncer todo list", self.data.name); + self.todo.lock().unwrap().add_ring_difference(&prev_ring, &new_ring, &self.data, &self.aux); prev_ring = new_ring; } } @@ -182,8 +189,8 @@ where _ = s_timeout => { if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; - debug!("({}) Adding full scan to syncer todo list", self.table.name); - self.add_full_scan().await; + debug!("({}) Adding full scan to syncer todo list", self.data.name); + self.add_full_scan(); } } } @@ -191,8 +198,8 @@ where Ok(()) } - pub async fn add_full_scan(&self) { - self.todo.lock().unwrap().add_full_scan(&self.table); + pub fn add_full_scan(&self) { + self.todo.lock().unwrap().add_full_scan(&self.data, &self.aux); } async fn syncer_task( @@ -211,7 +218,7 @@ where if let Err(e) = res { warn!( "({}) Error while syncing {:?}: {}", - self.table.name, partition, e + self.data.name, partition, e ); } } else { @@ -228,18 +235,18 @@ where must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { if partition.retain { - let my_id = self.table.system.id; + let my_id = self.aux.system.id; let nodes = self - .table + .aux .replication - .write_nodes(&partition.begin, &self.table.system) + .write_nodes(&partition.begin, &self.aux.system) .into_iter() .filter(|node| *node != my_id) .collect::<Vec<_>>(); debug!( "({}) Preparing to sync {:?} with {:?}...", - self.table.name, partition, nodes + self.data.name, partition, nodes ); let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?; @@ -259,10 +266,10 @@ where while let Some(r) = sync_futures.next().await { if let Err(e) = r { n_errors += 1; - warn!("({}) Sync error: {}", self.table.name, e); + warn!("({}) Sync error: {}", self.data.name, e); } } - if n_errors > self.table.replication.max_write_errors() { + if n_errors > self.aux.replication.max_write_errors() { return Err(Error::Message(format!( "Sync failed with too many nodes (should have been: {:?}).", nodes @@ -293,7 +300,7 @@ where while !*must_exit.borrow() { let mut items = Vec::new(); - for item in self.table.store.range(begin.to_vec()..end.to_vec()) { + for item in self.data.store.range(begin.to_vec()..end.to_vec()) { let (key, value) = item?; items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); @@ -304,12 +311,12 @@ where if items.len() > 0 { let nodes = self - .table + .aux .replication - .write_nodes(&begin, &self.table.system) + .write_nodes(&begin, &self.aux.system) .into_iter() .collect::<Vec<_>>(); - if nodes.contains(&self.table.system.id) { + if nodes.contains(&self.aux.system.id) { warn!("Interrupting offload as partitions seem to have changed"); break; } @@ -340,7 +347,7 @@ where let update_msg = Arc::new(TableRPC::<F>::Update(values)); for res in join_all(nodes.iter().map(|to| { - self.table + self.aux .rpc_client .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) })) @@ -352,7 +359,7 @@ where // All remote nodes have written those items, now we can delete them locally let mut not_removed = 0; for (k, v) in items.iter() { - if !self.table.delete_if_equal(&k[..], &v[..])? { + if !self.data.delete_if_equal(&k[..], &v[..])? { not_removed += 1; } } @@ -399,7 +406,7 @@ where if range.level == 1 { let mut children = vec![]; for item in self - .table + .data .store .range(range.begin.clone()..range.end.clone()) { @@ -516,7 +523,7 @@ where let v = self.range_checksum(&range, must_exit)?; trace!( "({}) New checksum calculated for {}-{}/{}, {} children", - self.table.name, + self.data.name, hex::encode(&range.begin) .chars() .take(16) @@ -553,7 +560,7 @@ where // If their root checksum has level > than us, use that as a reference let root_cks_resp = self - .table + .aux .rpc_client .call( who, @@ -582,7 +589,7 @@ where let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); trace!( "({}) Sync with {:?}: {} ({}) remaining", - self.table.name, + self.data.name, who, todo.len(), total_children @@ -592,7 +599,7 @@ where let step = todo.drain(..step_size).collect::<Vec<_>>(); let rpc_resp = self - .table + .aux .rpc_client .call( who, @@ -606,7 +613,7 @@ where if diff_ranges.len() > 0 || diff_items.len() > 0 { info!( "({}) Sync with {:?}: difference {} ranges, {} items", - self.table.name, + self.data.name, who, diff_ranges.len(), diff_items.len() @@ -622,7 +629,7 @@ where } } if diff_items.len() > 0 { - self.table.handle_update(&diff_items[..])?; + self.data.update_many(&diff_items[..])?; } if items_to_send.len() > 0 { self.send_items(who, items_to_send).await?; @@ -640,19 +647,19 @@ where async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", - self.table.name, + self.data.name, item_list.len(), who ); let mut values = vec![]; for item in item_list.iter() { - if let Some(v) = self.table.store.get(&item[..])? { + if let Some(v) = self.data.store.get(&item[..])? { values.push(Arc::new(ByteBuf::from(v.as_ref()))); } } let rpc_resp = self - .table + .aux .rpc_client .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT) .await?; @@ -714,7 +721,7 @@ where ret_ranges.push(their_range.clone()); if their_range.level == 0 { if let Some(item_bytes) = - self.table.store.get(their_range.begin.as_slice())? + self.data.store.get(their_range.begin.as_slice())? { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } @@ -738,7 +745,7 @@ where } if our_range.level == 0 { if let Some(item_bytes) = - self.table.store.get(our_range.begin.as_slice())? + self.data.store.get(our_range.begin.as_slice())? { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } @@ -753,7 +760,7 @@ where if ret_ranges.len() > 0 || ret_items.len() > 0 { trace!( "({}) Checksum comparison RPC: {} different + {} items for {} received", - self.table.name, + self.data.name, ret_ranges.len(), ret_items.len(), n_checksums @@ -782,13 +789,13 @@ where } impl SyncTodo { - fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) { - let my_id = table.system.id; + fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, data: &TableData<F>, aux: &TableAux<F, R>) { + let my_id = aux.system.id; self.todo.clear(); - let ring = table.system.ring.borrow().clone(); - let split_points = table.replication.split_points(&ring); + let ring = aux.system.ring.borrow().clone(); + let split_points = aux.replication.split_points(&ring); for i in 0..split_points.len() - 1 { let begin = split_points[i]; @@ -797,12 +804,12 @@ impl SyncTodo { continue; } - let nodes = table.replication.replication_nodes(&begin, &ring); + let nodes = aux.replication.replication_nodes(&begin, &ring); let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if table.store.range(begin..end).next().is_none() { + if data.store.range(begin..end).next().is_none() { continue; } } @@ -813,25 +820,25 @@ impl SyncTodo { fn add_ring_difference<F: TableSchema, R: TableReplication>( &mut self, - table: &Table<F, R>, old_ring: &Ring, new_ring: &Ring, + data: &TableData<F>, aux: &TableAux<F, R>, ) { - let my_id = table.system.id; + let my_id = aux.system.id; // If it is us who are entering or leaving the system, // initiate a full sync instead of incremental sync if old_ring.config.members.contains_key(&my_id) != new_ring.config.members.contains_key(&my_id) { - self.add_full_scan(table); + self.add_full_scan(data, aux); return; } let mut all_points = None .into_iter() - .chain(table.replication.split_points(old_ring).drain(..)) - .chain(table.replication.split_points(new_ring).drain(..)) + .chain(aux.replication.split_points(old_ring).drain(..)) + .chain(aux.replication.split_points(new_ring).drain(..)) .chain(self.todo.iter().map(|x| x.begin)) .chain(self.todo.iter().map(|x| x.end)) .collect::<Vec<_>>(); @@ -845,11 +852,11 @@ impl SyncTodo { for i in 0..all_points.len() - 1 { let begin = all_points[i]; let end = all_points[i + 1]; - let was_ours = table + let was_ours = aux .replication .replication_nodes(&begin, &old_ring) .contains(&my_id); - let is_ours = table + let is_ours = aux .replication .replication_nodes(&begin, &new_ring) .contains(&my_id); |