From 2a84d965abc07e4438bd0936dd6cf3307a8a2024 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 21 Apr 2020 16:05:55 +0000 Subject: Improve table sync --- src/table_sync.rs | 226 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 140 insertions(+), 86 deletions(-) (limited to 'src/table_sync.rs') diff --git a/src/table_sync.rs b/src/table_sync.rs index 26c5bed8..bfbde285 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -18,14 +18,14 @@ use crate::membership::Ring; use crate::table::*; const MAX_DEPTH: usize = 16; -const SCAN_INTERVAL: Duration = Duration::from_secs(60); +const SCAN_INTERVAL: Duration = Duration::from_secs(1800); const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); -const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10); +const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub struct TableSyncer { table: Arc>, todo: Mutex, - cache: Vec>>, + cache: Vec>>, } #[derive(Serialize, Deserialize)] @@ -47,6 +47,15 @@ struct TodoPartition { retain: bool, } +// A SyncRange defines a query on the dataset stored by a node, in the following way: +// - all items whose key are >= `begin` +// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded) +// - except if the first item of the range has such many leading zero bytes +// - and stopping at `end` (excluded) if such an item is not found +// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges" +// i.e. of ranges of level `level-1` that cover the same range +// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin) +// See RangeChecksum for the struct that stores this information. #[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] pub struct SyncRange { begin: Vec, @@ -61,7 +70,10 @@ impl std::cmp::PartialOrd for SyncRange { } impl std::cmp::Ord for SyncRange { fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.begin.cmp(&other.begin) + self.begin + .cmp(&other.begin) + .then(self.level.cmp(&other.level)) + .then(self.end.cmp(&other.end)) } } @@ -75,6 +87,13 @@ pub struct RangeChecksum { time: Instant, } +#[derive(Debug, Clone)] +pub struct RangeChecksumCache { + hash: Option, // None if no children + found_limit: Option>, + time: Instant, +} + impl TableSyncer where F: TableSchema + 'static, @@ -159,7 +178,7 @@ where if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; debug!("({}) Adding full scan to syncer todo list", self.table.name); - self.todo.lock().await.add_full_scan(&self.table); + self.add_full_scan().await; } } } @@ -167,6 +186,10 @@ where Ok(()) } + pub async fn add_full_scan(&self) { + self.todo.lock().await.add_full_scan(&self.table); + } + async fn syncer_task( self: Arc, mut must_exit: watch::Receiver, @@ -273,47 +296,17 @@ where } } Err(Error::Message(format!( - "Unable to compute root checksum (this should never happen" + "Unable to compute root checksum (this should never happen)" ))) } - fn range_checksum<'a>( - self: &'a Arc, - range: &'a SyncRange, - must_exit: &'a mut watch::Receiver, - ) -> BoxFuture<'a, Result> { - async move { - let mut cache = self.cache[range.level].lock().await; - if let Some(v) = cache.get(&range) { - if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT { - return Ok(v.clone()); - } - } - cache.remove(&range); - drop(cache); - - let v = self.range_checksum_inner(&range, must_exit).await?; - trace!( - "({}) New checksum calculated for {}-{}/{}, {} children", - self.table.name, - hex::encode(&range.begin[..]), - hex::encode(&range.end[..]), - range.level, - v.children.len() - ); - - let mut cache = self.cache[range.level].lock().await; - cache.insert(range.clone(), v.clone()); - Ok(v) - } - .boxed() - } - - async fn range_checksum_inner( + async fn range_checksum( self: &Arc, range: &SyncRange, must_exit: &mut watch::Receiver, ) -> Result { + assert!(range.level != 0); + if range.level == 1 { let mut children = vec![]; for item in self @@ -323,7 +316,10 @@ where { let (key, value) = item?; let key_hash = hash(&key[..]); - if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) + if children.len() > 0 + && key_hash.as_slice()[0..range.level] + .iter() + .all(|x| *x == 0u8) { return Ok(RangeChecksum { bounds: range.clone(), @@ -354,17 +350,18 @@ where }; let mut time = Instant::now(); while !*must_exit.borrow() { - let sub_ck = self.range_checksum(&sub_range, must_exit).await?; + let sub_ck = self + .range_checksum_cached_hash(&sub_range, must_exit) + .await?; - if sub_ck.children.len() > 0 { - let sub_ck_hash = hash(&rmp_to_vec_all_named(&sub_ck)?[..]); - children.push((sub_range.clone(), sub_ck_hash)); + if let Some(hash) = &sub_ck.hash { + children.push((sub_range.clone(), hash.clone())); if sub_ck.time < time { time = sub_ck.time; } } - if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 { + if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() { return Ok(RangeChecksum { bounds: range.clone(), children, @@ -377,7 +374,7 @@ where let actual_limit_hash = hash(&found_limit[..]); if actual_limit_hash.as_slice()[0..range.level] .iter() - .all(|x| *x == 0) + .all(|x| *x == 0u8) { return Ok(RangeChecksum { bounds: range.clone(), @@ -393,6 +390,52 @@ where } } + fn range_checksum_cached_hash<'a>( + self: &'a Arc, + range: &'a SyncRange, + must_exit: &'a mut watch::Receiver, + ) -> BoxFuture<'a, Result> { + async move { + let mut cache = self.cache[range.level].lock().await; + if let Some(v) = cache.get(&range) { + if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT { + return Ok(v.clone()); + } + } + cache.remove(&range); + drop(cache); + + let v = self.range_checksum(&range, must_exit).await?; + trace!( + "({}) New checksum calculated for {}-{}/{}, {} children", + self.table.name, + hex::encode(&range.begin) + .chars() + .take(16) + .collect::(), + hex::encode(&range.end).chars().take(16).collect::(), + range.level, + v.children.len() + ); + + let hash = if v.children.len() > 0 { + Some(hash(&rmp_to_vec_all_named(&v)?[..])) + } else { + None + }; + let cache_entry = RangeChecksumCache { + hash, + found_limit: v.found_limit, + time: v.time, + }; + + let mut cache = self.cache[range.level].lock().await; + cache.insert(range.clone(), cache_entry.clone()); + Ok(cache_entry) + } + .boxed() + } + async fn do_sync_with( self: Arc, partition: TodoPartition, @@ -423,6 +466,11 @@ where } else { todo.push_back(root_ck); } + } else { + return Err(Error::BadRequest(format!( + "Invalid respone to GetRootChecksumRange RPC: {}", + debug_serialize(root_cks_resp) + ))); } while !todo.is_empty() && !*must_exit.borrow() { @@ -435,8 +483,8 @@ where total_children ); - let end = std::cmp::min(16, todo.len()); - let step = todo.drain(..end).collect::>(); + let step_size = std::cmp::min(16, todo.len()); + let step = todo.drain(..step_size).collect::>(); let rpc_resp = self .table @@ -472,10 +520,7 @@ where self.table.handle_update(diff_items).await?; } if items_to_send.len() > 0 { - self.table - .system - .background - .spawn(self.clone().send_items(who.clone(), items_to_send)); + self.send_items(who.clone(), items_to_send).await?; } } else { return Err(Error::BadRequest(format!( @@ -487,7 +532,7 @@ where Ok(()) } - async fn send_items(self: Arc, who: UUID, item_list: Vec>) -> Result<(), Error> { + async fn send_items(&self, who: UUID, item_list: Vec>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", self.table.name, @@ -542,56 +587,56 @@ where ) -> Result { let mut ret_ranges = vec![]; let mut ret_items = vec![]; - for ckr in checksums.iter() { - let our_ckr = self.range_checksum(&ckr.bounds, must_exit).await?; - for (range, hash) in ckr.children.iter() { - // Only consider items that are in the intersection of the two ranges - // (other ranges will be exchanged at some point) - if our_ckr - .found_limit - .as_ref() - .map(|x| range.begin.as_slice() >= x.as_slice()) - .unwrap_or(false) - { - break; - } + for their_ckr in checksums.iter() { + let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit).await?; + for (their_range, their_hash) in their_ckr.children.iter() { let differs = match our_ckr .children - .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) + .binary_search_by(|(our_range, _)| our_range.cmp(&their_range)) { - Err(_) => true, - Ok(i) => our_ckr.children[i].1 != *hash, + Err(_) => { + if their_range.level >= 1 { + let cached_hash = self + .range_checksum_cached_hash(&their_range, must_exit) + .await?; + cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true) + } else { + true + } + } + Ok(i) => our_ckr.children[i].1 != *their_hash, }; if differs { - ret_ranges.push(range.clone()); - if retain && range.level == 0 { - if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + ret_ranges.push(their_range.clone()); + if retain && their_range.level == 0 { + if let Some(item_bytes) = + self.table.store.get(their_range.begin.as_slice())? + { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } } } } - for (range, _hash) in our_ckr.children.iter() { - if ckr - .found_limit - .as_ref() - .map(|x| range.begin.as_slice() >= x.as_slice()) - .unwrap_or(false) - { - break; + for (our_range, _hash) in our_ckr.children.iter() { + if let Some(their_found_limit) = &their_ckr.found_limit { + if our_range.begin.as_slice() > their_found_limit.as_slice() { + break; + } } - let not_present = ckr + let not_present = our_ckr .children - .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin)) + .binary_search_by(|(their_range, _)| their_range.cmp(&our_range)) .is_err(); if not_present { - if range.level > 0 { - ret_ranges.push(range.clone()); + if our_range.level > 0 { + ret_ranges.push(our_range.clone()); } - if retain && range.level == 0 { - if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + if retain && our_range.level == 0 { + if let Some(item_bytes) = + self.table.store.get(our_range.begin.as_slice())? + { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } } @@ -673,6 +718,15 @@ impl SyncTodo { ) { let my_id = table.system.id.clone(); + // If it is us who are entering or leaving the system, + // initiate a full sync instead of incremental sync + if old_ring.config.members.contains_key(&my_id) + != new_ring.config.members.contains_key(&my_id) + { + self.add_full_scan(table); + return; + } + let mut all_points = None .into_iter() .chain(table.replication.split_points(old_ring).drain(..)) -- cgit v1.2.3