diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/block.rs | 23 | ||||
-rw-r--r-- | src/table.rs | 43 | ||||
-rw-r--r-- | src/table_sync.rs | 62 |
3 files changed, 88 insertions, 40 deletions
diff --git a/src/block.rs b/src/block.rs index 4be8df37..e209dab6 100644 --- a/src/block.rs +++ b/src/block.rs @@ -2,11 +2,11 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwapOption; use futures::stream::*; use tokio::fs; use tokio::prelude::*; use tokio::sync::{watch, Mutex}; -use arc_swap::ArcSwapOption; use crate::data; use crate::data::*; @@ -48,8 +48,7 @@ impl BlockManager { pub async fn spawn_background_worker(self: Arc<Self>) { let bm2 = self.clone(); - self - .system + self.system .background .spawn_worker(move |must_exit| bm2.resync_loop(must_exit)) .await; @@ -139,7 +138,11 @@ impl BlockManager { while !*must_exit.borrow() { if let Some((time_bytes, hash_bytes)) = self.resync_queue.get_gt(&[])? { let time_msec = u64_from_bytes(&time_bytes[0..8]); - eprintln!("First in resync queue: {} (now = {})", time_msec, now_msec()); + eprintln!( + "First in resync queue: {} (now = {})", + time_msec, + now_msec() + ); if now_msec() >= time_msec { let mut hash = [0u8; 32]; hash.copy_from_slice(hash_bytes.as_ref()); @@ -147,7 +150,7 @@ impl BlockManager { match self.resync_iter(&hash).await { Ok(_) => { - self.resync_queue.remove(&hash_bytes)?; + self.resync_queue.remove(&time_bytes)?; } Err(e) => { eprintln!( @@ -175,11 +178,17 @@ impl BlockManager { .map(|x| u64_from_bytes(x.as_ref()) > 0) .unwrap_or(false); - eprintln!("Resync block {:?}: exists {}, needed {}", hash, exists, needed); + eprintln!( + "Resync block {:?}: exists {}, needed {}", + hash, exists, needed + ); if exists && !needed { let garage = self.garage.load_full().unwrap(); - let active_refs = garage.block_ref_table.get_range(&hash, &[0u8; 32].into(), Some(()), 1).await?; + let active_refs = garage + .block_ref_table + .get_range(&hash, &[0u8; 32].into(), Some(()), 1) + .await?; let needed_by_others = !active_refs.is_empty(); if needed_by_others { // TODO check they have it and send it if not diff --git a/src/table.rs b/src/table.rs index bd26a79d..2ae70398 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,12 +1,12 @@ -use std::collections::{HashMap, BTreeMap}; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use arc_swap::ArcSwapOption; use crate::data::*; use crate::error::Error; @@ -122,7 +122,9 @@ pub trait TableSchema: Send + Sync { type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>); - fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true } + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { + true + } } impl<F: TableSchema + 'static> Table<F> { @@ -244,9 +246,7 @@ impl<F: TableSchema + 'static> Table<F> { let ent2 = ret_entry.clone(); self.system .background - .spawn(async move { - self2.repair_on_read(&who[..], ent2).await - }); + .spawn(async move { self2.repair_on_read(&who[..], ent2).await }); } } Ok(ret) @@ -263,7 +263,8 @@ impl<F: TableSchema + 'static> Table<F> { let ring = self.system.ring.borrow().clone(); let who = ring.walk_ring(&hash, self.param.replication_factor); - let rpc = &TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); + let rpc = + &TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); let resps = self .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) .await?; @@ -273,7 +274,8 @@ impl<F: TableSchema + 'static> Table<F> { for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; + let entry = + rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { @@ -294,16 +296,18 @@ impl<F: TableSchema + 'static> Table<F> { } if !to_repair.is_empty() { let self2 = self.clone(); - self.system - .background - .spawn(async move { - for (_, v) in to_repair.iter_mut() { - self2.repair_on_read(&who[..], v.take().unwrap()).await?; - } - Ok(()) - }); + self.system.background.spawn(async move { + for (_, v) in to_repair.iter_mut() { + self2.repair_on_read(&who[..], v.take().unwrap()).await?; + } + Ok(()) + }); } - let ret_vec = ret.iter_mut().take(limit).map(|(_k, v)| v.take().unwrap()).collect::<Vec<_>>(); + let ret_vec = ret + .iter_mut() + .take(limit) + .map(|(_k, v)| v.take().unwrap()) + .collect::<Vec<_>>(); Ok(ret_vec) } @@ -408,7 +412,10 @@ impl<F: TableSchema + 'static> Table<F> { } } - pub async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> { + pub async fn handle_update( + self: &Arc<Self>, + mut entries: Vec<Arc<ByteBuf>>, + ) -> Result<(), Error> { for update_bytes in entries.drain(..) { let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; diff --git a/src/table_sync.rs b/src/table_sync.rs index f96e45ff..703dd750 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -29,7 +29,7 @@ pub struct TableSyncer<F: TableSchema> { #[derive(Serialize, Deserialize)] pub enum SyncRPC { - Checksums(Vec<RangeChecksum>), + Checksums(Vec<RangeChecksum>, bool), Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>), } @@ -179,8 +179,12 @@ impl<F: TableSchema + 'static> TableSyncer<F> { .iter() .filter(|node| **node != my_id) .map(|node| { - self.clone() - .do_sync_with(root_cks.clone(), node.clone(), must_exit.clone()) + self.clone().do_sync_with( + root_cks.clone(), + node.clone(), + partition.retain, + must_exit.clone(), + ) }) .collect::<FuturesUnordered<_>>(); @@ -344,6 +348,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { self: Arc<Self>, root_ck: RangeChecksum, who: UUID, + retain: bool, mut must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { let mut todo = VecDeque::new(); @@ -364,10 +369,21 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let rpc_resp = self .table - .rpc_call(&who, &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step))) + .rpc_call( + &who, + &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)), + ) .await?; - if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = rpc_resp { - eprintln!("({}) Sync with {:?}: difference {} ranges, {} items", self.table.name, who, diff_ranges.len(), diff_items.len()); + if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = + rpc_resp + { + eprintln!( + "({}) Sync with {:?}: difference {} ranges, {} items", + self.table.name, + who, + diff_ranges.len(), + diff_items.len() + ); let mut items_to_send = vec![]; for differing in diff_ranges.drain(..) { if differing.level == 0 { @@ -377,7 +393,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { todo.push_back(checksum); } } - if diff_items.len() > 0 { + if retain && diff_items.len() > 0 { self.table.handle_update(diff_items).await?; } if items_to_send.len() > 0 { @@ -429,7 +445,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { message: &SyncRPC, mut must_exit: watch::Receiver<bool>, ) -> Result<SyncRPC, Error> { - if let SyncRPC::Checksums(checksums) = message { + if let SyncRPC::Checksums(checksums, retain) = message { let mut ret_ranges = vec![]; let mut ret_items = vec![]; for ckr in checksums.iter() { @@ -437,7 +453,12 @@ impl<F: TableSchema + 'static> TableSyncer<F> { for (range, hash) in ckr.children.iter() { // Only consider items that are in the intersection of the two ranges // (other ranges will be exchanged at some point) - if our_ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) { + if our_ckr + .found_limit + .as_ref() + .map(|x| range.begin.as_slice() >= x.as_slice()) + .unwrap_or(false) + { break; } @@ -450,15 +471,22 @@ impl<F: TableSchema + 'static> TableSyncer<F> { }; if differs { ret_ranges.push(range.clone()); - if range.level == 0 { - if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + if *retain && range.level == 0 { + if let Some(item_bytes) = + self.table.store.get(range.begin.as_slice())? + { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } } } } for (range, _hash) in our_ckr.children.iter() { - if ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) { + if ckr + .found_limit + .as_ref() + .map(|x| range.begin.as_slice() >= x.as_slice()) + .unwrap_or(false) + { break; } @@ -467,9 +495,13 @@ impl<F: TableSchema + 'static> TableSyncer<F> { .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin)) .is_err(); if not_present { - ret_ranges.push(range.clone()); - if range.level == 0 { - if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? { + if range.level > 0 { + ret_ranges.push(range.clone()); + } + if *retain && range.level == 0 { + if let Some(item_bytes) = + self.table.store.get(range.begin.as_slice())? + { ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); } } |