aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TODO5
-rw-r--r--src/block_ref_table.rs12
-rw-r--r--src/bucket_table.rs5
-rw-r--r--src/object_table.rs37
-rw-r--r--src/table.rs6
-rw-r--r--src/version_table.rs31
-rwxr-xr-xtest_delete.sh6
7 files changed, 56 insertions, 46 deletions
diff --git a/TODO b/TODO
index 2baa4f77..1372496a 100644
--- a/TODO
+++ b/TODO
@@ -6,6 +6,11 @@ Finish the thing that sends blocks to other nodes if needed before deleting them
How are we going to test that our replication method works correctly?
We will have to introduce lots of dummy data and then add/remove nodes many times.
+Repair:
+- re-propagate object table version deletions to version table
+- re-propagate version table deletion to block ref table
+- re-propagate block ref table to rc
+
To do list
----------
diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs
index 17efa155..6a256aa3 100644
--- a/src/block_ref_table.rs
+++ b/src/block_ref_table.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use crate::background::*;
use crate::data::*;
+use crate::error::Error;
use crate::table::*;
use crate::block::*;
@@ -47,20 +48,17 @@ impl TableSchema for BlockRefTable {
type E = BlockRef;
type Filter = ();
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let block = &old.as_ref().or(new.as_ref()).unwrap().block;
let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false);
let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false);
if is_after && !was_before {
- if let Err(e) = self.block_manager.block_incref(block) {
- eprintln!("Failed to incref block {:?}: {}", block, e);
- }
+ self.block_manager.block_incref(block)?;
}
if was_before && !is_after {
- if let Err(e) = self.block_manager.block_decref(block) {
- eprintln!("Failed to decref block {:?}: {}", block, e);
- }
+ self.block_manager.block_decref(block)?;
}
+ Ok(())
}
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
diff --git a/src/bucket_table.rs b/src/bucket_table.rs
index be7dd348..5604049c 100644
--- a/src/bucket_table.rs
+++ b/src/bucket_table.rs
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use crate::error::Error;
use crate::table::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
@@ -71,7 +72,9 @@ impl TableSchema for BucketTable {
type E = Bucket;
type Filter = ();
- async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
+ async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
+ Ok(())
+ }
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
!entry.deleted
diff --git a/src/object_table.rs b/src/object_table.rs
index 59ce3b7f..0d0de146 100644
--- a/src/object_table.rs
+++ b/src/object_table.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use crate::background::BackgroundRunner;
use crate::data::*;
+use crate::error::Error;
use crate::table::*;
use crate::table_sharded::*;
@@ -101,30 +102,28 @@ impl TableSchema for ObjectTable {
type E = Object;
type Filter = ();
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let version_table = self.version_table.clone();
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of old versions
- self.background.spawn(async move {
- for v in old_v.versions.iter() {
- if new_v
- .versions
- .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
- .is_err()
- {
- let deleted_version = Version {
- uuid: v.uuid.clone(),
- deleted: true,
- blocks: vec![],
- bucket: old_v.bucket.clone(),
- key: old_v.key.clone(),
- };
- version_table.insert(&deleted_version).await?;
- }
+ for v in old_v.versions.iter() {
+ if new_v
+ .versions
+ .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
+ .is_err()
+ {
+ let deleted_version = Version {
+ uuid: v.uuid.clone(),
+ deleted: true,
+ blocks: vec![],
+ bucket: old_v.bucket.clone(),
+ key: old_v.key.clone(),
+ };
+ version_table.insert(&deleted_version).await?;
}
- Ok(())
- });
+ }
}
+ Ok(())
}
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
diff --git a/src/table.rs b/src/table.rs
index 80364d17..d9b505c9 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -105,7 +105,7 @@ pub trait TableSchema: Send + Sync {
type E: Entry<Self::P, Self::S>;
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>);
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>;
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
true
}
@@ -469,7 +469,7 @@ where
epidemic_propagate.push(new_entry.clone());
}
- self.instance.updated(old_entry, Some(new_entry)).await;
+ self.instance.updated(old_entry, Some(new_entry)).await?;
self.system
.background
.spawn(syncer.clone().invalidate(tree_key));
@@ -497,7 +497,7 @@ where
}
if let Some(old_val) = self.store.remove(&key)? {
let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?;
- self.instance.updated(Some(old_entry), None).await;
+ self.instance.updated(Some(old_entry), None).await?;
self.system
.background
.spawn(syncer.clone().invalidate(key.to_vec()));
diff --git a/src/version_table.rs b/src/version_table.rs
index dfd27812..230b7f1c 100644
--- a/src/version_table.rs
+++ b/src/version_table.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use crate::background::BackgroundRunner;
use crate::data::*;
+use crate::error::Error;
use crate::table::*;
use crate::table_sharded::*;
@@ -67,26 +68,24 @@ impl TableSchema for VersionTable {
type E = Version;
type Filter = ();
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> {
let block_ref_table = self.block_ref_table.clone();
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
- self.background.spawn(async move {
- if new_v.deleted && !old_v.deleted {
- let deleted_block_refs = old_v
- .blocks
- .iter()
- .map(|vb| BlockRef {
- block: vb.hash.clone(),
- version: old_v.uuid.clone(),
- deleted: true,
- })
- .collect::<Vec<_>>();
- block_ref_table.insert_many(&deleted_block_refs[..]).await?;
- }
- Ok(())
- });
+ if new_v.deleted && !old_v.deleted {
+ let deleted_block_refs = old_v
+ .blocks
+ .iter()
+ .map(|vb| BlockRef {
+ block: vb.hash.clone(),
+ version: old_v.uuid.clone(),
+ deleted: true,
+ })
+ .collect::<Vec<_>>();
+ block_ref_table.insert_many(&deleted_block_refs[..]).await?;
+ }
}
+ Ok(())
}
fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
diff --git a/test_delete.sh b/test_delete.sh
new file mode 100755
index 00000000..5f929786
--- /dev/null
+++ b/test_delete.sh
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+for FILE in $(find target/debug/deps); do
+ curl -v localhost:3900/$FILE -X DELETE -H 'Host: garage'
+done
+