diff options
-rw-r--r-- | src/api_server.rs | 56 | ||||
-rw-r--r-- | src/block.rs | 163 | ||||
-rw-r--r-- | src/block_ref_table.rs | 5 | ||||
-rw-r--r-- | src/error.rs | 5 | ||||
-rw-r--r-- | src/rpc_client.rs | 12 | ||||
-rw-r--r-- | src/rpc_server.rs | 1 | ||||
-rw-r--r-- | src/server.rs | 10 | ||||
-rw-r--r-- | src/table.rs | 11 | ||||
-rw-r--r-- | src/table_sync.rs | 203 |
9 files changed, 326 insertions, 140 deletions
diff --git a/src/api_server.rs b/src/api_server.rs index f3f7165b..4ae48720 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -9,12 +9,10 @@ use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; -use crate::data; +use crate::block::*; use crate::data::*; use crate::error::Error; use crate::http_util::*; -use crate::proto::*; -use crate::rpc_client::*; use crate::server::Garage; use crate::table::EmptySortKey; @@ -155,7 +153,7 @@ async fn handle_put( let mut next_offset = first_block.len(); let mut put_curr_version_block = put_block_meta(garage.clone(), &version, 0, first_block_hash.clone()); - let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block); + let mut put_curr_block = rpc_put_block(&garage.system, first_block_hash, first_block); loop { let (_, _, next_block) = @@ -169,7 +167,7 @@ async fn handle_put( next_offset as u64, block_hash.clone(), ); - put_curr_block = put_block(garage.clone(), block_hash, block); + put_curr_block = rpc_put_block(&garage.system, block_hash, block); next_offset += block_len; } else { break; @@ -209,24 +207,6 @@ async fn put_block_meta( Ok(()) } -async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(), Error> { - let who = garage - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, garage.system.config.data_replication_factor); - rpc_try_call_many( - garage.system.clone(), - &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), - (garage.system.config.data_replication_factor + 1) / 2, - BLOCK_RW_TIMEOUT, - ) - .await?; - Ok(()) -} - struct BodyChunker { body: Body, read_all: bool, @@ -322,7 +302,7 @@ async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(first_block_hash) => { - let read_first_block = get_block(garage.clone(), &first_block_hash); + let read_first_block = rpc_get_block(&garage.system, &first_block_hash); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptySortKey); let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; @@ -345,7 +325,7 @@ async fn handle_get( if let Some(data) = data_opt { Ok(Bytes::from(data)) } else { - get_block(garage.clone(), &hash).await.map(Bytes::from) + rpc_get_block(&garage.system, &hash).await.map(Bytes::from) } } }) @@ -355,29 +335,3 @@ async fn handle_get( } } } - -async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> { - let who = garage - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, garage.system.config.data_replication_factor); - let resps = rpc_try_call_many( - garage.system.clone(), - &who[..], - Message::GetBlock(hash.clone()), - 1, - BLOCK_RW_TIMEOUT, - ) - .await?; - - for resp in resps { - if let Message::PutBlock(pbm) = resp { - if data::hash(&pbm.data) == *hash { - return Ok(pbm.data); - } - } - } - Err(Error::Message(format!("No valid blocks returned"))) -} diff --git a/src/block.rs b/src/block.rs index e50dab38..e898ad19 100644 --- a/src/block.rs +++ b/src/block.rs @@ -1,32 +1,52 @@ use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; -use futures_util::future::*; +use futures::stream::*; use tokio::fs; use tokio::prelude::*; -use tokio::sync::Mutex; +use tokio::sync::{watch, Mutex}; -use crate::background::*; +use crate::data; use crate::data::*; use crate::error::Error; +use crate::membership::System; use crate::proto::*; +use crate::rpc_client::*; pub struct BlockManager { pub data_dir: PathBuf, pub rc: sled::Tree, + pub resync_queue: sled::Tree, pub lock: Mutex<()>, + pub system: Arc<System>, } impl BlockManager { - pub fn new(db: &sled::Db, data_dir: PathBuf) -> Self { + pub async fn new(db: &sled::Db, data_dir: PathBuf, system: Arc<System>) -> Arc<Self> { let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); rc.set_merge_operator(rc_merge); - Self { + + let resync_queue = db + .open_tree("block_local_resync_queue") + .expect("Unable to open block_local_resync_queue tree"); + + let block_manager = Arc::new(Self { rc, + resync_queue, data_dir, lock: Mutex::new(()), - } + system, + }); + let bm2 = block_manager.clone(); + block_manager + .system + .background + .spawn_worker(move |must_exit| bm2.resync_loop(must_exit)) + .await; + block_manager } pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { @@ -51,9 +71,18 @@ impl BlockManager { let mut path = self.block_dir(hash); path.push(hex::encode(hash)); - let mut f = fs::File::open(path).await?; + let mut f = fs::File::open(&path).await?; let mut data = vec![]; f.read_to_end(&mut data).await?; + drop(f); + + if data::hash(&data[..]) != *hash { + let _lock = self.lock.lock().await; + eprintln!("Block {:?} is corrupted. Deleting and resyncing.", hash); + fs::remove_file(path).await?; + self.resync_queue.insert(hash.to_vec(), vec![1u8])?; + return Err(Error::CorruptData(hash.clone())); + } Ok(Message::PutBlock(PutBlockMessage { hash: hash.clone(), @@ -73,28 +102,74 @@ impl BlockManager { Ok(()) } - pub fn block_decref(&self, hash: &Hash, background: &BackgroundRunner) -> Result<(), Error> { - match self.rc.merge(&hash, vec![0])? { - None => { - let mut path = self.block_dir(hash); - path.push(hex::encode(hash)); - background.spawn(tokio::fs::remove_file(path).map_err(Into::into)); - Ok(()) + pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> { + if self.rc.merge(&hash, vec![0])?.is_none() { + self.resync_queue.insert(hash.to_vec(), vec![1u8])?; + } + Ok(()) + } + + async fn resync_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> { + while !*must_exit.borrow() { + if let Some((hash_bytes, _v)) = self.resync_queue.get_gt(&[])? { + let mut hash = [0u8; 32]; + hash.copy_from_slice(hash_bytes.as_ref()); + let hash = Hash::from(hash); + + match self.resync_iter(&hash).await { + Ok(_) => { + self.resync_queue.remove(&hash_bytes)?; + } + Err(e) => { + eprintln!( + "Failed to resync hash {:?}, leaving it in queue: {}", + hash, e + ); + } + } + } else { + tokio::time::delay_for(Duration::from_secs(1)).await; } - Some(_) => Ok(()), } + Ok(()) } + + async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> { + let mut path = self.data_dir.clone(); + path.push(hex::encode(hash.as_ref())); + + let exists = fs::metadata(&path).await.is_ok(); + let needed = self + .rc + .get(hash.as_ref())? + .map(|x| u64_from_bytes(x.as_ref()) > 0) + .unwrap_or(false); + + if exists && !needed { + // TODO: verify that other nodes that might need it have it ? + fs::remove_file(path).await?; + self.resync_queue.remove(&hash)?; + } + + if needed && !exists { + // TODO find a way to not do this if they are sending it to us + let block_data = rpc_get_block(&self.system, &hash).await?; + self.write_block(hash, &block_data[..]).await?; + } + + Ok(()) + } +} + +fn u64_from_bytes(bytes: &[u8]) -> u64 { + assert!(bytes.len() == 8); + let mut x8 = [0u8; 8]; + x8.copy_from_slice(bytes); + u64::from_be_bytes(x8) } fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> { - let old = old - .map(|x| { - assert!(x.len() == 8); - let mut x8 = [0u8; 8]; - x8.copy_from_slice(x); - u64::from_be_bytes(x8) - }) - .unwrap_or(0); + let old = old.map(u64_from_bytes).unwrap_or(0); assert!(new.len() == 1); let new = match new[0] { 0 => { @@ -113,3 +188,45 @@ fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> { Some(u64::to_be_bytes(new).to_vec()) } } + +pub async fn rpc_get_block(system: &Arc<System>, hash: &Hash) -> Result<Vec<u8>, Error> { + let who = system + .ring + .borrow() + .clone() + .walk_ring(&hash, system.config.data_replication_factor); + let msg = Message::GetBlock(hash.clone()); + let mut resp_stream = who + .iter() + .map(|to| rpc_call(system.clone(), to, &msg, BLOCK_RW_TIMEOUT)) + .collect::<FuturesUnordered<_>>(); + + while let Some(resp) = resp_stream.next().await { + if let Ok(Message::PutBlock(msg)) = resp { + if data::hash(&msg.data[..]) == *hash { + return Ok(msg.data); + } + } + } + Err(Error::Message(format!( + "Unable to read block {:?}: no valid blocks returned", + hash + ))) +} + +pub async fn rpc_put_block(system: &Arc<System>, hash: Hash, data: Vec<u8>) -> Result<(), Error> { + let who = system + .ring + .borrow() + .clone() + .walk_ring(&hash, system.config.data_replication_factor); + rpc_try_call_many( + system.clone(), + &who[..], + Message::PutBlock(PutBlockMessage { hash, data }), + (system.config.data_replication_factor + 1) / 2, + BLOCK_RW_TIMEOUT, + ) + .await?; + Ok(()) +} diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs index 21fe4658..cf24fea7 100644 --- a/src/block_ref_table.rs +++ b/src/block_ref_table.rs @@ -55,10 +55,7 @@ impl TableSchema for BlockRefTable { } } if was_before && !is_after { - if let Err(e) = self - .block_manager - .block_decref(block, &self.background) - { + if let Err(e) = self.block_manager.block_decref(block) { eprintln!("Failed to decref block {:?}: {}", block, e); } } diff --git a/src/error.rs b/src/error.rs index c9653f5a..7c648fab 100644 --- a/src/error.rs +++ b/src/error.rs @@ -2,6 +2,8 @@ use err_derive::Error; use hyper::StatusCode; use std::io; +use crate::data::Hash; + #[derive(Debug, Error)] pub enum Error { #[error(display = "IO error: {}", _0)] @@ -50,6 +52,9 @@ pub enum Error { #[error(display = "Not found")] NotFound, + #[error(display = "Corrupt data: does not match hash {:?}", _0)] + CorruptData(Hash), + #[error(display = "{}", _0)] Message(String), } diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 81d20966..f8da778c 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -1,7 +1,7 @@ +use std::borrow::Borrow; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; -use std::borrow::Borrow; use bytes::IntoBuf; use futures::stream::futures_unordered::FuturesUnordered; @@ -45,7 +45,8 @@ pub async fn rpc_try_call_many( ) -> Result<Vec<Message>, Error> { let sys2 = sys.clone(); let msg = Arc::new(msg); - let mut resp_stream = to.to_vec() + let mut resp_stream = to + .to_vec() .into_iter() .map(move |to| rpc_call(sys2.clone(), to.clone(), msg.clone(), timeout)) .collect::<FuturesUnordered<_>>(); @@ -95,7 +96,12 @@ pub async fn rpc_call<M: Borrow<Message>, N: Borrow<UUID>>( let status = sys.status.borrow().clone(); match status.nodes.get(to.borrow()) { Some(status) => status.addr.clone(), - None => return Err(Error::Message(format!("Peer ID not found: {:?}", to.borrow()))), + None => { + return Err(Error::Message(format!( + "Peer ID not found: {:?}", + to.borrow() + ))) + } } }; sys.rpc_client.call(&addr, msg, timeout).await diff --git a/src/rpc_server.rs b/src/rpc_server.rs index b18e366a..c473a32d 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -18,7 +18,6 @@ use crate::proto::Message; use crate::server::Garage; use crate::tls_util; - fn err_to_msg(x: Result<Message, Error>) -> Message { match x { Err(e) => Message::Error(format!("{}", e)), diff --git a/src/server.rs b/src/server.rs index 78b992f5..287b4386 100644 --- a/src/server.rs +++ b/src/server.rs @@ -17,7 +17,7 @@ use crate::proto::*; use crate::rpc_server; use crate::table::*; -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct Config { pub metadata_dir: PathBuf, pub data_dir: PathBuf, @@ -39,7 +39,7 @@ pub struct Config { pub rpc_tls: Option<TlsConfig>, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct TlsConfig { pub ca_cert: String, pub node_cert: String, @@ -48,9 +48,9 @@ pub struct TlsConfig { pub struct Garage { pub db: sled::Db, + pub background: Arc<BackgroundRunner>, pub system: Arc<System>, pub block_manager: Arc<BlockManager>, - pub background: Arc<BackgroundRunner>, pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>, @@ -66,9 +66,9 @@ impl Garage { db: sled::Db, background: Arc<BackgroundRunner>, ) -> Arc<Self> { - let block_manager = Arc::new(BlockManager::new(&db, config.data_dir.clone())); + let system = Arc::new(System::new(config.clone(), id, background.clone())); - let system = Arc::new(System::new(config, id, background.clone())); + let block_manager = BlockManager::new(&db, config.data_dir.clone(), system.clone()).await; let data_rep_param = TableReplicationParams { replication_factor: system.config.data_replication_factor, diff --git a/src/table.rs b/src/table.rs index 6b7d1779..6892c9f5 100644 --- a/src/table.rs +++ b/src/table.rs @@ -4,9 +4,9 @@ use std::time::Duration; use async_trait::async_trait; use futures::stream::*; -use tokio::sync::RwLock; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; +use tokio::sync::RwLock; use crate::data::*; use crate::error::Error; @@ -316,7 +316,10 @@ impl<F: TableSchema + 'static> Table<F> { return Ok(rmp_serde::decode::from_read_ref(&rep_by)?); } } - Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) + Err(Error::Message(format!( + "Invalid reply to TableRPC: {:?}", + resp + ))) } async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { @@ -331,7 +334,9 @@ impl<F: TableSchema + 'static> Table<F> { } TableRPC::SyncChecksums(checksums) => { let syncer = self.syncer.read().await.as_ref().unwrap().clone(); - let differing = syncer.handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()).await?; + let differing = syncer + .handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()) + .await?; Ok(TableRPC::SyncDifferentSet(differing)) } _ => Err(Error::RPCError(format!("Unexpected table RPC"))), diff --git a/src/table_sync.rs b/src/table_sync.rs index c1d3bea8..8eb08074 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,16 +1,16 @@ use rand::Rng; -use std::collections::{BTreeSet, BTreeMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; -use futures::{pin_mut, select}; use futures::future::BoxFuture; -use futures_util::stream::*; +use futures::{pin_mut, select}; use futures_util::future::*; +use futures_util::stream::*; +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; use tokio::sync::watch; use tokio::sync::Mutex; -use serde::{Serialize, Deserialize}; -use serde_bytes::ByteBuf; use crate::data::*; use crate::error::Error; @@ -62,7 +62,7 @@ pub struct RangeChecksum { pub children: Vec<(SyncRange, Hash)>, pub found_limit: Option<Vec<u8>>, - #[serde(skip, default="std::time::Instant::now")] + #[serde(skip, default = "std::time::Instant::now")] pub time: Instant, } @@ -72,7 +72,9 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let syncer = Arc::new(TableSyncer { table: table.clone(), todo: Mutex::new(todo), - cache: (0..MAX_DEPTH).map(|_| Mutex::new(BTreeMap::new())).collect::<Vec<_>>(), + cache: (0..MAX_DEPTH) + .map(|_| Mutex::new(BTreeMap::new())) + .collect::<Vec<_>>(), }); let s1 = syncer.clone(); @@ -137,9 +139,15 @@ impl<F: TableSchema + 'static> TableSyncer<F> { ) -> Result<(), Error> { while !*must_exit.borrow() { if let Some(partition) = self.todo.lock().await.pop_task() { - let res = self.clone().sync_partition(&partition, &mut must_exit).await; + let res = self + .clone() + .sync_partition(&partition, &mut must_exit) + .await; if let Err(e) = res { - eprintln!("({}) Error while syncing {:?}: {}", self.table.name, partition, e); + eprintln!( + "({}) Error while syncing {:?}: {}", + self.table.name, partition, e + ); } } else { tokio::time::delay_for(Duration::from_secs(1)).await; @@ -148,13 +156,29 @@ impl<F: TableSchema + 'static> TableSyncer<F> { Ok(()) } - async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { + async fn sync_partition( + self: Arc<Self>, + partition: &Partition, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<(), Error> { eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition); - let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?; + let root_cks = self + .root_checksum(&partition.begin, &partition.end, must_exit) + .await?; - let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor); - let mut sync_futures = nodes.iter() - .map(|node| self.clone().do_sync_with(root_cks.clone(), node.clone(), must_exit.clone())) + let nodes = self + .table + .system + .ring + .borrow() + .clone() + .walk_ring(&partition.begin, self.table.param.replication_factor); + let mut sync_futures = nodes + .iter() + .map(|node| { + self.clone() + .do_sync_with(root_cks.clone(), node.clone(), must_exit.clone()) + }) .collect::<FuturesUnordered<_>>(); while let Some(r) = sync_futures.next().await { @@ -163,27 +187,45 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } if !partition.retain { - self.table.delete_range(&partition.begin, &partition.end).await?; + self.table + .delete_range(&partition.begin, &partition.end) + .await?; } Ok(()) } - async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> { + async fn root_checksum( + self: &Arc<Self>, + begin: &Hash, + end: &Hash, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<RangeChecksum, Error> { for i in 1..MAX_DEPTH { - let rc = self.range_checksum(&SyncRange{ - begin: begin.to_vec(), - end: end.to_vec(), - level: i, - }, must_exit).await?; + let rc = self + .range_checksum( + &SyncRange { + begin: begin.to_vec(), + end: end.to_vec(), + level: i, + }, + must_exit, + ) + .await?; if rc.found_limit.is_none() { return Ok(rc); } } - Err(Error::Message(format!("Unable to compute root checksum (this should never happen"))) + Err(Error::Message(format!( + "Unable to compute root checksum (this should never happen" + ))) } - fn range_checksum<'a>(self: &'a Arc<Self>, range: &'a SyncRange, must_exit: &'a mut watch::Receiver<bool>) -> BoxFuture<'a, Result<RangeChecksum, Error>> { + fn range_checksum<'a>( + self: &'a Arc<Self>, + range: &'a SyncRange, + must_exit: &'a mut watch::Receiver<bool>, + ) -> BoxFuture<'a, Result<RangeChecksum, Error>> { async move { let mut cache = self.cache[range.level].lock().await; if let Some(v) = cache.get(&range) { @@ -195,41 +237,53 @@ impl<F: TableSchema + 'static> TableSyncer<F> { drop(cache); let v = self.range_checksum_inner(&range, must_exit).await?; - eprintln!("({}) New checksum calculated for {}-{}/{}, {} children", + eprintln!( + "({}) New checksum calculated for {}-{}/{}, {} children", self.table.name, hex::encode(&range.begin[..]), hex::encode(&range.end[..]), range.level, - v.children.len()); + v.children.len() + ); let mut cache = self.cache[range.level].lock().await; cache.insert(range.clone(), v.clone()); Ok(v) - }.boxed() + } + .boxed() } - async fn range_checksum_inner(self: &Arc<Self>, range: &SyncRange, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> { + async fn range_checksum_inner( + self: &Arc<Self>, + range: &SyncRange, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<RangeChecksum, Error> { if range.level == 1 { let mut children = vec![]; - for item in self.table.store.range(range.begin.clone()..range.end.clone()) { + for item in self + .table + .store + .range(range.begin.clone()..range.end.clone()) + { let (key, value) = item?; let key_hash = hash(&key[..]); - if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) { - return Ok(RangeChecksum{ + if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) + { + return Ok(RangeChecksum { bounds: range.clone(), children, found_limit: Some(key.to_vec()), time: Instant::now(), - }) + }); } - let item_range = SyncRange{ + let item_range = SyncRange { begin: key.to_vec(), end: vec![], level: 0, }; children.push((item_range, hash(&value[..]))); } - Ok(RangeChecksum{ + Ok(RangeChecksum { bounds: range.clone(), children, found_limit: None, @@ -237,7 +291,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { }) } else { let mut children = vec![]; - let mut sub_range = SyncRange{ + let mut sub_range = SyncRange { begin: range.begin.clone(), end: range.end.clone(), level: range.level - 1, @@ -255,7 +309,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 { - return Ok(RangeChecksum{ + return Ok(RangeChecksum { bounds: range.clone(), children, found_limit: None, @@ -265,8 +319,11 @@ impl<F: TableSchema + 'static> TableSyncer<F> { let found_limit = sub_ck.found_limit.unwrap(); let actual_limit_hash = hash(&found_limit[..]); - if actual_limit_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) { - return Ok(RangeChecksum{ + if actual_limit_hash.as_slice()[0..range.level] + .iter() + .all(|x| *x == 0) + { + return Ok(RangeChecksum { bounds: range.clone(), children, found_limit: Some(found_limit.clone()), @@ -280,18 +337,32 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } - async fn do_sync_with(self: Arc<Self>, root_ck: RangeChecksum, who: UUID, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> { + async fn do_sync_with( + self: Arc<Self>, + root_ck: RangeChecksum, + who: UUID, + mut must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { let mut todo = VecDeque::new(); todo.push_back(root_ck); while !todo.is_empty() && !*must_exit.borrow() { let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); - eprintln!("({}) Sync with {:?}: {} ({}) remaining", self.table.name, who, todo.len(), total_children); + eprintln!( + "({}) Sync with {:?}: {} ({}) remaining", + self.table.name, + who, + todo.len(), + total_children + ); let end = std::cmp::min(16, todo.len()); let step = todo.drain(..end).collect::<Vec<_>>(); - let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)).await?; + let rpc_resp = self + .table + .rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)) + .await?; if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp { let mut items = vec![]; for differing in s.drain(..) { @@ -303,17 +374,28 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } if items.len() > 0 { - self.table.system.background.spawn(self.clone().send_items(who.clone(), items)); + self.table + .system + .background + .spawn(self.clone().send_items(who.clone(), items)); } } else { - return Err(Error::Message(format!("Unexpected response to RPC SyncChecksums: {}", debug_serialize(&rpc_resp)))); + return Err(Error::Message(format!( + "Unexpected response to RPC SyncChecksums: {}", + debug_serialize(&rpc_resp) + ))); } } Ok(()) } async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { - eprintln!("({}) Sending {} items to {:?}", self.table.name, item_list.len(), who); + eprintln!( + "({}) Sending {} items to {:?}", + self.table.name, + item_list.len(), + who + ); let mut values = vec![]; for item in item_list.iter() { @@ -321,20 +403,33 @@ impl<F: TableSchema + 'static> TableSyncer<F> { values.push(Arc::new(ByteBuf::from(v.as_ref()))); } } - let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::Update(values)).await?; + let rpc_resp = self + .table + .rpc_call(&who, &TableRPC::<F>::Update(values)) + .await?; if let TableRPC::<F>::Ok = rpc_resp { Ok(()) } else { - Err(Error::Message(format!("Unexpected response to RPC Update: {}", debug_serialize(&rpc_resp)))) + Err(Error::Message(format!( + "Unexpected response to RPC Update: {}", + debug_serialize(&rpc_resp) + ))) } } - pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> { + pub async fn handle_checksum_rpc( + self: &Arc<Self>, + checksums: &[RangeChecksum], + mut must_exit: watch::Receiver<bool>, + ) -> Result<Vec<SyncRange>, Error> { let mut ret = vec![]; for ckr in checksums.iter() { let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?; for (range, hash) in ckr.children.iter() { - match our_ckr.children.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) { + match our_ckr + .children + .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) + { Err(_) => { ret.push(range.clone()); } @@ -346,14 +441,22 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } } - let n_checksums = checksums.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); - eprintln!("({}) Checksum comparison RPC: {} different out of {}", self.table.name, ret.len(), n_checksums); + let n_checksums = checksums + .iter() + .map(|x| x.children.len()) + .fold(0, |x, y| x + y); + eprintln!( + "({}) Checksum comparison RPC: {} different out of {}", + self.table.name, + ret.len(), + n_checksums + ); Ok(ret) } pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> { for i in 1..MAX_DEPTH { - let needle = SyncRange{ + let needle = SyncRange { begin: item_key.to_vec(), end: vec![], level: i, |