diff options
author | Alex Auvolat <alex@adnab.me> | 2021-12-16 13:17:09 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-01-04 12:46:13 +0100 |
commit | 4d30e62db456097563c574b9dfd22b138d700087 (patch) | |
tree | 00c6790eb7dad952c8e4796731a4ff6e7c3613b6 /src/model | |
parent | 0bbb6673e7ce703e470a3c2aad620ee5f009bc84 (diff) | |
download | garage-4d30e62db456097563c574b9dfd22b138d700087.tar.gz garage-4d30e62db456097563c574b9dfd22b138d700087.zip |
New buckets for 0.6.0: migration code and build files
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/Cargo.toml | 2 | ||||
-rw-r--r-- | src/model/key_table.rs | 27 | ||||
-rw-r--r-- | src/model/lib.rs | 1 | ||||
-rw-r--r-- | src/model/migrate.rs | 93 | ||||
-rw-r--r-- | src/model/object_table.rs | 71 | ||||
-rw-r--r-- | src/model/version_table.rs | 35 |
6 files changed, 228 insertions, 1 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 12c08719..03881f5d 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -17,7 +17,7 @@ path = "lib.rs" garage_rpc = { version = "0.6.0", path = "../rpc" } garage_table = { version = "0.6.0", path = "../table" } garage_util = { version = "0.6.0", path = "../util" } -garage_model_050 = { package = "garage_model", version = "0.5.0" } +garage_model_050 = { package = "garage_model", version = "0.5.1" } async-trait = "0.1.7" arc-swap = "1.0" diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 469dbd49..526ed496 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -171,4 +171,31 @@ impl TableSchema for KeyTable { } } } + + fn try_migrate(bytes: &[u8]) -> Option<Self::E> { + let old_k = + match rmp_serde::decode::from_read_ref::<_, garage_model_050::key_table::Key>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + let state = if old_k.deleted.get() { + crdt::Deletable::Deleted + } else { + // Authorized buckets is ignored here, + // migration is performed in specific migration code in + // garage/migrate.rs + crdt::Deletable::Present(KeyParams { + allow_create_bucket: crdt::Lww::new(false), + authorized_buckets: crdt::Map::new(), + local_aliases: crdt::LwwMap::new(), + }) + }; + let name = crdt::Lww::migrate_from_raw(old_k.name.timestamp(), old_k.name.get().clone()); + Some(Key { + key_id: old_k.key_id, + secret_key: old_k.secret_key, + name, + state, + }) + } } diff --git a/src/model/lib.rs b/src/model/lib.rs index fe8cfdad..3f6b5cd4 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -8,6 +8,7 @@ pub mod bucket_helper; pub mod bucket_table; pub mod garage; pub mod key_table; +pub mod migrate; pub mod object_table; pub mod permission; pub mod version_table; diff --git a/src/model/migrate.rs b/src/model/migrate.rs new file mode 100644 index 00000000..35ff1807 --- /dev/null +++ b/src/model/migrate.rs @@ -0,0 +1,93 @@ +use std::sync::Arc; + +use garage_table::util::EmptyKey; +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::error::*; +use garage_util::time::*; + +use garage_model_050::bucket_table as old_bucket; + +use crate::bucket_alias_table::*; +use crate::bucket_table::*; +use crate::garage::Garage; +use crate::permission::*; + +pub struct Migrate { + pub garage: Arc<Garage>, +} + +impl Migrate { + pub async fn migrate_buckets050(&self) -> Result<(), Error> { + let tree = self.garage.db.open_tree("bucket:table")?; + + for res in tree.iter() { + let (_k, v) = res?; + let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])?; + + if let old_bucket::BucketState::Present(p) = bucket.state.get() { + self.migrate_buckets050_do_bucket(&bucket, p).await?; + } + } + + Ok(()) + } + + pub async fn migrate_buckets050_do_bucket( + &self, + old_bucket: &old_bucket::Bucket, + old_bucket_p: &old_bucket::BucketParams, + ) -> Result<(), Error> { + let mut new_ak = Map::new(); + for (k, ts, perm) in old_bucket_p.authorized_keys.items().iter() { + new_ak.put( + k.to_string(), + BucketKeyPerm { + timestamp: *ts, + allow_read: perm.allow_read, + allow_write: perm.allow_write, + allow_owner: false, + }, + ); + } + + let mut aliases = LwwMap::new(); + aliases.update_in_place(old_bucket.name.clone(), true); + + let new_bucket = Bucket { + id: blake2sum(old_bucket.name.as_bytes()), + state: Deletable::Present(BucketParams { + creation_date: now_msec(), + authorized_keys: new_ak.clone(), + website_access: Lww::new(*old_bucket_p.website.get()), + website_config: Lww::new(None), + aliases, + local_aliases: LwwMap::new(), + }), + }; + self.garage.bucket_table.insert(&new_bucket).await?; + + let new_alias = BucketAlias { + name: old_bucket.name.clone(), + state: Lww::new(Deletable::Present(AliasParams { + bucket_id: new_bucket.id, + })), + }; + self.garage.bucket_alias_table.insert(&new_alias).await?; + + for (k, perm) in new_ak.items().iter() { + let mut key = self + .garage + .key_table + .get(&EmptyKey, k) + .await? + .ok_or_message(format!("Missing key: {}", k))?; + if let Some(p) = key.state.as_option_mut() { + p.authorized_buckets.put(new_bucket.id, *perm); + } + self.garage.key_table.insert(&key).await?; + } + + Ok(()) + } +} diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 285cb5a7..45f0daf4 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -11,6 +11,8 @@ use garage_table::*; use crate::version_table::*; +use garage_model_050::object_table as old; + /// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { @@ -255,4 +257,73 @@ impl TableSchema for ObjectTable { let deleted = !entry.versions.iter().any(|v| v.is_data()); filter.apply(deleted) } + + fn try_migrate(bytes: &[u8]) -> Option<Self::E> { + let old_v = match rmp_serde::decode::from_read_ref::<_, old::Object>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + Some(migrate_object(old_v)) + } +} + +// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv +// (we just want to change bucket into bucket_id by hashing it) + +fn migrate_object(o: old::Object) -> Object { + let versions = o + .versions() + .iter() + .cloned() + .map(migrate_object_version) + .collect(); + Object { + bucket_id: blake2sum(o.bucket.as_bytes()), + key: o.key, + versions, + } +} + +fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion { + ObjectVersion { + uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(), + timestamp: v.timestamp, + state: match v.state { + old::ObjectVersionState::Uploading(h) => { + ObjectVersionState::Uploading(migrate_object_version_headers(h)) + } + old::ObjectVersionState::Complete(d) => { + ObjectVersionState::Complete(migrate_object_version_data(d)) + } + old::ObjectVersionState::Aborted => ObjectVersionState::Aborted, + }, + } +} + +fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders { + ObjectVersionHeaders { + content_type: h.content_type, + other: h.other, + } +} + +fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData { + match d { + old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker, + old::ObjectVersionData::Inline(m, b) => { + ObjectVersionData::Inline(migrate_object_version_meta(m), b) + } + old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock( + migrate_object_version_meta(m), + Hash::try_from(h.as_slice()).unwrap(), + ), + } +} + +fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta { + ObjectVersionMeta { + headers: migrate_object_version_headers(m.headers), + size: m.size, + etag: m.etag, + } } diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 4edea0b7..05cae831 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -147,4 +147,39 @@ impl TableSchema for VersionTable { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.deleted.get()) } + + fn try_migrate(bytes: &[u8]) -> Option<Self::E> { + let old = + match rmp_serde::decode::from_read_ref::<_, garage_model_050::version_table::Version>( + bytes, + ) { + Ok(x) => x, + Err(_) => return None, + }; + let mut new_blocks = crdt::Map::new(); + for (k, v) in old.blocks.items().iter() { + new_blocks.put( + VersionBlockKey { + part_number: k.part_number, + offset: k.offset, + }, + VersionBlock { + hash: Hash::try_from(v.hash.as_slice()).unwrap(), + size: v.size, + }, + ); + } + let mut new_parts_etags = crdt::Map::new(); + for (k, v) in old.parts_etags.items().iter() { + new_parts_etags.put(*k, v.clone()); + } + Some(Version { + uuid: Hash::try_from(old.uuid.as_slice()).unwrap(), + deleted: crdt::Bool::new(old.deleted.get()), + blocks: new_blocks, + parts_etags: new_parts_etags, + bucket_id: blake2sum(old.bucket.as_bytes()), + key: old.key, + }) + } } |