aboutsummaryrefslogtreecommitdiff
path: root/src/model/s3/version_table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/s3/version_table.rs')
-rw-r--r--src/model/s3/version_table.rs219
1 files changed, 115 insertions, 104 deletions
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index 6bc2ecd1..6edc83f4 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -1,9 +1,7 @@
-use serde::{Deserialize, Serialize};
use std::sync::Arc;
use garage_db as db;
-use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
@@ -12,32 +10,108 @@ use garage_table::*;
use crate::s3::block_ref_table::*;
-use crate::prev::v051::version_table as old;
-
-/// A version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Version {
- /// UUID of the version, used as partition key
- pub uuid: Uuid,
-
- // Actual data: the blocks for this version
- // In the case of a multipart upload, also store the etags
- // of individual parts and check them when doing CompleteMultipartUpload
- /// Is this version deleted
- pub deleted: crdt::Bool,
- /// list of blocks of data composing the version
- pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
- /// Etag of each part in case of a multipart upload, empty otherwise
- pub parts_etags: crdt::Map<u64, String>,
-
- // Back link to bucket+key so that we can figure if
- // this was deleted later on
- /// Bucket in which the related object is stored
- pub bucket_id: Uuid,
- /// Key in which the related object is stored
- pub key: String,
+mod v05 {
+ use garage_util::crdt;
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+
+ /// A version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
+ pub parts_etags: crdt::Map<u64, String>,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ /// Bucket in which the related object is stored
+ pub bucket: String,
+ /// Key in which the related object is stored
+ pub key: String,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct VersionBlockKey {
+ /// Number of the part
+ pub part_number: u64,
+ /// Offset of this sub-segment in its part
+ pub offset: u64,
+ }
+
+ /// Informations about a single block
+ #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct VersionBlock {
+ /// Blake2 sum of the block
+ pub hash: Hash,
+ /// Size of the block
+ pub size: u64,
+ }
+
+ impl garage_util::migrate::InitialFormat for Version {}
}
+mod v08 {
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v05;
+
+ /// A version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
+ pub parts_etags: crdt::Map<u64, String>,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ /// Bucket in which the related object is stored
+ pub bucket_id: Uuid,
+ /// Key in which the related object is stored
+ pub key: String,
+ }
+
+ pub use v05::{VersionBlock, VersionBlockKey};
+
+ impl garage_util::migrate::Migrate for Version {
+ type Previous = v05::Version;
+
+ fn migrate(old: v05::Version) -> Version {
+ use garage_util::data::blake2sum;
+
+ Version {
+ uuid: old.uuid,
+ deleted: old.deleted,
+ blocks: old.blocks,
+ parts_etags: old.parts_etags,
+ bucket_id: blake2sum(old.bucket.as_bytes()),
+ key: old.key,
+ }
+ }
+ }
+}
+
+pub use v08::*;
+
impl Version {
pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self {
Self {
@@ -65,14 +139,6 @@ impl Version {
}
}
-#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlockKey {
- /// Number of the part
- pub part_number: u64,
- /// Offset of this sub-segment in its part
- pub offset: u64,
-}
-
impl Ord for VersionBlockKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.part_number
@@ -87,15 +153,6 @@ impl PartialOrd for VersionBlockKey {
}
}
-/// Informations about a single block
-#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlock {
- /// Blake2 sum of the block
- pub hash: Hash,
- /// Size of the block
- pub size: u64,
-}
-
impl AutoCrdt for VersionBlock {
const WARN_IF_DIFFERENT: bool = true;
}
@@ -127,7 +184,6 @@ impl Crdt for Version {
}
pub struct VersionTable {
- pub background: Arc<BackgroundRunner>,
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
}
@@ -141,33 +197,26 @@ impl TableSchema for VersionTable {
fn updated(
&self,
- _tx: &mut db::Transaction,
+ tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
- let block_ref_table = self.block_ref_table.clone();
- let old = old.cloned();
- let new = new.cloned();
-
- self.background.spawn(async move {
- if let (Some(old_v), Some(new_v)) = (old, new) {
- // Propagate deletion of version blocks
- if new_v.deleted.get() && !old_v.deleted.get() {
- let deleted_block_refs = old_v
- .blocks
- .items()
- .iter()
- .map(|(_k, vb)| BlockRef {
- block: vb.hash,
- version: old_v.uuid,
- deleted: true.into(),
- })
- .collect::<Vec<_>>();
- block_ref_table.insert_many(&deleted_block_refs[..]).await?;
+ if let (Some(old_v), Some(new_v)) = (old, new) {
+ // Propagate deletion of version blocks
+ if new_v.deleted.get() && !old_v.deleted.get() {
+ let deleted_block_refs = old_v.blocks.items().iter().map(|(_k, vb)| BlockRef {
+ block: vb.hash,
+ version: old_v.uuid,
+ deleted: true.into(),
+ });
+ for block_ref in deleted_block_refs {
+ let res = self.block_ref_table.queue_insert(tx, &block_ref);
+ if let Err(e) = db::unabort(res)? {
+ error!("Unable to enqueue block ref deletion propagation: {}. A repair will be needed.", e);
+ }
}
}
- Ok(())
- });
+ }
Ok(())
}
@@ -175,42 +224,4 @@ impl TableSchema for VersionTable {
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.deleted.get())
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?;
-
- let blocks = old
- .blocks
- .items()
- .iter()
- .map(|(k, v)| {
- (
- VersionBlockKey {
- part_number: k.part_number,
- offset: k.offset,
- },
- VersionBlock {
- hash: Hash::try_from(v.hash.as_slice()).unwrap(),
- size: v.size,
- },
- )
- })
- .collect::<crdt::Map<_, _>>();
-
- let parts_etags = old
- .parts_etags
- .items()
- .iter()
- .map(|(k, v)| (*k, v.clone()))
- .collect::<crdt::Map<_, _>>();
-
- Some(Version {
- uuid: Hash::try_from(old.uuid.as_slice()).unwrap(),
- deleted: crdt::Bool::new(old.deleted.get()),
- blocks,
- parts_etags,
- bucket_id: blake2sum(old.bucket.as_bytes()),
- key: old.key,
- })
- }
}