aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/Cargo.toml6
-rw-r--r--src/model/garage.rs37
-rw-r--r--src/model/helper/bucket.rs12
-rw-r--r--src/model/index_counter.rs8
-rw-r--r--src/model/k2v/rpc.rs45
-rw-r--r--src/model/key_table.rs68
-rw-r--r--src/model/lib.rs4
-rw-r--r--src/model/migrate.rs108
-rw-r--r--src/model/prev/mod.rs1
-rw-r--r--src/model/prev/v051/bucket_table.rs63
-rw-r--r--src/model/prev/v051/mod.rs1
-rw-r--r--src/model/s3/block_ref_table.rs39
-rw-r--r--src/model/s3/lifecycle_worker.rs8
-rw-r--r--src/model/s3/object_table.rs215
-rw-r--r--src/model/s3/version_table.rs60
15 files changed, 278 insertions, 397 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index bde354b5..1e1ce0f7 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_model"
-version = "0.9.3"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -27,6 +27,7 @@ blake2.workspace = true
chrono.workspace = true
err-derive.workspace = true
hex.workspace = true
+http.workspace = true
base64.workspace = true
parse_duration.workspace = true
tracing.workspace = true
@@ -42,8 +43,7 @@ tokio.workspace = true
opentelemetry.workspace = true
[features]
-default = [ "sled", "lmdb", "sqlite" ]
+default = [ "lmdb", "sqlite" ]
k2v = [ "garage_util/k2v" ]
lmdb = [ "garage_db/lmdb" ]
-sled = [ "garage_db/sled" ]
sqlite = [ "garage_db/sqlite" ]
diff --git a/src/model/garage.rs b/src/model/garage.rs
index a6f60546..273690db 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -10,7 +10,7 @@ use garage_util::config::*;
use garage_util::error::*;
use garage_util::persister::PersisterShared;
-use garage_rpc::replication_mode::ReplicationMode;
+use garage_rpc::replication_mode::*;
use garage_rpc::system::System;
use garage_block::manager::*;
@@ -40,8 +40,8 @@ pub struct Garage {
/// The set of background variables that can be viewed/modified at runtime
pub bg_vars: vars::BgVars,
- /// The replication mode of this cluster
- pub replication_mode: ReplicationMode,
+ /// The replication factor of this cluster
+ pub replication_factor: ReplicationFactor,
/// The local database
pub db: db::Db,
@@ -118,9 +118,6 @@ impl Garage {
.ok_or_message("Invalid `db_engine` value in configuration file")?;
let mut db_path = config.metadata_dir.clone();
match db_engine {
- db::Engine::Sled => {
- db_path.push("db");
- }
db::Engine::Sqlite => {
db_path.push("db.sqlite");
}
@@ -134,8 +131,6 @@ impl Garage {
v if v == usize::default() => None,
v => Some(v),
},
- sled_cache_capacity: config.sled_cache_capacity,
- sled_flush_every_ms: config.sled_flush_every_ms,
};
let db = db::open_db(&db_path, db_engine, &db_opt)
.ok_or_message("Unable to open metadata db")?;
@@ -148,32 +143,30 @@ impl Garage {
.and_then(|x| NetworkKey::from_slice(&x))
.ok_or_message("Invalid RPC secret key")?;
- let replication_mode = ReplicationMode::parse(&config.replication_mode)
- .ok_or_message("Invalid replication_mode in config file.")?;
+ let (replication_factor, consistency_mode) = parse_replication_mode(&config)?;
info!("Initialize background variable system...");
let mut bg_vars = vars::BgVars::new();
info!("Initialize membership management system...");
- let system = System::new(network_key, replication_mode, &config)?;
+ let system = System::new(network_key, replication_factor, consistency_mode, &config)?;
let data_rep_param = TableShardedReplication {
system: system.clone(),
- replication_factor: replication_mode.replication_factor(),
- write_quorum: replication_mode.write_quorum(),
+ replication_factor: replication_factor.into(),
+ write_quorum: replication_factor.write_quorum(consistency_mode),
read_quorum: 1,
};
let meta_rep_param = TableShardedReplication {
system: system.clone(),
- replication_factor: replication_mode.replication_factor(),
- write_quorum: replication_mode.write_quorum(),
- read_quorum: replication_mode.read_quorum(),
+ replication_factor: replication_factor.into(),
+ write_quorum: replication_factor.write_quorum(consistency_mode),
+ read_quorum: replication_factor.read_quorum(consistency_mode),
};
let control_rep_param = TableFullReplication {
system: system.clone(),
- max_faults: replication_mode.control_write_max_faults(),
};
info!("Initialize block manager...");
@@ -254,11 +247,19 @@ impl Garage {
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
+ // ---- setup block refcount recalculation ----
+ // this function can be used to fix inconsistencies in the RC table
+ block_manager.set_recalc_rc(vec![
+ block_ref_recount_fn(&block_ref_table),
+ // other functions could be added here if we had other tables
+ // that hold references to data blocks
+ ]);
+
// -- done --
Ok(Arc::new(Self {
config,
bg_vars,
- replication_mode,
+ replication_factor,
db,
system,
block_manager,
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 4ae9122f..e5506d7e 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -155,10 +155,12 @@ impl<'a> BucketHelper<'a> {
#[cfg(feature = "k2v")]
{
- use garage_rpc::ring::Ring;
- use std::sync::Arc;
-
- let ring: Arc<Ring> = self.0.system.ring.borrow().clone();
+ let node_id_vec = self
+ .0
+ .system
+ .cluster_layout()
+ .all_nongateway_nodes()
+ .to_vec();
let k2vindexes = self
.0
.k2v
@@ -167,7 +169,7 @@ impl<'a> BucketHelper<'a> {
.get_range(
&bucket_id,
None,
- Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ Some((DeletedFilter::NotDeleted, node_id_vec)),
10,
EnumerationOrder::Forward,
)
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index c0bf38d8..aa13ee7b 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use garage_db as db;
-use garage_rpc::ring::Ring;
+use garage_rpc::layout::LayoutHelper;
use garage_rpc::system::System;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -83,9 +83,9 @@ impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
}
impl<T: CountedItem> CounterEntry<T> {
- pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
- let nodes = &ring.layout.node_id_vec[..];
- self.filtered_values_with_nodes(nodes)
+ pub fn filtered_values(&self, layout: &LayoutHelper) -> HashMap<String, i64> {
+ let nodes = layout.all_nongateway_nodes();
+ self.filtered_values_with_nodes(&nodes)
}
pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> {
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 4ab44c22..e15f2df8 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -127,23 +127,21 @@ impl K2VRpcHandler {
.item_table
.data
.replication
- .write_nodes(&partition.hash());
+ .storage_nodes(&partition.hash());
who.sort();
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &who[..],
+ &who,
K2VRpc::InsertItem(InsertedItem {
partition,
sort_key,
causal_context,
value,
}),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(1)
- .interrupt_after_quorum(true),
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1),
)
.await?;
@@ -168,7 +166,7 @@ impl K2VRpcHandler {
.item_table
.data
.replication
- .write_nodes(&partition.hash());
+ .storage_nodes(&partition.hash());
who.sort();
call_list.entry(who).or_default().push(InsertedItem {
@@ -187,14 +185,12 @@ impl K2VRpcHandler {
let call_futures = call_list.into_iter().map(|(nodes, items)| async move {
let resp = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&nodes[..],
K2VRpc::InsertManyItems(items),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(1)
- .interrupt_after_quorum(true),
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1),
)
.await?;
Ok::<_, Error>((nodes, resp))
@@ -223,15 +219,16 @@ impl K2VRpcHandler {
},
sort_key,
};
+ // TODO figure this out with write sets, is it still appropriate???
let nodes = self
.item_table
.data
.replication
- .write_nodes(&poll_key.partition.hash());
+ .read_nodes(&poll_key.partition.hash());
- let rpc = self.system.rpc.try_call_many(
+ let rpc = self.system.rpc_helper().try_call_many(
&self.endpoint,
- &nodes[..],
+ &nodes,
K2VRpc::PollItem {
key: poll_key,
causal_context,
@@ -239,9 +236,11 @@ impl K2VRpcHandler {
},
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.item_table.data.replication.read_quorum())
+ .send_all_at_once(true)
.without_timeout(),
);
- let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let timeout_duration =
+ Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout();
let resps = select! {
r = rpc => r?,
_ = tokio::time::sleep(timeout_duration) => return Ok(None),
@@ -283,11 +282,12 @@ impl K2VRpcHandler {
seen.restrict(&range);
// Prepare PollRange RPC to send to the storage nodes responsible for the parititon
+ // TODO figure this out with write sets, does it still work????
let nodes = self
.item_table
.data
.replication
- .write_nodes(&range.partition.hash());
+ .read_nodes(&range.partition.hash());
let quorum = self.item_table.data.replication.read_quorum();
let msg = K2VRpc::PollRange {
range,
@@ -300,7 +300,11 @@ impl K2VRpcHandler {
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
let mut requests = nodes
.iter()
- .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
+ .map(|node| {
+ self.system
+ .rpc_helper()
+ .call(&self.endpoint, *node, msg.clone(), rs)
+ })
.collect::<FuturesUnordered<_>>();
// Fetch responses. This procedure stops fetching responses when any of the following
@@ -316,8 +320,9 @@ impl K2VRpcHandler {
// kind: all items produced by that node until time ts have been returned, so we can
// bump the entry in the global vector clock and possibly remove some item-specific
// vector clocks)
- let mut deadline =
- Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let mut deadline = Instant::now()
+ + Duration::from_millis(timeout_msec)
+ + self.system.rpc_helper().rpc_timeout();
let mut resps = vec![];
let mut errors = vec![];
loop {
@@ -339,7 +344,7 @@ impl K2VRpcHandler {
}
if errors.len() > nodes.len() - quorum {
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into());
+ return Err(Error::Quorum(quorum, None, resps.len(), nodes.len(), errors).into());
}
// Take all returned items into account to produce the response.
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index a9762f1b..efb95f08 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -7,48 +7,7 @@ use garage_table::{DeletedFilter, EmptyKey, Entry, TableSchema};
use crate::permission::BucketKeyPerm;
-pub(crate) mod v05 {
- use garage_util::crdt;
- use serde::{Deserialize, Serialize};
-
- /// An api key
- #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
- pub struct Key {
- /// The id of the key (immutable), used as partition key
- pub key_id: String,
-
- /// The secret_key associated
- pub secret_key: String,
-
- /// Name for the key
- pub name: crdt::Lww<String>,
-
- /// Is the key deleted
- pub deleted: crdt::Bool,
-
- /// Buckets in which the key is authorized. Empty if `Key` is deleted
- // CRDT interaction: deleted implies authorized_buckets is empty
- pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
- }
-
- /// Permission given to a key in a bucket
- #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
- pub struct PermissionSet {
- /// The key can be used to read the bucket
- pub allow_read: bool,
- /// The key can be used to write in the bucket
- pub allow_write: bool,
- }
-
- impl crdt::AutoCrdt for PermissionSet {
- const WARN_IF_DIFFERENT: bool = true;
- }
-
- impl garage_util::migrate::InitialFormat for Key {}
-}
-
mod v08 {
- use super::v05;
use crate::permission::BucketKeyPerm;
use garage_util::crdt;
use garage_util::data::Uuid;
@@ -86,32 +45,7 @@ mod v08 {
pub local_aliases: crdt::LwwMap<String, Option<Uuid>>,
}
- impl garage_util::migrate::Migrate for Key {
- type Previous = v05::Key;
-
- fn migrate(old_k: v05::Key) -> Key {
- let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
-
- let state = if old_k.deleted.get() {
- crdt::Deletable::Deleted
- } else {
- // Authorized buckets is ignored here,
- // migration is performed in specific migration code in
- // garage/migrate.rs
- crdt::Deletable::Present(KeyParams {
- secret_key: old_k.secret_key,
- name,
- allow_create_bucket: crdt::Lww::new(false),
- authorized_buckets: crdt::Map::new(),
- local_aliases: crdt::LwwMap::new(),
- })
- };
- Key {
- key_id: old_k.key_id,
- state,
- }
- }
- }
+ impl garage_util::migrate::InitialFormat for Key {}
}
pub use v08::*;
diff --git a/src/model/lib.rs b/src/model/lib.rs
index 8ec338da..1939a7a9 100644
--- a/src/model/lib.rs
+++ b/src/model/lib.rs
@@ -1,9 +1,6 @@
#[macro_use]
extern crate tracing;
-// For migration from previous versions
-pub(crate) mod prev;
-
pub mod permission;
pub mod index_counter;
@@ -18,5 +15,4 @@ pub mod s3;
pub mod garage;
pub mod helper;
-pub mod migrate;
pub mod snapshot;
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
deleted file mode 100644
index 8528382a..00000000
--- a/src/model/migrate.rs
+++ /dev/null
@@ -1,108 +0,0 @@
-use std::sync::Arc;
-
-use garage_util::crdt::*;
-use garage_util::data::*;
-use garage_util::encode::nonversioned_decode;
-use garage_util::error::Error as GarageError;
-use garage_util::time::*;
-
-use crate::prev::v051::bucket_table as old_bucket;
-
-use crate::bucket_alias_table::*;
-use crate::bucket_table::*;
-use crate::garage::Garage;
-use crate::helper::error::*;
-use crate::permission::*;
-
-pub struct Migrate {
- pub garage: Arc<Garage>,
-}
-
-impl Migrate {
- pub async fn migrate_buckets050(&self) -> Result<(), Error> {
- let tree = self
- .garage
- .db
- .open_tree("bucket:table")
- .map_err(GarageError::from)?;
-
- let mut old_buckets = vec![];
- for res in tree.iter().map_err(GarageError::from)? {
- let (_k, v) = res.map_err(GarageError::from)?;
- let bucket =
- nonversioned_decode::<old_bucket::Bucket>(&v[..]).map_err(GarageError::from)?;
- old_buckets.push(bucket);
- }
-
- for bucket in old_buckets {
- if let old_bucket::BucketState::Present(p) = bucket.state.get() {
- self.migrate_buckets050_do_bucket(&bucket, p).await?;
- }
- }
-
- Ok(())
- }
-
- pub async fn migrate_buckets050_do_bucket(
- &self,
- old_bucket: &old_bucket::Bucket,
- old_bucket_p: &old_bucket::BucketParams,
- ) -> Result<(), Error> {
- let bucket_id = blake2sum(old_bucket.name.as_bytes());
-
- let new_name = if is_valid_bucket_name(&old_bucket.name) {
- old_bucket.name.clone()
- } else {
- // if old bucket name was not valid, replace it by
- // a hex-encoded name derived from its identifier
- hex::encode(&bucket_id.as_slice()[..16])
- };
-
- let website = if *old_bucket_p.website.get() {
- Some(WebsiteConfig {
- index_document: "index.html".into(),
- error_document: None,
- })
- } else {
- None
- };
-
- let helper = self.garage.locked_helper().await;
-
- self.garage
- .bucket_table
- .insert(&Bucket {
- id: bucket_id,
- state: Deletable::Present(BucketParams {
- creation_date: now_msec(),
- authorized_keys: Map::new(),
- aliases: LwwMap::new(),
- local_aliases: LwwMap::new(),
- website_config: Lww::new(website),
- cors_config: Lww::new(None),
- lifecycle_config: Lww::new(None),
- quotas: Lww::new(Default::default()),
- }),
- })
- .await?;
-
- helper.set_global_bucket_alias(bucket_id, &new_name).await?;
-
- for (k, ts, perm) in old_bucket_p.authorized_keys.items().iter() {
- helper
- .set_bucket_key_permissions(
- bucket_id,
- k,
- BucketKeyPerm {
- timestamp: *ts,
- allow_read: perm.allow_read,
- allow_write: perm.allow_write,
- allow_owner: false,
- },
- )
- .await?;
- }
-
- Ok(())
- }
-}
diff --git a/src/model/prev/mod.rs b/src/model/prev/mod.rs
deleted file mode 100644
index 68bb1502..00000000
--- a/src/model/prev/mod.rs
+++ /dev/null
@@ -1 +0,0 @@
-pub(crate) mod v051;
diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs
deleted file mode 100644
index 19893458..00000000
--- a/src/model/prev/v051/bucket_table.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-use serde::{Deserialize, Serialize};
-
-use garage_table::crdt::Crdt;
-use garage_table::*;
-
-use crate::key_table::v05::PermissionSet;
-
-/// A bucket is a collection of objects
-///
-/// Its parameters are not directly accessible as:
-/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
-/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Bucket {
- /// Name of the bucket
- pub name: String,
- /// State, and configuration if not deleted, of the bucket
- pub state: crdt::Lww<BucketState>,
-}
-
-/// State of a bucket
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub enum BucketState {
- /// The bucket is deleted
- Deleted,
- /// The bucket exists
- Present(BucketParams),
-}
-
-impl Crdt for BucketState {
- fn merge(&mut self, o: &Self) {
- match o {
- BucketState::Deleted => *self = BucketState::Deleted,
- BucketState::Present(other_params) => {
- if let BucketState::Present(params) = self {
- params.merge(other_params);
- }
- }
- }
- }
-}
-
-/// Configuration for a bucket
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct BucketParams {
- /// Map of key with access to the bucket, and what kind of access they give
- pub authorized_keys: crdt::LwwMap<String, PermissionSet>,
- /// Is the bucket served as http
- pub website: crdt::Lww<bool>,
-}
-
-impl Crdt for BucketParams {
- fn merge(&mut self, o: &Self) {
- self.authorized_keys.merge(&o.authorized_keys);
- self.website.merge(&o.website);
- }
-}
-
-impl Crdt for Bucket {
- fn merge(&mut self, other: &Self) {
- self.state.merge(&other.state);
- }
-}
diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs
deleted file mode 100644
index 8c1335a5..00000000
--- a/src/model/prev/v051/mod.rs
+++ /dev/null
@@ -1 +0,0 @@
-pub(crate) mod bucket_table;
diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs
index 7b023d87..57eb7b16 100644
--- a/src/model/s3/block_ref_table.rs
+++ b/src/model/s3/block_ref_table.rs
@@ -3,8 +3,12 @@ use std::sync::Arc;
use garage_db as db;
use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::migrate::Migrate;
+use garage_block::CalculateRefcount;
use garage_table::crdt::Crdt;
+use garage_table::replication::TableShardedReplication;
use garage_table::*;
use garage_block::manager::*;
@@ -84,3 +88,38 @@ impl TableSchema for BlockRefTable {
filter.apply(entry.deleted.get())
}
}
+
+pub fn block_ref_recount_fn(
+ block_ref_table: &Arc<Table<BlockRefTable, TableShardedReplication>>,
+) -> CalculateRefcount {
+ let table = Arc::downgrade(block_ref_table);
+ Box::new(move |tx: &db::Transaction, block: &Hash| {
+ let table = table
+ .upgrade()
+ .ok_or_message("cannot upgrade weak ptr to block_ref_table")
+ .map_err(db::TxError::Abort)?;
+ Ok(calculate_refcount(&table, tx, block)?)
+ })
+}
+
+fn calculate_refcount(
+ block_ref_table: &Table<BlockRefTable, TableShardedReplication>,
+ tx: &db::Transaction,
+ block: &Hash,
+) -> db::TxResult<usize, Error> {
+ let mut result = 0;
+ for entry in tx.range(&block_ref_table.data.store, block.as_slice()..)? {
+ let (key, value) = entry?;
+ if &key[..32] != block.as_slice() {
+ break;
+ }
+ let value = BlockRef::decode(&value)
+ .ok_or_message("could not decode block_ref")
+ .map_err(db::TxError::Abort)?;
+ assert_eq!(value.block, *block);
+ if !value.deleted.get() {
+ result += 1;
+ }
+ }
+ Ok(result)
+}
diff --git a/src/model/s3/lifecycle_worker.rs b/src/model/s3/lifecycle_worker.rs
index 50d4283f..9ecf168c 100644
--- a/src/model/s3/lifecycle_worker.rs
+++ b/src/model/s3/lifecycle_worker.rs
@@ -121,13 +121,7 @@ impl Worker for LifecycleWorker {
mpu_aborted,
..
} => {
- let n_objects = self
- .garage
- .object_table
- .data
- .store
- .fast_len()
- .unwrap_or(None);
+ let n_objects = self.garage.object_table.data.store.len().ok();
let progress = match n_objects {
None => "...".to_string(),
Some(total) => format!(
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index ebea04bd..eedb9615 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -17,7 +17,7 @@ pub const OBJECTS: &str = "objects";
pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
pub const BYTES: &str = "bytes";
-mod v05 {
+mod v08 {
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
@@ -26,7 +26,7 @@ mod v05 {
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
/// The bucket in which the object is stored, used as partition key
- pub bucket: String,
+ pub bucket_id: Uuid,
/// The key at which the object is stored in its bucket, used as sorting key
pub key: String,
@@ -92,16 +92,13 @@ mod v05 {
impl garage_util::migrate::InitialFormat for Object {}
}
-mod v08 {
+mod v09 {
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
- use super::v05;
+ use super::v08;
- pub use v05::{
- ObjectVersion, ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta,
- ObjectVersionState,
- };
+ pub use v08::{ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta};
/// An object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
@@ -116,28 +113,69 @@ mod v08 {
pub(super) versions: Vec<ObjectVersion>,
}
+ /// Informations about a version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersion {
+ /// Id of the version
+ pub uuid: Uuid,
+ /// Timestamp of when the object was created
+ pub timestamp: u64,
+ /// State of the version
+ pub state: ObjectVersionState,
+ }
+
+ /// State of an object version
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionState {
+ /// The version is being received
+ Uploading {
+ /// Indicates whether this is a multipart upload
+ multipart: bool,
+ /// Headers to be included in the final object
+ headers: ObjectVersionHeaders,
+ },
+ /// The version is fully received
+ Complete(ObjectVersionData),
+ /// The version uploaded containded errors or the upload was explicitly aborted
+ Aborted,
+ }
+
impl garage_util::migrate::Migrate for Object {
- type Previous = v05::Object;
+ const VERSION_MARKER: &'static [u8] = b"G09s3o";
- fn migrate(old: v05::Object) -> Object {
- use garage_util::data::blake2sum;
+ type Previous = v08::Object;
+ fn migrate(old: v08::Object) -> Object {
+ let versions = old
+ .versions
+ .into_iter()
+ .map(|x| ObjectVersion {
+ uuid: x.uuid,
+ timestamp: x.timestamp,
+ state: match x.state {
+ v08::ObjectVersionState::Uploading(h) => ObjectVersionState::Uploading {
+ multipart: false,
+ headers: h,
+ },
+ v08::ObjectVersionState::Complete(d) => ObjectVersionState::Complete(d),
+ v08::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
+ },
+ })
+ .collect();
Object {
- bucket_id: blake2sum(old.bucket.as_bytes()),
+ bucket_id: old.bucket_id,
key: old.key,
- versions: old.versions,
+ versions,
}
}
}
}
-mod v09 {
- use garage_util::data::Uuid;
+mod v010 {
+ use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
- use super::v08;
-
- pub use v08::{ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta};
+ use super::v09;
/// An object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
@@ -170,8 +208,8 @@ mod v09 {
Uploading {
/// Indicates whether this is a multipart upload
multipart: bool,
- /// Headers to be included in the final object
- headers: ObjectVersionHeaders,
+ /// Encryption params + headers to be included in the final object
+ encryption: ObjectVersionEncryption,
},
/// The version is fully received
Complete(ObjectVersionData),
@@ -179,38 +217,133 @@ mod v09 {
Aborted,
}
+ /// Data stored in object version
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionData {
+ /// The object was deleted, this Version is a tombstone to mark it as such
+ DeleteMarker,
+ /// The object is short, it's stored inlined.
+ /// It is never compressed. For encrypted objects, it is encrypted using
+ /// AES256-GCM, like the encrypted headers.
+ Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
+ /// The object is not short, Hash of first block is stored here, next segments hashes are
+ /// stored in the version table
+ FirstBlock(ObjectVersionMeta, Hash),
+ }
+
+ /// Metadata about the object version
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersionMeta {
+ /// Size of the object. If object is encrypted/compressed,
+ /// this is always the size of the unencrypted/uncompressed data
+ pub size: u64,
+ /// etag of the object
+ pub etag: String,
+ /// Encryption params + headers (encrypted or plaintext)
+ pub encryption: ObjectVersionEncryption,
+ }
+
+ /// Encryption information + metadata
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionEncryption {
+ SseC {
+ /// Encrypted serialized ObjectVersionHeaders struct.
+ /// This is never compressed, just encrypted using AES256-GCM.
+ #[serde(with = "serde_bytes")]
+ headers: Vec<u8>,
+ /// Whether data blocks are compressed in addition to being encrypted
+ /// (compression happens before encryption, whereas for non-encrypted
+ /// objects, compression is handled at the level of the block manager)
+ compressed: bool,
+ },
+ Plaintext {
+ /// Plain-text headers
+ headers: ObjectVersionHeaders,
+ },
+ }
+
+ /// Vector of headers, as tuples of the format (header name, header value)
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersionHeaders(pub Vec<(String, String)>);
+
impl garage_util::migrate::Migrate for Object {
- const VERSION_MARKER: &'static [u8] = b"G09s3o";
+ const VERSION_MARKER: &'static [u8] = b"G010s3ob";
- type Previous = v08::Object;
+ type Previous = v09::Object;
- fn migrate(old: v08::Object) -> Object {
- let versions = old
- .versions
- .into_iter()
- .map(|x| ObjectVersion {
- uuid: x.uuid,
- timestamp: x.timestamp,
- state: match x.state {
- v08::ObjectVersionState::Uploading(h) => ObjectVersionState::Uploading {
- multipart: false,
- headers: h,
- },
- v08::ObjectVersionState::Complete(d) => ObjectVersionState::Complete(d),
- v08::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
- },
- })
- .collect();
+ fn migrate(old: v09::Object) -> Object {
Object {
bucket_id: old.bucket_id,
key: old.key,
- versions,
+ versions: old.versions.into_iter().map(migrate_version).collect(),
}
}
}
+
+ fn migrate_version(old: v09::ObjectVersion) -> ObjectVersion {
+ ObjectVersion {
+ uuid: old.uuid,
+ timestamp: old.timestamp,
+ state: match old.state {
+ v09::ObjectVersionState::Uploading { multipart, headers } => {
+ ObjectVersionState::Uploading {
+ multipart,
+ encryption: migrate_headers(headers),
+ }
+ }
+ v09::ObjectVersionState::Complete(d) => {
+ ObjectVersionState::Complete(migrate_data(d))
+ }
+ v09::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
+ },
+ }
+ }
+
+ fn migrate_data(old: v09::ObjectVersionData) -> ObjectVersionData {
+ match old {
+ v09::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker,
+ v09::ObjectVersionData::Inline(meta, data) => {
+ ObjectVersionData::Inline(migrate_meta(meta), data)
+ }
+ v09::ObjectVersionData::FirstBlock(meta, fb) => {
+ ObjectVersionData::FirstBlock(migrate_meta(meta), fb)
+ }
+ }
+ }
+
+ fn migrate_meta(old: v09::ObjectVersionMeta) -> ObjectVersionMeta {
+ ObjectVersionMeta {
+ size: old.size,
+ etag: old.etag,
+ encryption: migrate_headers(old.headers),
+ }
+ }
+
+ fn migrate_headers(old: v09::ObjectVersionHeaders) -> ObjectVersionEncryption {
+ use http::header::CONTENT_TYPE;
+
+ let mut new_headers = Vec::with_capacity(old.other.len() + 1);
+ if old.content_type != "blob" {
+ new_headers.push((CONTENT_TYPE.as_str().to_string(), old.content_type));
+ }
+ for (name, value) in old.other.into_iter() {
+ new_headers.push((name, value));
+ }
+
+ ObjectVersionEncryption::Plaintext {
+ headers: ObjectVersionHeaders(new_headers),
+ }
+ }
+
+ // Since ObjectVersionHeaders can now be serialized independently, for the
+ // purpose of being encrypted, we need it to support migrations on its own
+ // as well.
+ impl garage_util::migrate::InitialFormat for ObjectVersionHeaders {
+ const VERSION_MARKER: &'static [u8] = b"G010s3oh";
+ }
}
-pub use v09::*;
+pub use v010::*;
impl Object {
/// Initialize an Object struct from parts
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index 5c032f9f..d611a9e3 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -11,7 +11,7 @@ use garage_table::*;
use crate::s3::block_ref_table::*;
-mod v05 {
+mod v08 {
use garage_util::crdt;
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
@@ -35,7 +35,7 @@ mod v05 {
// Back link to bucket+key so that we can figure if
// this was deleted later on
/// Bucket in which the related object is stored
- pub bucket: String,
+ pub bucket_id: Uuid,
/// Key in which the related object is stored
pub key: String,
}
@@ -44,7 +44,8 @@ mod v05 {
pub struct VersionBlockKey {
/// Number of the part
pub part_number: u64,
- /// Offset of this sub-segment in its part
+ /// Offset of this sub-segment in its part as sent by the client
+ /// (before any kind of compression or encryption)
pub offset: u64,
}
@@ -53,64 +54,13 @@ mod v05 {
pub struct VersionBlock {
/// Blake2 sum of the block
pub hash: Hash,
- /// Size of the block
+ /// Size of the block, before any kind of compression or encryption
pub size: u64,
}
impl garage_util::migrate::InitialFormat for Version {}
}
-mod v08 {
- use garage_util::crdt;
- use garage_util::data::Uuid;
- use serde::{Deserialize, Serialize};
-
- use super::v05;
-
- pub use v05::{VersionBlock, VersionBlockKey};
-
- /// A version of an object
- #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
- pub struct Version {
- /// UUID of the version, used as partition key
- pub uuid: Uuid,
-
- // Actual data: the blocks for this version
- // In the case of a multipart upload, also store the etags
- // of individual parts and check them when doing CompleteMultipartUpload
- /// Is this version deleted
- pub deleted: crdt::Bool,
- /// list of blocks of data composing the version
- pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
- /// Etag of each part in case of a multipart upload, empty otherwise
- pub parts_etags: crdt::Map<u64, String>,
-
- // Back link to bucket+key so that we can figure if
- // this was deleted later on
- /// Bucket in which the related object is stored
- pub bucket_id: Uuid,
- /// Key in which the related object is stored
- pub key: String,
- }
-
- impl garage_util::migrate::Migrate for Version {
- type Previous = v05::Version;
-
- fn migrate(old: v05::Version) -> Version {
- use garage_util::data::blake2sum;
-
- Version {
- uuid: old.uuid,
- deleted: old.deleted,
- blocks: old.blocks,
- parts_etags: old.parts_etags,
- bucket_id: blake2sum(old.bucket.as_bytes()),
- key: old.key,
- }
- }
- }
-}
-
pub(crate) mod v09 {
use garage_util::crdt;
use garage_util::data::Uuid;