diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-17 21:59:07 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-17 21:59:07 +0200 |
commit | 3f40ef149f6dd4d61ceb326b5691e186aec178c3 (patch) | |
tree | f28af6fdb06cbe9b3f91f54aab9c4077bd83276d | |
parent | f62b54f1dffe9b76aab47d32ba7f47411d5a58af (diff) | |
download | garage-3f40ef149f6dd4d61ceb326b5691e186aec178c3.tar.gz garage-3f40ef149f6dd4d61ceb326b5691e186aec178c3.zip |
Fix sync: use max root checksum level
-rw-r--r-- | src/api_server.rs | 10 | ||||
-rw-r--r-- | src/block.rs | 6 | ||||
-rw-r--r-- | src/table.rs | 4 | ||||
-rw-r--r-- | src/table_sync.rs | 171 |
4 files changed, 112 insertions, 79 deletions
diff --git a/src/api_server.rs b/src/api_server.rs index 4ae48720..c6d52d16 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -86,16 +86,14 @@ async fn handler_inner( .to_string(); let version_uuid = handle_put(garage, &mime_type, &bucket, &key, req.into_body()).await?; - Ok(Response::new(Box::new(BytesBody::from(format!( - "{:?}\n", - version_uuid + Ok(Response::new(Box::new(BytesBody::from(hex::encode( + version_uuid, ))))) } &Method::DELETE => { let version_uuid = handle_delete(garage, &bucket, &key).await?; - Ok(Response::new(Box::new(BytesBody::from(format!( - "{:?}\n", - version_uuid + Ok(Response::new(Box::new(BytesBody::from(hex::encode( + version_uuid, ))))) } _ => Err(Error::BadRequest(format!("Invalid method"))), diff --git a/src/block.rs b/src/block.rs index 1e4c52dc..6add24b7 100644 --- a/src/block.rs +++ b/src/block.rs @@ -242,14 +242,14 @@ impl BlockManager { if need_nodes.len() > 0 { let put_block_message = self.read_block(hash).await?; - for resp in rpc_call_many( + let put_responses = rpc_call_many( garage.system.clone(), &need_nodes[..], put_block_message, BLOCK_RW_TIMEOUT, ) - .await - { + .await; + for resp in put_responses { resp?; } } diff --git a/src/table.rs b/src/table.rs index d2c7de94..3ad08cff 100644 --- a/src/table.rs +++ b/src/table.rs @@ -155,7 +155,7 @@ impl<F: TableSchema + 'static> Table<F> { let hash = e.partition_key().hash(); let ring = self.system.ring.borrow().clone(); let who = ring.walk_ring(&hash, self.param.replication_factor); - eprintln!("insert who: {:?}", who); + //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let rpc = &TableRPC::<F>::Update(vec![e_enc]); @@ -212,7 +212,7 @@ impl<F: TableSchema + 'static> Table<F> { let hash = partition_key.hash(); let ring = self.system.ring.borrow().clone(); let who = ring.walk_ring(&hash, self.param.replication_factor); - eprintln!("get who: {:?}", who); + //eprintln!("get who: {:?}", who); let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self diff --git a/src/table_sync.rs b/src/table_sync.rs index 703dd750..024e239f 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -29,6 +29,8 @@ pub struct TableSyncer<F: TableSchema> { #[derive(Serialize, Deserialize)] pub enum SyncRPC { + GetRootChecksumRange(Hash, Hash), + RootChecksumRange(SyncRange), Checksums(Vec<RangeChecksum>, bool), Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>), } @@ -180,6 +182,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { .filter(|node| **node != my_id) .map(|node| { self.clone().do_sync_with( + partition.clone(), root_cks.clone(), node.clone(), partition.retain, @@ -346,13 +349,33 @@ impl<F: TableSchema + 'static> TableSyncer<F> { async fn do_sync_with( self: Arc<Self>, + partition: Partition, root_ck: RangeChecksum, who: UUID, retain: bool, mut must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { let mut todo = VecDeque::new(); - todo.push_back(root_ck); + + // If their root checksum has level > than us, use that as a reference + let root_cks_resp = self + .table + .rpc_call( + &who, + &TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange( + partition.begin.clone(), + partition.end.clone(), + )), + ) + .await?; + if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp { + if range.level > root_ck.bounds.level { + let their_root_range_ck = self.range_checksum(&range, &mut must_exit).await?; + todo.push_back(their_root_range_ck); + } else { + todo.push_back(root_ck); + } + } while !todo.is_empty() && !*must_exit.borrow() { let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); @@ -445,83 +468,95 @@ impl<F: TableSchema + 'static> TableSyncer<F> { message: &SyncRPC, mut must_exit: watch::Receiver<bool>, ) -> Result<SyncRPC, Error> { - if let SyncRPC::Checksums(checksums, retain) = message { - let mut ret_ranges = vec![]; - let mut ret_items = vec![]; - for ckr in checksums.iter() { - let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?; - for (range, hash) in ckr.children.iter() { - // Only consider items that are in the intersection of the two ranges - // (other ranges will be exchanged at some point) - if our_ckr - .found_limit - .as_ref() - .map(|x| range.begin.as_slice() >= x.as_slice()) - .unwrap_or(false) - { - break; - } + match message { + SyncRPC::GetRootChecksumRange(begin, end) => { + let root_cks = self.root_checksum(&begin, &end, &mut must_exit).await?; + Ok(SyncRPC::RootChecksumRange(root_cks.bounds)) + } + SyncRPC::Checksums(checksums, retain) => { + self.handle_checksums_rpc(&checksums[..], *retain, &mut must_exit) + .await + } + _ => Err(Error::Message(format!("Unexpected sync RPC"))), + } + } - let differs = match our_ckr - .children - .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) - { - Err(_) => true, - Ok(i) => our_ckr.children[i].1 != *hash, - }; - if differs { - ret_ranges.push(range.clone()); - if *retain && range.level == 0 { - if let Some(item_bytes) = - self.table.store.get(range.begin.as_slice())? - { - ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); - } + pub async fn handle_checksums_rpc( + self: &Arc<Self>, + checksums: &[RangeChecksum], + retain: bool, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<SyncRPC, Error> { + let mut ret_ranges = vec![]; + let mut ret_items = vec![]; + for ckr in checksums.iter() { + let our_ckr = self.range_checksum(&ckr.bounds, must_exit).await?; + for (range, hash) in ckr.children.iter() { + // Only consider items that are in the intersection of the two ranges + // (other ranges will be exchanged at some point) + if our_ckr + .found_limit + .as_ref() + .map(|x| range.begin.as_slice() >= x.as_slice()) + .unwrap_or(false) + { + break; + } + + let differs = match our_ckr + .children + .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) + { + Err(_) => true, + Ok(i) => our_ckr.children[i].1 != *hash, + }; + if differs { + ret_ranges.push(range.clone()); + if retain && range.level == 0 { + if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } } } - for (range, _hash) in our_ckr.children.iter() { - if ckr - .found_limit - .as_ref() - .map(|x| range.begin.as_slice() >= x.as_slice()) - .unwrap_or(false) - { - break; - } + } + for (range, _hash) in our_ckr.children.iter() { + if ckr + .found_limit + .as_ref() + .map(|x| range.begin.as_slice() >= x.as_slice()) + .unwrap_or(false) + { + break; + } - let not_present = ckr - .children - .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin)) - .is_err(); - if not_present { - if range.level > 0 { - ret_ranges.push(range.clone()); - } - if *retain && range.level == 0 { - if let Some(item_bytes) = - self.table.store.get(range.begin.as_slice())? - { - ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); - } + let not_present = ckr + .children + .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin)) + .is_err(); + if not_present { + if range.level > 0 { + ret_ranges.push(range.clone()); + } + if retain && range.level == 0 { + if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } } } } - let n_checksums = checksums - .iter() - .map(|x| x.children.len()) - .fold(0, |x, y| x + y); - eprintln!( - "({}) Checksum comparison RPC: {} different + {} items for {} received", - self.table.name, - ret_ranges.len(), - ret_items.len(), - n_checksums - ); - return Ok(SyncRPC::Difference(ret_ranges, ret_items)); } - Err(Error::Message(format!("Unexpected sync RPC"))) + let n_checksums = checksums + .iter() + .map(|x| x.children.len()) + .fold(0, |x, y| x + y); + eprintln!( + "({}) Checksum comparison RPC: {} different + {} items for {} received", + self.table.name, + ret_ranges.len(), + ret_items.len(), + n_checksums + ); + Ok(SyncRPC::Difference(ret_ranges, ret_items)) } pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> { |