diff options
Diffstat (limited to 'src/model/s3')
-rw-r--r-- | src/model/s3/block_ref_table.rs | 39 | ||||
-rw-r--r-- | src/model/s3/lifecycle_worker.rs | 8 | ||||
-rw-r--r-- | src/model/s3/object_table.rs | 215 | ||||
-rw-r--r-- | src/model/s3/version_table.rs | 60 |
4 files changed, 219 insertions, 103 deletions
diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index 7b023d87..57eb7b16 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -3,8 +3,12 @@ use std::sync::Arc; use garage_db as db; use garage_util::data::*; +use garage_util::error::*; +use garage_util::migrate::Migrate; +use garage_block::CalculateRefcount; use garage_table::crdt::Crdt; +use garage_table::replication::TableShardedReplication; use garage_table::*; use garage_block::manager::*; @@ -84,3 +88,38 @@ impl TableSchema for BlockRefTable { filter.apply(entry.deleted.get()) } } + +pub fn block_ref_recount_fn( + block_ref_table: &Arc<Table<BlockRefTable, TableShardedReplication>>, +) -> CalculateRefcount { + let table = Arc::downgrade(block_ref_table); + Box::new(move |tx: &db::Transaction, block: &Hash| { + let table = table + .upgrade() + .ok_or_message("cannot upgrade weak ptr to block_ref_table") + .map_err(db::TxError::Abort)?; + Ok(calculate_refcount(&table, tx, block)?) + }) +} + +fn calculate_refcount( + block_ref_table: &Table<BlockRefTable, TableShardedReplication>, + tx: &db::Transaction, + block: &Hash, +) -> db::TxResult<usize, Error> { + let mut result = 0; + for entry in tx.range(&block_ref_table.data.store, block.as_slice()..)? { + let (key, value) = entry?; + if &key[..32] != block.as_slice() { + break; + } + let value = BlockRef::decode(&value) + .ok_or_message("could not decode block_ref") + .map_err(db::TxError::Abort)?; + assert_eq!(value.block, *block); + if !value.deleted.get() { + result += 1; + } + } + Ok(result) +} diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs index 50d4283f..9ecf168c 100644 --- a/src/model/s3/lifecycle_worker.rs +++ b/src/model/s3/lifecycle_worker.rs @@ -121,13 +121,7 @@ impl Worker for LifecycleWorker { mpu_aborted, .. } => { - let n_objects = self - .garage - .object_table - .data - .store - .fast_len() - .unwrap_or(None); + let n_objects = self.garage.object_table.data.store.len().ok(); let progress = match n_objects { None => "...".to_string(), Some(total) => format!( diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index ebea04bd..eedb9615 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -17,7 +17,7 @@ pub const OBJECTS: &str = "objects"; pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; pub const BYTES: &str = "bytes"; -mod v05 { +mod v08 { use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; @@ -26,7 +26,7 @@ mod v05 { #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Object { /// The bucket in which the object is stored, used as partition key - pub bucket: String, + pub bucket_id: Uuid, /// The key at which the object is stored in its bucket, used as sorting key pub key: String, @@ -92,16 +92,13 @@ mod v05 { impl garage_util::migrate::InitialFormat for Object {} } -mod v08 { +mod v09 { use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; - use super::v05; + use super::v08; - pub use v05::{ - ObjectVersion, ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta, - ObjectVersionState, - }; + pub use v08::{ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta}; /// An object #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] @@ -116,28 +113,69 @@ mod v08 { pub(super) versions: Vec<ObjectVersion>, } + /// Informations about a version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, + } + + /// State of an object version + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionState { + /// The version is being received + Uploading { + /// Indicates whether this is a multipart upload + multipart: bool, + /// Headers to be included in the final object + headers: ObjectVersionHeaders, + }, + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, + } + impl garage_util::migrate::Migrate for Object { - type Previous = v05::Object; + const VERSION_MARKER: &'static [u8] = b"G09s3o"; - fn migrate(old: v05::Object) -> Object { - use garage_util::data::blake2sum; + type Previous = v08::Object; + fn migrate(old: v08::Object) -> Object { + let versions = old + .versions + .into_iter() + .map(|x| ObjectVersion { + uuid: x.uuid, + timestamp: x.timestamp, + state: match x.state { + v08::ObjectVersionState::Uploading(h) => ObjectVersionState::Uploading { + multipart: false, + headers: h, + }, + v08::ObjectVersionState::Complete(d) => ObjectVersionState::Complete(d), + v08::ObjectVersionState::Aborted => ObjectVersionState::Aborted, + }, + }) + .collect(); Object { - bucket_id: blake2sum(old.bucket.as_bytes()), + bucket_id: old.bucket_id, key: old.key, - versions: old.versions, + versions, } } } } -mod v09 { - use garage_util::data::Uuid; +mod v010 { + use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; - use super::v08; - - pub use v08::{ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta}; + use super::v09; /// An object #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] @@ -170,8 +208,8 @@ mod v09 { Uploading { /// Indicates whether this is a multipart upload multipart: bool, - /// Headers to be included in the final object - headers: ObjectVersionHeaders, + /// Encryption params + headers to be included in the final object + encryption: ObjectVersionEncryption, }, /// The version is fully received Complete(ObjectVersionData), @@ -179,38 +217,133 @@ mod v09 { Aborted, } + /// Data stored in object version + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such + DeleteMarker, + /// The object is short, it's stored inlined. + /// It is never compressed. For encrypted objects, it is encrypted using + /// AES256-GCM, like the encrypted headers. + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table + FirstBlock(ObjectVersionMeta, Hash), + } + + /// Metadata about the object version + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersionMeta { + /// Size of the object. If object is encrypted/compressed, + /// this is always the size of the unencrypted/uncompressed data + pub size: u64, + /// etag of the object + pub etag: String, + /// Encryption params + headers (encrypted or plaintext) + pub encryption: ObjectVersionEncryption, + } + + /// Encryption information + metadata + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionEncryption { + SseC { + /// Encrypted serialized ObjectVersionHeaders struct. + /// This is never compressed, just encrypted using AES256-GCM. + #[serde(with = "serde_bytes")] + headers: Vec<u8>, + /// Whether data blocks are compressed in addition to being encrypted + /// (compression happens before encryption, whereas for non-encrypted + /// objects, compression is handled at the level of the block manager) + compressed: bool, + }, + Plaintext { + /// Plain-text headers + headers: ObjectVersionHeaders, + }, + } + + /// Vector of headers, as tuples of the format (header name, header value) + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersionHeaders(pub Vec<(String, String)>); + impl garage_util::migrate::Migrate for Object { - const VERSION_MARKER: &'static [u8] = b"G09s3o"; + const VERSION_MARKER: &'static [u8] = b"G010s3ob"; - type Previous = v08::Object; + type Previous = v09::Object; - fn migrate(old: v08::Object) -> Object { - let versions = old - .versions - .into_iter() - .map(|x| ObjectVersion { - uuid: x.uuid, - timestamp: x.timestamp, - state: match x.state { - v08::ObjectVersionState::Uploading(h) => ObjectVersionState::Uploading { - multipart: false, - headers: h, - }, - v08::ObjectVersionState::Complete(d) => ObjectVersionState::Complete(d), - v08::ObjectVersionState::Aborted => ObjectVersionState::Aborted, - }, - }) - .collect(); + fn migrate(old: v09::Object) -> Object { Object { bucket_id: old.bucket_id, key: old.key, - versions, + versions: old.versions.into_iter().map(migrate_version).collect(), } } } + + fn migrate_version(old: v09::ObjectVersion) -> ObjectVersion { + ObjectVersion { + uuid: old.uuid, + timestamp: old.timestamp, + state: match old.state { + v09::ObjectVersionState::Uploading { multipart, headers } => { + ObjectVersionState::Uploading { + multipart, + encryption: migrate_headers(headers), + } + } + v09::ObjectVersionState::Complete(d) => { + ObjectVersionState::Complete(migrate_data(d)) + } + v09::ObjectVersionState::Aborted => ObjectVersionState::Aborted, + }, + } + } + + fn migrate_data(old: v09::ObjectVersionData) -> ObjectVersionData { + match old { + v09::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker, + v09::ObjectVersionData::Inline(meta, data) => { + ObjectVersionData::Inline(migrate_meta(meta), data) + } + v09::ObjectVersionData::FirstBlock(meta, fb) => { + ObjectVersionData::FirstBlock(migrate_meta(meta), fb) + } + } + } + + fn migrate_meta(old: v09::ObjectVersionMeta) -> ObjectVersionMeta { + ObjectVersionMeta { + size: old.size, + etag: old.etag, + encryption: migrate_headers(old.headers), + } + } + + fn migrate_headers(old: v09::ObjectVersionHeaders) -> ObjectVersionEncryption { + use http::header::CONTENT_TYPE; + + let mut new_headers = Vec::with_capacity(old.other.len() + 1); + if old.content_type != "blob" { + new_headers.push((CONTENT_TYPE.as_str().to_string(), old.content_type)); + } + for (name, value) in old.other.into_iter() { + new_headers.push((name, value)); + } + + ObjectVersionEncryption::Plaintext { + headers: ObjectVersionHeaders(new_headers), + } + } + + // Since ObjectVersionHeaders can now be serialized independently, for the + // purpose of being encrypted, we need it to support migrations on its own + // as well. + impl garage_util::migrate::InitialFormat for ObjectVersionHeaders { + const VERSION_MARKER: &'static [u8] = b"G010s3oh"; + } } -pub use v09::*; +pub use v010::*; impl Object { /// Initialize an Object struct from parts diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 5c032f9f..d611a9e3 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -11,7 +11,7 @@ use garage_table::*; use crate::s3::block_ref_table::*; -mod v05 { +mod v08 { use garage_util::crdt; use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; @@ -35,7 +35,7 @@ mod v05 { // Back link to bucket+key so that we can figure if // this was deleted later on /// Bucket in which the related object is stored - pub bucket: String, + pub bucket_id: Uuid, /// Key in which the related object is stored pub key: String, } @@ -44,7 +44,8 @@ mod v05 { pub struct VersionBlockKey { /// Number of the part pub part_number: u64, - /// Offset of this sub-segment in its part + /// Offset of this sub-segment in its part as sent by the client + /// (before any kind of compression or encryption) pub offset: u64, } @@ -53,64 +54,13 @@ mod v05 { pub struct VersionBlock { /// Blake2 sum of the block pub hash: Hash, - /// Size of the block + /// Size of the block, before any kind of compression or encryption pub size: u64, } impl garage_util::migrate::InitialFormat for Version {} } -mod v08 { - use garage_util::crdt; - use garage_util::data::Uuid; - use serde::{Deserialize, Serialize}; - - use super::v05; - - pub use v05::{VersionBlock, VersionBlockKey}; - - /// A version of an object - #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] - pub struct Version { - /// UUID of the version, used as partition key - pub uuid: Uuid, - - // Actual data: the blocks for this version - // In the case of a multipart upload, also store the etags - // of individual parts and check them when doing CompleteMultipartUpload - /// Is this version deleted - pub deleted: crdt::Bool, - /// list of blocks of data composing the version - pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, - /// Etag of each part in case of a multipart upload, empty otherwise - pub parts_etags: crdt::Map<u64, String>, - - // Back link to bucket+key so that we can figure if - // this was deleted later on - /// Bucket in which the related object is stored - pub bucket_id: Uuid, - /// Key in which the related object is stored - pub key: String, - } - - impl garage_util::migrate::Migrate for Version { - type Previous = v05::Version; - - fn migrate(old: v05::Version) -> Version { - use garage_util::data::blake2sum; - - Version { - uuid: old.uuid, - deleted: old.deleted, - blocks: old.blocks, - parts_etags: old.parts_etags, - bucket_id: blake2sum(old.bucket.as_bytes()), - key: old.key, - } - } - } -} - pub(crate) mod v09 { use garage_util::crdt; use garage_util::data::Uuid; |