aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--src/block.rs100
-rw-r--r--src/block_ref_table.rs5
-rw-r--r--src/object_table.rs6
-rw-r--r--src/server.rs34
-rw-r--r--src/table.rs118
-rw-r--r--src/table_sync.rs78
-rw-r--r--src/version_table.rs5
9 files changed, 236 insertions, 112 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 0d4521c8..52e67f5f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -295,6 +295,7 @@ dependencies = [
name = "garage"
version = "0.1.0"
dependencies = [
+ "arc-swap 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
"async-trait 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)",
"bincode 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/Cargo.toml b/Cargo.toml
index 478aa1bc..a66a712f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -29,6 +29,7 @@ sha2 = "0.8"
async-trait = "0.1.30"
reduce = "0.1.2"
serde_json = "1.0"
+arc-swap = "0.4"
rustls = "0.17"
tokio-rustls = "0.13"
diff --git a/src/block.rs b/src/block.rs
index e898ad19..25a10910 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -6,6 +6,7 @@ use futures::stream::*;
use tokio::fs;
use tokio::prelude::*;
use tokio::sync::{watch, Mutex};
+use arc_swap::ArcSwapOption;
use crate::data;
use crate::data::*;
@@ -13,6 +14,7 @@ use crate::error::Error;
use crate::membership::System;
use crate::proto::*;
use crate::rpc_client::*;
+use crate::server::Garage;
pub struct BlockManager {
pub data_dir: PathBuf,
@@ -20,10 +22,11 @@ pub struct BlockManager {
pub resync_queue: sled::Tree,
pub lock: Mutex<()>,
pub system: Arc<System>,
+ pub garage: ArcSwapOption<Garage>,
}
impl BlockManager {
- pub async fn new(db: &sled::Db, data_dir: PathBuf, system: Arc<System>) -> Arc<Self> {
+ pub fn new(db: &sled::Db, data_dir: PathBuf, system: Arc<System>) -> Arc<Self> {
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
@@ -33,20 +36,23 @@ impl BlockManager {
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
- let block_manager = Arc::new(Self {
+ Arc::new(Self {
rc,
resync_queue,
data_dir,
lock: Mutex::new(()),
system,
- });
- let bm2 = block_manager.clone();
- block_manager
+ garage: ArcSwapOption::from(None),
+ })
+ }
+
+ pub async fn spawn_background_worker(self: Arc<Self>) {
+ let bm2 = self.clone();
+ self
.system
.background
.spawn_worker(move |must_exit| bm2.resync_loop(must_exit))
.await;
- block_manager
}
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
@@ -80,7 +86,7 @@ impl BlockManager {
let _lock = self.lock.lock().await;
eprintln!("Block {:?} is corrupted. Deleting and resyncing.", hash);
fs::remove_file(path).await?;
- self.resync_queue.insert(hash.to_vec(), vec![1u8])?;
+ self.put_to_resync(&hash, 0)?;
return Err(Error::CorruptData(hash.clone()));
}
@@ -98,38 +104,55 @@ impl BlockManager {
}
pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
- self.rc.merge(&hash, vec![1])?;
+ let new_rc = self.rc.merge(&hash, vec![1])?;
+ if new_rc.map(|x| u64_from_bytes(&x[..]) == 0).unwrap_or(true) {
+ self.put_to_resync(&hash, BLOCK_RW_TIMEOUT.as_millis() as u64)?;
+ }
Ok(())
}
pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
- if self.rc.merge(&hash, vec![0])?.is_none() {
- self.resync_queue.insert(hash.to_vec(), vec![1u8])?;
+ let new_rc = self.rc.merge(&hash, vec![0])?;
+ if new_rc.is_none() {
+ self.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT.as_millis() as u64)?;
}
Ok(())
}
+ fn put_to_resync(&self, hash: &Hash, delay_millis: u64) -> Result<(), Error> {
+ let when = now_msec() + delay_millis;
+ eprintln!("Put resync_queue: {} {:?}", when, hash);
+ let mut key = u64::to_be_bytes(when).to_vec();
+ key.extend(hash.as_ref());
+ self.resync_queue.insert(key, hash.as_ref())?;
+ Ok(())
+ }
+
async fn resync_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> {
while !*must_exit.borrow() {
- if let Some((hash_bytes, _v)) = self.resync_queue.get_gt(&[])? {
- let mut hash = [0u8; 32];
- hash.copy_from_slice(hash_bytes.as_ref());
- let hash = Hash::from(hash);
-
- match self.resync_iter(&hash).await {
- Ok(_) => {
- self.resync_queue.remove(&hash_bytes)?;
- }
- Err(e) => {
- eprintln!(
- "Failed to resync hash {:?}, leaving it in queue: {}",
- hash, e
- );
+ if let Some((time_bytes, hash_bytes)) = self.resync_queue.get_gt(&[])? {
+ let time_msec = u64_from_bytes(&time_bytes[0..8]);
+ eprintln!("First in resync queue: {} (now = {})", time_msec, now_msec());
+ if now_msec() >= time_msec {
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(hash_bytes.as_ref());
+ let hash = Hash::from(hash);
+
+ match self.resync_iter(&hash).await {
+ Ok(_) => {
+ self.resync_queue.remove(&hash_bytes)?;
+ }
+ Err(e) => {
+ eprintln!(
+ "Failed to resync hash {:?}, leaving it in queue: {}",
+ hash, e
+ );
+ }
}
+ continue;
}
- } else {
- tokio::time::delay_for(Duration::from_secs(1)).await;
}
+ tokio::time::delay_for(Duration::from_secs(1)).await;
}
Ok(())
}
@@ -145,14 +168,23 @@ impl BlockManager {
.map(|x| u64_from_bytes(x.as_ref()) > 0)
.unwrap_or(false);
+ eprintln!("Resync block {:?}: exists {}, needed {}", hash, exists, needed);
+
if exists && !needed {
- // TODO: verify that other nodes that might need it have it ?
+ let garage = self.garage.load_full().unwrap();
+ let active_refs = garage.block_ref_table.get_range(&hash, &[0u8; 32].into(), Some(()), 1).await?;
+ let needed_by_others = !active_refs.is_empty();
+ if needed_by_others {
+ // TODO check they have it and send it if not
+ }
fs::remove_file(path).await?;
self.resync_queue.remove(&hash)?;
}
if needed && !exists {
// TODO find a way to not do this if they are sending it to us
+ // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
+ // between the RC being incremented and this part being called.
let block_data = rpc_get_block(&self.system, &hash).await?;
self.write_block(hash, &block_data[..]).await?;
}
@@ -190,11 +222,8 @@ fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
}
pub async fn rpc_get_block(system: &Arc<System>, hash: &Hash) -> Result<Vec<u8>, Error> {
- let who = system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, system.config.data_replication_factor);
+ let ring = system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, system.config.data_replication_factor);
let msg = Message::GetBlock(hash.clone());
let mut resp_stream = who
.iter()
@@ -215,11 +244,8 @@ pub async fn rpc_get_block(system: &Arc<System>, hash: &Hash) -> Result<Vec<u8>,
}
pub async fn rpc_put_block(system: &Arc<System>, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
- let who = system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, system.config.data_replication_factor);
+ let ring = system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, system.config.data_replication_factor);
rpc_try_call_many(
system.clone(),
&who[..],
diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs
index cf24fea7..5f6ce21b 100644
--- a/src/block_ref_table.rs
+++ b/src/block_ref_table.rs
@@ -44,6 +44,7 @@ impl TableSchema for BlockRefTable {
type P = Hash;
type S = UUID;
type E = BlockRef;
+ type Filter = ();
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
@@ -60,4 +61,8 @@ impl TableSchema for BlockRefTable {
}
}
}
+
+ fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
+ !entry.deleted
+ }
}
diff --git a/src/object_table.rs b/src/object_table.rs
index fbacf2dc..880543e1 100644
--- a/src/object_table.rs
+++ b/src/object_table.rs
@@ -96,6 +96,7 @@ impl TableSchema for ObjectTable {
type P = String;
type S = String;
type E = Object;
+ type Filter = ();
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let version_table = self.version_table.clone();
@@ -122,4 +123,9 @@ impl TableSchema for ObjectTable {
});
}
}
+
+ fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
+ // TODO
+ true
+ }
}
diff --git a/src/server.rs b/src/server.rs
index 287b4386..591a7bf9 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,10 +1,11 @@
-pub use futures_util::future::FutureExt;
-use serde::Deserialize;
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
+
+pub use futures_util::future::FutureExt;
+use serde::Deserialize;
use tokio::sync::watch;
use crate::api_server;
@@ -66,9 +67,11 @@ impl Garage {
db: sled::Db,
background: Arc<BackgroundRunner>,
) -> Arc<Self> {
+ println!("Initialize membership management system...");
let system = Arc::new(System::new(config.clone(), id, background.clone()));
- let block_manager = BlockManager::new(&db, config.data_dir.clone(), system.clone()).await;
+ println!("Initialize block manager...");
+ let block_manager = BlockManager::new(&db, config.data_dir.clone(), system.clone());
let data_rep_param = TableReplicationParams {
replication_factor: system.config.data_replication_factor,
@@ -84,6 +87,7 @@ impl Garage {
timeout: DEFAULT_TIMEOUT,
};
+ println!("Initialize block_ref_table...");
let block_ref_table = Table::new(
BlockRefTable {
background: background.clone(),
@@ -95,6 +99,8 @@ impl Garage {
data_rep_param.clone(),
)
.await;
+
+ println!("Initialize version_table...");
let version_table = Table::new(
VersionTable {
background: background.clone(),
@@ -106,6 +112,8 @@ impl Garage {
meta_rep_param.clone(),
)
.await;
+
+ println!("Initialize object_table...");
let object_table = Table::new(
ObjectTable {
background: background.clone(),
@@ -118,6 +126,7 @@ impl Garage {
)
.await;
+ println!("Initialize Garage...");
let mut garage = Self {
db,
system: system.clone(),
@@ -142,7 +151,13 @@ impl Garage {
garage.block_ref_table.clone().rpc_handler(),
);
- Arc::new(garage)
+ let garage = Arc::new(garage);
+
+ println!("Start block manager background thread...");
+ garage.block_manager.garage.swap(Some(garage.clone()));
+ garage.block_manager.clone().spawn_background_worker().await;
+
+ garage
}
}
@@ -206,20 +221,25 @@ async fn wait_from(mut chan: watch::Receiver<bool>) -> () {
}
pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
+ println!("Loading configuration...");
let config = read_config(config_file).expect("Unable to read config file");
+ let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID");
+ println!("Node ID: {}", hex::encode(&id));
+
+ println!("Opening database...");
let mut db_path = config.metadata_dir.clone();
db_path.push("db");
let db = sled::open(db_path).expect("Unable to open DB");
- let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID");
- println!("Node ID: {}", hex::encode(&id));
-
let (send_cancel, watch_cancel) = watch::channel(false);
+ println!("Initializing background runner...");
let background = BackgroundRunner::new(8, watch_cancel.clone());
+
let garage = Garage::new(config, id, db, background.clone()).await;
+ println!("Initializing RPC and API servers...");
let rpc_server = rpc_server::run_rpc_server(garage.clone(), wait_from(watch_cancel.clone()));
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
diff --git a/src/table.rs b/src/table.rs
index 6892c9f5..40114aec 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -1,4 +1,4 @@
-use std::collections::HashMap;
+use std::collections::{HashMap, BTreeMap};
use std::sync::Arc;
use std::time::Duration;
@@ -60,10 +60,11 @@ pub enum TableRPC<F: TableSchema> {
ReadEntry(F::P, F::S),
ReadEntryResponse(Option<ByteBuf>),
+ ReadRange(F::P, F::S, Option<F::Filter>, usize),
+
Update(Vec<Arc<ByteBuf>>),
- SyncChecksums(Vec<RangeChecksum>),
- SyncDifferentSet(Vec<SyncRange>),
+ SyncRPC(SyncRPC),
}
pub trait PartitionKey {
@@ -118,11 +119,15 @@ pub trait TableSchema: Send + Sync {
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type E: Entry<Self::P, Self::S>;
+ type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>);
+ fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true }
}
impl<F: TableSchema + 'static> Table<F> {
+ // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
+
pub async fn new(
instance: F,
system: Arc<System>,
@@ -144,18 +149,10 @@ impl<F: TableSchema + 'static> Table<F> {
table
}
- pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> {
- Box::new(TableRpcHandlerAdapter::<F> { table: self })
- }
-
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, self.param.replication_factor);
+ let ring = self.system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, self.param.replication_factor);
eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
@@ -171,12 +168,8 @@ impl<F: TableSchema + 'static> Table<F> {
for entry in entries.iter() {
let hash = entry.partition_key().hash();
- let who = self
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, self.param.replication_factor);
+ let ring = self.system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, self.param.replication_factor);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
if !call_list.contains_key(&node) {
@@ -215,12 +208,8 @@ impl<F: TableSchema + 'static> Table<F> {
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
- let who = self
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, self.param.replication_factor);
+ let ring = self.system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, self.param.replication_factor);
eprintln!("get who: {:?}", who);
let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
@@ -251,15 +240,76 @@ impl<F: TableSchema + 'static> Table<F> {
}
if let Some(ret_entry) = &ret {
if not_all_same {
+ let self2 = self.clone();
+ let ent2 = ret_entry.clone();
self.system
.background
- .spawn(self.clone().repair_on_read(who, ret_entry.clone()));
+ .spawn(async move {
+ self2.repair_on_read(&who[..], ent2).await
+ });
}
}
Ok(ret)
}
- async fn repair_on_read(self: Arc<Self>, who: Vec<UUID>, what: F::E) -> Result<(), Error> {
+ pub async fn get_range(
+ self: &Arc<Self>,
+ partition_key: &F::P,
+ begin_sort_key: &F::S,
+ filter: Option<F::Filter>,
+ limit: usize,
+ ) -> Result<Vec<F::E>, Error> {
+ let hash = partition_key.hash();
+ let ring = self.system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, self.param.replication_factor);
+
+ let rpc = &TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit);
+ let resps = self
+ .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum)
+ .await?;
+
+ let mut ret = BTreeMap::new();
+ let mut to_repair = BTreeMap::new();
+ for resp in resps {
+ if let TableRPC::Update(entries) = resp {
+ for entry_bytes in entries.iter() {
+ let entry = rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?;
+ let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
+ match ret.remove(&entry_key) {
+ None => {
+ ret.insert(entry_key, Some(entry));
+ }
+ Some(Some(mut prev)) => {
+ let must_repair = prev != entry;
+ prev.merge(&entry);
+ if must_repair {
+ to_repair.insert(entry_key.clone(), Some(prev.clone()));
+ }
+ ret.insert(entry_key, Some(prev));
+ }
+ Some(None) => unreachable!(),
+ }
+ }
+ }
+ }
+ if !to_repair.is_empty() {
+ let self2 = self.clone();
+ self.system
+ .background
+ .spawn(async move {
+ for (_, v) in to_repair.iter_mut() {
+ self2.repair_on_read(&who[..], v.take().unwrap()).await?;
+ }
+ Ok(())
+ });
+ }
+ let ret_vec = ret.iter_mut().take(limit).map(|(_k, v)| v.take().unwrap()).collect::<Vec<_>>();
+ Ok(ret_vec)
+ }
+
+ // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
+
+ async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
self.rpc_try_call_many(&who[..], &TableRPC::<F>::Update(vec![what_enc]), who.len())
.await?;
@@ -322,6 +372,12 @@ impl<F: TableSchema + 'static> Table<F> {
)))
}
+ // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ==============
+
+ pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> {
+ Box::new(TableRpcHandlerAdapter::<F> { table: self })
+ }
+
async fn handle(self: &Arc<Self>, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {
match msg {
TableRPC::ReadEntry(key, sort_key) => {
@@ -332,12 +388,12 @@ impl<F: TableSchema + 'static> Table<F> {
self.handle_update(pairs).await?;
Ok(TableRPC::Ok)
}
- TableRPC::SyncChecksums(checksums) => {
+ TableRPC::SyncRPC(rpc) => {
let syncer = self.syncer.read().await.as_ref().unwrap().clone();
- let differing = syncer
- .handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone())
+ let response = syncer
+ .handle_rpc(&rpc, self.system.background.stop_signal.clone())
.await?;
- Ok(TableRPC::SyncDifferentSet(differing))
+ Ok(TableRPC::SyncRPC(response))
}
_ => Err(Error::RPCError(format!("Unexpected table RPC"))),
}
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 8eb08074..5ef13d6d 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -27,6 +27,12 @@ pub struct TableSyncer<F: TableSchema> {
pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>,
}
+#[derive(Serialize, Deserialize)]
+pub enum SyncRPC {
+ Checksums(Vec<RangeChecksum>),
+ DifferentSet(Vec<SyncRange>),
+}
+
pub struct SyncTodo {
pub todo: Vec<Partition>,
}
@@ -166,13 +172,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.root_checksum(&partition.begin, &partition.end, must_exit)
.await?;
- let nodes = self
- .table
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&partition.begin, self.table.param.replication_factor);
+ let ring = self.table.system.ring.borrow().clone();
+ let nodes = ring.walk_ring(&partition.begin, self.table.param.replication_factor);
let mut sync_futures = nodes
.iter()
.map(|node| {
@@ -361,9 +362,9 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let rpc_resp = self
.table
- .rpc_call(&who, &TableRPC::<F>::SyncChecksums(step))
+ .rpc_call(&who, &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)))
.await?;
- if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp {
+ if let TableRPC::<F>::SyncRPC(SyncRPC::DifferentSet(mut s)) = rpc_resp {
let mut items = vec![];
for differing in s.drain(..) {
if differing.level == 0 {
@@ -381,7 +382,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
} else {
return Err(Error::Message(format!(
- "Unexpected response to RPC SyncChecksums: {}",
+ "Unexpected response to sync RPC checksums: {}",
debug_serialize(&rpc_resp)
)));
}
@@ -417,41 +418,44 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
- pub async fn handle_checksum_rpc(
+ pub async fn handle_rpc(
self: &Arc<Self>,
- checksums: &[RangeChecksum],
+ message: &SyncRPC,
mut must_exit: watch::Receiver<bool>,
- ) -> Result<Vec<SyncRange>, Error> {
- let mut ret = vec![];
- for ckr in checksums.iter() {
- let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
- for (range, hash) in ckr.children.iter() {
- match our_ckr
- .children
- .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
- {
- Err(_) => {
- ret.push(range.clone());
- }
- Ok(i) => {
- if our_ckr.children[i].1 != *hash {
+ ) -> Result<SyncRPC, Error> {
+ if let SyncRPC::Checksums(checksums) = message {
+ let mut ret = vec![];
+ for ckr in checksums.iter() {
+ let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
+ for (range, hash) in ckr.children.iter() {
+ match our_ckr
+ .children
+ .binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
+ {
+ Err(_) => {
ret.push(range.clone());
}
+ Ok(i) => {
+ if our_ckr.children[i].1 != *hash {
+ ret.push(range.clone());
+ }
+ }
}
}
}
+ let n_checksums = checksums
+ .iter()
+ .map(|x| x.children.len())
+ .fold(0, |x, y| x + y);
+ eprintln!(
+ "({}) Checksum comparison RPC: {} different out of {}",
+ self.table.name,
+ ret.len(),
+ n_checksums
+ );
+ return Ok(SyncRPC::DifferentSet(ret));
}
- let n_checksums = checksums
- .iter()
- .map(|x| x.children.len())
- .fold(0, |x, y| x + y);
- eprintln!(
- "({}) Checksum comparison RPC: {} different out of {}",
- self.table.name,
- ret.len(),
- n_checksums
- );
- Ok(ret)
+ Err(Error::Message(format!("Unexpected sync RPC")))
}
pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> {
diff --git a/src/version_table.rs b/src/version_table.rs
index 77a7560d..22290fd7 100644
--- a/src/version_table.rs
+++ b/src/version_table.rs
@@ -62,6 +62,7 @@ impl TableSchema for VersionTable {
type P = Hash;
type S = EmptySortKey;
type E = Version;
+ type Filter = ();
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block_ref_table = self.block_ref_table.clone();
@@ -84,4 +85,8 @@ impl TableSchema for VersionTable {
});
}
}
+
+ fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
+ !entry.deleted
+ }
}