diff options
author | Alex Auvolat <alex@adnab.me> | 2023-01-03 14:44:47 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-01-03 14:44:47 +0100 |
commit | cdb2a591e9d393d24ab5c49bb905b0589b193299 (patch) | |
tree | 10c95206d0bd7b30c1fcd14ccc188be374cb1066 /src/model | |
parent | 582b0761790b7958a3ba10c4b549b466997d2dcd (diff) | |
download | garage-cdb2a591e9d393d24ab5c49bb905b0589b193299.tar.gz garage-cdb2a591e9d393d24ab5c49bb905b0589b193299.zip |
Refactor how things are migrated
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/bucket_alias_table.rs | 20 | ||||
-rw-r--r-- | src/model/bucket_table.rs | 128 | ||||
-rw-r--r-- | src/model/index_counter.rs | 50 | ||||
-rw-r--r-- | src/model/k2v/item_table.rs | 54 | ||||
-rw-r--r-- | src/model/key_table.rs | 154 | ||||
-rw-r--r-- | src/model/prev/v051/bucket_table.rs | 2 | ||||
-rw-r--r-- | src/model/prev/v051/key_table.rs | 50 | ||||
-rw-r--r-- | src/model/prev/v051/mod.rs | 3 | ||||
-rw-r--r-- | src/model/prev/v051/object_table.rs | 149 | ||||
-rw-r--r-- | src/model/prev/v051/version_table.rs | 79 | ||||
-rw-r--r-- | src/model/s3/block_ref_table.rs | 29 | ||||
-rw-r--r-- | src/model/s3/object_table.rs | 241 | ||||
-rw-r--r-- | src/model/s3/version_table.rs | 180 |
13 files changed, 480 insertions, 659 deletions
diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs index fcd1536e..d07394f6 100644 --- a/src/model/bucket_alias_table.rs +++ b/src/model/bucket_alias_table.rs @@ -5,14 +5,22 @@ use garage_util::data::*; use garage_table::crdt::*; use garage_table::*; -/// The bucket alias table holds the names given to buckets -/// in the global namespace. -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct BucketAlias { - name: String, - pub state: crdt::Lww<Option<Uuid>>, +mod v08 { + use super::*; + + /// The bucket alias table holds the names given to buckets + /// in the global namespace. + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct BucketAlias { + pub(super) name: String, + pub state: crdt::Lww<Option<Uuid>>, + } + + impl garage_util::migrate::InitialFormat for BucketAlias {} } +pub use v08::*; + impl BucketAlias { pub fn new(name: String, ts: u64, bucket_id: Option<Uuid>) -> Option<Self> { if !is_valid_bucket_name(&name) { diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 7be42702..38ed88ee 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -7,72 +7,80 @@ use garage_util::time::*; use crate::permission::BucketKeyPerm; -/// A bucket is a collection of objects -/// -/// Its parameters are not directly accessible as: -/// - It must be possible to merge paramaters, hence the use of a LWW CRDT. -/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Bucket { - /// ID of the bucket - pub id: Uuid, - /// State, and configuration if not deleted, of the bucket - pub state: crdt::Deletable<BucketParams>, -} +mod v08 { + use super::*; + + /// A bucket is a collection of objects + /// + /// Its parameters are not directly accessible as: + /// - It must be possible to merge paramaters, hence the use of a LWW CRDT. + /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Bucket { + /// ID of the bucket + pub id: Uuid, + /// State, and configuration if not deleted, of the bucket + pub state: crdt::Deletable<BucketParams>, + } -/// Configuration for a bucket -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct BucketParams { - /// Bucket's creation date - pub creation_date: u64, - /// Map of key with access to the bucket, and what kind of access they give - pub authorized_keys: crdt::Map<String, BucketKeyPerm>, - - /// Map of aliases that are or have been given to this bucket - /// in the global namespace - /// (not authoritative: this is just used as an indication to - /// map back to aliases when doing ListBuckets) - pub aliases: crdt::LwwMap<String, bool>, - /// Map of aliases that are or have been given to this bucket - /// in namespaces local to keys - /// key = (access key id, alias name) - pub local_aliases: crdt::LwwMap<(String, String), bool>, - - /// Whether this bucket is allowed for website access - /// (under all of its global alias names), - /// and if so, the website configuration XML document - pub website_config: crdt::Lww<Option<WebsiteConfig>>, - /// CORS rules - pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>, - /// Bucket quotas - #[serde(default)] - pub quotas: crdt::Lww<BucketQuotas>, -} + /// Configuration for a bucket + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct BucketParams { + /// Bucket's creation date + pub creation_date: u64, + /// Map of key with access to the bucket, and what kind of access they give + pub authorized_keys: crdt::Map<String, BucketKeyPerm>, + + /// Map of aliases that are or have been given to this bucket + /// in the global namespace + /// (not authoritative: this is just used as an indication to + /// map back to aliases when doing ListBuckets) + pub aliases: crdt::LwwMap<String, bool>, + /// Map of aliases that are or have been given to this bucket + /// in namespaces local to keys + /// key = (access key id, alias name) + pub local_aliases: crdt::LwwMap<(String, String), bool>, + + /// Whether this bucket is allowed for website access + /// (under all of its global alias names), + /// and if so, the website configuration XML document + pub website_config: crdt::Lww<Option<WebsiteConfig>>, + /// CORS rules + pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>, + /// Bucket quotas + #[serde(default)] + pub quotas: crdt::Lww<BucketQuotas>, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct WebsiteConfig { - pub index_document: String, - pub error_document: Option<String>, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct WebsiteConfig { + pub index_document: String, + pub error_document: Option<String>, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct CorsRule { - pub id: Option<String>, - pub max_age_seconds: Option<u64>, - pub allow_origins: Vec<String>, - pub allow_methods: Vec<String>, - pub allow_headers: Vec<String>, - pub expose_headers: Vec<String>, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct CorsRule { + pub id: Option<String>, + pub max_age_seconds: Option<u64>, + pub allow_origins: Vec<String>, + pub allow_methods: Vec<String>, + pub allow_headers: Vec<String>, + pub expose_headers: Vec<String>, + } -#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct BucketQuotas { - /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket) - pub max_size: Option<u64>, - /// Maximum number of non-deleted objects in the bucket - pub max_objects: Option<u64>, + #[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct BucketQuotas { + /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket) + pub max_size: Option<u64>, + /// Maximum number of non-deleted objects in the bucket + pub max_objects: Option<u64>, + } + + impl garage_util::migrate::InitialFormat for Bucket {} } +pub use v08::*; + impl AutoCrdt for BucketQuotas { const WARN_IF_DIFFERENT: bool = true; } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 6303ea3e..c3ed29c7 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -12,6 +12,7 @@ use garage_rpc::system::System; use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; +use garage_util::migrate::Migrate; use garage_util::time::*; use garage_table::crdt::*; @@ -29,14 +30,28 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { fn counts(&self) -> Vec<(&'static str, i64)>; } -/// A counter entry in the global table -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] -pub struct CounterEntry<T: CountedItem> { - pub pk: T::CP, - pub sk: T::CS, - pub values: BTreeMap<String, CounterValue>, +mod v08 { + use super::*; + + /// A counter entry in the global table + #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] + pub struct CounterEntry<T: CountedItem> { + pub pk: T::CP, + pub sk: T::CS, + pub values: BTreeMap<String, CounterValue>, + } + + /// A counter entry in the global table + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct CounterValue { + pub node_values: BTreeMap<Uuid, (u64, i64)>, + } + + impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {} } +pub use v08::*; + impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { fn partition_key(&self) -> &T::CP { &self.pk @@ -78,12 +93,6 @@ impl<T: CountedItem> CounterEntry<T> { } } -/// A counter entry in the global table -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct CounterValue { - pub node_values: BTreeMap<Uuid, (u64, i64)>, -} - impl<T: CountedItem> Crdt for CounterEntry<T> { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { @@ -195,11 +204,9 @@ impl<T: CountedItem> IndexCounter<T> { let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => { - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)? - } + Some(old_bytes) => LocalCounterEntry::<T>::decode(&old_bytes) + .ok_or_message("Cannot decode local counter entry") + .map_err(db::TxError::Abort)?, None => LocalCounterEntry { pk: pk.clone(), sk: sk.clone(), @@ -214,7 +221,8 @@ impl<T: CountedItem> IndexCounter<T> { ent.1 += *inc; } - let new_entry_bytes = rmp_to_vec_all_named(&entry) + let new_entry_bytes = entry + .encode() .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; @@ -263,7 +271,7 @@ impl<T: CountedItem> IndexCounter<T> { tv.1 = 0; } - let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + let local_counter_bytes = local_counter.encode()?; self.local_counter .insert(&local_counter_k, &local_counter_bytes)?; @@ -330,7 +338,7 @@ impl<T: CountedItem> IndexCounter<T> { tv.1 += v; } - let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + let local_counter_bytes = local_counter.encode()?; self.local_counter .insert(&local_counter_key, local_counter_bytes)?; @@ -357,6 +365,8 @@ struct LocalCounterEntry<T: CountedItem> { values: BTreeMap<String, (u64, i64)>, } +impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {} + impl<T: CountedItem> LocalCounterEntry<T> { fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> { CounterEntry { diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 7860cb17..ce3e4129 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -1,7 +1,8 @@ -use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use serde::{Deserialize, Serialize}; + use garage_db as db; use garage_util::data::*; @@ -17,32 +18,43 @@ pub const CONFLICTS: &str = "conflicts"; pub const VALUES: &str = "values"; pub const BYTES: &str = "bytes"; -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct K2VItem { - pub partition: K2VItemPartition, - pub sort_key: String, +mod v08 { + use crate::k2v::causality::K2VNodeId; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; - items: BTreeMap<K2VNodeId, DvvsEntry>, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct K2VItem { + pub partition: K2VItemPartition, + pub sort_key: String, -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] -pub struct K2VItemPartition { - pub bucket_id: Uuid, - pub partition_key: String, -} + pub(super) items: BTreeMap<K2VNodeId, DvvsEntry>, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -struct DvvsEntry { - t_discard: u64, - values: Vec<(u64, DvvsValue)>, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] + pub struct K2VItemPartition { + pub bucket_id: Uuid, + pub partition_key: String, + } + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct DvvsEntry { + pub(super) t_discard: u64, + pub(super) values: Vec<(u64, DvvsValue)>, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum DvvsValue { - Value(#[serde(with = "serde_bytes")] Vec<u8>), - Deleted, + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum DvvsValue { + Value(#[serde(with = "serde_bytes")] Vec<u8>), + Deleted, + } + + impl garage_util::migrate::InitialFormat for K2VItem {} } +pub use v08::*; + impl K2VItem { /// Creates a new K2VItem when no previous entry existed in the db pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self { diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 9d2fc783..bb5334a3 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,45 +1,121 @@ use serde::{Deserialize, Serialize}; -use garage_table::crdt::*; -use garage_table::*; +use garage_util::crdt::{self, Crdt}; use garage_util::data::*; +use garage_table::{DeletedFilter, EmptyKey, Entry, TableSchema}; + use crate::permission::BucketKeyPerm; -use crate::prev::v051::key_table as old; +pub(crate) mod v05 { + use garage_util::crdt; + use serde::{Deserialize, Serialize}; -/// An api key -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Key { - /// The id of the key (immutable), used as partition key - pub key_id: String, + /// An api key + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Key { + /// The id of the key (immutable), used as partition key + pub key_id: String, - /// Internal state of the key - pub state: crdt::Deletable<KeyParams>, -} + /// The secret_key associated + pub secret_key: String, -/// Configuration for a key -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct KeyParams { - /// The secret_key associated (immutable) - pub secret_key: String, + /// Name for the key + pub name: crdt::Lww<String>, - /// Name for the key - pub name: crdt::Lww<String>, + /// Is the key deleted + pub deleted: crdt::Bool, + + /// Buckets in which the key is authorized. Empty if `Key` is deleted + // CRDT interaction: deleted implies authorized_buckets is empty + pub authorized_buckets: crdt::LwwMap<String, PermissionSet>, + } - /// Flag to allow users having this key to create buckets - pub allow_create_bucket: crdt::Lww<bool>, + /// Permission given to a key in a bucket + #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct PermissionSet { + /// The key can be used to read the bucket + pub allow_read: bool, + /// The key can be used to write in the bucket + pub allow_write: bool, + } + + impl crdt::AutoCrdt for PermissionSet { + const WARN_IF_DIFFERENT: bool = true; + } - /// If the key is present: it gives some permissions, - /// a map of bucket IDs (uuids) to permissions. - /// Otherwise no permissions are granted to key - pub authorized_buckets: crdt::Map<Uuid, BucketKeyPerm>, + impl garage_util::migrate::InitialFormat for Key {} +} - /// A key can have a local view of buckets names it is - /// the only one to see, this is the namespace for these aliases - pub local_aliases: crdt::LwwMap<String, Option<Uuid>>, +mod v08 { + use super::v05; + use crate::permission::BucketKeyPerm; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + /// An api key + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Key { + /// The id of the key (immutable), used as partition key + pub key_id: String, + + /// Internal state of the key + pub state: crdt::Deletable<KeyParams>, + } + + /// Configuration for a key + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct KeyParams { + /// The secret_key associated (immutable) + pub secret_key: String, + + /// Name for the key + pub name: crdt::Lww<String>, + + /// Flag to allow users having this key to create buckets + pub allow_create_bucket: crdt::Lww<bool>, + + /// If the key is present: it gives some permissions, + /// a map of bucket IDs (uuids) to permissions. + /// Otherwise no permissions are granted to key + pub authorized_buckets: crdt::Map<Uuid, BucketKeyPerm>, + + /// A key can have a local view of buckets names it is + /// the only one to see, this is the namespace for these aliases + pub local_aliases: crdt::LwwMap<String, Option<Uuid>>, + } + + impl garage_util::migrate::Migrate for Key { + type Previous = v05::Key; + + fn migrate(old_k: v05::Key) -> Key { + let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone()); + + let state = if old_k.deleted.get() { + crdt::Deletable::Deleted + } else { + // Authorized buckets is ignored here, + // migration is performed in specific migration code in + // garage/migrate.rs + crdt::Deletable::Present(KeyParams { + secret_key: old_k.secret_key, + name, + allow_create_bucket: crdt::Lww::new(false), + authorized_buckets: crdt::Map::new(), + local_aliases: crdt::LwwMap::new(), + }) + }; + Key { + key_id: old_k.key_id, + state, + } + } + } } +pub use v08::*; + impl KeyParams { fn new(secret_key: &str, name: &str) -> Self { KeyParams { @@ -173,28 +249,4 @@ impl TableSchema for KeyTable { } } } - - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old_k = rmp_serde::decode::from_read_ref::<_, old::Key>(bytes).ok()?; - let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone()); - - let state = if old_k.deleted.get() { - crdt::Deletable::Deleted - } else { - // Authorized buckets is ignored here, - // migration is performed in specific migration code in - // garage/migrate.rs - crdt::Deletable::Present(KeyParams { - secret_key: old_k.secret_key, - name, - allow_create_bucket: crdt::Lww::new(false), - authorized_buckets: crdt::Map::new(), - local_aliases: crdt::LwwMap::new(), - }) - }; - Some(Key { - key_id: old_k.key_id, - state, - }) - } } diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs index 628a49dd..19893458 100644 --- a/src/model/prev/v051/bucket_table.rs +++ b/src/model/prev/v051/bucket_table.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use garage_table::crdt::Crdt; use garage_table::*; -use super::key_table::PermissionSet; +use crate::key_table::v05::PermissionSet; /// A bucket is a collection of objects /// diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs deleted file mode 100644 index 37516b1c..00000000 --- a/src/model/prev/v051/key_table.rs +++ /dev/null @@ -1,50 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use garage_table::crdt::*; -use garage_table::*; - -/// An api key -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Key { - /// The id of the key (immutable), used as partition key - pub key_id: String, - - /// The secret_key associated - pub secret_key: String, - - /// Name for the key - pub name: crdt::Lww<String>, - - /// Is the key deleted - pub deleted: crdt::Bool, - - /// Buckets in which the key is authorized. Empty if `Key` is deleted - // CRDT interaction: deleted implies authorized_buckets is empty - pub authorized_buckets: crdt::LwwMap<String, PermissionSet>, -} - -/// Permission given to a key in a bucket -#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct PermissionSet { - /// The key can be used to read the bucket - pub allow_read: bool, - /// The key can be used to write in the bucket - pub allow_write: bool, -} - -impl AutoCrdt for PermissionSet { - const WARN_IF_DIFFERENT: bool = true; -} - -impl Crdt for Key { - fn merge(&mut self, other: &Self) { - self.name.merge(&other.name); - self.deleted.merge(&other.deleted); - - if self.deleted.get() { - self.authorized_buckets.clear(); - } else { - self.authorized_buckets.merge(&other.authorized_buckets); - } - } -} diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs index 7a954752..8c1335a5 100644 --- a/src/model/prev/v051/mod.rs +++ b/src/model/prev/v051/mod.rs @@ -1,4 +1 @@ pub(crate) mod bucket_table; -pub(crate) mod key_table; -pub(crate) mod object_table; -pub(crate) mod version_table; diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs deleted file mode 100644 index e79e5787..00000000 --- a/src/model/prev/v051/object_table.rs +++ /dev/null @@ -1,149 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; - -use garage_util::data::*; - -use garage_table::crdt::*; - -/// An object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Object { - /// The bucket in which the object is stored, used as partition key - pub bucket: String, - - /// The key at which the object is stored in its bucket, used as sorting key - pub key: String, - - /// The list of currenty stored versions of the object - versions: Vec<ObjectVersion>, -} - -impl Object { - /// Get a list of currently stored versions of `Object` - pub fn versions(&self) -> &[ObjectVersion] { - &self.versions[..] - } -} - -/// Informations about a version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersion { - /// Id of the version - pub uuid: Uuid, - /// Timestamp of when the object was created - pub timestamp: u64, - /// State of the version - pub state: ObjectVersionState, -} - -/// State of an object version -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionState { - /// The version is being received - Uploading(ObjectVersionHeaders), - /// The version is fully received - Complete(ObjectVersionData), - /// The version uploaded containded errors or the upload was explicitly aborted - Aborted, -} - -impl Crdt for ObjectVersionState { - fn merge(&mut self, other: &Self) { - use ObjectVersionState::*; - match other { - Aborted => { - *self = Aborted; - } - Complete(b) => match self { - Aborted => {} - Complete(a) => { - a.merge(b); - } - Uploading(_) => { - *self = Complete(b.clone()); - } - }, - Uploading(_) => {} - } - } -} - -/// Data stored in object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionData { - /// The object was deleted, this Version is a tombstone to mark it as such - DeleteMarker, - /// The object is short, it's stored inlined - Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), - /// The object is not short, Hash of first block is stored here, next segments hashes are - /// stored in the version table - FirstBlock(ObjectVersionMeta, Hash), -} - -impl AutoCrdt for ObjectVersionData { - const WARN_IF_DIFFERENT: bool = true; -} - -/// Metadata about the object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionMeta { - /// Headers to send to the client - pub headers: ObjectVersionHeaders, - /// Size of the object - pub size: u64, - /// etag of the object - pub etag: String, -} - -/// Additional headers for an object -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionHeaders { - /// Content type of the object - pub content_type: String, - /// Any other http headers to send - pub other: BTreeMap<String, String>, -} - -impl ObjectVersion { - fn cmp_key(&self) -> (u64, Uuid) { - (self.timestamp, self.uuid) - } - - /// Is the object version completely received - pub fn is_complete(&self) -> bool { - matches!(self.state, ObjectVersionState::Complete(_)) - } -} - -impl Crdt for Object { - fn merge(&mut self, other: &Self) { - // Merge versions from other into here - for other_v in other.versions.iter() { - match self - .versions - .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) - { - Ok(i) => { - self.versions[i].state.merge(&other_v.state); - } - Err(i) => { - self.versions.insert(i, other_v.clone()); - } - } - } - - // Remove versions which are obsolete, i.e. those that come - // before the last version which .is_complete(). - let last_complete = self - .versions - .iter() - .enumerate() - .rev() - .find(|(_, v)| v.is_complete()) - .map(|(vi, _)| vi); - - if let Some(last_vi) = last_complete { - self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>(); - } - } -} diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs deleted file mode 100644 index c11c62d5..00000000 --- a/src/model/prev/v051/version_table.rs +++ /dev/null @@ -1,79 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use garage_util::data::*; - -use garage_table::crdt::*; -use garage_table::*; - -/// A version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Version { - /// UUID of the version, used as partition key - pub uuid: Uuid, - - // Actual data: the blocks for this version - // In the case of a multipart upload, also store the etags - // of individual parts and check them when doing CompleteMultipartUpload - /// Is this version deleted - pub deleted: crdt::Bool, - /// list of blocks of data composing the version - pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, - /// Etag of each part in case of a multipart upload, empty otherwise - pub parts_etags: crdt::Map<u64, String>, - - // Back link to bucket+key so that we can figure if - // this was deleted later on - /// Bucket in which the related object is stored - pub bucket: String, - /// Key in which the related object is stored - pub key: String, -} - -#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlockKey { - /// Number of the part - pub part_number: u64, - /// Offset of this sub-segment in its part - pub offset: u64, -} - -impl Ord for VersionBlockKey { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.part_number - .cmp(&other.part_number) - .then(self.offset.cmp(&other.offset)) - } -} - -impl PartialOrd for VersionBlockKey { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -/// Informations about a single block -#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlock { - /// Blake2 sum of the block - pub hash: Hash, - /// Size of the block - pub size: u64, -} - -impl AutoCrdt for VersionBlock { - const WARN_IF_DIFFERENT: bool = true; -} - -impl Crdt for Version { - fn merge(&mut self, other: &Self) { - self.deleted.merge(&other.deleted); - - if self.deleted.get() { - self.blocks.clear(); - self.parts_etags.clear(); - } else { - self.blocks.merge(&other.blocks); - self.parts_etags.merge(&other.parts_etags); - } - } -} diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index c7017409..7b023d87 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -1,4 +1,3 @@ -use serde::{Deserialize, Serialize}; use std::sync::Arc; use garage_db as db; @@ -10,19 +9,29 @@ use garage_table::*; use garage_block::manager::*; -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct BlockRef { - /// Hash (blake2 sum) of the block, used as partition key - pub block: Hash, +mod v08 { + use garage_util::crdt; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; - /// Id of the Version for the object containing this block, used as sorting key - pub version: Uuid, + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct BlockRef { + /// Hash (blake2 sum) of the block, used as partition key + pub block: Hash, - // Keep track of deleted status - /// Is the Version that contains this block deleted - pub deleted: crdt::Bool, + /// Id of the Version for the object containing this block, used as sorting key + pub version: Uuid, + + // Keep track of deleted status + /// Is the Version that contains this block deleted + pub deleted: crdt::Bool, + } + + impl garage_util::migrate::InitialFormat for BlockRef {} } +pub use v08::*; + impl Entry<Hash, Uuid> for BlockRef { fn partition_key(&self) -> &Hash { &self.block diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 1b2f0014..616e0d35 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -1,5 +1,4 @@ use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; use std::sync::Arc; use garage_db as db; @@ -13,25 +12,126 @@ use garage_table::*; use crate::index_counter::*; use crate::s3::version_table::*; -use crate::prev::v051::object_table as old; - pub const OBJECTS: &str = "objects"; pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; pub const BYTES: &str = "bytes"; -/// An object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Object { - /// The bucket in which the object is stored, used as partition key - pub bucket_id: Uuid, +mod v05 { + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + + /// An object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket: String, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + pub(super) versions: Vec<ObjectVersion>, + } + + /// Informations about a version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, + } + + /// State of an object version + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionState { + /// The version is being received + Uploading(ObjectVersionHeaders), + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, + } + + /// Data stored in object version + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such + DeleteMarker, + /// The object is short, it's stored inlined + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table + FirstBlock(ObjectVersionMeta, Hash), + } + + /// Metadata about the object version + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersionMeta { + /// Headers to send to the client + pub headers: ObjectVersionHeaders, + /// Size of the object + pub size: u64, + /// etag of the object + pub etag: String, + } + + /// Additional headers for an object + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersionHeaders { + /// Content type of the object + pub content_type: String, + /// Any other http headers to send + pub other: BTreeMap<String, String>, + } + + impl garage_util::migrate::InitialFormat for Object {} +} + +mod v08 { + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v05; - /// The key at which the object is stored in its bucket, used as sorting key - pub key: String, + pub use v05::{ + ObjectVersion, ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta, + ObjectVersionState, + }; + + /// An object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket_id: Uuid, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + pub(super) versions: Vec<ObjectVersion>, + } - /// The list of currenty stored versions of the object - versions: Vec<ObjectVersion>, + impl garage_util::migrate::Migrate for Object { + type Previous = v05::Object; + + fn migrate(old: v05::Object) -> Object { + use garage_util::data::blake2sum; + + Object { + bucket_id: blake2sum(old.bucket.as_bytes()), + key: old.key, + versions: old.versions, + } + } + } } +pub use v08::*; + impl Object { /// Initialize an Object struct from parts pub fn new(bucket_id: Uuid, key: String, versions: Vec<ObjectVersion>) -> Self { @@ -68,28 +168,6 @@ impl Object { } } -/// Informations about a version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersion { - /// Id of the version - pub uuid: Uuid, - /// Timestamp of when the object was created - pub timestamp: u64, - /// State of the version - pub state: ObjectVersionState, -} - -/// State of an object version -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionState { - /// The version is being received - Uploading(ObjectVersionHeaders), - /// The version is fully received - Complete(ObjectVersionData), - /// The version uploaded containded errors or the upload was explicitly aborted - Aborted, -} - impl Crdt for ObjectVersionState { fn merge(&mut self, other: &Self) { use ObjectVersionState::*; @@ -111,42 +189,10 @@ impl Crdt for ObjectVersionState { } } -/// Data stored in object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionData { - /// The object was deleted, this Version is a tombstone to mark it as such - DeleteMarker, - /// The object is short, it's stored inlined - Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), - /// The object is not short, Hash of first block is stored here, next segments hashes are - /// stored in the version table - FirstBlock(ObjectVersionMeta, Hash), -} - impl AutoCrdt for ObjectVersionData { const WARN_IF_DIFFERENT: bool = true; } -/// Metadata about the object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionMeta { - /// Headers to send to the client - pub headers: ObjectVersionHeaders, - /// Size of the object - pub size: u64, - /// etag of the object - pub etag: String, -} - -/// Additional headers for an object -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionHeaders { - /// Content type of the object - pub content_type: String, - /// Any other http headers to send - pub other: BTreeMap<String, String>, -} - impl ObjectVersion { fn cmp_key(&self) -> (u64, Uuid) { (self.timestamp, self.uuid) @@ -290,11 +336,6 @@ impl TableSchema for ObjectTable { ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), } } - - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?; - Some(migrate_object(old_obj)) - } } impl CountedItem for Object { @@ -342,61 +383,3 @@ impl CountedItem for Object { // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // (we just want to change bucket into bucket_id by hashing it) - -fn migrate_object(o: old::Object) -> Object { - let versions = o - .versions() - .iter() - .cloned() - .map(migrate_object_version) - .collect(); - Object { - bucket_id: blake2sum(o.bucket.as_bytes()), - key: o.key, - versions, - } -} - -fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion { - ObjectVersion { - uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(), - timestamp: v.timestamp, - state: match v.state { - old::ObjectVersionState::Uploading(h) => { - ObjectVersionState::Uploading(migrate_object_version_headers(h)) - } - old::ObjectVersionState::Complete(d) => { - ObjectVersionState::Complete(migrate_object_version_data(d)) - } - old::ObjectVersionState::Aborted => ObjectVersionState::Aborted, - }, - } -} - -fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders { - ObjectVersionHeaders { - content_type: h.content_type, - other: h.other, - } -} - -fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData { - match d { - old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker, - old::ObjectVersionData::Inline(m, b) => { - ObjectVersionData::Inline(migrate_object_version_meta(m), b) - } - old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock( - migrate_object_version_meta(m), - Hash::try_from(h.as_slice()).unwrap(), - ), - } -} - -fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta { - ObjectVersionMeta { - headers: migrate_object_version_headers(m.headers), - size: m.size, - etag: m.etag, - } -} diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 0486512b..6edc83f4 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -1,4 +1,3 @@ -use serde::{Deserialize, Serialize}; use std::sync::Arc; use garage_db as db; @@ -11,32 +10,108 @@ use garage_table::*; use crate::s3::block_ref_table::*; -use crate::prev::v051::version_table as old; - -/// A version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Version { - /// UUID of the version, used as partition key - pub uuid: Uuid, - - // Actual data: the blocks for this version - // In the case of a multipart upload, also store the etags - // of individual parts and check them when doing CompleteMultipartUpload - /// Is this version deleted - pub deleted: crdt::Bool, - /// list of blocks of data composing the version - pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, - /// Etag of each part in case of a multipart upload, empty otherwise - pub parts_etags: crdt::Map<u64, String>, - - // Back link to bucket+key so that we can figure if - // this was deleted later on - /// Bucket in which the related object is stored - pub bucket_id: Uuid, - /// Key in which the related object is stored - pub key: String, +mod v05 { + use garage_util::crdt; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + + /// A version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, + /// Etag of each part in case of a multipart upload, empty otherwise + pub parts_etags: crdt::Map<u64, String>, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket: String, + /// Key in which the related object is stored + pub key: String, + } + + #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct VersionBlockKey { + /// Number of the part + pub part_number: u64, + /// Offset of this sub-segment in its part + pub offset: u64, + } + + /// Informations about a single block + #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct VersionBlock { + /// Blake2 sum of the block + pub hash: Hash, + /// Size of the block + pub size: u64, + } + + impl garage_util::migrate::InitialFormat for Version {} +} + +mod v08 { + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v05; + + /// A version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, + /// Etag of each part in case of a multipart upload, empty otherwise + pub parts_etags: crdt::Map<u64, String>, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket_id: Uuid, + /// Key in which the related object is stored + pub key: String, + } + + pub use v05::{VersionBlock, VersionBlockKey}; + + impl garage_util::migrate::Migrate for Version { + type Previous = v05::Version; + + fn migrate(old: v05::Version) -> Version { + use garage_util::data::blake2sum; + + Version { + uuid: old.uuid, + deleted: old.deleted, + blocks: old.blocks, + parts_etags: old.parts_etags, + bucket_id: blake2sum(old.bucket.as_bytes()), + key: old.key, + } + } + } } +pub use v08::*; + impl Version { pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { Self { @@ -64,14 +139,6 @@ impl Version { } } -#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlockKey { - /// Number of the part - pub part_number: u64, - /// Offset of this sub-segment in its part - pub offset: u64, -} - impl Ord for VersionBlockKey { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.part_number @@ -86,15 +153,6 @@ impl PartialOrd for VersionBlockKey { } } -/// Informations about a single block -#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlock { - /// Blake2 sum of the block - pub hash: Hash, - /// Size of the block - pub size: u64, -} - impl AutoCrdt for VersionBlock { const WARN_IF_DIFFERENT: bool = true; } @@ -166,42 +224,4 @@ impl TableSchema for VersionTable { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.deleted.get()) } - - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?; - - let blocks = old - .blocks - .items() - .iter() - .map(|(k, v)| { - ( - VersionBlockKey { - part_number: k.part_number, - offset: k.offset, - }, - VersionBlock { - hash: Hash::try_from(v.hash.as_slice()).unwrap(), - size: v.size, - }, - ) - }) - .collect::<crdt::Map<_, _>>(); - - let parts_etags = old - .parts_etags - .items() - .iter() - .map(|(k, v)| (*k, v.clone())) - .collect::<crdt::Map<_, _>>(); - - Some(Version { - uuid: Hash::try_from(old.uuid.as_slice()).unwrap(), - deleted: crdt::Bool::new(old.deleted.get()), - blocks, - parts_etags, - bucket_id: blake2sum(old.bucket.as_bytes()), - key: old.key, - }) - } } |