aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs134
1 files changed, 37 insertions, 97 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 9c148393..f5c2ef33 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -1,5 +1,4 @@
use std::collections::VecDeque;
-use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
@@ -15,7 +14,7 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
+use garage_rpc::ring::*;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
@@ -38,20 +37,10 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> {
rpc_client: Arc<RpcClient<SyncRPC>>,
}
-type RootCk = Vec<(MerklePartition, Hash)>;
-
-#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
-pub struct PartitionRange {
- begin: MerklePartition,
- // if end is None, go all the way to partition 0xFFFF included
- end: Option<MerklePartition>,
-}
-
#[derive(Serialize, Deserialize)]
pub(crate) enum SyncRPC {
- RootCkHash(PartitionRange, Hash),
- RootCkList(PartitionRange, RootCk),
- CkNoDifference,
+ RootCkHash(Partition, Hash),
+ RootCkDifferent(bool),
GetNode(MerkleNodeKey),
Node(MerkleNodeKey, MerkleNode),
Items(Vec<Arc<ByteBuf>>),
@@ -66,7 +55,9 @@ struct SyncTodo {
#[derive(Debug, Clone)]
struct TodoPartition {
- range: PartitionRange,
+ partition: Partition,
+ begin: Hash,
+ end: Hash,
// Are we a node that stores this partition or not?
retain: bool,
@@ -222,7 +213,7 @@ where
let nodes = self
.data
.replication
- .write_nodes(&hash_of_merkle_partition(partition.range.begin))
+ .write_nodes(&partition.begin)
.into_iter()
.filter(|node| *node != my_id)
.collect::<Vec<_>>();
@@ -254,8 +245,8 @@ where
}
} else {
self.offload_partition(
- &hash_of_merkle_partition(partition.range.begin),
- &hash_of_merkle_partition_opt(partition.range.end),
+ &partition.begin,
+ &partition.end,
must_exit,
)
.await?;
@@ -364,30 +355,13 @@ where
// side to the other will happen when the other side syncs with us,
// which they also do regularly.
- fn get_root_ck(&self, range: PartitionRange) -> Result<RootCk, Error> {
- let begin = u16::from_be_bytes(range.begin);
- let range_iter = match range.end {
- Some(end) => {
- let end = u16::from_be_bytes(end);
- begin..=(end - 1)
- }
- None => begin..=0xFFFF,
+ fn get_root_ck(&self, partition: Partition) -> Result<(MerkleNodeKey, MerkleNode), Error> {
+ let key = MerkleNodeKey {
+ partition,
+ prefix: vec![],
};
-
- let mut ret = vec![];
- for i in range_iter {
- let key = MerkleNodeKey {
- partition: u16::to_be_bytes(i),
- prefix: vec![],
- };
- match self.merkle.read_node(&key)? {
- MerkleNode::Empty => (),
- x => {
- ret.push((key.partition, hash_of(&x)?));
- }
- }
- }
- Ok(ret)
+ let node = self.merkle.read_node(&key)?;
+ Ok((key, node))
}
async fn do_sync_with(
@@ -396,7 +370,7 @@ where
who: UUID,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
- let root_ck = self.get_root_ck(partition.range)?;
+ let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
if root_ck.is_empty() {
debug!(
"({}) Sync {:?} with {:?}: partition is empty.",
@@ -404,51 +378,29 @@ where
);
return Ok(());
}
+ let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?;
- let root_ck_hash = hash_of(&root_ck)?;
-
- // If their root checksum has level > than us, use that as a reference
+ // Check if they have the same root checksum
+ // If so, do nothing.
let root_resp = self
.rpc_client
.call(
who,
- SyncRPC::RootCkHash(partition.range, root_ck_hash),
+ SyncRPC::RootCkHash(partition.partition, root_ck_hash),
TABLE_SYNC_RPC_TIMEOUT,
)
.await?;
let mut todo = match root_resp {
- SyncRPC::CkNoDifference => {
+ SyncRPC::RootCkDifferent(false) => {
debug!(
"({}) Sync {:?} with {:?}: no difference",
self.data.name, partition, who
);
return Ok(());
}
- SyncRPC::RootCkList(_, their_root_ck) => {
- let join = join_ordered(&root_ck[..], &their_root_ck[..]);
- let mut todo = VecDeque::new();
- for (p, v1, v2) in join.iter() {
- let diff = match (v1, v2) {
- (Some(_), None) | (None, Some(_)) => true,
- (Some(a), Some(b)) => a != b,
- _ => false,
- };
- if diff {
- todo.push_back(MerkleNodeKey {
- partition: **p,
- prefix: vec![],
- });
- }
- }
- debug!(
- "({}) Sync {:?} with {:?}: todo.len() = {}",
- self.data.name,
- partition,
- who,
- todo.len()
- );
- todo
+ SyncRPC::RootCkDifferent(true) => {
+ VecDeque::from(vec![root_ck_key])
}
x => {
return Err(Error::Message(format!(
@@ -565,13 +517,9 @@ where
async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
match message {
SyncRPC::RootCkHash(range, h) => {
- let root_ck = self.get_root_ck(*range)?;
- let hash = hash_of(&root_ck)?;
- if hash == *h {
- Ok(SyncRPC::CkNoDifference)
- } else {
- Ok(SyncRPC::RootCkList(*range, root_ck))
- }
+ let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
+ let hash = hash_of::<MerkleNode>(&root_ck)?;
+ Ok(SyncRPC::RootCkDifferent(hash != *h))
}
SyncRPC::GetNode(k) => {
let node = self.merkle.read_node(&k)?;
@@ -596,39 +544,31 @@ impl SyncTodo {
self.todo.clear();
- let ring = system.ring.borrow().clone();
- let split_points = data.replication.split_points(&ring);
+ let partitions = data.replication.partitions();
- for i in 0..split_points.len() {
- let begin: MerklePartition = {
- let b = split_points[i];
- assert_eq!(b.as_slice()[2..], [0u8; 30][..]);
- b.as_slice()[..2].try_into().unwrap()
- };
+ for i in 0..partitions.len() {
+ let begin = partitions[i].1;
- let end: Option<MerklePartition> = if i + 1 < split_points.len() {
- let e = split_points[i + 1];
- assert_eq!(e.as_slice()[2..], [0u8; 30][..]);
- Some(e.as_slice()[..2].try_into().unwrap())
+ let end = if i + 1 < partitions.len() {
+ partitions[i+1].1
} else {
- None
+ [0xFFu8; 32].into()
};
- let begin_hash = hash_of_merkle_partition(begin);
- let end_hash = hash_of_merkle_partition_opt(end);
-
- let nodes = data.replication.write_nodes(&begin_hash);
+ let nodes = data.replication.write_nodes(&begin);
let retain = nodes.contains(&my_id);
if !retain {
// Check if we have some data to send, otherwise skip
- if data.store.range(begin_hash..end_hash).next().is_none() {
+ if data.store.range(begin..end).next().is_none() {
continue;
}
}
self.todo.push(TodoPartition {
- range: PartitionRange { begin, end },
+ partition: partitions[i].0,
+ begin,
+ end,
retain,
});
}