aboutsummaryrefslogtreecommitdiff
path: root/src/model/s3
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/s3')
-rw-r--r--src/model/s3/object_table.rs50
-rw-r--r--src/model/s3/version_table.rs37
2 files changed, 40 insertions, 47 deletions
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 26ff57f6..05b27fb4 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -255,34 +255,34 @@ impl TableSchema for ObjectTable {
);
}
- // 2. Spawn threads that propagates deletions to version table
- let version_table = self.version_table.clone();
- let old = old.cloned();
- let new = new.cloned();
-
- self.background.spawn(async move {
- if let (Some(old_v), Some(new_v)) = (old, new) {
- // Propagate deletion of old versions
- for v in old_v.versions.iter() {
- let newly_deleted = match new_v
- .versions
- .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
- {
- Err(_) => true,
- Ok(i) => {
- new_v.versions[i].state == ObjectVersionState::Aborted
- && v.state != ObjectVersionState::Aborted
- }
- };
- if newly_deleted {
- let deleted_version =
- Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
- version_table.insert(&deleted_version).await?;
+ // 2. Enqueue propagation deletions to version table
+ if let (Some(old_v), Some(new_v)) = (old, new) {
+ // Propagate deletion of old versions
+ for v in old_v.versions.iter() {
+ let newly_deleted = match new_v
+ .versions
+ .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
+ {
+ Err(_) => true,
+ Ok(i) => {
+ new_v.versions[i].state == ObjectVersionState::Aborted
+ && v.state != ObjectVersionState::Aborted
+ }
+ };
+ if newly_deleted {
+ let deleted_version =
+ Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
+ let res = self.version_table.queue_insert(tx, &deleted_version);
+ if let Err(e) = db::unabort(res)? {
+ error!(
+ "Unable to enqueue version deletion propagation: {}. A repair will be needed.",
+ e
+ );
}
}
}
- Ok(())
- });
+ }
+
Ok(())
}
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index 6bc2ecd1..0cfaa954 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -141,33 +141,26 @@ impl TableSchema for VersionTable {
fn updated(
&self,
- _tx: &mut db::Transaction,
+ tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
- let block_ref_table = self.block_ref_table.clone();
- let old = old.cloned();
- let new = new.cloned();
-
- self.background.spawn(async move {
- if let (Some(old_v), Some(new_v)) = (old, new) {
- // Propagate deletion of version blocks
- if new_v.deleted.get() && !old_v.deleted.get() {
- let deleted_block_refs = old_v
- .blocks
- .items()
- .iter()
- .map(|(_k, vb)| BlockRef {
- block: vb.hash,
- version: old_v.uuid,
- deleted: true.into(),
- })
- .collect::<Vec<_>>();
- block_ref_table.insert_many(&deleted_block_refs[..]).await?;
+ if let (Some(old_v), Some(new_v)) = (old, new) {
+ // Propagate deletion of version blocks
+ if new_v.deleted.get() && !old_v.deleted.get() {
+ let deleted_block_refs = old_v.blocks.items().iter().map(|(_k, vb)| BlockRef {
+ block: vb.hash,
+ version: old_v.uuid,
+ deleted: true.into(),
+ });
+ for block_ref in deleted_block_refs {
+ let res = self.block_ref_table.queue_insert(tx, &block_ref);
+ if let Err(e) = db::unabort(res)? {
+ error!("Unable to enqueue block ref deletion propagation: {}. A repair will be needed.", e);
+ }
}
}
- Ok(())
- });
+ }
Ok(())
}