aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block_ref_table.rs12
-rw-r--r--src/bucket_table.rs5
-rw-r--r--src/object_table.rs37
-rw-r--r--src/table.rs6
-rw-r--r--src/version_table.rs31
5 files changed, 45 insertions, 46 deletions
diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs
index 17efa155..6a256aa3 100644
--- a/src/block_ref_table.rs
+++ b/src/block_ref_table.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use crate::background::*;
use crate::data::*;
+use crate::error::Error;
use crate::table::*;
use crate::block::*;
@@ -47,20 +48,17 @@ impl TableSchema for BlockRefTable {
type E = BlockRef;
type Filter = ();
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
if is_after && !was_before {
- if let Err(e) = self.block_manager.block_incref(block) {
- eprintln!("Failed to incref block {:?}: {}", block, e);
- }
+ self.block_manager.block_incref(block)?;
}
if was_before && !is_after {
- if let Err(e) = self.block_manager.block_decref(block) {
- eprintln!("Failed to decref block {:?}: {}", block, e);
- }
+ self.block_manager.block_decref(block)?;
}
+ Ok(())
}
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
diff --git a/src/bucket_table.rs b/src/bucket_table.rs
index be7dd348..5604049c 100644
--- a/src/bucket_table.rs
+++ b/src/bucket_table.rs
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use crate::error::Error;
use crate::table::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
@@ -71,7 +72,9 @@ impl TableSchema for BucketTable {
type E = Bucket;
type Filter = ();
- async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
+ async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
+ Ok(())
+ }
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
!entry.deleted
diff --git a/src/object_table.rs b/src/object_table.rs
index 59ce3b7f..0d0de146 100644
--- a/src/object_table.rs
+++ b/src/object_table.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use crate::background::BackgroundRunner;
use crate::data::*;
+use crate::error::Error;
use crate::table::*;
use crate::table_sharded::*;
@@ -101,30 +102,28 @@ impl TableSchema for ObjectTable {
type E = Object;
type Filter = ();
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let version_table = self.version_table.clone();
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of old versions
- self.background.spawn(async move {
- for v in old_v.versions.iter() {
- if new_v
- .versions
- .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
- .is_err()
- {
- let deleted_version = Version {
- uuid: v.uuid.clone(),
- deleted: true,
- blocks: vec![],
- bucket: old_v.bucket.clone(),
- key: old_v.key.clone(),
- };
- version_table.insert(&deleted_version).await?;
- }
+ for v in old_v.versions.iter() {
+ if new_v
+ .versions
+ .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
+ .is_err()
+ {
+ let deleted_version = Version {
+ uuid: v.uuid.clone(),
+ deleted: true,
+ blocks: vec![],
+ bucket: old_v.bucket.clone(),
+ key: old_v.key.clone(),
+ };
+ version_table.insert(&deleted_version).await?;
}
- Ok(())
- });
+ }
}
+ Ok(())
}
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
diff --git a/src/table.rs b/src/table.rs
index 80364d17..d9b505c9 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -105,7 +105,7 @@ pub trait TableSchema: Send + Sync {
type E: Entry<Self::P, Self::S>;
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>);
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>;
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
true
}
@@ -469,7 +469,7 @@ where
epidemic_propagate.push(new_entry.clone());
}
- self.instance.updated(old_entry, Some(new_entry)).await;
+ self.instance.updated(old_entry, Some(new_entry)).await?;
self.system
.background
.spawn(syncer.clone().invalidate(tree_key));
@@ -497,7 +497,7 @@ where
}
if let Some(old_val) = self.store.remove(&key)? {
let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?;
- self.instance.updated(Some(old_entry), None).await;
+ self.instance.updated(Some(old_entry), None).await?;
self.system
.background
.spawn(syncer.clone().invalidate(key.to_vec()));
diff --git a/src/version_table.rs b/src/version_table.rs
index dfd27812..230b7f1c 100644
--- a/src/version_table.rs
+++ b/src/version_table.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use crate::background::BackgroundRunner;
use crate::data::*;
+use crate::error::Error;
use crate::table::*;
use crate::table_sharded::*;
@@ -67,26 +68,24 @@ impl TableSchema for VersionTable {
type E = Version;
type Filter = ();
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let block_ref_table = self.block_ref_table.clone();
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
- self.background.spawn(async move {
- if new_v.deleted && !old_v.deleted {
- let deleted_block_refs = old_v
- .blocks
- .iter()
- .map(|vb| BlockRef {
- block: vb.hash.clone(),
- version: old_v.uuid.clone(),
- deleted: true,
- })
- .collect::<Vec<_>>();
- block_ref_table.insert_many(&deleted_block_refs[..]).await?;
- }
- Ok(())
- });
+ if new_v.deleted && !old_v.deleted {
+ let deleted_block_refs = old_v
+ .blocks
+ .iter()
+ .map(|vb| BlockRef {
+ block: vb.hash.clone(),
+ version: old_v.uuid.clone(),
+ deleted: true,
+ })
+ .collect::<Vec<_>>();
+ block_ref_table.insert_many(&deleted_block_refs[..]).await?;
+ }
}
+ Ok(())
}
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {