aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api_server.rs56
-rw-r--r--src/block.rs163
-rw-r--r--src/block_ref_table.rs5
-rw-r--r--src/error.rs5
-rw-r--r--src/rpc_client.rs12
-rw-r--r--src/rpc_server.rs1
-rw-r--r--src/server.rs10
-rw-r--r--src/table.rs11
-rw-r--r--src/table_sync.rs203
9 files changed, 326 insertions, 140 deletions
diff --git a/src/api_server.rs b/src/api_server.rs
index f3f7165b..4ae48720 100644
--- a/src/api_server.rs
+++ b/src/api_server.rs
@@ -9,12 +9,10 @@ use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
-use crate::data;
+use crate::block::*;
use crate::data::*;
use crate::error::Error;
use crate::http_util::*;
-use crate::proto::*;
-use crate::rpc_client::*;
use crate::server::Garage;
use crate::table::EmptySortKey;
@@ -155,7 +153,7 @@ async fn handle_put(
let mut next_offset = first_block.len();
let mut put_curr_version_block =
put_block_meta(garage.clone(), &version, 0, first_block_hash.clone());
- let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block);
+ let mut put_curr_block = rpc_put_block(&garage.system, first_block_hash, first_block);
loop {
let (_, _, next_block) =
@@ -169,7 +167,7 @@ async fn handle_put(
next_offset as u64,
block_hash.clone(),
);
- put_curr_block = put_block(garage.clone(), block_hash, block);
+ put_curr_block = rpc_put_block(&garage.system, block_hash, block);
next_offset += block_len;
} else {
break;
@@ -209,24 +207,6 @@ async fn put_block_meta(
Ok(())
}
-async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
- let who = garage
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, garage.system.config.data_replication_factor);
- rpc_try_call_many(
- garage.system.clone(),
- &who[..],
- Message::PutBlock(PutBlockMessage { hash, data }),
- (garage.system.config.data_replication_factor + 1) / 2,
- BLOCK_RW_TIMEOUT,
- )
- .await?;
- Ok(())
-}
-
struct BodyChunker {
body: Body,
read_all: bool,
@@ -322,7 +302,7 @@ async fn handle_get(
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(first_block_hash) => {
- let read_first_block = get_block(garage.clone(), &first_block_hash);
+ let read_first_block = rpc_get_block(&garage.system, &first_block_hash);
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptySortKey);
let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
@@ -345,7 +325,7 @@ async fn handle_get(
if let Some(data) = data_opt {
Ok(Bytes::from(data))
} else {
- get_block(garage.clone(), &hash).await.map(Bytes::from)
+ rpc_get_block(&garage.system, &hash).await.map(Bytes::from)
}
}
})
@@ -355,29 +335,3 @@ async fn handle_get(
}
}
}
-
-async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> {
- let who = garage
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, garage.system.config.data_replication_factor);
- let resps = rpc_try_call_many(
- garage.system.clone(),
- &who[..],
- Message::GetBlock(hash.clone()),
- 1,
- BLOCK_RW_TIMEOUT,
- )
- .await?;
-
- for resp in resps {
- if let Message::PutBlock(pbm) = resp {
- if data::hash(&pbm.data) == *hash {
- return Ok(pbm.data);
- }
- }
- }
- Err(Error::Message(format!("No valid blocks returned")))
-}
diff --git a/src/block.rs b/src/block.rs
index e50dab38..e898ad19 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -1,32 +1,52 @@
use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Duration;
-use futures_util::future::*;
+use futures::stream::*;
use tokio::fs;
use tokio::prelude::*;
-use tokio::sync::Mutex;
+use tokio::sync::{watch, Mutex};
-use crate::background::*;
+use crate::data;
use crate::data::*;
use crate::error::Error;
+use crate::membership::System;
use crate::proto::*;
+use crate::rpc_client::*;
pub struct BlockManager {
pub data_dir: PathBuf,
pub rc: sled::Tree,
+ pub resync_queue: sled::Tree,
pub lock: Mutex<()>,
+ pub system: Arc<System>,
}
impl BlockManager {
- pub fn new(db: &sled::Db, data_dir: PathBuf) -> Self {
+ pub async fn new(db: &sled::Db, data_dir: PathBuf, system: Arc<System>) -> Arc<Self> {
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
rc.set_merge_operator(rc_merge);
- Self {
+
+ let resync_queue = db
+ .open_tree("block_local_resync_queue")
+ .expect("Unable to open block_local_resync_queue tree");
+
+ let block_manager = Arc::new(Self {
rc,
+ resync_queue,
data_dir,
lock: Mutex::new(()),
- }
+ system,
+ });
+ let bm2 = block_manager.clone();
+ block_manager
+ .system
+ .background
+ .spawn_worker(move |must_exit| bm2.resync_loop(must_exit))
+ .await;
+ block_manager
}
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
@@ -51,9 +71,18 @@ impl BlockManager {
let mut path = self.block_dir(hash);
path.push(hex::encode(hash));
- let mut f = fs::File::open(path).await?;
+ let mut f = fs::File::open(&path).await?;
let mut data = vec![];
f.read_to_end(&mut data).await?;
+ drop(f);
+
+ if data::hash(&data[..]) != *hash {
+ let _lock = self.lock.lock().await;
+ eprintln!("Block {:?} is corrupted. Deleting and resyncing.", hash);
+ fs::remove_file(path).await?;
+ self.resync_queue.insert(hash.to_vec(), vec![1u8])?;
+ return Err(Error::CorruptData(hash.clone()));
+ }
Ok(Message::PutBlock(PutBlockMessage {
hash: hash.clone(),
@@ -73,28 +102,74 @@ impl BlockManager {
Ok(())
}
- pub fn block_decref(&self, hash: &Hash, background: &BackgroundRunner) -> Result<(), Error> {
- match self.rc.merge(&hash, vec![0])? {
- None => {
- let mut path = self.block_dir(hash);
- path.push(hex::encode(hash));
- background.spawn(tokio::fs::remove_file(path).map_err(Into::into));
- Ok(())
+ pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
+ if self.rc.merge(&hash, vec![0])?.is_none() {
+ self.resync_queue.insert(hash.to_vec(), vec![1u8])?;
+ }
+ Ok(())
+ }
+
+ async fn resync_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> {
+ while !*must_exit.borrow() {
+ if let Some((hash_bytes, _v)) = self.resync_queue.get_gt(&[])? {
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(hash_bytes.as_ref());
+ let hash = Hash::from(hash);
+
+ match self.resync_iter(&hash).await {
+ Ok(_) => {
+ self.resync_queue.remove(&hash_bytes)?;
+ }
+ Err(e) => {
+ eprintln!(
+ "Failed to resync hash {:?}, leaving it in queue: {}",
+ hash, e
+ );
+ }
+ }
+ } else {
+ tokio::time::delay_for(Duration::from_secs(1)).await;
}
- Some(_) => Ok(()),
}
+ Ok(())
}
+
+ async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> {
+ let mut path = self.data_dir.clone();
+ path.push(hex::encode(hash.as_ref()));
+
+ let exists = fs::metadata(&path).await.is_ok();
+ let needed = self
+ .rc
+ .get(hash.as_ref())?
+ .map(|x| u64_from_bytes(x.as_ref()) > 0)
+ .unwrap_or(false);
+
+ if exists && !needed {
+ // TODO: verify that other nodes that might need it have it ?
+ fs::remove_file(path).await?;
+ self.resync_queue.remove(&hash)?;
+ }
+
+ if needed && !exists {
+ // TODO find a way to not do this if they are sending it to us
+ let block_data = rpc_get_block(&self.system, &hash).await?;
+ self.write_block(hash, &block_data[..]).await?;
+ }
+
+ Ok(())
+ }
+}
+
+fn u64_from_bytes(bytes: &[u8]) -> u64 {
+ assert!(bytes.len() == 8);
+ let mut x8 = [0u8; 8];
+ x8.copy_from_slice(bytes);
+ u64::from_be_bytes(x8)
}
fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
- let old = old
- .map(|x| {
- assert!(x.len() == 8);
- let mut x8 = [0u8; 8];
- x8.copy_from_slice(x);
- u64::from_be_bytes(x8)
- })
- .unwrap_or(0);
+ let old = old.map(u64_from_bytes).unwrap_or(0);
assert!(new.len() == 1);
let new = match new[0] {
0 => {
@@ -113,3 +188,45 @@ fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
Some(u64::to_be_bytes(new).to_vec())
}
}
+
+pub async fn rpc_get_block(system: &Arc<System>, hash: &Hash) -> Result<Vec<u8>, Error> {
+ let who = system
+ .ring
+ .borrow()
+ .clone()
+ .walk_ring(&hash, system.config.data_replication_factor);
+ let msg = Message::GetBlock(hash.clone());
+ let mut resp_stream = who
+ .iter()
+ .map(|to| rpc_call(system.clone(), to, &msg, BLOCK_RW_TIMEOUT))
+ .collect::<FuturesUnordered<_>>();
+
+ while let Some(resp) = resp_stream.next().await {
+ if let Ok(Message::PutBlock(msg)) = resp {
+ if data::hash(&msg.data[..]) == *hash {
+ return Ok(msg.data);
+ }
+ }
+ }
+ Err(Error::Message(format!(
+ "Unable to read block {:?}: no valid blocks returned",
+ hash
+ )))
+}
+
+pub async fn rpc_put_block(system: &Arc<System>, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
+ let who = system
+ .ring
+ .borrow()
+ .clone()
+ .walk_ring(&hash, system.config.data_replication_factor);
+ rpc_try_call_many(
+ system.clone(),
+ &who[..],
+ Message::PutBlock(PutBlockMessage { hash, data }),
+ (system.config.data_replication_factor + 1) / 2,
+ BLOCK_RW_TIMEOUT,
+ )
+ .await?;
+ Ok(())
+}
diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs
index 21fe4658..cf24fea7 100644
--- a/src/block_ref_table.rs
+++ b/src/block_ref_table.rs
@@ -55,10 +55,7 @@ impl TableSchema for BlockRefTable {
}
}
if was_before && !is_after {
- if let Err(e) = self
- .block_manager
- .block_decref(block, &self.background)
- {
+ if let Err(e) = self.block_manager.block_decref(block) {
eprintln!("Failed to decref block {:?}: {}", block, e);
}
}
diff --git a/src/error.rs b/src/error.rs
index c9653f5a..7c648fab 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -2,6 +2,8 @@ use err_derive::Error;
use hyper::StatusCode;
use std::io;
+use crate::data::Hash;
+
#[derive(Debug, Error)]
pub enum Error {
#[error(display = "IO error: {}", _0)]
@@ -50,6 +52,9 @@ pub enum Error {
#[error(display = "Not found")]
NotFound,
+ #[error(display = "Corrupt data: does not match hash {:?}", _0)]
+ CorruptData(Hash),
+
#[error(display = "{}", _0)]
Message(String),
}
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index 81d20966..f8da778c 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -1,7 +1,7 @@
+use std::borrow::Borrow;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
-use std::borrow::Borrow;
use bytes::IntoBuf;
use futures::stream::futures_unordered::FuturesUnordered;
@@ -45,7 +45,8 @@ pub async fn rpc_try_call_many(
) -> Result<Vec<Message>, Error> {
let sys2 = sys.clone();
let msg = Arc::new(msg);
- let mut resp_stream = to.to_vec()
+ let mut resp_stream = to
+ .to_vec()
.into_iter()
.map(move |to| rpc_call(sys2.clone(), to.clone(), msg.clone(), timeout))
.collect::<FuturesUnordered<_>>();
@@ -95,7 +96,12 @@ pub async fn rpc_call<M: Borrow<Message>, N: Borrow<UUID>>(
let status = sys.status.borrow().clone();
match status.nodes.get(to.borrow()) {
Some(status) => status.addr.clone(),
- None => return Err(Error::Message(format!("Peer ID not found: {:?}", to.borrow()))),
+ None => {
+ return Err(Error::Message(format!(
+ "Peer ID not found: {:?}",
+ to.borrow()
+ )))
+ }
}
};
sys.rpc_client.call(&addr, msg, timeout).await
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index b18e366a..c473a32d 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -18,7 +18,6 @@ use crate::proto::Message;
use crate::server::Garage;
use crate::tls_util;
-
fn err_to_msg(x: Result<Message, Error>) -> Message {
match x {
Err(e) => Message::Error(format!("{}", e)),
diff --git a/src/server.rs b/src/server.rs
index 78b992f5..287b4386 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -17,7 +17,7 @@ use crate::proto::*;
use crate::rpc_server;
use crate::table::*;
-#[derive(Deserialize, Debug)]
+#[derive(Deserialize, Debug, Clone)]
pub struct Config {
pub metadata_dir: PathBuf,
pub data_dir: PathBuf,
@@ -39,7 +39,7 @@ pub struct Config {
pub rpc_tls: Option<TlsConfig>,
}
-#[derive(Deserialize, Debug)]
+#[derive(Deserialize, Debug, Clone)]
pub struct TlsConfig {
pub ca_cert: String,
pub node_cert: String,
@@ -48,9 +48,9 @@ pub struct TlsConfig {
pub struct Garage {
pub db: sled::Db,
+ pub background: Arc<BackgroundRunner>,
pub system: Arc<System>,
pub block_manager: Arc<BlockManager>,
- pub background: Arc<BackgroundRunner>,
pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>,
@@ -66,9 +66,9 @@ impl Garage {
db: sled::Db,
background: Arc<BackgroundRunner>,
) -> Arc<Self> {
- let block_manager = Arc::new(BlockManager::new(&db, config.data_dir.clone()));
+ let system = Arc::new(System::new(config.clone(), id, background.clone()));
- let system = Arc::new(System::new(config, id, background.clone()));
+ let block_manager = BlockManager::new(&db, config.data_dir.clone(), system.clone()).await;
let data_rep_param = TableReplicationParams {
replication_factor: system.config.data_replication_factor,
diff --git a/src/table.rs b/src/table.rs
index 6b7d1779..6892c9f5 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -4,9 +4,9 @@ use std::time::Duration;
use async_trait::async_trait;
use futures::stream::*;
-use tokio::sync::RwLock;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
+use tokio::sync::RwLock;
use crate::data::*;
use crate::error::Error;
@@ -316,7 +316,10 @@ impl<F: TableSchema + 'static> Table<F> {
return Ok(rmp_serde::decode::from_read_ref(&rep_by)?);
}
}
- Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp)))
+ Err(Error::Message(format!(
+ "Invalid reply to TableRPC: {:?}",
+ resp
+ )))
}
async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {
@@ -331,7 +334,9 @@ impl<F: TableSchema + 'static> Table<F> {
}
TableRPC::SyncChecksums(checksums) => {
let syncer = self.syncer.read().await.as_ref().unwrap().clone();
- let differing = syncer.handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()).await?;
+ let differing = syncer
+ .handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone())
+ .await?;
Ok(TableRPC::SyncDifferentSet(differing))
}
_ => Err(Error::RPCError(format!("Unexpected table RPC"))),
diff --git a/src/table_sync.rs b/src/table_sync.rs
index c1d3bea8..8eb08074 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -1,16 +1,16 @@
use rand::Rng;
-use std::collections::{BTreeSet, BTreeMap, VecDeque};
+use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
-use futures::{pin_mut, select};
use futures::future::BoxFuture;
-use futures_util::stream::*;
+use futures::{pin_mut, select};
use futures_util::future::*;
+use futures_util::stream::*;
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
use tokio::sync::watch;
use tokio::sync::Mutex;
-use serde::{Serialize, Deserialize};
-use serde_bytes::ByteBuf;
use crate::data::*;
use crate::error::Error;
@@ -62,7 +62,7 @@ pub struct RangeChecksum {
pub children: Vec<(SyncRange, Hash)>,
pub found_limit: Option<Vec<u8>>,
- #[serde(skip, default="std::time::Instant::now")]
+ #[serde(skip, default = "std::time::Instant::now")]
pub time: Instant,
}
@@ -72,7 +72,9 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let syncer = Arc::new(TableSyncer {
table: table.clone(),
todo: Mutex::new(todo),
- cache: (0..MAX_DEPTH).map(|_| Mutex::new(BTreeMap::new())).collect::<Vec<_>>(),
+ cache: (0..MAX_DEPTH)
+ .map(|_| Mutex::new(BTreeMap::new()))
+ .collect::<Vec<_>>(),
});
let s1 = syncer.clone();
@@ -137,9 +139,15 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
) -> Result<(), Error> {
while !*must_exit.borrow() {
if let Some(partition) = self.todo.lock().await.pop_task() {
- let res = self.clone().sync_partition(&partition, &mut must_exit).await;
+ let res = self
+ .clone()
+ .sync_partition(&partition, &mut must_exit)
+ .await;
if let Err(e) = res {
- eprintln!("({}) Error while syncing {:?}: {}", self.table.name, partition, e);
+ eprintln!(
+ "({}) Error while syncing {:?}: {}",
+ self.table.name, partition, e
+ );
}
} else {
tokio::time::delay_for(Duration::from_secs(1)).await;
@@ -148,13 +156,29 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
Ok(())
}
- async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
+ async fn sync_partition(
+ self: Arc<Self>,
+ partition: &Partition,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
eprintln!("({}) Preparing to sync {:?}...", self.table.name, partition);
- let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?;
+ let root_cks = self
+ .root_checksum(&partition.begin, &partition.end, must_exit)
+ .await?;
- let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor);
- let mut sync_futures = nodes.iter()
- .map(|node| self.clone().do_sync_with(root_cks.clone(), node.clone(), must_exit.clone()))
+ let nodes = self
+ .table
+ .system
+ .ring
+ .borrow()
+ .clone()
+ .walk_ring(&partition.begin, self.table.param.replication_factor);
+ let mut sync_futures = nodes
+ .iter()
+ .map(|node| {
+ self.clone()
+ .do_sync_with(root_cks.clone(), node.clone(), must_exit.clone())
+ })
.collect::<FuturesUnordered<_>>();
while let Some(r) = sync_futures.next().await {
@@ -163,27 +187,45 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
if !partition.retain {
- self.table.delete_range(&partition.begin, &partition.end).await?;
+ self.table
+ .delete_range(&partition.begin, &partition.end)
+ .await?;
}
Ok(())
}
- async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> {
+ async fn root_checksum(
+ self: &Arc<Self>,
+ begin: &Hash,
+ end: &Hash,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<RangeChecksum, Error> {
for i in 1..MAX_DEPTH {
- let rc = self.range_checksum(&SyncRange{
- begin: begin.to_vec(),
- end: end.to_vec(),
- level: i,
- }, must_exit).await?;
+ let rc = self
+ .range_checksum(
+ &SyncRange {
+ begin: begin.to_vec(),
+ end: end.to_vec(),
+ level: i,
+ },
+ must_exit,
+ )
+ .await?;
if rc.found_limit.is_none() {
return Ok(rc);
}
}
- Err(Error::Message(format!("Unable to compute root checksum (this should never happen")))
+ Err(Error::Message(format!(
+ "Unable to compute root checksum (this should never happen"
+ )))
}
- fn range_checksum<'a>(self: &'a Arc<Self>, range: &'a SyncRange, must_exit: &'a mut watch::Receiver<bool>) -> BoxFuture<'a, Result<RangeChecksum, Error>> {
+ fn range_checksum<'a>(
+ self: &'a Arc<Self>,
+ range: &'a SyncRange,
+ must_exit: &'a mut watch::Receiver<bool>,
+ ) -> BoxFuture<'a, Result<RangeChecksum, Error>> {
async move {
let mut cache = self.cache[range.level].lock().await;
if let Some(v) = cache.get(&range) {
@@ -195,41 +237,53 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
drop(cache);
let v = self.range_checksum_inner(&range, must_exit).await?;
- eprintln!("({}) New checksum calculated for {}-{}/{}, {} children",
+ eprintln!(
+ "({}) New checksum calculated for {}-{}/{}, {} children",
self.table.name,
hex::encode(&range.begin[..]),
hex::encode(&range.end[..]),
range.level,
- v.children.len());
+ v.children.len()
+ );
let mut cache = self.cache[range.level].lock().await;
cache.insert(range.clone(), v.clone());
Ok(v)
- }.boxed()
+ }
+ .boxed()
}
- async fn range_checksum_inner(self: &Arc<Self>, range: &SyncRange, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> {
+ async fn range_checksum_inner(
+ self: &Arc<Self>,
+ range: &SyncRange,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<RangeChecksum, Error> {
if range.level == 1 {
let mut children = vec![];
- for item in self.table.store.range(range.begin.clone()..range.end.clone()) {
+ for item in self
+ .table
+ .store
+ .range(range.begin.clone()..range.end.clone())
+ {
let (key, value) = item?;
let key_hash = hash(&key[..]);
- if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) {
- return Ok(RangeChecksum{
+ if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0)
+ {
+ return Ok(RangeChecksum {
bounds: range.clone(),
children,
found_limit: Some(key.to_vec()),
time: Instant::now(),
- })
+ });
}
- let item_range = SyncRange{
+ let item_range = SyncRange {
begin: key.to_vec(),
end: vec![],
level: 0,
};
children.push((item_range, hash(&value[..])));
}
- Ok(RangeChecksum{
+ Ok(RangeChecksum {
bounds: range.clone(),
children,
found_limit: None,
@@ -237,7 +291,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
})
} else {
let mut children = vec![];
- let mut sub_range = SyncRange{
+ let mut sub_range = SyncRange {
begin: range.begin.clone(),
end: range.end.clone(),
level: range.level - 1,
@@ -255,7 +309,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 {
- return Ok(RangeChecksum{
+ return Ok(RangeChecksum {
bounds: range.clone(),
children,
found_limit: None,
@@ -265,8 +319,11 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let found_limit = sub_ck.found_limit.unwrap();
let actual_limit_hash = hash(&found_limit[..]);
- if actual_limit_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) {
- return Ok(RangeChecksum{
+ if actual_limit_hash.as_slice()[0..range.level]
+ .iter()
+ .all(|x| *x == 0)
+ {
+ return Ok(RangeChecksum {
bounds: range.clone(),
children,
found_limit: Some(found_limit.clone()),
@@ -280,18 +337,32 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
- async fn do_sync_with(self: Arc<Self>, root_ck: RangeChecksum, who: UUID, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> {
+ async fn do_sync_with(
+ self: Arc<Self>,
+ root_ck: RangeChecksum,
+ who: UUID,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
let mut todo = VecDeque::new();
todo.push_back(root_ck);
while !todo.is_empty() && !*must_exit.borrow() {
let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
- eprintln!("({}) Sync with {:?}: {} ({}) remaining", self.table.name, who, todo.len(), total_children);
+ eprintln!(
+ "({}) Sync with {:?}: {} ({}) remaining",
+ self.table.name,
+ who,
+ todo.len(),
+ total_children
+ );
let end = std::cmp::min(16, todo.len());
let step = todo.drain(..end).collect::<Vec<_>>();
- let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)).await?;
+ let rpc_resp = self
+ .table
+ .rpc_call(&who, &TableRPC::<F>::SyncChecksums(step))
+ .await?;
if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp {
let mut items = vec![];
for differing in s.drain(..) {
@@ -303,17 +374,28 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
if items.len() > 0 {
- self.table.system.background.spawn(self.clone().send_items(who.clone(), items));
+ self.table
+ .system
+ .background
+ .spawn(self.clone().send_items(who.clone(), items));
}
} else {
- return Err(Error::Message(format!("Unexpected response to RPC SyncChecksums: {}", debug_serialize(&rpc_resp))));
+ return Err(Error::Message(format!(
+ "Unexpected response to RPC SyncChecksums: {}",
+ debug_serialize(&rpc_resp)
+ )));
}
}
Ok(())
}
async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
- eprintln!("({}) Sending {} items to {:?}", self.table.name, item_list.len(), who);
+ eprintln!(
+ "({}) Sending {} items to {:?}",
+ self.table.name,
+ item_list.len(),
+ who
+ );
let mut values = vec![];
for item in item_list.iter() {
@@ -321,20 +403,33 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
values.push(Arc::new(ByteBuf::from(v.as_ref())));
}
}
- let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::Update(values)).await?;
+ let rpc_resp = self
+ .table
+ .rpc_call(&who, &TableRPC::<F>::Update(values))
+ .await?;
if let TableRPC::<F>::Ok = rpc_resp {
Ok(())
} else {
- Err(Error::Message(format!("Unexpected response to RPC Update: {}", debug_serialize(&rpc_resp))))
+ Err(Error::Message(format!(
+ "Unexpected response to RPC Update: {}",
+ debug_serialize(&rpc_resp)
+ )))
}
}
- pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> {
+ pub async fn handle_checksum_rpc(
+ self: &Arc<Self>,
+ checksums: &[RangeChecksum],
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<Vec<SyncRange>, Error> {
let mut ret = vec![];
for ckr in checksums.iter() {
let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
for (range, hash) in ckr.children.iter() {
- match our_ckr.children.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) {
+ match our_ckr
+ .children
+ .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
+ {
Err(_) => {
ret.push(range.clone());
}
@@ -346,14 +441,22 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
}
- let n_checksums = checksums.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
- eprintln!("({}) Checksum comparison RPC: {} different out of {}", self.table.name, ret.len(), n_checksums);
+ let n_checksums = checksums
+ .iter()
+ .map(|x| x.children.len())
+ .fold(0, |x, y| x + y);
+ eprintln!(
+ "({}) Checksum comparison RPC: {} different out of {}",
+ self.table.name,
+ ret.len(),
+ n_checksums
+ );
Ok(ret)
}
pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> {
for i in 1..MAX_DEPTH {
- let needle = SyncRange{
+ let needle = SyncRange {
begin: item_key.to_vec(),
end: vec![],
level: i,