diff options
author | Alex Auvolat <alex@adnab.me> | 2021-03-11 18:28:03 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-03-11 18:28:27 +0100 |
commit | 046b649bcc3b147140fc2b0af0e071d3dd1b2c8d (patch) | |
tree | f23ea651237ffce9f3aca3a2e7d32c9aab56b450 /src/table | |
parent | 94f3d287742ff90f179f528421c690b00b71a912 (diff) | |
download | garage-046b649bcc3b147140fc2b0af0e071d3dd1b2c8d.tar.gz garage-046b649bcc3b147140fc2b0af0e071d3dd1b2c8d.zip |
(not well tested) use merkle tree for sync
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/data.rs | 15 | ||||
-rw-r--r-- | src/table/lib.rs | 4 | ||||
-rw-r--r-- | src/table/merkle.rs | 107 | ||||
-rw-r--r-- | src/table/replication/fullcopy.rs | 1 | ||||
-rw-r--r-- | src/table/replication/sharded.rs | 6 | ||||
-rw-r--r-- | src/table/sync.rs | 632 | ||||
-rw-r--r-- | src/table/table.rs | 50 | ||||
-rw-r--r-- | src/table/table_sync.rs | 898 |
8 files changed, 752 insertions, 961 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index fa89fc27..6217bf6d 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -1,16 +1,16 @@ use std::sync::Arc; use log::warn; -use sled::Transactional; use serde_bytes::ByteBuf; +use sled::Transactional; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; -use garage_util::background::BackgroundRunner; -use crate::schema::*; -use crate::merkle::*; use crate::crdt::CRDT; +use crate::merkle::*; +use crate::schema::*; pub struct TableData<F: TableSchema> { pub name: String, @@ -20,7 +20,10 @@ pub struct TableData<F: TableSchema> { pub(crate) merkle_updater: Arc<MerkleUpdater>, } -impl<F> TableData<F> where F: TableSchema { +impl<F> TableData<F> +where + F: TableSchema, +{ pub fn new( name: String, instance: F, @@ -45,7 +48,7 @@ impl<F> TableData<F> where F: TableSchema { merkle_tree_store, ); - Arc::new(Self{ + Arc::new(Self { name, instance, store, diff --git a/src/table/lib.rs b/src/table/lib.rs index bb249a56..18c29c35 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -7,11 +7,11 @@ pub mod crdt; pub mod schema; pub mod util; +pub mod data; pub mod merkle; pub mod replication; -pub mod data; +pub mod sync; pub mod table; -pub mod table_sync; pub use schema::*; pub use table::*; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index ef197dc8..92c18e09 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -15,6 +15,19 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +pub type MerklePartition = [u8; 2]; + +pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash { + let mut partition_pos = [0u8; 32]; + partition_pos[0..2].copy_from_slice(&p[..]); + partition_pos.into() +} + +pub fn hash_of_merkle_partition_opt(p: Option<MerklePartition>) -> Hash { + p.map(hash_of_merkle_partition) + .unwrap_or([0xFFu8; 32].into()) +} + // This modules partitions the data in 2**16 partitions, based on the top // 16 bits (two bytes) of item's partition keys' hashes. // It builds one Merkle tree for each of these 2**16 partitions. @@ -37,10 +50,10 @@ pub(crate) struct MerkleUpdater { empty_node_hash: Hash, } -#[derive(Clone)] +#[derive(Clone, Serialize, Deserialize)] pub struct MerkleNodeKey { // partition: first 16 bits (two bytes) of the partition_key's hash - pub partition: [u8; 2], + pub partition: MerklePartition, // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) pub prefix: Vec<u8>, @@ -214,8 +227,11 @@ impl MerkleUpdater { // insertion and replace current node by an intermediary node let (pos1, h1) = { let key2 = key.next_key(blake2sum(&exlf_key[..])); - let subhash = - self.put_node_txn(tx, &key2, &MerkleNode::Leaf(exlf_key, exlf_hash))?; + let subhash = self.put_node_txn( + tx, + &key2, + &MerkleNode::Leaf(exlf_key, exlf_hash), + )?; (key2.prefix[i], subhash) }; let (pos2, h2) = { @@ -280,14 +296,11 @@ impl MerkleUpdater { } // Access a node in the Merkle tree, used by the sync protocol - pub(crate) fn read_node( - &self, - k: &MerkleNodeKey, - ) -> Result<MerkleNode, Error> { + pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> { let ent = self.merkle_tree.get(k.encode())?; match ent { None => Ok(MerkleNode::Empty), - Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?) + Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), } } } @@ -339,31 +352,77 @@ fn intermediate_rm_child(ch: &mut Vec<(u8, Hash)>, pos: u8) { #[test] fn test_intermediate_aux() { let mut v = vec![]; - + intermediate_set_child(&mut v, 12u8, [12u8; 32].into()); - assert!(v == vec![(12u8, [12u8; 32].into())]); - + assert_eq!(v, vec![(12u8, [12u8; 32].into())]); + intermediate_set_child(&mut v, 42u8, [42u8; 32].into()); - assert!(v == vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]); - + assert_eq!( + v, + vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())] + ); + intermediate_set_child(&mut v, 4u8, [4u8; 32].into()); - assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]); - + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (12u8, [12u8; 32].into()), + (42u8, [42u8; 32].into()) + ] + ); + intermediate_set_child(&mut v, 12u8, [8u8; 32].into()); - assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]); - + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (12u8, [8u8; 32].into()), + (42u8, [42u8; 32].into()) + ] + ); + intermediate_set_child(&mut v, 6u8, [6u8; 32].into()); - assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [6u8; 32].into()), + (12u8, [8u8; 32].into()), + (42u8, [42u8; 32].into()) + ] + ); intermediate_rm_child(&mut v, 42u8); - assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [6u8; 32].into()), + (12u8, [8u8; 32].into()) + ] + ); intermediate_rm_child(&mut v, 11u8); - assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [6u8; 32].into()), + (12u8, [8u8; 32].into()) + ] + ); intermediate_rm_child(&mut v, 6u8); - assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]); - + assert_eq!(v, vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]); + intermediate_set_child(&mut v, 6u8, [7u8; 32].into()); - assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [7u8; 32].into()), (12u8, [8u8; 32].into())]); + assert_eq!( + v, + vec![ + (4u8, [4u8; 32].into()), + (6u8, [7u8; 32].into()), + (12u8, [8u8; 32].into()) + ] + ); } diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index a62a6c3c..a20f20b7 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -53,7 +53,6 @@ impl TableReplication for TableFullReplication { fn split_points(&self, _ring: &Ring) -> Vec<Hash> { let mut ret = vec![]; ret.push([0u8; 32].into()); - ret.push([0xFFu8; 32].into()); ret } } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 42a742cd..886c7c08 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -44,11 +44,13 @@ impl TableReplication for TableShardedReplication { fn split_points(&self, ring: &Ring) -> Vec<Hash> { let mut ret = vec![]; - ret.push([0u8; 32].into()); for entry in ring.ring.iter() { ret.push(entry.location); } - ret.push([0xFFu8; 32].into()); + if ret.len() > 0 { + assert_eq!(ret[0], [0u8; 32].into()); + } + ret } } diff --git a/src/table/sync.rs b/src/table/sync.rs new file mode 100644 index 00000000..9c37c286 --- /dev/null +++ b/src/table/sync.rs @@ -0,0 +1,632 @@ +use std::collections::VecDeque; +use std::convert::TryInto; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use futures::future::join_all; +use futures::{pin_mut, select}; +use futures_util::future::*; +use futures_util::stream::*; +use rand::Rng; +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; +use tokio::sync::{mpsc, watch}; + +use garage_rpc::ring::Ring; +use garage_util::data::*; +use garage_util::error::Error; + +use crate::data::*; +use crate::merkle::*; +use crate::replication::*; +use crate::*; + +const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); + +// Do anti-entropy every 10 minutes +const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); + +pub struct TableSyncer<F: TableSchema, R: TableReplication> { + data: Arc<TableData<F>>, + aux: Arc<TableAux<F, R>>, + + todo: Mutex<SyncTodo>, +} + +type RootCk = Vec<(MerklePartition, Hash)>; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub struct PartitionRange { + begin: MerklePartition, + // if end is None, go all the way to partition 0xFFFF included + end: Option<MerklePartition>, +} + +#[derive(Serialize, Deserialize)] +pub(crate) enum SyncRPC { + RootCkHash(PartitionRange, Hash), + RootCkList(PartitionRange, RootCk), + CkNoDifference, + GetNode(MerkleNodeKey), + Node(MerkleNodeKey, MerkleNode), + Items(Vec<Arc<ByteBuf>>), +} + +struct SyncTodo { + todo: Vec<TodoPartition>, +} + +#[derive(Debug, Clone)] +struct TodoPartition { + range: PartitionRange, + + // Are we a node that stores this partition or not? + retain: bool, +} + +impl<F, R> TableSyncer<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + pub(crate) fn launch(data: Arc<TableData<F>>, aux: Arc<TableAux<F, R>>) -> Arc<Self> { + let todo = SyncTodo { todo: vec![] }; + + let syncer = Arc::new(Self { + data: data.clone(), + aux: aux.clone(), + todo: Mutex::new(todo), + }); + + let (busy_tx, busy_rx) = mpsc::unbounded_channel(); + + let s1 = syncer.clone(); + aux.system.background.spawn_worker( + format!("table sync watcher for {}", data.name), + move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), + ); + + let s2 = syncer.clone(); + aux.system.background.spawn_worker( + format!("table syncer for {}", data.name), + move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), + ); + + let s3 = syncer.clone(); + tokio::spawn(async move { + tokio::time::delay_for(Duration::from_secs(20)).await; + s3.add_full_sync(); + }); + + syncer + } + + async fn watcher_task( + self: Arc<Self>, + mut must_exit: watch::Receiver<bool>, + mut busy_rx: mpsc::UnboundedReceiver<bool>, + ) -> Result<(), Error> { + let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone(); + let mut nothing_to_do_since = Some(Instant::now()); + + while !*must_exit.borrow() { + let s_ring_recv = ring_recv.recv().fuse(); + let s_busy = busy_rx.recv().fuse(); + let s_must_exit = must_exit.recv().fuse(); + let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse(); + pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout); + + select! { + new_ring_r = s_ring_recv => { + if new_ring_r.is_some() { + debug!("({}) Adding ring difference to syncer todo list", self.data.name); + self.add_full_sync(); + } + } + busy_opt = s_busy => { + if let Some(busy) = busy_opt { + if busy { + nothing_to_do_since = None; + } else { + if nothing_to_do_since.is_none() { + nothing_to_do_since = Some(Instant::now()); + } + } + } + } + must_exit_v = s_must_exit => { + if must_exit_v.unwrap_or(false) { + break; + } + } + _ = s_timeout => { + if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { + nothing_to_do_since = None; + debug!("({}) Adding full sync to syncer todo list", self.data.name); + self.add_full_sync(); + } + } + } + } + Ok(()) + } + + pub fn add_full_sync(&self) { + self.todo + .lock() + .unwrap() + .add_full_sync(&self.data, &self.aux); + } + + async fn syncer_task( + self: Arc<Self>, + mut must_exit: watch::Receiver<bool>, + busy_tx: mpsc::UnboundedSender<bool>, + ) -> Result<(), Error> { + while !*must_exit.borrow() { + let task = self.todo.lock().unwrap().pop_task(); + if let Some(partition) = task { + busy_tx.send(true)?; + let res = self + .clone() + .sync_partition(&partition, &mut must_exit) + .await; + if let Err(e) = res { + warn!( + "({}) Error while syncing {:?}: {}", + self.data.name, partition, e + ); + } + } else { + busy_tx.send(false)?; + tokio::time::delay_for(Duration::from_secs(1)).await; + } + } + Ok(()) + } + + async fn sync_partition( + self: Arc<Self>, + partition: &TodoPartition, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<(), Error> { + if partition.retain { + let my_id = self.aux.system.id; + + let nodes = self + .aux + .replication + .write_nodes( + &hash_of_merkle_partition(partition.range.begin), + &self.aux.system, + ) + .into_iter() + .filter(|node| *node != my_id) + .collect::<Vec<_>>(); + + debug!( + "({}) Syncing {:?} with {:?}...", + self.data.name, partition, nodes + ); + let mut sync_futures = nodes + .iter() + .map(|node| { + self.clone() + .do_sync_with(partition.clone(), *node, must_exit.clone()) + }) + .collect::<FuturesUnordered<_>>(); + + let mut n_errors = 0; + while let Some(r) = sync_futures.next().await { + if let Err(e) = r { + n_errors += 1; + warn!("({}) Sync error: {}", self.data.name, e); + } + } + if n_errors > self.aux.replication.max_write_errors() { + return Err(Error::Message(format!( + "Sync failed with too many nodes (should have been: {:?}).", + nodes + ))); + } + } else { + self.offload_partition( + &hash_of_merkle_partition(partition.range.begin), + &hash_of_merkle_partition_opt(partition.range.end), + must_exit, + ) + .await?; + } + + Ok(()) + } + + // Offload partition: this partition is not something we are storing, + // so send it out to all other nodes that store it and delete items locally. + // We don't bother checking if the remote nodes already have the items, + // we just batch-send everything. Offloading isn't supposed to happen very often. + // If any of the nodes that are supposed to store the items is unable to + // save them, we interrupt the process. + async fn offload_partition( + self: &Arc<Self>, + begin: &Hash, + end: &Hash, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<(), Error> { + let mut counter: usize = 0; + + while !*must_exit.borrow() { + let mut items = Vec::new(); + + for item in self.data.store.range(begin.to_vec()..end.to_vec()) { + let (key, value) = item?; + items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); + + if items.len() >= 1024 { + break; + } + } + + if items.len() > 0 { + let nodes = self + .aux + .replication + .write_nodes(&begin, &self.aux.system) + .into_iter() + .collect::<Vec<_>>(); + if nodes.contains(&self.aux.system.id) { + warn!("Interrupting offload as partitions seem to have changed"); + break; + } + + counter += 1; + debug!( + "Offloading {} items from {:?}..{:?} ({})", + items.len(), + begin, + end, + counter + ); + self.offload_items(&items, &nodes[..]).await?; + } else { + break; + } + } + + Ok(()) + } + + async fn offload_items( + self: &Arc<Self>, + items: &Vec<(Vec<u8>, Arc<ByteBuf>)>, + nodes: &[UUID], + ) -> Result<(), Error> { + let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); + let update_msg = Arc::new(TableRPC::<F>::Update(values)); + + for res in join_all(nodes.iter().map(|to| { + self.aux + .rpc_client + .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) + })) + .await + { + res?; + } + + // All remote nodes have written those items, now we can delete them locally + let mut not_removed = 0; + for (k, v) in items.iter() { + if !self.data.delete_if_equal(&k[..], &v[..])? { + not_removed += 1; + } + } + + if not_removed > 0 { + debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed); + } + + Ok(()) + } + + // ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ====== + + fn get_root_ck(&self, range: PartitionRange) -> Result<RootCk, Error> { + let begin = u16::from_be_bytes(range.begin); + let range_iter = match range.end { + Some(end) => { + let end = u16::from_be_bytes(end); + begin..=(end - 1) + } + None => begin..=0xFFFF, + }; + + let mut ret = vec![]; + for i in range_iter { + let key = MerkleNodeKey { + partition: u16::to_be_bytes(i), + prefix: vec![], + }; + match self.data.merkle_updater.read_node(&key)? { + MerkleNode::Empty => (), + x => { + ret.push((key.partition, hash_of(&x)?)); + } + } + } + Ok(ret) + } + + async fn do_sync_with( + self: Arc<Self>, + partition: TodoPartition, + who: UUID, + must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { + let root_ck = self.get_root_ck(partition.range)?; + let root_ck_hash = hash_of(&root_ck)?; + + // If their root checksum has level > than us, use that as a reference + let root_resp = self + .aux + .rpc_client + .call( + who, + TableRPC::<F>::SyncRPC(SyncRPC::RootCkHash(partition.range, root_ck_hash)), + TABLE_SYNC_RPC_TIMEOUT, + ) + .await?; + + let mut todo = match root_resp { + TableRPC::<F>::SyncRPC(SyncRPC::CkNoDifference) => { + debug!( + "({}) Sync {:?} with {:?}: no difference", + self.data.name, partition, who + ); + return Ok(()); + } + TableRPC::<F>::SyncRPC(SyncRPC::RootCkList(_, their_root_ck)) => { + let join = join_ordered(&root_ck[..], &their_root_ck[..]); + let mut todo = VecDeque::new(); + for (p, v1, v2) in join.iter() { + let diff = match (v1, v2) { + (Some(_), None) | (None, Some(_)) => true, + (Some(a), Some(b)) => a != b, + _ => false, + }; + if diff { + todo.push_back(MerkleNodeKey { + partition: **p, + prefix: vec![], + }); + } + } + debug!( + "({}) Sync {:?} with {:?}: todo.len() = {}", + self.data.name, + partition, + who, + todo.len() + ); + todo + } + x => { + return Err(Error::Message(format!( + "Invalid respone to RootCkHash RPC: {}", + debug_serialize(x) + ))); + } + }; + + let mut todo_items = vec![]; + + while !todo.is_empty() && !*must_exit.borrow() { + let key = todo.pop_front().unwrap(); + let node = self.data.merkle_updater.read_node(&key)?; + + match node { + MerkleNode::Empty => { + // They have items we don't have. + // We don't request those items from them, they will send them. + // We only bother with pushing items that differ + } + MerkleNode::Leaf(ik, _) => { + // Just send that item directly + if let Some(val) = self.data.store.get(ik)? { + todo_items.push(val.to_vec()); + } + } + MerkleNode::Intermediate(l) => { + let remote_node = match self + .aux + .rpc_client + .call( + who, + TableRPC::<F>::SyncRPC(SyncRPC::GetNode(key.clone())), + TABLE_SYNC_RPC_TIMEOUT, + ) + .await? + { + TableRPC::<F>::SyncRPC(SyncRPC::Node(_, node)) => node, + x => { + return Err(Error::Message(format!( + "Invalid respone to GetNode RPC: {}", + debug_serialize(x) + ))); + } + }; + let int_l2 = match remote_node { + MerkleNode::Intermediate(l2) => l2, + _ => vec![], + }; + + let join = join_ordered(&l[..], &int_l2[..]); + for (p, v1, v2) in join.into_iter() { + let diff = match (v1, v2) { + (Some(_), None) | (None, Some(_)) => true, + (Some(a), Some(b)) => a != b, + _ => false, + }; + if diff { + todo.push_back(key.add_byte(*p)); + } + } + } + } + + if todo_items.len() >= 256 { + self.send_items(who, std::mem::replace(&mut todo_items, vec![])) + .await?; + } + } + + if !todo_items.is_empty() { + self.send_items(who, todo_items).await?; + } + + Ok(()) + } + + async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { + info!( + "({}) Sending {} items to {:?}", + self.data.name, + item_list.len(), + who + ); + + let mut values = vec![]; + for item in item_list.iter() { + if let Some(v) = self.data.store.get(&item[..])? { + values.push(Arc::new(ByteBuf::from(v.as_ref()))); + } + } + let rpc_resp = self + .aux + .rpc_client + .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT) + .await?; + if let TableRPC::<F>::Ok = rpc_resp { + Ok(()) + } else { + Err(Error::Message(format!( + "Unexpected response to RPC Update: {}", + debug_serialize(&rpc_resp) + ))) + } + } + + // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== + + pub(crate) async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> { + match message { + SyncRPC::RootCkHash(range, h) => { + let root_ck = self.get_root_ck(*range)?; + let hash = hash_of(&root_ck)?; + if hash == *h { + Ok(SyncRPC::CkNoDifference) + } else { + Ok(SyncRPC::RootCkList(*range, root_ck)) + } + } + SyncRPC::GetNode(k) => { + let node = self.data.merkle_updater.read_node(&k)?; + Ok(SyncRPC::Node(k.clone(), node)) + } + _ => Err(Error::Message(format!("Unexpected sync RPC"))), + } + } +} + +impl SyncTodo { + fn add_full_sync<F: TableSchema, R: TableReplication>( + &mut self, + data: &TableData<F>, + aux: &TableAux<F, R>, + ) { + let my_id = aux.system.id; + + self.todo.clear(); + + let ring = aux.system.ring.borrow().clone(); + let split_points = aux.replication.split_points(&ring); + + for i in 0..split_points.len() { + let begin: MerklePartition = { + let b = split_points[i]; + assert_eq!(b.as_slice()[2..], [0u8; 30][..]); + b.as_slice()[..2].try_into().unwrap() + }; + + let end: Option<MerklePartition> = if i + 1 < split_points.len() { + let e = split_points[i + 1]; + assert_eq!(e.as_slice()[2..], [0u8; 30][..]); + Some(e.as_slice()[..2].try_into().unwrap()) + } else { + None + }; + + let begin_hash = hash_of_merkle_partition(begin); + let end_hash = hash_of_merkle_partition_opt(end); + + let nodes = aux.replication.replication_nodes(&begin_hash, &ring); + + let retain = nodes.contains(&my_id); + if !retain { + // Check if we have some data to send, otherwise skip + if data.store.range(begin_hash..end_hash).next().is_none() { + continue; + } + } + + self.todo.push(TodoPartition { + range: PartitionRange { begin, end }, + retain, + }); + } + } + + fn pop_task(&mut self) -> Option<TodoPartition> { + if self.todo.is_empty() { + return None; + } + + let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len()); + if i == self.todo.len() - 1 { + self.todo.pop() + } else { + let replacement = self.todo.pop().unwrap(); + let ret = std::mem::replace(&mut self.todo[i], replacement); + Some(ret) + } + } +} + +fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> { + Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) +} + +fn join_ordered<'a, K: Ord + Eq, V1, V2>( + x: &'a [(K, V1)], + y: &'a [(K, V2)], +) -> Vec<(&'a K, Option<&'a V1>, Option<&'a V2>)> { + let mut ret = vec![]; + let mut i = 0; + let mut j = 0; + while i < x.len() || j < y.len() { + if i < x.len() && j < y.len() && x[i].0 == y[j].0 { + ret.push((&x[i].0, Some(&x[i].1), Some(&y[j].1))); + i += 1; + j += 1; + } else if i < x.len() && (j == y.len() || x[i].0 < y[j].0) { + ret.push((&x[i].0, Some(&x[i].1), None)); + i += 1; + } else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) { + ret.push((&x[i].0, None, Some(&y[j].1))); + j += 1; + } else { + unreachable!(); + } + } + ret +} diff --git a/src/table/table.rs b/src/table/table.rs index a4cb4b24..516c9358 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -15,9 +15,9 @@ use garage_rpc::rpc_server::*; use crate::crdt::CRDT; use crate::data::*; -use crate::schema::*; -use crate::table_sync::*; use crate::replication::*; +use crate::schema::*; +use crate::sync::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); @@ -50,7 +50,6 @@ pub(crate) enum TableRPC<F: TableSchema> { impl<F: TableSchema> RpcMessage for TableRPC<F> {} - impl<F, R> Table<F, R> where F: TableSchema + 'static, @@ -69,29 +68,17 @@ where let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); - let data = TableData::new( - name, - instance, - db, - system.background.clone(), - ); + let data = TableData::new(name, instance, db, system.background.clone()); - let aux = Arc::new(TableAux{ + let aux = Arc::new(TableAux { system, replication, rpc_client, }); - let syncer = TableSyncer::launch( - data.clone(), - aux.clone(), - ); + let syncer = TableSyncer::launch(data.clone(), aux.clone()); - let table = Arc::new(Self { - data, - aux, - syncer, - }); + let table = Arc::new(Self { data, aux, syncer }); table.clone().register_handler(rpc_server, rpc_path); @@ -106,7 +93,8 @@ where let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = TableRPC::<F>::Update(vec![e_enc]); - self.aux.rpc_client + self.aux + .rpc_client .try_call_many( &who[..], rpc, @@ -135,7 +123,11 @@ where let call_futures = call_list.drain().map(|(node, entries)| async move { let rpc = TableRPC::<F>::Update(entries); - let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + let resp = self + .aux + .rpc_client + .call(node, rpc, TABLE_RPC_TIMEOUT) + .await?; Ok::<_, Error>((node, resp)) }); let mut resps = call_futures.collect::<FuturesUnordered<_>>(); @@ -200,7 +192,8 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.aux.system + self.aux + .system .background .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); } @@ -221,7 +214,8 @@ where let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); let resps = self - .aux.rpc_client + .aux + .rpc_client .try_call_many( &who[..], rpc, @@ -276,7 +270,8 @@ where async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); - self.aux.rpc_client + self.aux + .rpc_client .try_call_many( &who[..], TableRPC::<F>::Update(vec![what_enc]), @@ -296,7 +291,8 @@ where }); let self2 = self.clone(); - self.aux.rpc_client + self.aux + .rpc_client .set_local_handler(self.aux.system.id, move |msg| { let self2 = self2.clone(); async move { self2.handle(&msg).await } @@ -318,9 +314,7 @@ where Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { - let response = self.syncer - .handle_rpc(rpc, self.aux.system.background.stop_signal.clone()) - .await?; + let response = self.syncer.handle_rpc(rpc).await?; Ok(TableRPC::SyncRPC(response)) } _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs deleted file mode 100644 index 7394be1b..00000000 --- a/src/table/table_sync.rs +++ /dev/null @@ -1,898 +0,0 @@ -use rand::Rng; -use std::collections::{BTreeMap, VecDeque}; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; - -use futures::future::join_all; -use futures::{pin_mut, select}; -use futures_util::future::*; -use futures_util::stream::*; -use serde::{Deserialize, Serialize}; -use serde_bytes::ByteBuf; -use tokio::sync::{mpsc, watch}; - -use garage_rpc::ring::Ring; -use garage_util::data::*; -use garage_util::error::Error; - -use crate::*; -use crate::data::*; -use crate::replication::*; - -const MAX_DEPTH: usize = 16; - -const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); - -// Do anti-entropy every 10 minutes -const SCAN_INTERVAL: Duration = Duration::from_secs(10 * 60); - -const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60); - -pub struct TableSyncer<F: TableSchema, R: TableReplication> { - data: Arc<TableData<F>>, - aux: Arc<TableAux<F, R>>, - - todo: Mutex<SyncTodo>, - cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>, -} - -#[derive(Serialize, Deserialize)] -pub(crate) enum SyncRPC { - GetRootChecksumRange(Hash, Hash), - RootChecksumRange(SyncRange), - Checksums(Vec<RangeChecksum>), - Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>), -} - -struct SyncTodo { - todo: Vec<TodoPartition>, -} - -#[derive(Debug, Clone)] -struct TodoPartition { - // Partition consists in hashes between begin included and end excluded - begin: Hash, - end: Hash, - - // Are we a node that stores this partition or not? - retain: bool, -} - -// A SyncRange defines a query on the dataset stored by a node, in the following way: -// - all items whose key are >= `begin` -// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded) -// - except if the first item of the range has such many leading zero bytes -// - and stopping at `end` (excluded) if such an item is not found -// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges" -// i.e. of ranges of level `level-1` that cover the same range -// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin) -// See RangeChecksum for the struct that stores this information. -#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] -pub(crate) struct SyncRange { - begin: Vec<u8>, - end: Vec<u8>, - level: usize, -} - -impl std::cmp::PartialOrd for SyncRange { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} -impl std::cmp::Ord for SyncRange { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.begin - .cmp(&other.begin) - .then(self.level.cmp(&other.level)) - .then(self.end.cmp(&other.end)) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct RangeChecksum { - bounds: SyncRange, - children: Vec<(SyncRange, Hash)>, - found_limit: Option<Vec<u8>>, - - #[serde(skip, default = "std::time::Instant::now")] - time: Instant, -} - -#[derive(Debug, Clone)] -struct RangeChecksumCache { - hash: Option<Hash>, // None if no children - found_limit: Option<Vec<u8>>, - time: Instant, -} - -impl<F, R> TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - pub(crate) fn launch(data: Arc<TableData<F>>, - aux: Arc<TableAux<F, R>>) -> Arc<Self> { - let todo = SyncTodo{ todo: vec![] }; - - let syncer = Arc::new(Self { - data: data.clone(), - aux: aux.clone(), - todo: Mutex::new(todo), - cache: (0..MAX_DEPTH) - .map(|_| Mutex::new(BTreeMap::new())) - .collect::<Vec<_>>(), - }); - - let (busy_tx, busy_rx) = mpsc::unbounded_channel(); - - let s1 = syncer.clone(); - aux.system.background.spawn_worker( - format!("table sync watcher for {}", data.name), - move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), - ); - - let s2 = syncer.clone(); - aux.system.background.spawn_worker( - format!("table syncer for {}", data.name), - move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), - ); - - let s3 = syncer.clone(); - tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(20)).await; - s3.add_full_scan(); - }); - - syncer - } - - async fn watcher_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - mut busy_rx: mpsc::UnboundedReceiver<bool>, - ) -> Result<(), Error> { - let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone(); - let mut nothing_to_do_since = Some(Instant::now()); - - while !*must_exit.borrow() { - let s_ring_recv = ring_recv.recv().fuse(); - let s_busy = busy_rx.recv().fuse(); - let s_must_exit = must_exit.recv().fuse(); - let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse(); - pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout); - - select! { - new_ring_r = s_ring_recv => { - if let Some(new_ring) = new_ring_r { - debug!("({}) Adding ring difference to syncer todo list", self.data.name); - self.todo.lock().unwrap().add_ring_difference(&prev_ring, &new_ring, &self.data, &self.aux); - prev_ring = new_ring; - } - } - busy_opt = s_busy => { - if let Some(busy) = busy_opt { - if busy { - nothing_to_do_since = None; - } else { - if nothing_to_do_since.is_none() { - nothing_to_do_since = Some(Instant::now()); - } - } - } - } - must_exit_v = s_must_exit => { - if must_exit_v.unwrap_or(false) { - break; - } - } - _ = s_timeout => { - if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { - nothing_to_do_since = None; - debug!("({}) Adding full scan to syncer todo list", self.data.name); - self.add_full_scan(); - } - } - } - } - Ok(()) - } - - pub fn add_full_scan(&self) { - self.todo.lock().unwrap().add_full_scan(&self.data, &self.aux); - } - - async fn syncer_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - busy_tx: mpsc::UnboundedSender<bool>, - ) -> Result<(), Error> { - while !*must_exit.borrow() { - let task = self.todo.lock().unwrap().pop_task(); - if let Some(partition) = task { - busy_tx.send(true)?; - let res = self - .clone() - .sync_partition(&partition, &mut must_exit) - .await; - if let Err(e) = res { - warn!( - "({}) Error while syncing {:?}: {}", - self.data.name, partition, e - ); - } - } else { - busy_tx.send(false)?; - tokio::time::delay_for(Duration::from_secs(1)).await; - } - } - Ok(()) - } - - async fn sync_partition( - self: Arc<Self>, - partition: &TodoPartition, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<(), Error> { - if partition.retain { - let my_id = self.aux.system.id; - let nodes = self - .aux - .replication - .write_nodes(&partition.begin, &self.aux.system) - .into_iter() - .filter(|node| *node != my_id) - .collect::<Vec<_>>(); - - debug!( - "({}) Preparing to sync {:?} with {:?}...", - self.data.name, partition, nodes - ); - let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?; - - let mut sync_futures = nodes - .iter() - .map(|node| { - self.clone().do_sync_with( - partition.clone(), - root_cks.clone(), - *node, - must_exit.clone(), - ) - }) - .collect::<FuturesUnordered<_>>(); - - let mut n_errors = 0; - while let Some(r) = sync_futures.next().await { - if let Err(e) = r { - n_errors += 1; - warn!("({}) Sync error: {}", self.data.name, e); - } - } - if n_errors > self.aux.replication.max_write_errors() { - return Err(Error::Message(format!( - "Sync failed with too many nodes (should have been: {:?}).", - nodes - ))); - } - } else { - self.offload_partition(&partition.begin, &partition.end, must_exit) - .await?; - } - - Ok(()) - } - - // Offload partition: this partition is not something we are storing, - // so send it out to all other nodes that store it and delete items locally. - // We don't bother checking if the remote nodes already have the items, - // we just batch-send everything. Offloading isn't supposed to happen very often. - // If any of the nodes that are supposed to store the items is unable to - // save them, we interrupt the process. - async fn offload_partition( - self: &Arc<Self>, - begin: &Hash, - end: &Hash, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<(), Error> { - let mut counter: usize = 0; - - while !*must_exit.borrow() { - let mut items = Vec::new(); - - for item in self.data.store.range(begin.to_vec()..end.to_vec()) { - let (key, value) = item?; - items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); - - if items.len() >= 1024 { - break; - } - } - - if items.len() > 0 { - let nodes = self - .aux - .replication - .write_nodes(&begin, &self.aux.system) - .into_iter() - .collect::<Vec<_>>(); - if nodes.contains(&self.aux.system.id) { - warn!("Interrupting offload as partitions seem to have changed"); - break; - } - - counter += 1; - debug!( - "Offloading {} items from {:?}..{:?} ({})", - items.len(), - begin, - end, - counter - ); - self.offload_items(&items, &nodes[..]).await?; - } else { - break; - } - } - - Ok(()) - } - - async fn offload_items( - self: &Arc<Self>, - items: &Vec<(Vec<u8>, Arc<ByteBuf>)>, - nodes: &[UUID], - ) -> Result<(), Error> { - let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); - let update_msg = Arc::new(TableRPC::<F>::Update(values)); - - for res in join_all(nodes.iter().map(|to| { - self.aux - .rpc_client - .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT) - })) - .await - { - res?; - } - - // All remote nodes have written those items, now we can delete them locally - let mut not_removed = 0; - for (k, v) in items.iter() { - if !self.data.delete_if_equal(&k[..], &v[..])? { - not_removed += 1; - } - } - - if not_removed > 0 { - debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed); - } - - Ok(()) - } - - fn root_checksum( - self: &Arc<Self>, - begin: &Hash, - end: &Hash, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<RangeChecksum, Error> { - for i in 1..MAX_DEPTH { - let rc = self.range_checksum( - &SyncRange { - begin: begin.to_vec(), - end: end.to_vec(), - level: i, - }, - must_exit, - )?; - if rc.found_limit.is_none() { - return Ok(rc); - } - } - Err(Error::Message(format!( - "Unable to compute root checksum (this should never happen)" - ))) - } - - fn range_checksum( - self: &Arc<Self>, - range: &SyncRange, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<RangeChecksum, Error> { - assert!(range.level != 0); - trace!("Call range_checksum {:?}", range); - - if range.level == 1 { - let mut children = vec![]; - for item in self - .data - .store - .range(range.begin.clone()..range.end.clone()) - { - let (key, value) = item?; - let key_hash = blake2sum(&key[..]); - if children.len() > 0 - && key_hash.as_slice()[0..range.level] - .iter() - .all(|x| *x == 0u8) - { - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - return Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: Some(key.to_vec()), - time: Instant::now(), - }); - } - let item_range = SyncRange { - begin: key.to_vec(), - end: vec![], - level: 0, - }; - children.push((item_range, blake2sum(&value[..]))); - } - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: None, - time: Instant::now(), - }) - } else { - let mut children = vec![]; - let mut sub_range = SyncRange { - begin: range.begin.clone(), - end: range.end.clone(), - level: range.level - 1, - }; - let mut time = Instant::now(); - while !*must_exit.borrow() { - let sub_ck = self.range_checksum_cached_hash(&sub_range, must_exit)?; - - if let Some(hash) = sub_ck.hash { - children.push((sub_range.clone(), hash)); - if sub_ck.time < time { - time = sub_ck.time; - } - } - - if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() { - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - return Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: None, - time, - }); - } - let found_limit = sub_ck.found_limit.unwrap(); - - let actual_limit_hash = blake2sum(&found_limit[..]); - if actual_limit_hash.as_slice()[0..range.level] - .iter() - .all(|x| *x == 0u8) - { - trace!( - "range_checksum {:?} returning {} items", - range, - children.len() - ); - return Ok(RangeChecksum { - bounds: range.clone(), - children, - found_limit: Some(found_limit.clone()), - time, - }); - } - - sub_range.begin = found_limit; - } - trace!("range_checksum {:?} exiting due to must_exit", range); - Err(Error::Message(format!("Exiting."))) - } - } - - fn range_checksum_cached_hash( - self: &Arc<Self>, - range: &SyncRange, - must_exit: &mut watch::Receiver<bool>, - ) -> Result<RangeChecksumCache, Error> { - { - let mut cache = self.cache[range.level].lock().unwrap(); - if let Some(v) = cache.get(&range) { - if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT { - return Ok(v.clone()); - } - } - cache.remove(&range); - } - - let v = self.range_checksum(&range, must_exit)?; - trace!( - "({}) New checksum calculated for {}-{}/{}, {} children", - self.data.name, - hex::encode(&range.begin) - .chars() - .take(16) - .collect::<String>(), - hex::encode(&range.end).chars().take(16).collect::<String>(), - range.level, - v.children.len() - ); - - let hash = if v.children.len() > 0 { - Some(blake2sum(&rmp_to_vec_all_named(&v)?[..])) - } else { - None - }; - let cache_entry = RangeChecksumCache { - hash, - found_limit: v.found_limit, - time: v.time, - }; - - let mut cache = self.cache[range.level].lock().unwrap(); - cache.insert(range.clone(), cache_entry.clone()); - Ok(cache_entry) - } - - async fn do_sync_with( - self: Arc<Self>, - partition: TodoPartition, - root_ck: RangeChecksum, - who: UUID, - mut must_exit: watch::Receiver<bool>, - ) -> Result<(), Error> { - let mut todo = VecDeque::new(); - - // If their root checksum has level > than us, use that as a reference - let root_cks_resp = self - .aux - .rpc_client - .call( - who, - TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange( - partition.begin.clone(), - partition.end.clone(), - )), - TABLE_SYNC_RPC_TIMEOUT, - ) - .await?; - if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp { - if range.level > root_ck.bounds.level { - let their_root_range_ck = self.range_checksum(&range, &mut must_exit)?; - todo.push_back(their_root_range_ck); - } else { - todo.push_back(root_ck); - } - } else { - return Err(Error::Message(format!( - "Invalid respone to GetRootChecksumRange RPC: {}", - debug_serialize(root_cks_resp) - ))); - } - - while !todo.is_empty() && !*must_exit.borrow() { - let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); - trace!( - "({}) Sync with {:?}: {} ({}) remaining", - self.data.name, - who, - todo.len(), - total_children - ); - - let step_size = std::cmp::min(16, todo.len()); - let step = todo.drain(..step_size).collect::<Vec<_>>(); - - let rpc_resp = self - .aux - .rpc_client - .call( - who, - TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)), - TABLE_SYNC_RPC_TIMEOUT, - ) - .await?; - if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = - rpc_resp - { - if diff_ranges.len() > 0 || diff_items.len() > 0 { - info!( - "({}) Sync with {:?}: difference {} ranges, {} items", - self.data.name, - who, - diff_ranges.len(), - diff_items.len() - ); - } - let mut items_to_send = vec![]; - for differing in diff_ranges.drain(..) { - if differing.level == 0 { - items_to_send.push(differing.begin); - } else { - let checksum = self.range_checksum(&differing, &mut must_exit)?; - todo.push_back(checksum); - } - } - if diff_items.len() > 0 { - self.data.update_many(&diff_items[..])?; - } - if items_to_send.len() > 0 { - self.send_items(who, items_to_send).await?; - } - } else { - return Err(Error::Message(format!( - "Unexpected response to sync RPC checksums: {}", - debug_serialize(&rpc_resp) - ))); - } - } - Ok(()) - } - - async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { - info!( - "({}) Sending {} items to {:?}", - self.data.name, - item_list.len(), - who - ); - - let mut values = vec![]; - for item in item_list.iter() { - if let Some(v) = self.data.store.get(&item[..])? { - values.push(Arc::new(ByteBuf::from(v.as_ref()))); - } - } - let rpc_resp = self - .aux - .rpc_client - .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT) - .await?; - if let TableRPC::<F>::Ok = rpc_resp { - Ok(()) - } else { - Err(Error::Message(format!( - "Unexpected response to RPC Update: {}", - debug_serialize(&rpc_resp) - ))) - } - } - - pub(crate) async fn handle_rpc( - self: &Arc<Self>, - message: &SyncRPC, - mut must_exit: watch::Receiver<bool>, - ) -> Result<SyncRPC, Error> { - match message { - SyncRPC::GetRootChecksumRange(begin, end) => { - let root_cks = self.root_checksum(&begin, &end, &mut must_exit)?; - Ok(SyncRPC::RootChecksumRange(root_cks.bounds)) - } - SyncRPC::Checksums(checksums) => { - self.handle_checksums_rpc(&checksums[..], &mut must_exit) - .await - } - _ => Err(Error::Message(format!("Unexpected sync RPC"))), - } - } - - async fn handle_checksums_rpc( - self: &Arc<Self>, - checksums: &[RangeChecksum], - must_exit: &mut watch::Receiver<bool>, - ) -> Result<SyncRPC, Error> { - let mut ret_ranges = vec![]; - let mut ret_items = vec![]; - - for their_ckr in checksums.iter() { - let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit)?; - for (their_range, their_hash) in their_ckr.children.iter() { - let differs = match our_ckr - .children - .binary_search_by(|(our_range, _)| our_range.cmp(&their_range)) - { - Err(_) => { - if their_range.level >= 1 { - let cached_hash = - self.range_checksum_cached_hash(&their_range, must_exit)?; - cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true) - } else { - true - } - } - Ok(i) => our_ckr.children[i].1 != *their_hash, - }; - if differs { - ret_ranges.push(their_range.clone()); - if their_range.level == 0 { - if let Some(item_bytes) = - self.data.store.get(their_range.begin.as_slice())? - { - ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); - } - } - } - } - for (our_range, _hash) in our_ckr.children.iter() { - if let Some(their_found_limit) = &their_ckr.found_limit { - if our_range.begin.as_slice() > their_found_limit.as_slice() { - break; - } - } - - let not_present = our_ckr - .children - .binary_search_by(|(their_range, _)| their_range.cmp(&our_range)) - .is_err(); - if not_present { - if our_range.level > 0 { - ret_ranges.push(our_range.clone()); - } - if our_range.level == 0 { - if let Some(item_bytes) = - self.data.store.get(our_range.begin.as_slice())? - { - ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); - } - } - } - } - } - let n_checksums = checksums - .iter() - .map(|x| x.children.len()) - .fold(0, |x, y| x + y); - if ret_ranges.len() > 0 || ret_items.len() > 0 { - trace!( - "({}) Checksum comparison RPC: {} different + {} items for {} received", - self.data.name, - ret_ranges.len(), - ret_items.len(), - n_checksums - ); - } - Ok(SyncRPC::Difference(ret_ranges, ret_items)) - } - - pub(crate) fn invalidate(self: &Arc<Self>, item_key: &[u8]) { - for i in 1..MAX_DEPTH { - let needle = SyncRange { - begin: item_key.to_vec(), - end: vec![], - level: i, - }; - let mut cache = self.cache[i].lock().unwrap(); - if let Some(cache_entry) = cache.range(..=needle).rev().next() { - if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key { - let index = cache_entry.0.clone(); - drop(cache_entry); - cache.remove(&index); - } - } - } - } -} - -impl SyncTodo { - fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, data: &TableData<F>, aux: &TableAux<F, R>) { - let my_id = aux.system.id; - - self.todo.clear(); - - let ring = aux.system.ring.borrow().clone(); - let split_points = aux.replication.split_points(&ring); - - for i in 0..split_points.len() - 1 { - let begin = split_points[i]; - let end = split_points[i + 1]; - if begin == end { - continue; - } - - let nodes = aux.replication.replication_nodes(&begin, &ring); - - let retain = nodes.contains(&my_id); - if !retain { - // Check if we have some data to send, otherwise skip - if data.store.range(begin..end).next().is_none() { - continue; - } - } - - self.todo.push(TodoPartition { begin, end, retain }); - } - } - - fn add_ring_difference<F: TableSchema, R: TableReplication>( - &mut self, - old_ring: &Ring, - new_ring: &Ring, - data: &TableData<F>, aux: &TableAux<F, R>, - ) { - let my_id = aux.system.id; - - // If it is us who are entering or leaving the system, - // initiate a full sync instead of incremental sync - if old_ring.config.members.contains_key(&my_id) - != new_ring.config.members.contains_key(&my_id) - { - self.add_full_scan(data, aux); - return; - } - - let mut all_points = None - .into_iter() - .chain(aux.replication.split_points(old_ring).drain(..)) - .chain(aux.replication.split_points(new_ring).drain(..)) - .chain(self.todo.iter().map(|x| x.begin)) - .chain(self.todo.iter().map(|x| x.end)) - .collect::<Vec<_>>(); - all_points.sort(); - all_points.dedup(); - - let mut old_todo = std::mem::replace(&mut self.todo, vec![]); - old_todo.sort_by(|x, y| x.begin.cmp(&y.begin)); - let mut new_todo = vec![]; - - for i in 0..all_points.len() - 1 { - let begin = all_points[i]; - let end = all_points[i + 1]; - let was_ours = aux - .replication - .replication_nodes(&begin, &old_ring) - .contains(&my_id); - let is_ours = aux - .replication - .replication_nodes(&begin, &new_ring) - .contains(&my_id); - - let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) { - Ok(_) => true, - Err(j) => { - (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end) - || (j < old_todo.len() - && old_todo[j].begin < end && begin < old_todo[j].end) - } - }; - if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { - new_todo.push(TodoPartition { - begin, - end, - retain: is_ours, - }); - } - } - - self.todo = new_todo; - } - - fn pop_task(&mut self) -> Option<TodoPartition> { - if self.todo.is_empty() { - return None; - } - - let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len()); - if i == self.todo.len() - 1 { - self.todo.pop() - } else { - let replacement = self.todo.pop().unwrap(); - let ret = std::mem::replace(&mut self.todo[i], replacement); - Some(ret) - } - } -} |