aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/block_ref_table.rs15
-rw-r--r--src/object_table.rs14
-rw-r--r--src/table.rs19
-rw-r--r--src/table_sync.rs2
-rw-r--r--src/version_table.rs14
5 files changed, 39 insertions, 25 deletions
diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs
index 4364b646..21fe4658 100644
--- a/src/block_ref_table.rs
+++ b/src/block_ref_table.rs
@@ -45,20 +45,21 @@ impl TableSchema for BlockRefTable {
type S = UUID;
type E = BlockRef;
- async fn updated(&self, old: Option<Self::E>, new: Self::E) {
- let was_before = old.map(|x| !x.deleted).unwrap_or(false);
- let is_after = !new.deleted;
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ let block = &old.as_ref().or(new.as_ref()).unwrap().block;
+ let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
+ let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
if is_after && !was_before {
- if let Err(e) = self.block_manager.block_incref(&new.block) {
- eprintln!("Failed to incref block {:?}: {}", &new.block, e);
+ if let Err(e) = self.block_manager.block_incref(block) {
+ eprintln!("Failed to incref block {:?}: {}", block, e);
}
}
if was_before && !is_after {
if let Err(e) = self
.block_manager
- .block_decref(&new.block, &self.background)
+ .block_decref(block, &self.background)
{
- eprintln!("Failed to decref block {:?}: {}", &new.block, e);
+ eprintln!("Failed to decref block {:?}: {}", block, e);
}
}
}
diff --git a/src/object_table.rs b/src/object_table.rs
index c04a8090..fbacf2dc 100644
--- a/src/object_table.rs
+++ b/src/object_table.rs
@@ -97,13 +97,13 @@ impl TableSchema for ObjectTable {
type S = String;
type E = Object;
- async fn updated(&self, old: Option<Self::E>, new: Self::E) {
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let version_table = self.version_table.clone();
- self.background.spawn(async move {
+ if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of old versions
- if let Some(old_v) = old {
+ self.background.spawn(async move {
for v in old_v.versions.iter() {
- if new
+ if new_v
.versions
.binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
.is_err()
@@ -118,8 +118,8 @@ impl TableSchema for ObjectTable {
version_table.insert(&deleted_version).await?;
}
}
- }
- Ok(())
- });
+ Ok(())
+ });
+ }
}
}
diff --git a/src/table.rs b/src/table.rs
index 33364514..6b7d1779 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -119,7 +119,7 @@ pub trait TableSchema: Send + Sync {
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type E: Entry<Self::P, Self::S>;
- async fn updated(&self, old: Option<Self::E>, new: Self::E);
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>);
}
impl<F: TableSchema + 'static> Table<F> {
@@ -370,10 +370,10 @@ impl<F: TableSchema + 'static> Table<F> {
.map_err(Error::RMPEncode)
.map_err(sled::ConflictableTransactionError::Abort)?;
db.insert(tree_key.clone(), new_bytes)?;
- Ok((old_entry, new_entry))
+ Ok((old_entry, Some(new_entry)))
})?;
- if old_entry.as_ref() != Some(&new_entry) {
+ if old_entry != new_entry {
self.instance.updated(old_entry, new_entry).await;
let syncer = self.syncer.read().await.as_ref().unwrap().clone();
@@ -385,7 +385,18 @@ impl<F: TableSchema + 'static> Table<F> {
pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
eprintln!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
- // TODO
+ let mut count = 0;
+ while let Some((key, _value)) = self.store.get_lt(end.as_slice())? {
+ if key.as_ref() < begin.as_slice() {
+ break;
+ }
+ if let Some(old_val) = self.store.remove(&key)? {
+ let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?;
+ self.instance.updated(Some(old_entry), None).await;
+ count += 1;
+ }
+ }
+ eprintln!("({}) {} entries deleted", self.name, count);
Ok(())
}
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 013e6358..c1d3bea8 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -346,6 +346,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
}
}
+ let n_checksums = checksums.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
+ eprintln!("({}) Checksum comparison RPC: {} different out of {}", self.table.name, ret.len(), n_checksums);
Ok(ret)
}
diff --git a/src/version_table.rs b/src/version_table.rs
index 797b9348..77a7560d 100644
--- a/src/version_table.rs
+++ b/src/version_table.rs
@@ -63,12 +63,12 @@ impl TableSchema for VersionTable {
type S = EmptySortKey;
type E = Version;
- async fn updated(&self, old: Option<Self::E>, new: Self::E) {
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block_ref_table = self.block_ref_table.clone();
- self.background.spawn(async move {
+ if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
- if let Some(old_v) = old {
- if new.deleted && !old_v.deleted {
+ self.background.spawn(async move {
+ if new_v.deleted && !old_v.deleted {
let deleted_block_refs = old_v
.blocks
.iter()
@@ -80,8 +80,8 @@ impl TableSchema for VersionTable {
.collect::<Vec<_>>();
block_ref_table.insert_many(&deleted_block_refs[..]).await?;
}
- }
- Ok(())
- });
+ Ok(())
+ });
+ }
}
}