diff options
Diffstat (limited to 'src/table/table_sync.rs')
-rw-r--r-- | src/table/table_sync.rs | 36 |
1 files changed, 17 insertions, 19 deletions
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 1a1b328b..11b1c211 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -29,14 +29,14 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> { } #[derive(Serialize, Deserialize)] -pub enum SyncRPC { +pub(crate) enum SyncRPC { GetRootChecksumRange(Hash, Hash), RootChecksumRange(SyncRange), Checksums(Vec<RangeChecksum>), Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>), } -pub struct SyncTodo { +struct SyncTodo { todo: Vec<TodoPartition>, } @@ -60,7 +60,7 @@ struct TodoPartition { // (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin) // See RangeChecksum for the struct that stores this information. #[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] -pub struct SyncRange { +pub(crate) struct SyncRange { begin: Vec<u8>, end: Vec<u8>, level: usize, @@ -81,7 +81,7 @@ impl std::cmp::Ord for SyncRange { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RangeChecksum { +pub(crate) struct RangeChecksum { bounds: SyncRange, children: Vec<(SyncRange, Hash)>, found_limit: Option<Vec<u8>>, @@ -91,7 +91,7 @@ pub struct RangeChecksum { } #[derive(Debug, Clone)] -pub struct RangeChecksumCache { +struct RangeChecksumCache { hash: Option<Hash>, // None if no children found_limit: Option<Vec<u8>>, time: Instant, @@ -102,7 +102,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { + pub(crate) async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { let todo = SyncTodo { todo: Vec::new() }; let syncer = Arc::new(TableSyncer { table: table.clone(), @@ -348,15 +348,14 @@ where } // All remote nodes have written those items, now we can delete them locally - for (k, v) in items.iter() { - self.table.store.transaction(|tx_db| { - if let Some(curv) = tx_db.get(k)? { - if curv == &v[..] { - tx_db.remove(&k[..])?; - } - } - Ok(()) - })?; + for was_removed in join_all( + items + .iter() + .map(|(k, v)| self.table.delete_if_equal(&k[..], &v[..])), + ) + .await + { + was_removed?; } Ok(()) @@ -642,7 +641,7 @@ where } } - pub async fn handle_rpc( + pub(crate) async fn handle_rpc( self: &Arc<Self>, message: &SyncRPC, mut must_exit: watch::Receiver<bool>, @@ -738,7 +737,7 @@ where Ok(SyncRPC::Difference(ret_ranges, ret_items)) } - pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> { + pub(crate) fn invalidate(self: &Arc<Self>, item_key: &[u8]) { for i in 1..MAX_DEPTH { let needle = SyncRange { begin: item_key.to_vec(), @@ -747,14 +746,13 @@ where }; let mut cache = self.cache[i].lock().unwrap(); if let Some(cache_entry) = cache.range(..=needle).rev().next() { - if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key { + if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key { let index = cache_entry.0.clone(); drop(cache_entry); cache.remove(&index); } } } - Ok(()) } } |