From 04acaea231a9af77e5ca05068336f4492fe32ac0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sun, 19 Apr 2020 20:52:20 +0000 Subject: Don't do version & block_ref updates in background on deletion --- TODO | 5 +++++ src/block_ref_table.rs | 12 +++++------- src/bucket_table.rs | 5 ++++- src/object_table.rs | 37 ++++++++++++++++++------------------- src/table.rs | 6 +++--- src/version_table.rs | 31 +++++++++++++++---------------- test_delete.sh | 6 ++++++ 7 files changed, 56 insertions(+), 46 deletions(-) create mode 100755 test_delete.sh diff --git a/TODO b/TODO index 2baa4f77..1372496a 100644 --- a/TODO +++ b/TODO @@ -6,6 +6,11 @@ Finish the thing that sends blocks to other nodes if needed before deleting them How are we going to test that our replication method works correctly? We will have to introduce lots of dummy data and then add/remove nodes many times. +Repair: +- re-propagate object table version deletions to version table +- re-propagate version table deletion to block ref table +- re-propagate block ref table to rc + To do list ---------- diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs index 17efa155..6a256aa3 100644 --- a/src/block_ref_table.rs +++ b/src/block_ref_table.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use crate::background::*; use crate::data::*; +use crate::error::Error; use crate::table::*; use crate::block::*; @@ -47,20 +48,17 @@ impl TableSchema for BlockRefTable { type E = BlockRef; type Filter = (); - async fn updated(&self, old: Option, new: Option) { + async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { let block = &old.as_ref().or(new.as_ref()).unwrap().block; let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); if is_after && !was_before { - if let Err(e) = self.block_manager.block_incref(block) { - eprintln!("Failed to incref block {:?}: {}", block, e); - } + self.block_manager.block_incref(block)?; } if was_before && !is_after { - if let Err(e) = self.block_manager.block_decref(block) { - eprintln!("Failed to decref block {:?}: {}", block, e); - } + self.block_manager.block_decref(block)?; } + Ok(()) } fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { diff --git a/src/bucket_table.rs b/src/bucket_table.rs index be7dd348..5604049c 100644 --- a/src/bucket_table.rs +++ b/src/bucket_table.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use crate::error::Error; use crate::table::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] @@ -71,7 +72,9 @@ impl TableSchema for BucketTable { type E = Bucket; type Filter = (); - async fn updated(&self, _old: Option, _new: Option) {} + async fn updated(&self, _old: Option, _new: Option) -> Result<(), Error> { + Ok(()) + } fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { !entry.deleted diff --git a/src/object_table.rs b/src/object_table.rs index 59ce3b7f..0d0de146 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use crate::background::BackgroundRunner; use crate::data::*; +use crate::error::Error; use crate::table::*; use crate::table_sharded::*; @@ -101,30 +102,28 @@ impl TableSchema for ObjectTable { type E = Object; type Filter = (); - async fn updated(&self, old: Option, new: Option) { + async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { let version_table = self.version_table.clone(); if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of old versions - self.background.spawn(async move { - for v in old_v.versions.iter() { - if new_v - .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - .is_err() - { - let deleted_version = Version { - uuid: v.uuid.clone(), - deleted: true, - blocks: vec![], - bucket: old_v.bucket.clone(), - key: old_v.key.clone(), - }; - version_table.insert(&deleted_version).await?; - } + for v in old_v.versions.iter() { + if new_v + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + .is_err() + { + let deleted_version = Version { + uuid: v.uuid.clone(), + deleted: true, + blocks: vec![], + bucket: old_v.bucket.clone(), + key: old_v.key.clone(), + }; + version_table.insert(&deleted_version).await?; } - Ok(()) - }); + } } + Ok(()) } fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { diff --git a/src/table.rs b/src/table.rs index 80364d17..d9b505c9 100644 --- a/src/table.rs +++ b/src/table.rs @@ -105,7 +105,7 @@ pub trait TableSchema: Send + Sync { type E: Entry; type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; - async fn updated(&self, old: Option, new: Option); + async fn updated(&self, old: Option, new: Option) -> Result<(), Error>; fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true } @@ -469,7 +469,7 @@ where epidemic_propagate.push(new_entry.clone()); } - self.instance.updated(old_entry, Some(new_entry)).await; + self.instance.updated(old_entry, Some(new_entry)).await?; self.system .background .spawn(syncer.clone().invalidate(tree_key)); @@ -497,7 +497,7 @@ where } if let Some(old_val) = self.store.remove(&key)? { let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?; - self.instance.updated(Some(old_entry), None).await; + self.instance.updated(Some(old_entry), None).await?; self.system .background .spawn(syncer.clone().invalidate(key.to_vec())); diff --git a/src/version_table.rs b/src/version_table.rs index dfd27812..230b7f1c 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use crate::background::BackgroundRunner; use crate::data::*; +use crate::error::Error; use crate::table::*; use crate::table_sharded::*; @@ -67,26 +68,24 @@ impl TableSchema for VersionTable { type E = Version; type Filter = (); - async fn updated(&self, old: Option, new: Option) { + async fn updated(&self, old: Option, new: Option) -> Result<(), Error> { let block_ref_table = self.block_ref_table.clone(); if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks - self.background.spawn(async move { - if new_v.deleted && !old_v.deleted { - let deleted_block_refs = old_v - .blocks - .iter() - .map(|vb| BlockRef { - block: vb.hash.clone(), - version: old_v.uuid.clone(), - deleted: true, - }) - .collect::>(); - block_ref_table.insert_many(&deleted_block_refs[..]).await?; - } - Ok(()) - }); + if new_v.deleted && !old_v.deleted { + let deleted_block_refs = old_v + .blocks + .iter() + .map(|vb| BlockRef { + block: vb.hash.clone(), + version: old_v.uuid.clone(), + deleted: true, + }) + .collect::>(); + block_ref_table.insert_many(&deleted_block_refs[..]).await?; + } } + Ok(()) } fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { diff --git a/test_delete.sh b/test_delete.sh new file mode 100755 index 00000000..5f929786 --- /dev/null +++ b/test_delete.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +for FILE in $(find target/debug/deps); do + curl -v localhost:3900/$FILE -X DELETE -H 'Host: garage' +done + -- cgit v1.2.3