aboutsummaryrefslogtreecommitdiff
path: root/src/table_sync.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-21 16:05:55 +0000
committerAlex Auvolat <alex@adnab.me>2020-04-21 16:05:55 +0000
commit2a84d965abc07e4438bd0936dd6cf3307a8a2024 (patch)
tree4a8f20b05a8911e10cedb9383d0a25969b5b2b27 /src/table_sync.rs
parent0226561035bc90dbf45b993f755b1eee47c8a790 (diff)
downloadgarage-2a84d965abc07e4438bd0936dd6cf3307a8a2024.tar.gz
garage-2a84d965abc07e4438bd0936dd6cf3307a8a2024.zip
Improve table sync
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r--src/table_sync.rs226
1 files changed, 140 insertions, 86 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 26c5bed8..bfbde285 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -18,14 +18,14 @@ use crate::membership::Ring;
use crate::table::*;
const MAX_DEPTH: usize = 16;
-const SCAN_INTERVAL: Duration = Duration::from_secs(60);
+const SCAN_INTERVAL: Duration = Duration::from_secs(1800);
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
-const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10);
+const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
table: Arc<Table<F, R>>,
todo: Mutex<SyncTodo>,
- cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>,
+ cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>,
}
#[derive(Serialize, Deserialize)]
@@ -47,6 +47,15 @@ struct TodoPartition {
retain: bool,
}
+// A SyncRange defines a query on the dataset stored by a node, in the following way:
+// - all items whose key are >= `begin`
+// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded)
+// - except if the first item of the range has such many leading zero bytes
+// - and stopping at `end` (excluded) if such an item is not found
+// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges"
+// i.e. of ranges of level `level-1` that cover the same range
+// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin)
+// See RangeChecksum for the struct that stores this information.
#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
pub struct SyncRange {
begin: Vec<u8>,
@@ -61,7 +70,10 @@ impl std::cmp::PartialOrd for SyncRange {
}
impl std::cmp::Ord for SyncRange {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
- self.begin.cmp(&other.begin)
+ self.begin
+ .cmp(&other.begin)
+ .then(self.level.cmp(&other.level))
+ .then(self.end.cmp(&other.end))
}
}
@@ -75,6 +87,13 @@ pub struct RangeChecksum {
time: Instant,
}
+#[derive(Debug, Clone)]
+pub struct RangeChecksumCache {
+ hash: Option<Hash>, // None if no children
+ found_limit: Option<Vec<u8>>,
+ time: Instant,
+}
+
impl<F, R> TableSyncer<F, R>
where
F: TableSchema + 'static,
@@ -159,7 +178,7 @@ where
if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
debug!("({}) Adding full scan to syncer todo list", self.table.name);
- self.todo.lock().await.add_full_scan(&self.table);
+ self.add_full_scan().await;
}
}
}
@@ -167,6 +186,10 @@ where
Ok(())
}
+ pub async fn add_full_scan(&self) {
+ self.todo.lock().await.add_full_scan(&self.table);
+ }
+
async fn syncer_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
@@ -273,47 +296,17 @@ where
}
}
Err(Error::Message(format!(
- "Unable to compute root checksum (this should never happen"
+ "Unable to compute root checksum (this should never happen)"
)))
}
- fn range_checksum<'a>(
- self: &'a Arc<Self>,
- range: &'a SyncRange,
- must_exit: &'a mut watch::Receiver<bool>,
- ) -> BoxFuture<'a, Result<RangeChecksum, Error>> {
- async move {
- let mut cache = self.cache[range.level].lock().await;
- if let Some(v) = cache.get(&range) {
- if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
- return Ok(v.clone());
- }
- }
- cache.remove(&range);
- drop(cache);
-
- let v = self.range_checksum_inner(&range, must_exit).await?;
- trace!(
- "({}) New checksum calculated for {}-{}/{}, {} children",
- self.table.name,
- hex::encode(&range.begin[..]),
- hex::encode(&range.end[..]),
- range.level,
- v.children.len()
- );
-
- let mut cache = self.cache[range.level].lock().await;
- cache.insert(range.clone(), v.clone());
- Ok(v)
- }
- .boxed()
- }
-
- async fn range_checksum_inner(
+ async fn range_checksum(
self: &Arc<Self>,
range: &SyncRange,
must_exit: &mut watch::Receiver<bool>,
) -> Result<RangeChecksum, Error> {
+ assert!(range.level != 0);
+
if range.level == 1 {
let mut children = vec![];
for item in self
@@ -323,7 +316,10 @@ where
{
let (key, value) = item?;
let key_hash = hash(&key[..]);
- if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0)
+ if children.len() > 0
+ && key_hash.as_slice()[0..range.level]
+ .iter()
+ .all(|x| *x == 0u8)
{
return Ok(RangeChecksum {
bounds: range.clone(),
@@ -354,17 +350,18 @@ where
};
let mut time = Instant::now();
while !*must_exit.borrow() {
- let sub_ck = self.range_checksum(&sub_range, must_exit).await?;
+ let sub_ck = self
+ .range_checksum_cached_hash(&sub_range, must_exit)
+ .await?;
- if sub_ck.children.len() > 0 {
- let sub_ck_hash = hash(&rmp_to_vec_all_named(&sub_ck)?[..]);
- children.push((sub_range.clone(), sub_ck_hash));
+ if let Some(hash) = &sub_ck.hash {
+ children.push((sub_range.clone(), hash.clone()));
if sub_ck.time < time {
time = sub_ck.time;
}
}
- if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 {
+ if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
return Ok(RangeChecksum {
bounds: range.clone(),
children,
@@ -377,7 +374,7 @@ where
let actual_limit_hash = hash(&found_limit[..]);
if actual_limit_hash.as_slice()[0..range.level]
.iter()
- .all(|x| *x == 0)
+ .all(|x| *x == 0u8)
{
return Ok(RangeChecksum {
bounds: range.clone(),
@@ -393,6 +390,52 @@ where
}
}
+ fn range_checksum_cached_hash<'a>(
+ self: &'a Arc<Self>,
+ range: &'a SyncRange,
+ must_exit: &'a mut watch::Receiver<bool>,
+ ) -> BoxFuture<'a, Result<RangeChecksumCache, Error>> {
+ async move {
+ let mut cache = self.cache[range.level].lock().await;
+ if let Some(v) = cache.get(&range) {
+ if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
+ return Ok(v.clone());
+ }
+ }
+ cache.remove(&range);
+ drop(cache);
+
+ let v = self.range_checksum(&range, must_exit).await?;
+ trace!(
+ "({}) New checksum calculated for {}-{}/{}, {} children",
+ self.table.name,
+ hex::encode(&range.begin)
+ .chars()
+ .take(16)
+ .collect::<String>(),
+ hex::encode(&range.end).chars().take(16).collect::<String>(),
+ range.level,
+ v.children.len()
+ );
+
+ let hash = if v.children.len() > 0 {
+ Some(hash(&rmp_to_vec_all_named(&v)?[..]))
+ } else {
+ None
+ };
+ let cache_entry = RangeChecksumCache {
+ hash,
+ found_limit: v.found_limit,
+ time: v.time,
+ };
+
+ let mut cache = self.cache[range.level].lock().await;
+ cache.insert(range.clone(), cache_entry.clone());
+ Ok(cache_entry)
+ }
+ .boxed()
+ }
+
async fn do_sync_with(
self: Arc<Self>,
partition: TodoPartition,
@@ -423,6 +466,11 @@ where
} else {
todo.push_back(root_ck);
}
+ } else {
+ return Err(Error::BadRequest(format!(
+ "Invalid respone to GetRootChecksumRange RPC: {}",
+ debug_serialize(root_cks_resp)
+ )));
}
while !todo.is_empty() && !*must_exit.borrow() {
@@ -435,8 +483,8 @@ where
total_children
);
- let end = std::cmp::min(16, todo.len());
- let step = todo.drain(..end).collect::<Vec<_>>();
+ let step_size = std::cmp::min(16, todo.len());
+ let step = todo.drain(..step_size).collect::<Vec<_>>();
let rpc_resp = self
.table
@@ -472,10 +520,7 @@ where
self.table.handle_update(diff_items).await?;
}
if items_to_send.len() > 0 {
- self.table
- .system
- .background
- .spawn(self.clone().send_items(who.clone(), items_to_send));
+ self.send_items(who.clone(), items_to_send).await?;
}
} else {
return Err(Error::BadRequest(format!(
@@ -487,7 +532,7 @@ where
Ok(())
}
- async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
self.table.name,
@@ -542,56 +587,56 @@ where
) -> Result<SyncRPC, Error> {
let mut ret_ranges = vec![];
let mut ret_items = vec![];
- for ckr in checksums.iter() {
- let our_ckr = self.range_checksum(&ckr.bounds, must_exit).await?;
- for (range, hash) in ckr.children.iter() {
- // Only consider items that are in the intersection of the two ranges
- // (other ranges will be exchanged at some point)
- if our_ckr
- .found_limit
- .as_ref()
- .map(|x| range.begin.as_slice() >= x.as_slice())
- .unwrap_or(false)
- {
- break;
- }
+ for their_ckr in checksums.iter() {
+ let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit).await?;
+ for (their_range, their_hash) in their_ckr.children.iter() {
let differs = match our_ckr
.children
- .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
+ .binary_search_by(|(our_range, _)| our_range.cmp(&their_range))
{
- Err(_) => true,
- Ok(i) => our_ckr.children[i].1 != *hash,
+ Err(_) => {
+ if their_range.level >= 1 {
+ let cached_hash = self
+ .range_checksum_cached_hash(&their_range, must_exit)
+ .await?;
+ cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true)
+ } else {
+ true
+ }
+ }
+ Ok(i) => our_ckr.children[i].1 != *their_hash,
};
if differs {
- ret_ranges.push(range.clone());
- if retain && range.level == 0 {
- if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
+ ret_ranges.push(their_range.clone());
+ if retain && their_range.level == 0 {
+ if let Some(item_bytes) =
+ self.table.store.get(their_range.begin.as_slice())?
+ {
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
}
}
}
- for (range, _hash) in our_ckr.children.iter() {
- if ckr
- .found_limit
- .as_ref()
- .map(|x| range.begin.as_slice() >= x.as_slice())
- .unwrap_or(false)
- {
- break;
+ for (our_range, _hash) in our_ckr.children.iter() {
+ if let Some(their_found_limit) = &their_ckr.found_limit {
+ if our_range.begin.as_slice() > their_found_limit.as_slice() {
+ break;
+ }
}
- let not_present = ckr
+ let not_present = our_ckr
.children
- .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin))
+ .binary_search_by(|(their_range, _)| their_range.cmp(&our_range))
.is_err();
if not_present {
- if range.level > 0 {
- ret_ranges.push(range.clone());
+ if our_range.level > 0 {
+ ret_ranges.push(our_range.clone());
}
- if retain && range.level == 0 {
- if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
+ if retain && our_range.level == 0 {
+ if let Some(item_bytes) =
+ self.table.store.get(our_range.begin.as_slice())?
+ {
ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
}
@@ -673,6 +718,15 @@ impl SyncTodo {
) {
let my_id = table.system.id.clone();
+ // If it is us who are entering or leaving the system,
+ // initiate a full sync instead of incremental sync
+ if old_ring.config.members.contains_key(&my_id)
+ != new_ring.config.members.contains_key(&my_id)
+ {
+ self.add_full_scan(table);
+ return;
+ }
+
let mut all_points = None
.into_iter()
.chain(table.replication.split_points(old_ring).drain(..))