use std::sync::Arc; use garage_db as db; use garage_util::data::*; use garage_util::error::*; use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::s3::block_ref_table::*; mod v05 { use garage_util::crdt; use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; /// A version of an object #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Version { /// UUID of the version, used as partition key pub uuid: Uuid, // Actual data: the blocks for this version // In the case of a multipart upload, also store the etags // of individual parts and check them when doing CompleteMultipartUpload /// Is this version deleted pub deleted: crdt::Bool, /// list of blocks of data composing the version pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, /// Etag of each part in case of a multipart upload, empty otherwise pub parts_etags: crdt::Map<u64, String>, // Back link to bucket+key so that we can figure if // this was deleted later on /// Bucket in which the related object is stored pub bucket: String, /// Key in which the related object is stored pub key: String, } #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlockKey { /// Number of the part pub part_number: u64, /// Offset of this sub-segment in its part pub offset: u64, } /// Informations about a single block #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] pub struct VersionBlock { /// Blake2 sum of the block pub hash: Hash, /// Size of the block pub size: u64, } impl garage_util::migrate::InitialFormat for Version {} } mod v08 { use garage_util::crdt; use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; use super::v05; pub use v05::{VersionBlock, VersionBlockKey}; /// A version of an object #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Version { /// UUID of the version, used as partition key pub uuid: Uuid, // Actual data: the blocks for this version // In the case of a multipart upload, also store the etags // of individual parts and check them when doing CompleteMultipartUpload /// Is this version deleted pub deleted: crdt::Bool, /// list of blocks of data composing the version pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, /// Etag of each part in case of a multipart upload, empty otherwise pub parts_etags: crdt::Map<u64, String>, // Back link to bucket+key so that we can figure if // this was deleted later on /// Bucket in which the related object is stored pub bucket_id: Uuid, /// Key in which the related object is stored pub key: String, } impl garage_util::migrate::Migrate for Version { type Previous = v05::Version; fn migrate(old: v05::Version) -> Version { use garage_util::data::blake2sum; Version { uuid: old.uuid, deleted: old.deleted, blocks: old.blocks, parts_etags: old.parts_etags, bucket_id: blake2sum(old.bucket.as_bytes()), key: old.key, } } } } pub(crate) mod v09 { use garage_util::crdt; use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; use super::v08; pub use v08::{VersionBlock, VersionBlockKey}; /// A version of an object #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Version { /// UUID of the version, used as partition key pub uuid: Uuid, // Actual data: the blocks for this version // In the case of a multipart upload, also store the etags // of individual parts and check them when doing CompleteMultipartUpload /// Is this version deleted pub deleted: crdt::Bool, /// list of blocks of data composing the version pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, // Back link to bucket+key so that we can figure if // this was deleted later on pub backlink: VersionBacklink, } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum VersionBacklink { Object { /// Bucket in which the related object is stored bucket_id: Uuid, /// Key in which the related object is stored key: String, }, MultipartUpload { upload_id: Uuid, }, } impl garage_util::migrate::Migrate for Version { const VERSION_MARKER: &'static [u8] = b"G09s3v"; type Previous = v08::Version; fn migrate(old: v08::Version) -> Version { Version { uuid: old.uuid, deleted: old.deleted, blocks: old.blocks, backlink: VersionBacklink::Object { bucket_id: old.bucket_id, key: old.key, }, } } } } pub use v09::*; impl Version { pub fn new(uuid: Uuid, backlink: VersionBacklink, deleted: bool) -> Self { Self { uuid, deleted: deleted.into(), blocks: crdt::Map::new(), backlink, } } pub fn has_part_number(&self, part_number: u64) -> bool { self.blocks .items() .binary_search_by(|(k, _)| k.part_number.cmp(&part_number)) .is_ok() } pub fn n_parts(&self) -> Result<u64, Error> { Ok(self .blocks .items() .last() .ok_or_message("version has no parts")? .0 .part_number) } } impl Ord for VersionBlockKey { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.part_number .cmp(&other.part_number) .then(self.offset.cmp(&other.offset)) } } impl PartialOrd for VersionBlockKey { fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { Some(self.cmp(other)) } } impl AutoCrdt for VersionBlock { const WARN_IF_DIFFERENT: bool = true; } impl Entry<Uuid, EmptyKey> for Version { fn partition_key(&self) -> &Uuid { &self.uuid } fn sort_key(&self) -> &EmptyKey { &EmptyKey } fn is_tombstone(&self) -> bool { self.deleted.get() } } impl Crdt for Version { fn merge(&mut self, other: &Self) { self.deleted.merge(&other.deleted); if self.deleted.get() { self.blocks.clear(); } else { self.blocks.merge(&other.blocks); } } } pub struct VersionTable { pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, } impl TableSchema for VersionTable { const TABLE_NAME: &'static str = "version"; type P = Uuid; type S = EmptyKey; type E = Version; type Filter = DeletedFilter; fn updated( &self, tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, ) -> db::TxOpResult<()> { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks if new_v.deleted.get() && !old_v.deleted.get() { let deleted_block_refs = old_v.blocks.items().iter().map(|(_k, vb)| BlockRef { block: vb.hash, version: old_v.uuid, deleted: true.into(), }); for block_ref in deleted_block_refs { let res = self.block_ref_table.queue_insert(tx, &block_ref); if let Err(e) = db::unabort(res)? { error!("Unable to enqueue block ref deletion propagation: {}. A repair will be needed.", e); } } } } Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.deleted.get()) } }