aboutsummaryrefslogtreecommitdiff
path: root/src/table/table_sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table_sync.rs')
-rw-r--r--src/table/table_sync.rs36
1 files changed, 17 insertions, 19 deletions
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
index 1a1b328b..11b1c211 100644
--- a/src/table/table_sync.rs
+++ b/src/table/table_sync.rs
@@ -29,14 +29,14 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> {
}
#[derive(Serialize, Deserialize)]
-pub enum SyncRPC {
+pub(crate) enum SyncRPC {
GetRootChecksumRange(Hash, Hash),
RootChecksumRange(SyncRange),
Checksums(Vec<RangeChecksum>),
Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
}
-pub struct SyncTodo {
+struct SyncTodo {
todo: Vec<TodoPartition>,
}
@@ -60,7 +60,7 @@ struct TodoPartition {
// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin)
// See RangeChecksum for the struct that stores this information.
#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
-pub struct SyncRange {
+pub(crate) struct SyncRange {
begin: Vec<u8>,
end: Vec<u8>,
level: usize,
@@ -81,7 +81,7 @@ impl std::cmp::Ord for SyncRange {
}
#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct RangeChecksum {
+pub(crate) struct RangeChecksum {
bounds: SyncRange,
children: Vec<(SyncRange, Hash)>,
found_limit: Option<Vec<u8>>,
@@ -91,7 +91,7 @@ pub struct RangeChecksum {
}
#[derive(Debug, Clone)]
-pub struct RangeChecksumCache {
+struct RangeChecksumCache {
hash: Option<Hash>, // None if no children
found_limit: Option<Vec<u8>>,
time: Instant,
@@ -102,7 +102,7 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
+ pub(crate) async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
let todo = SyncTodo { todo: Vec::new() };
let syncer = Arc::new(TableSyncer {
table: table.clone(),
@@ -348,15 +348,14 @@ where
}
// All remote nodes have written those items, now we can delete them locally
- for (k, v) in items.iter() {
- self.table.store.transaction(|tx_db| {
- if let Some(curv) = tx_db.get(k)? {
- if curv == &v[..] {
- tx_db.remove(&k[..])?;
- }
- }
- Ok(())
- })?;
+ for was_removed in join_all(
+ items
+ .iter()
+ .map(|(k, v)| self.table.delete_if_equal(&k[..], &v[..])),
+ )
+ .await
+ {
+ was_removed?;
}
Ok(())
@@ -642,7 +641,7 @@ where
}
}
- pub async fn handle_rpc(
+ pub(crate) async fn handle_rpc(
self: &Arc<Self>,
message: &SyncRPC,
mut must_exit: watch::Receiver<bool>,
@@ -738,7 +737,7 @@ where
Ok(SyncRPC::Difference(ret_ranges, ret_items))
}
- pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> {
+ pub(crate) fn invalidate(self: &Arc<Self>, item_key: &[u8]) {
for i in 1..MAX_DEPTH {
let needle = SyncRange {
begin: item_key.to_vec(),
@@ -747,14 +746,13 @@ where
};
let mut cache = self.cache[i].lock().unwrap();
if let Some(cache_entry) = cache.range(..=needle).rev().next() {
- if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key {
+ if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key {
let index = cache_entry.0.clone();
drop(cache_entry);
cache.remove(&index);
}
}
}
- Ok(())
}
}