diff options
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 134 |
1 files changed, 37 insertions, 97 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index 9c148393..f5c2ef33 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -1,5 +1,4 @@ use std::collections::VecDeque; -use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -15,7 +14,7 @@ use garage_util::data::*; use garage_util::error::Error; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; +use garage_rpc::ring::*; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; @@ -38,20 +37,10 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> { rpc_client: Arc<RpcClient<SyncRPC>>, } -type RootCk = Vec<(MerklePartition, Hash)>; - -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct PartitionRange { - begin: MerklePartition, - // if end is None, go all the way to partition 0xFFFF included - end: Option<MerklePartition>, -} - #[derive(Serialize, Deserialize)] pub(crate) enum SyncRPC { - RootCkHash(PartitionRange, Hash), - RootCkList(PartitionRange, RootCk), - CkNoDifference, + RootCkHash(Partition, Hash), + RootCkDifferent(bool), GetNode(MerkleNodeKey), Node(MerkleNodeKey, MerkleNode), Items(Vec<Arc<ByteBuf>>), @@ -66,7 +55,9 @@ struct SyncTodo { #[derive(Debug, Clone)] struct TodoPartition { - range: PartitionRange, + partition: Partition, + begin: Hash, + end: Hash, // Are we a node that stores this partition or not? retain: bool, @@ -222,7 +213,7 @@ where let nodes = self .data .replication - .write_nodes(&hash_of_merkle_partition(partition.range.begin)) + .write_nodes(&partition.begin) .into_iter() .filter(|node| *node != my_id) .collect::<Vec<_>>(); @@ -254,8 +245,8 @@ where } } else { self.offload_partition( - &hash_of_merkle_partition(partition.range.begin), - &hash_of_merkle_partition_opt(partition.range.end), + &partition.begin, + &partition.end, must_exit, ) .await?; @@ -364,30 +355,13 @@ where // side to the other will happen when the other side syncs with us, // which they also do regularly. - fn get_root_ck(&self, range: PartitionRange) -> Result<RootCk, Error> { - let begin = u16::from_be_bytes(range.begin); - let range_iter = match range.end { - Some(end) => { - let end = u16::from_be_bytes(end); - begin..=(end - 1) - } - None => begin..=0xFFFF, + fn get_root_ck(&self, partition: Partition) -> Result<(MerkleNodeKey, MerkleNode), Error> { + let key = MerkleNodeKey { + partition, + prefix: vec![], }; - - let mut ret = vec![]; - for i in range_iter { - let key = MerkleNodeKey { - partition: u16::to_be_bytes(i), - prefix: vec![], - }; - match self.merkle.read_node(&key)? { - MerkleNode::Empty => (), - x => { - ret.push((key.partition, hash_of(&x)?)); - } - } - } - Ok(ret) + let node = self.merkle.read_node(&key)?; + Ok((key, node)) } async fn do_sync_with( @@ -396,7 +370,7 @@ where who: UUID, must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { - let root_ck = self.get_root_ck(partition.range)?; + let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?; if root_ck.is_empty() { debug!( "({}) Sync {:?} with {:?}: partition is empty.", @@ -404,51 +378,29 @@ where ); return Ok(()); } + let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?; - let root_ck_hash = hash_of(&root_ck)?; - - // If their root checksum has level > than us, use that as a reference + // Check if they have the same root checksum + // If so, do nothing. let root_resp = self .rpc_client .call( who, - SyncRPC::RootCkHash(partition.range, root_ck_hash), + SyncRPC::RootCkHash(partition.partition, root_ck_hash), TABLE_SYNC_RPC_TIMEOUT, ) .await?; let mut todo = match root_resp { - SyncRPC::CkNoDifference => { + SyncRPC::RootCkDifferent(false) => { debug!( "({}) Sync {:?} with {:?}: no difference", self.data.name, partition, who ); return Ok(()); } - SyncRPC::RootCkList(_, their_root_ck) => { - let join = join_ordered(&root_ck[..], &their_root_ck[..]); - let mut todo = VecDeque::new(); - for (p, v1, v2) in join.iter() { - let diff = match (v1, v2) { - (Some(_), None) | (None, Some(_)) => true, - (Some(a), Some(b)) => a != b, - _ => false, - }; - if diff { - todo.push_back(MerkleNodeKey { - partition: **p, - prefix: vec![], - }); - } - } - debug!( - "({}) Sync {:?} with {:?}: todo.len() = {}", - self.data.name, - partition, - who, - todo.len() - ); - todo + SyncRPC::RootCkDifferent(true) => { + VecDeque::from(vec![root_ck_key]) } x => { return Err(Error::Message(format!( @@ -565,13 +517,9 @@ where async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> { match message { SyncRPC::RootCkHash(range, h) => { - let root_ck = self.get_root_ck(*range)?; - let hash = hash_of(&root_ck)?; - if hash == *h { - Ok(SyncRPC::CkNoDifference) - } else { - Ok(SyncRPC::RootCkList(*range, root_ck)) - } + let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; + let hash = hash_of::<MerkleNode>(&root_ck)?; + Ok(SyncRPC::RootCkDifferent(hash != *h)) } SyncRPC::GetNode(k) => { let node = self.merkle.read_node(&k)?; @@ -596,39 +544,31 @@ impl SyncTodo { self.todo.clear(); - let ring = system.ring.borrow().clone(); - let split_points = data.replication.split_points(&ring); + let partitions = data.replication.partitions(); - for i in 0..split_points.len() { - let begin: MerklePartition = { - let b = split_points[i]; - assert_eq!(b.as_slice()[2..], [0u8; 30][..]); - b.as_slice()[..2].try_into().unwrap() - }; + for i in 0..partitions.len() { + let begin = partitions[i].1; - let end: Option<MerklePartition> = if i + 1 < split_points.len() { - let e = split_points[i + 1]; - assert_eq!(e.as_slice()[2..], [0u8; 30][..]); - Some(e.as_slice()[..2].try_into().unwrap()) + let end = if i + 1 < partitions.len() { + partitions[i+1].1 } else { - None + [0xFFu8; 32].into() }; - let begin_hash = hash_of_merkle_partition(begin); - let end_hash = hash_of_merkle_partition_opt(end); - - let nodes = data.replication.write_nodes(&begin_hash); + let nodes = data.replication.write_nodes(&begin); let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if data.store.range(begin_hash..end_hash).next().is_none() { + if data.store.range(begin..end).next().is_none() { continue; } } self.todo.push(TodoPartition { - range: PartitionRange { begin, end }, + partition: partitions[i].0, + begin, + end, retain, }); } |