diff options
author | Alex Auvolat <alex@adnab.me> | 2023-01-03 14:44:47 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-01-03 14:44:47 +0100 |
commit | cdb2a591e9d393d24ab5c49bb905b0589b193299 (patch) | |
tree | 10c95206d0bd7b30c1fcd14ccc188be374cb1066 /src/model/s3/object_table.rs | |
parent | 582b0761790b7958a3ba10c4b549b466997d2dcd (diff) | |
download | garage-cdb2a591e9d393d24ab5c49bb905b0589b193299.tar.gz garage-cdb2a591e9d393d24ab5c49bb905b0589b193299.zip |
Refactor how things are migrated
Diffstat (limited to 'src/model/s3/object_table.rs')
-rw-r--r-- | src/model/s3/object_table.rs | 241 |
1 files changed, 112 insertions, 129 deletions
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 1b2f0014..616e0d35 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -1,5 +1,4 @@ use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; use std::sync::Arc; use garage_db as db; @@ -13,25 +12,126 @@ use garage_table::*; use crate::index_counter::*; use crate::s3::version_table::*; -use crate::prev::v051::object_table as old; - pub const OBJECTS: &str = "objects"; pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; pub const BYTES: &str = "bytes"; -/// An object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Object { - /// The bucket in which the object is stored, used as partition key - pub bucket_id: Uuid, +mod v05 { + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + + /// An object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket: String, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + pub(super) versions: Vec<ObjectVersion>, + } + + /// Informations about a version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, + } + + /// State of an object version + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionState { + /// The version is being received + Uploading(ObjectVersionHeaders), + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, + } + + /// Data stored in object version + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such + DeleteMarker, + /// The object is short, it's stored inlined + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table + FirstBlock(ObjectVersionMeta, Hash), + } + + /// Metadata about the object version + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersionMeta { + /// Headers to send to the client + pub headers: ObjectVersionHeaders, + /// Size of the object + pub size: u64, + /// etag of the object + pub etag: String, + } + + /// Additional headers for an object + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersionHeaders { + /// Content type of the object + pub content_type: String, + /// Any other http headers to send + pub other: BTreeMap<String, String>, + } + + impl garage_util::migrate::InitialFormat for Object {} +} + +mod v08 { + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v05; - /// The key at which the object is stored in its bucket, used as sorting key - pub key: String, + pub use v05::{ + ObjectVersion, ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta, + ObjectVersionState, + }; + + /// An object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket_id: Uuid, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + pub(super) versions: Vec<ObjectVersion>, + } - /// The list of currenty stored versions of the object - versions: Vec<ObjectVersion>, + impl garage_util::migrate::Migrate for Object { + type Previous = v05::Object; + + fn migrate(old: v05::Object) -> Object { + use garage_util::data::blake2sum; + + Object { + bucket_id: blake2sum(old.bucket.as_bytes()), + key: old.key, + versions: old.versions, + } + } + } } +pub use v08::*; + impl Object { /// Initialize an Object struct from parts pub fn new(bucket_id: Uuid, key: String, versions: Vec<ObjectVersion>) -> Self { @@ -68,28 +168,6 @@ impl Object { } } -/// Informations about a version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersion { - /// Id of the version - pub uuid: Uuid, - /// Timestamp of when the object was created - pub timestamp: u64, - /// State of the version - pub state: ObjectVersionState, -} - -/// State of an object version -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionState { - /// The version is being received - Uploading(ObjectVersionHeaders), - /// The version is fully received - Complete(ObjectVersionData), - /// The version uploaded containded errors or the upload was explicitly aborted - Aborted, -} - impl Crdt for ObjectVersionState { fn merge(&mut self, other: &Self) { use ObjectVersionState::*; @@ -111,42 +189,10 @@ impl Crdt for ObjectVersionState { } } -/// Data stored in object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionData { - /// The object was deleted, this Version is a tombstone to mark it as such - DeleteMarker, - /// The object is short, it's stored inlined - Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), - /// The object is not short, Hash of first block is stored here, next segments hashes are - /// stored in the version table - FirstBlock(ObjectVersionMeta, Hash), -} - impl AutoCrdt for ObjectVersionData { const WARN_IF_DIFFERENT: bool = true; } -/// Metadata about the object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionMeta { - /// Headers to send to the client - pub headers: ObjectVersionHeaders, - /// Size of the object - pub size: u64, - /// etag of the object - pub etag: String, -} - -/// Additional headers for an object -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionHeaders { - /// Content type of the object - pub content_type: String, - /// Any other http headers to send - pub other: BTreeMap<String, String>, -} - impl ObjectVersion { fn cmp_key(&self) -> (u64, Uuid) { (self.timestamp, self.uuid) @@ -290,11 +336,6 @@ impl TableSchema for ObjectTable { ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), } } - - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?; - Some(migrate_object(old_obj)) - } } impl CountedItem for Object { @@ -342,61 +383,3 @@ impl CountedItem for Object { // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // (we just want to change bucket into bucket_id by hashing it) - -fn migrate_object(o: old::Object) -> Object { - let versions = o - .versions() - .iter() - .cloned() - .map(migrate_object_version) - .collect(); - Object { - bucket_id: blake2sum(o.bucket.as_bytes()), - key: o.key, - versions, - } -} - -fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion { - ObjectVersion { - uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(), - timestamp: v.timestamp, - state: match v.state { - old::ObjectVersionState::Uploading(h) => { - ObjectVersionState::Uploading(migrate_object_version_headers(h)) - } - old::ObjectVersionState::Complete(d) => { - ObjectVersionState::Complete(migrate_object_version_data(d)) - } - old::ObjectVersionState::Aborted => ObjectVersionState::Aborted, - }, - } -} - -fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders { - ObjectVersionHeaders { - content_type: h.content_type, - other: h.other, - } -} - -fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData { - match d { - old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker, - old::ObjectVersionData::Inline(m, b) => { - ObjectVersionData::Inline(migrate_object_version_meta(m), b) - } - old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock( - migrate_object_version_meta(m), - Hash::try_from(h.as_slice()).unwrap(), - ), - } -} - -fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta { - ObjectVersionMeta { - headers: migrate_object_version_headers(m.headers), - size: m.size, - etag: m.etag, - } -} |