diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-17 17:09:57 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-17 17:09:57 +0200 |
commit | 69f1d8fef23149e45189c296e0c0d23e040cbb0e (patch) | |
tree | b213f119ae5eea620ab140cd712362707af28ddb /src/table_sync.rs | |
parent | e41ce4d81528388f043c1c5e6608df45347ea70d (diff) | |
download | garage-69f1d8fef23149e45189c296e0c0d23e040cbb0e.tar.gz garage-69f1d8fef23149e45189c296e0c0d23e040cbb0e.zip |
WIP
TODOs:
- ensure sync goes both way
- finish sending blocks to other nodes when they need them before deleting
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r-- | src/table_sync.rs | 78 |
1 files changed, 41 insertions, 37 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs index 8eb08074..5ef13d6d 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -27,6 +27,12 @@ pub struct TableSyncer<F: TableSchema> { pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>, } +#[derive(Serialize, Deserialize)] +pub enum SyncRPC { + Checksums(Vec<RangeChecksum>), + DifferentSet(Vec<SyncRange>), +} + pub struct SyncTodo { pub todo: Vec<Partition>, } @@ -166,13 +172,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> { .root_checksum(&partition.begin, &partition.end, must_exit) .await?; - let nodes = self - .table - .system - .ring - .borrow() - .clone() - .walk_ring(&partition.begin, self.table.param.replication_factor); + let ring = self.table.system.ring.borrow().clone(); + let nodes = ring.walk_ring(&partition.begin, self.table.param.replication_factor); let mut sync_futures = nodes .iter() .map(|node| { @@ -361,9 +362,9 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let rpc_resp = self .table - .rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)) + .rpc_call(&who, &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step))) .await?; - if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp { + if let TableRPC::<F>::SyncRPC(SyncRPC::DifferentSet(mut s)) = rpc_resp { let mut items = vec![]; for differing in s.drain(..) { if differing.level == 0 { @@ -381,7 +382,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } else { return Err(Error::Message(format!( - "Unexpected response to RPC SyncChecksums: {}", + "Unexpected response to sync RPC checksums: {}", debug_serialize(&rpc_resp) ))); } @@ -417,41 +418,44 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } - pub async fn handle_checksum_rpc( + pub async fn handle_rpc( self: &Arc<Self>, - checksums: &[RangeChecksum], + message: &SyncRPC, mut must_exit: watch::Receiver<bool>, - ) -> Result<Vec<SyncRange>, Error> { - let mut ret = vec![]; - for ckr in checksums.iter() { - let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?; - for (range, hash) in ckr.children.iter() { - match our_ckr - .children - .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) - { - Err(_) => { - ret.push(range.clone()); - } - Ok(i) => { - if our_ckr.children[i].1 != *hash { + ) -> Result<SyncRPC, Error> { + if let SyncRPC::Checksums(checksums) = message { + let mut ret = vec![]; + for ckr in checksums.iter() { + let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?; + for (range, hash) in ckr.children.iter() { + match our_ckr + .children + .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) + { + Err(_) => { ret.push(range.clone()); } + Ok(i) => { + if our_ckr.children[i].1 != *hash { + ret.push(range.clone()); + } + } } } } + let n_checksums = checksums + .iter() + .map(|x| x.children.len()) + .fold(0, |x, y| x + y); + eprintln!( + "({}) Checksum comparison RPC: {} different out of {}", + self.table.name, + ret.len(), + n_checksums + ); + return Ok(SyncRPC::DifferentSet(ret)); } - let n_checksums = checksums - .iter() - .map(|x| x.children.len()) - .fold(0, |x, y| x + y); - eprintln!( - "({}) Checksum comparison RPC: {} different out of {}", - self.table.name, - ret.len(), - n_checksums - ); - Ok(ret) + Err(Error::Message(format!("Unexpected sync RPC"))) } pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> { |