diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-17 15:36:16 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-17 15:40:13 +0200 |
commit | e41ce4d81528388f043c1c5e6608df45347ea70d (patch) | |
tree | ee25f06b6f7da356c53d5f0a8fc8ec9e81d4bb23 /src/table_sync.rs | |
parent | 867646093b24a9bb7e4b24a7f2248615c6e03fde (diff) | |
download | garage-e41ce4d81528388f043c1c5e6608df45347ea70d.tar.gz garage-e41ce4d81528388f043c1c5e6608df45347ea70d.zip |
Implement getting missing blocks when RC increases
Issue: RC increases also when the block ref entry is first put by the actual client.
At that point the client is probably already sending us the block content,
so we don't need to do a get...
We should add a delay before the task is added or find something to do.
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r-- | src/table_sync.rs | 203 |
1 files changed, 153 insertions, 50 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs index c1d3bea8..8eb08074 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,16 +1,16 @@ use rand::Rng; -use std::collections::{BTreeSet, BTreeMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; -use futures::{pin_mut, select}; use futures::future::BoxFuture; -use futures_util::stream::*; +use futures::{pin_mut, select}; use futures_util::future::*; +use futures_util::stream::*; +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; use tokio::sync::watch; use tokio::sync::Mutex; -use serde::{Serialize, Deserialize}; -use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; @@ -62,7 +62,7 @@ pub struct RangeChecksum { pub children: Vec<(SyncRange, Hash)>, pub found_limit: Option<Vec<u8>>, - #[serde(skip, default="std::time::Instant::now")] + #[serde(skip, default = "std::time::Instant::now")] pub time: Instant, } @@ -72,7 +72,9 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let syncer = Arc::new(TableSyncer { table: table.clone(), todo: Mutex::new(todo), - cache: (0..MAX_DEPTH).map(|_| Mutex::new(BTreeMap::new())).collect::<Vec<_>>(), + cache: (0..MAX_DEPTH) + .map(|_| Mutex::new(BTreeMap::new())) + .collect::<Vec<_>>(), }); let s1 = syncer.clone(); @@ -137,9 +139,15 @@ impl<F: TableSchema + 'static> TableSyncer<F> { ) -> Result<(), Error> { while !*must_exit.borrow() { if let Some(partition) = self.todo.lock().await.pop_task() { - let res = self.clone().sync_partition(&partition, &mut must_exit).await; + let res = self + .clone() + .sync_partition(&partition, &mut must_exit) + .await; if let Err(e) = res { - eprintln!("({}) Error while syncing {:?}: {}", self.table.name, partition, e); + eprintln!( + "({}) Error while syncing {:?}: {}", + self.table.name, partition, e + ); } } else { tokio::time::delay_for(Duration::from_secs(1)).await; @@ -148,13 +156,29 @@ impl<F: TableSchema + 'static> TableSyncer<F> { Ok(()) } - async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { + async fn sync_partition( + self: Arc<Self>, + partition: &Partition, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<(), Error> { eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition); - let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?; + let root_cks = self + .root_checksum(&partition.begin, &partition.end, must_exit) + .await?; - let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor); - let mut sync_futures = nodes.iter() - .map(|node| self.clone().do_sync_with(root_cks.clone(), node.clone(), must_exit.clone())) + let nodes = self + .table + .system + .ring + .borrow() + .clone() + .walk_ring(&partition.begin, self.table.param.replication_factor); + let mut sync_futures = nodes + .iter() + .map(|node| { + self.clone() + .do_sync_with(root_cks.clone(), node.clone(), must_exit.clone()) + }) .collect::<FuturesUnordered<_>>(); while let Some(r) = sync_futures.next().await { @@ -163,27 +187,45 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } if !partition.retain { - self.table.delete_range(&partition.begin, &partition.end).await?; + self.table + .delete_range(&partition.begin, &partition.end) + .await?; } Ok(()) } - async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> { + async fn root_checksum( + self: &Arc<Self>, + begin: &Hash, + end: &Hash, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<RangeChecksum, Error> { for i in 1..MAX_DEPTH { - let rc = self.range_checksum(&SyncRange{ - begin: begin.to_vec(), - end: end.to_vec(), - level: i, - }, must_exit).await?; + let rc = self + .range_checksum( + &SyncRange { + begin: begin.to_vec(), + end: end.to_vec(), + level: i, + }, + must_exit, + ) + .await?; if rc.found_limit.is_none() { return Ok(rc); } } - Err(Error::Message(format!("Unable to compute root checksum (this should never happen"))) + Err(Error::Message(format!( + "Unable to compute root checksum (this should never happen" + ))) } - fn range_checksum<'a>(self: &'a Arc<Self>, range: &'a SyncRange, must_exit: &'a mut watch::Receiver<bool>) -> BoxFuture<'a, Result<RangeChecksum, Error>> { + fn range_checksum<'a>( + self: &'a Arc<Self>, + range: &'a SyncRange, + must_exit: &'a mut watch::Receiver<bool>, + ) -> BoxFuture<'a, Result<RangeChecksum, Error>> { async move { let mut cache = self.cache[range.level].lock().await; if let Some(v) = cache.get(&range) { @@ -195,41 +237,53 @@ impl<F: TableSchema + 'static> TableSyncer<F> { drop(cache); let v = self.range_checksum_inner(&range, must_exit).await?; - eprintln!("({}) New checksum calculated for {}-{}/{}, {} children", + eprintln!( + "({}) New checksum calculated for {}-{}/{}, {} children", self.table.name, hex::encode(&range.begin[..]), hex::encode(&range.end[..]), range.level, - v.children.len()); + v.children.len() + ); let mut cache = self.cache[range.level].lock().await; cache.insert(range.clone(), v.clone()); Ok(v) - }.boxed() + } + .boxed() } - async fn range_checksum_inner(self: &Arc<Self>, range: &SyncRange, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> { + async fn range_checksum_inner( + self: &Arc<Self>, + range: &SyncRange, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<RangeChecksum, Error> { if range.level == 1 { let mut children = vec![]; - for item in self.table.store.range(range.begin.clone()..range.end.clone()) { + for item in self + .table + .store + .range(range.begin.clone()..range.end.clone()) + { let (key, value) = item?; let key_hash = hash(&key[..]); - if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) { - return Ok(RangeChecksum{ + if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) + { + return Ok(RangeChecksum { bounds: range.clone(), children, found_limit: Some(key.to_vec()), time: Instant::now(), - }) + }); } - let item_range = SyncRange{ + let item_range = SyncRange { begin: key.to_vec(), end: vec![], level: 0, }; children.push((item_range, hash(&value[..]))); } - Ok(RangeChecksum{ + Ok(RangeChecksum { bounds: range.clone(), children, found_limit: None, @@ -237,7 +291,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { }) } else { let mut children = vec![]; - let mut sub_range = SyncRange{ + let mut sub_range = SyncRange { begin: range.begin.clone(), end: range.end.clone(), level: range.level - 1, @@ -255,7 +309,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 { - return Ok(RangeChecksum{ + return Ok(RangeChecksum { bounds: range.clone(), children, found_limit: None, @@ -265,8 +319,11 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let found_limit = sub_ck.found_limit.unwrap(); let actual_limit_hash = hash(&found_limit[..]); - if actual_limit_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) { - return Ok(RangeChecksum{ + if actual_limit_hash.as_slice()[0..range.level] + .iter() + .all(|x| *x == 0) + { + return Ok(RangeChecksum { bounds: range.clone(), children, found_limit: Some(found_limit.clone()), @@ -280,18 +337,32 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } - async fn do_sync_with(self: Arc<Self>, root_ck: RangeChecksum, who: UUID, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> { + async fn do_sync_with( + self: Arc<Self>, + root_ck: RangeChecksum, + who: UUID, + mut must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { let mut todo = VecDeque::new(); todo.push_back(root_ck); while !todo.is_empty() && !*must_exit.borrow() { let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); - eprintln!("({}) Sync with {:?}: {} ({}) remaining", self.table.name, who, todo.len(), total_children); + eprintln!( + "({}) Sync with {:?}: {} ({}) remaining", + self.table.name, + who, + todo.len(), + total_children + ); let end = std::cmp::min(16, todo.len()); let step = todo.drain(..end).collect::<Vec<_>>(); - let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)).await?; + let rpc_resp = self + .table + .rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)) + .await?; if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp { let mut items = vec![]; for differing in s.drain(..) { @@ -303,17 +374,28 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } if items.len() > 0 { - self.table.system.background.spawn(self.clone().send_items(who.clone(), items)); + self.table + .system + .background + .spawn(self.clone().send_items(who.clone(), items)); } } else { - return Err(Error::Message(format!("Unexpected response to RPC SyncChecksums: {}", debug_serialize(&rpc_resp)))); + return Err(Error::Message(format!( + "Unexpected response to RPC SyncChecksums: {}", + debug_serialize(&rpc_resp) + ))); } } Ok(()) } async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { - eprintln!("({}) Sending {} items to {:?}", self.table.name, item_list.len(), who); + eprintln!( + "({}) Sending {} items to {:?}", + self.table.name, + item_list.len(), + who + ); let mut values = vec![]; for item in item_list.iter() { @@ -321,20 +403,33 @@ impl<F: TableSchema + 'static> TableSyncer<F> { values.push(Arc::new(ByteBuf::from(v.as_ref()))); } } - let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::Update(values)).await?; + let rpc_resp = self + .table + .rpc_call(&who, &TableRPC::<F>::Update(values)) + .await?; if let TableRPC::<F>::Ok = rpc_resp { Ok(()) } else { - Err(Error::Message(format!("Unexpected response to RPC Update: {}", debug_serialize(&rpc_resp)))) + Err(Error::Message(format!( + "Unexpected response to RPC Update: {}", + debug_serialize(&rpc_resp) + ))) } } - pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> { + pub async fn handle_checksum_rpc( + self: &Arc<Self>, + checksums: &[RangeChecksum], + mut must_exit: watch::Receiver<bool>, + ) -> Result<Vec<SyncRange>, Error> { let mut ret = vec![]; for ckr in checksums.iter() { let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?; for (range, hash) in ckr.children.iter() { - match our_ckr.children.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) { + match our_ckr + .children + .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) + { Err(_) => { ret.push(range.clone()); } @@ -346,14 +441,22 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } } - let n_checksums = checksums.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); - eprintln!("({}) Checksum comparison RPC: {} different out of {}", self.table.name, ret.len(), n_checksums); + let n_checksums = checksums + .iter() + .map(|x| x.children.len()) + .fold(0, |x, y| x + y); + eprintln!( + "({}) Checksum comparison RPC: {} different out of {}", + self.table.name, + ret.len(), + n_checksums + ); Ok(ret) } pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> { for i in 1..MAX_DEPTH { - let needle = SyncRange{ + let needle = SyncRange { begin: item_key.to_vec(), end: vec![], level: i, |