diff options
-rw-r--r-- | src/block_ref_table.rs | 15 | ||||
-rw-r--r-- | src/object_table.rs | 14 | ||||
-rw-r--r-- | src/table.rs | 19 | ||||
-rw-r--r-- | src/table_sync.rs | 2 | ||||
-rw-r--r-- | src/version_table.rs | 14 |
5 files changed, 39 insertions, 25 deletions
diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs index 4364b646..21fe4658 100644 --- a/src/block_ref_table.rs +++ b/src/block_ref_table.rs @@ -45,20 +45,21 @@ impl TableSchema for BlockRefTable { type S = UUID; type E = BlockRef; - async fn updated(&self, old: Option<Self::E>, new: Self::E) { - let was_before = old.map(|x| !x.deleted).unwrap_or(false); - let is_after = !new.deleted; + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { + let block = &old.as_ref().or(new.as_ref()).unwrap().block; + let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); + let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); if is_after && !was_before { - if let Err(e) = self.block_manager.block_incref(&new.block) { - eprintln!("Failed to incref block {:?}: {}", &new.block, e); + if let Err(e) = self.block_manager.block_incref(block) { + eprintln!("Failed to incref block {:?}: {}", block, e); } } if was_before && !is_after { if let Err(e) = self .block_manager - .block_decref(&new.block, &self.background) + .block_decref(block, &self.background) { - eprintln!("Failed to decref block {:?}: {}", &new.block, e); + eprintln!("Failed to decref block {:?}: {}", block, e); } } } diff --git a/src/object_table.rs b/src/object_table.rs index c04a8090..fbacf2dc 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -97,13 +97,13 @@ impl TableSchema for ObjectTable { type S = String; type E = Object; - async fn updated(&self, old: Option<Self::E>, new: Self::E) { + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { let version_table = self.version_table.clone(); - self.background.spawn(async move { + if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of old versions - if let Some(old_v) = old { + self.background.spawn(async move { for v in old_v.versions.iter() { - if new + if new_v .versions .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) .is_err() @@ -118,8 +118,8 @@ impl TableSchema for ObjectTable { version_table.insert(&deleted_version).await?; } } - } - Ok(()) - }); + Ok(()) + }); + } } } diff --git a/src/table.rs b/src/table.rs index 33364514..6b7d1779 100644 --- a/src/table.rs +++ b/src/table.rs @@ -119,7 +119,7 @@ pub trait TableSchema: Send + Sync { type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type E: Entry<Self::P, Self::S>; - async fn updated(&self, old: Option<Self::E>, new: Self::E); + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>); } impl<F: TableSchema + 'static> Table<F> { @@ -370,10 +370,10 @@ impl<F: TableSchema + 'static> Table<F> { .map_err(Error::RMPEncode) .map_err(sled::ConflictableTransactionError::Abort)?; db.insert(tree_key.clone(), new_bytes)?; - Ok((old_entry, new_entry)) + Ok((old_entry, Some(new_entry))) })?; - if old_entry.as_ref() != Some(&new_entry) { + if old_entry != new_entry { self.instance.updated(old_entry, new_entry).await; let syncer = self.syncer.read().await.as_ref().unwrap().clone(); @@ -385,7 +385,18 @@ impl<F: TableSchema + 'static> Table<F> { pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end); - // TODO + let mut count = 0; + while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { + if key.as_ref() < begin.as_slice() { + break; + } + if let Some(old_val) = self.store.remove(&key)? { + let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?; + self.instance.updated(Some(old_entry), None).await; + count += 1; + } + } + eprintln!("({}) {} entries deleted", self.name, count); Ok(()) } diff --git a/src/table_sync.rs b/src/table_sync.rs index 013e6358..c1d3bea8 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -346,6 +346,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } } } + let n_checksums = checksums.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); + eprintln!("({}) Checksum comparison RPC: {} different out of {}", self.table.name, ret.len(), n_checksums); Ok(ret) } diff --git a/src/version_table.rs b/src/version_table.rs index 797b9348..77a7560d 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -63,12 +63,12 @@ impl TableSchema for VersionTable { type S = EmptySortKey; type E = Version; - async fn updated(&self, old: Option<Self::E>, new: Self::E) { + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { let block_ref_table = self.block_ref_table.clone(); - self.background.spawn(async move { + if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks - if let Some(old_v) = old { - if new.deleted && !old_v.deleted { + self.background.spawn(async move { + if new_v.deleted && !old_v.deleted { let deleted_block_refs = old_v .blocks .iter() @@ -80,8 +80,8 @@ impl TableSchema for VersionTable { .collect::<Vec<_>>(); block_ref_table.insert_many(&deleted_block_refs[..]).await?; } - } - Ok(()) - }); + Ok(()) + }); + } } } |