diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/Cargo.toml | 14 | ||||
-rw-r--r-- | src/model/bucket_alias_table.rs | 24 | ||||
-rw-r--r-- | src/model/bucket_table.rs | 133 | ||||
-rw-r--r-- | src/model/garage.rs | 41 | ||||
-rw-r--r-- | src/model/index_counter.rs | 232 | ||||
-rw-r--r-- | src/model/k2v/item_table.rs | 54 | ||||
-rw-r--r-- | src/model/k2v/rpc.rs | 7 | ||||
-rw-r--r-- | src/model/key_table.rs | 154 | ||||
-rw-r--r-- | src/model/migrate.rs | 5 | ||||
-rw-r--r-- | src/model/prev/v051/bucket_table.rs | 2 | ||||
-rw-r--r-- | src/model/prev/v051/key_table.rs | 50 | ||||
-rw-r--r-- | src/model/prev/v051/mod.rs | 3 | ||||
-rw-r--r-- | src/model/prev/v051/object_table.rs | 149 | ||||
-rw-r--r-- | src/model/prev/v051/version_table.rs | 79 | ||||
-rw-r--r-- | src/model/s3/block_ref_table.rs | 29 | ||||
-rw-r--r-- | src/model/s3/object_table.rs | 296 | ||||
-rw-r--r-- | src/model/s3/version_table.rs | 219 |
17 files changed, 598 insertions, 893 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 2c2e2bfe..323c2d64 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_model" -version = "0.8.0" +version = "0.8.1" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,11 +14,11 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_db = { version = "0.8.0", path = "../db" } -garage_rpc = { version = "0.8.0", path = "../rpc" } -garage_table = { version = "0.8.0", path = "../table" } -garage_block = { version = "0.8.0", path = "../block" } -garage_util = { version = "0.8.0", path = "../util" } +garage_db = { version = "0.8.1", default-features = false, path = "../db" } +garage_rpc = { version = "0.8.1", path = "../rpc" } +garage_table = { version = "0.8.1", path = "../table" } +garage_block = { version = "0.8.1", path = "../block" } +garage_util = { version = "0.8.1", path = "../util" } async-trait = "0.1.7" arc-swap = "1.0" @@ -30,7 +30,6 @@ tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } -rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" @@ -42,6 +41,7 @@ opentelemetry = "0.17" netapp = "0.5" [features] +default = [ "sled" ] k2v = [ "garage_util/k2v" ] lmdb = [ "garage_db/lmdb" ] sled = [ "garage_db/sled" ] diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs index fcd1536e..54d7fbad 100644 --- a/src/model/bucket_alias_table.rs +++ b/src/model/bucket_alias_table.rs @@ -1,18 +1,26 @@ -use serde::{Deserialize, Serialize}; - use garage_util::data::*; use garage_table::crdt::*; use garage_table::*; -/// The bucket alias table holds the names given to buckets -/// in the global namespace. -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct BucketAlias { - name: String, - pub state: crdt::Lww<Option<Uuid>>, +mod v08 { + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + /// The bucket alias table holds the names given to buckets + /// in the global namespace. + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct BucketAlias { + pub(super) name: String, + pub state: crdt::Lww<Option<Uuid>>, + } + + impl garage_util::migrate::InitialFormat for BucketAlias {} } +pub use v08::*; + impl BucketAlias { pub fn new(name: String, ts: u64, bucket_id: Option<Uuid>) -> Option<Self> { if !is_valid_bucket_name(&name) { diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 7be42702..ac163736 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,5 +1,3 @@ -use serde::{Deserialize, Serialize}; - use garage_table::crdt::*; use garage_table::*; use garage_util::data::*; @@ -7,72 +5,83 @@ use garage_util::time::*; use crate::permission::BucketKeyPerm; -/// A bucket is a collection of objects -/// -/// Its parameters are not directly accessible as: -/// - It must be possible to merge paramaters, hence the use of a LWW CRDT. -/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Bucket { - /// ID of the bucket - pub id: Uuid, - /// State, and configuration if not deleted, of the bucket - pub state: crdt::Deletable<BucketParams>, -} +mod v08 { + use crate::permission::BucketKeyPerm; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + /// A bucket is a collection of objects + /// + /// Its parameters are not directly accessible as: + /// - It must be possible to merge paramaters, hence the use of a LWW CRDT. + /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Bucket { + /// ID of the bucket + pub id: Uuid, + /// State, and configuration if not deleted, of the bucket + pub state: crdt::Deletable<BucketParams>, + } -/// Configuration for a bucket -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct BucketParams { - /// Bucket's creation date - pub creation_date: u64, - /// Map of key with access to the bucket, and what kind of access they give - pub authorized_keys: crdt::Map<String, BucketKeyPerm>, - - /// Map of aliases that are or have been given to this bucket - /// in the global namespace - /// (not authoritative: this is just used as an indication to - /// map back to aliases when doing ListBuckets) - pub aliases: crdt::LwwMap<String, bool>, - /// Map of aliases that are or have been given to this bucket - /// in namespaces local to keys - /// key = (access key id, alias name) - pub local_aliases: crdt::LwwMap<(String, String), bool>, - - /// Whether this bucket is allowed for website access - /// (under all of its global alias names), - /// and if so, the website configuration XML document - pub website_config: crdt::Lww<Option<WebsiteConfig>>, - /// CORS rules - pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>, - /// Bucket quotas - #[serde(default)] - pub quotas: crdt::Lww<BucketQuotas>, -} + /// Configuration for a bucket + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct BucketParams { + /// Bucket's creation date + pub creation_date: u64, + /// Map of key with access to the bucket, and what kind of access they give + pub authorized_keys: crdt::Map<String, BucketKeyPerm>, + + /// Map of aliases that are or have been given to this bucket + /// in the global namespace + /// (not authoritative: this is just used as an indication to + /// map back to aliases when doing ListBuckets) + pub aliases: crdt::LwwMap<String, bool>, + /// Map of aliases that are or have been given to this bucket + /// in namespaces local to keys + /// key = (access key id, alias name) + pub local_aliases: crdt::LwwMap<(String, String), bool>, + + /// Whether this bucket is allowed for website access + /// (under all of its global alias names), + /// and if so, the website configuration XML document + pub website_config: crdt::Lww<Option<WebsiteConfig>>, + /// CORS rules + pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>, + /// Bucket quotas + #[serde(default)] + pub quotas: crdt::Lww<BucketQuotas>, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct WebsiteConfig { - pub index_document: String, - pub error_document: Option<String>, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct WebsiteConfig { + pub index_document: String, + pub error_document: Option<String>, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct CorsRule { - pub id: Option<String>, - pub max_age_seconds: Option<u64>, - pub allow_origins: Vec<String>, - pub allow_methods: Vec<String>, - pub allow_headers: Vec<String>, - pub expose_headers: Vec<String>, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct CorsRule { + pub id: Option<String>, + pub max_age_seconds: Option<u64>, + pub allow_origins: Vec<String>, + pub allow_methods: Vec<String>, + pub allow_headers: Vec<String>, + pub expose_headers: Vec<String>, + } -#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct BucketQuotas { - /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket) - pub max_size: Option<u64>, - /// Maximum number of non-deleted objects in the bucket - pub max_objects: Option<u64>, + #[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct BucketQuotas { + /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket) + pub max_size: Option<u64>, + /// Maximum number of non-deleted objects in the bucket + pub max_objects: Option<u64>, + } + + impl garage_util::migrate::InitialFormat for Bucket {} } +pub use v08::*; + impl AutoCrdt for BucketQuotas { const WARN_IF_DIFFERENT: bool = true; } diff --git a/src/model/garage.rs b/src/model/garage.rs index 75012952..5bea6b4f 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -8,10 +8,10 @@ use garage_util::background::*; use garage_util::config::*; use garage_util::error::*; +use garage_rpc::replication_mode::ReplicationMode; use garage_rpc::system::System; use garage_block::manager::*; -use garage_table::replication::ReplicationMode; use garage_table::replication::TableFullReplication; use garage_table::replication::TableShardedReplication; use garage_table::*; @@ -34,10 +34,11 @@ pub struct Garage { /// The parsed configuration Garage is running pub config: Config, + /// The replication mode of this cluster + pub replication_mode: ReplicationMode, + /// The local database pub db: db::Db, - /// A background job runner - pub background: Arc<BackgroundRunner>, /// The membership manager pub system: Arc<System>, /// The block manager @@ -75,7 +76,7 @@ pub struct GarageK2V { impl Garage { /// Create and run garage - pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> { + pub fn new(config: Config) -> Result<Arc<Self>, Error> { // Create meta dir and data dir if they don't exist already std::fs::create_dir_all(&config.metadata_dir) .ok_or_message("Unable to create Garage metadata directory")?; @@ -164,12 +165,7 @@ impl Garage { .expect("Invalid replication_mode in config file."); info!("Initialize membership management system..."); - let system = System::new( - network_key, - background.clone(), - replication_mode.replication_factor(), - &config, - )?; + let system = System::new(network_key, replication_mode, &config)?; let data_rep_param = TableShardedReplication { system: system.clone(), @@ -227,7 +223,6 @@ impl Garage { info!("Initialize version_table..."); let version_table = Table::new( VersionTable { - background: background.clone(), block_ref_table: block_ref_table.clone(), }, meta_rep_param.clone(), @@ -242,7 +237,6 @@ impl Garage { #[allow(clippy::redundant_clone)] let object_table = Table::new( ObjectTable { - background: background.clone(), version_table: version_table.clone(), object_counter_table: object_counter_table.clone(), }, @@ -258,8 +252,8 @@ impl Garage { // -- done -- Ok(Arc::new(Self { config, + replication_mode, db, - background, system, block_manager, bucket_table, @@ -274,6 +268,22 @@ impl Garage { })) } + pub fn spawn_workers(&self, bg: &BackgroundRunner) { + self.block_manager.spawn_workers(bg); + + self.bucket_table.spawn_workers(bg); + self.bucket_alias_table.spawn_workers(bg); + self.key_table.spawn_workers(bg); + + self.object_table.spawn_workers(bg); + self.object_counter_table.spawn_workers(bg); + self.version_table.spawn_workers(bg); + self.block_ref_table.spawn_workers(bg); + + #[cfg(feature = "k2v")] + self.k2v.spawn_workers(bg); + } + pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { helper::bucket::BucketHelper(self) } @@ -308,4 +318,9 @@ impl GarageK2V { rpc, } } + + pub fn spawn_workers(&self, bg: &BackgroundRunner) { + self.item_table.spawn_workers(bg); + self.counter_table.spawn_workers(bg); + } } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index e6394f0c..35d6596d 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -1,19 +1,18 @@ use core::ops::Bound; -use std::collections::{hash_map, BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; -use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use tokio::sync::{mpsc, watch}; use garage_db as db; use garage_rpc::ring::Ring; use garage_rpc::system::System; -use garage_util::background::*; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; +use garage_util::migrate::Migrate; use garage_util::time::*; use garage_table::crdt::*; @@ -31,14 +30,44 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { fn counts(&self) -> Vec<(&'static str, i64)>; } -/// A counter entry in the global table -#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] -pub struct CounterEntry<T: CountedItem> { - pub pk: T::CP, - pub sk: T::CS, - pub values: BTreeMap<String, CounterValue>, +mod v08 { + use super::CountedItem; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + + // ---- Global part (the table everyone queries) ---- + + /// A counter entry in the global table + #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] + pub struct CounterEntry<T: CountedItem> { + pub pk: T::CP, + pub sk: T::CS, + pub values: BTreeMap<String, CounterValue>, + } + + /// A counter entry in the global table + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct CounterValue { + pub node_values: BTreeMap<Uuid, (u64, i64)>, + } + + impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {} + + // ---- Local part (the counter we maintain transactionnaly on each node) ---- + + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] + pub(super) struct LocalCounterEntry<T: CountedItem> { + pub(super) pk: T::CP, + pub(super) sk: T::CS, + pub(super) values: BTreeMap<String, (u64, i64)>, + } + + impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {} } +pub use v08::*; + impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { fn partition_key(&self) -> &T::CP { &self.pk @@ -80,12 +109,6 @@ impl<T: CountedItem> CounterEntry<T> { } } -/// A counter entry in the global table -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct CounterValue { - pub node_values: BTreeMap<Uuid, (u64, i64)>, -} - impl<T: CountedItem> Crdt for CounterEntry<T> { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { @@ -142,7 +165,6 @@ impl<T: CountedItem> TableSchema for CounterTable<T> { pub struct IndexCounter<T: CountedItem> { this_node: Uuid, local_counter: db::Tree, - propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } @@ -152,16 +174,11 @@ impl<T: CountedItem> IndexCounter<T> { replication: TableShardedReplication, db: &db::Db, ) -> Arc<Self> { - let background = system.background.clone(); - - let (propagate_tx, propagate_rx) = mpsc::unbounded_channel(); - - let this = Arc::new(Self { + Arc::new(Self { this_node: system.id, local_counter: db .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME)) .expect("Unable to open local counter tree"), - propagate_tx, table: Table::new( CounterTable { _phantom_t: Default::default(), @@ -170,16 +187,11 @@ impl<T: CountedItem> IndexCounter<T> { system, db, ), - }); - - background.spawn_worker(IndexPropagatorWorker { - index_counter: this.clone(), - propagate_rx, - buf: HashMap::new(), - errors: 0, - }); + }) + } - this + pub fn spawn_workers(&self, bg: &BackgroundRunner) { + self.table.spawn_workers(bg); } pub fn count( @@ -208,11 +220,9 @@ impl<T: CountedItem> IndexCounter<T> { let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => { - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)? - } + Some(old_bytes) => LocalCounterEntry::<T>::decode(&old_bytes) + .ok_or_message("Cannot decode local counter entry") + .map_err(db::TxError::Abort)?, None => LocalCounterEntry { pk: pk.clone(), sk: sk.clone(), @@ -227,17 +237,14 @@ impl<T: CountedItem> IndexCounter<T> { ent.1 += *inc; } - let new_entry_bytes = rmp_to_vec_all_named(&entry) + let new_entry_bytes = entry + .encode() .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; - if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) { - error!( - "Could not propagate updated counter values, failed to send to channel: {}", - e - ); - } + let dist_entry = entry.into_counter_entry(self.this_node); + self.table.queue_insert(tx, &dist_entry)?; Ok(()) } @@ -250,23 +257,6 @@ impl<T: CountedItem> IndexCounter<T> { TS: TableSchema<E = T>, TR: TableReplication, { - let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> { - let entry_k = self - .table - .data - .tree_key(entry.partition_key(), entry.sort_key()); - self.table - .data - .update_entry_with(&entry_k, |ent| match ent { - Some(mut ent) => { - ent.merge(&entry); - ent - } - None => entry.clone(), - })?; - Ok(()) - }; - // 1. Set all old local counters to zero let now = now_msec(); let mut next_start: Option<Vec<u8>> = None; @@ -289,20 +279,22 @@ impl<T: CountedItem> IndexCounter<T> { info!("zeroing old counters... ({})", hex::encode(&batch[0].0)); for (local_counter_k, local_counter) in batch { - let mut local_counter = - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?; + let mut local_counter = LocalCounterEntry::<T>::decode(&local_counter) + .ok_or_message("Cannot decode local counter entry")?; for (_, tv) in local_counter.values.iter_mut() { tv.0 = std::cmp::max(tv.0 + 1, now); tv.1 = 0; } - let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + let local_counter_bytes = local_counter.encode()?; self.local_counter .insert(&local_counter_k, &local_counter_bytes)?; let counter_entry = local_counter.into_counter_entry(self.this_node); - save_counter_entry(counter_entry)?; + self.local_counter + .db() + .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; next_start = Some(local_counter_k); } @@ -343,9 +335,8 @@ impl<T: CountedItem> IndexCounter<T> { let local_counter_key = self.table.data.tree_key(pk, sk); let mut local_counter = match self.local_counter.get(&local_counter_key)? { Some(old_bytes) => { - let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>( - &old_bytes, - )?; + let ent = LocalCounterEntry::<T>::decode(&old_bytes) + .ok_or_message("Cannot decode local counter entry")?; assert!(ent.pk == *pk); assert!(ent.sk == *sk); ent @@ -362,12 +353,14 @@ impl<T: CountedItem> IndexCounter<T> { tv.1 += v; } - let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + let local_counter_bytes = local_counter.encode()?; self.local_counter .insert(&local_counter_key, local_counter_bytes)?; let counter_entry = local_counter.into_counter_entry(self.this_node); - save_counter_entry(counter_entry)?; + self.local_counter + .db() + .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; next_start = Some(counted_entry_k); } @@ -378,104 +371,7 @@ impl<T: CountedItem> IndexCounter<T> { } } -struct IndexPropagatorWorker<T: CountedItem> { - index_counter: Arc<IndexCounter<T>>, - propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>, - - buf: HashMap<Vec<u8>, CounterEntry<T>>, - errors: usize, -} - -impl<T: CountedItem> IndexPropagatorWorker<T> { - fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) { - let tree_key = self.index_counter.table.data.tree_key(&pk, &sk); - let dist_entry = counters.into_counter_entry(self.index_counter.this_node); - match self.buf.entry(tree_key) { - hash_map::Entry::Vacant(e) => { - e.insert(dist_entry); - } - hash_map::Entry::Occupied(mut e) => { - e.get_mut().merge(&dist_entry); - } - } - } -} - -#[async_trait] -impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { - fn name(&self) -> String { - format!("{} index counter propagator", T::COUNTER_TABLE_NAME) - } - - fn info(&self) -> Option<String> { - if !self.buf.is_empty() { - Some(format!("{} items in queue", self.buf.len())) - } else { - None - } - } - - async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - // This loop batches updates to counters to be sent all at once. - // They are sent once the propagate_rx channel has been emptied (or is closed). - let closed = loop { - match self.propagate_rx.try_recv() { - Ok((pk, sk, counters)) => { - self.add_ent(pk, sk, counters); - } - Err(mpsc::error::TryRecvError::Empty) => break false, - Err(mpsc::error::TryRecvError::Disconnected) => break true, - } - }; - - if !self.buf.is_empty() { - let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>(); - let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap()); - if let Err(e) = self.index_counter.table.insert_many(entries).await { - self.errors += 1; - if self.errors >= 2 && *must_exit.borrow() { - error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); - return Ok(WorkerState::Done); - } - // Propagate error up to worker manager, it will log it, increment a counter, - // and sleep for a certain delay (with exponential backoff), waiting for - // things to go back to normal - return Err(e); - } else { - for k in entries_k { - self.buf.remove(&k); - } - self.errors = 0; - } - - return Ok(WorkerState::Busy); - } else if closed { - return Ok(WorkerState::Done); - } else { - return Ok(WorkerState::Idle); - } - } - - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { - match self.propagate_rx.recv().await { - Some((pk, sk, counters)) => { - self.add_ent(pk, sk, counters); - WorkerState::Busy - } - None => match self.buf.is_empty() { - false => WorkerState::Busy, - true => WorkerState::Done, - }, - } - } -} - -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -struct LocalCounterEntry<T: CountedItem> { - pk: T::CP, - sk: T::CS, - values: BTreeMap<String, (u64, i64)>, -} +// ---- impl<T: CountedItem> LocalCounterEntry<T> { fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> { diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 7860cb17..ce3e4129 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -1,7 +1,8 @@ -use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use serde::{Deserialize, Serialize}; + use garage_db as db; use garage_util::data::*; @@ -17,32 +18,43 @@ pub const CONFLICTS: &str = "conflicts"; pub const VALUES: &str = "values"; pub const BYTES: &str = "bytes"; -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct K2VItem { - pub partition: K2VItemPartition, - pub sort_key: String, +mod v08 { + use crate::k2v::causality::K2VNodeId; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; - items: BTreeMap<K2VNodeId, DvvsEntry>, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct K2VItem { + pub partition: K2VItemPartition, + pub sort_key: String, -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] -pub struct K2VItemPartition { - pub bucket_id: Uuid, - pub partition_key: String, -} + pub(super) items: BTreeMap<K2VNodeId, DvvsEntry>, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -struct DvvsEntry { - t_discard: u64, - values: Vec<(u64, DvvsValue)>, -} + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] + pub struct K2VItemPartition { + pub bucket_id: Uuid, + pub partition_key: String, + } + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct DvvsEntry { + pub(super) t_discard: u64, + pub(super) values: Vec<(u64, DvvsValue)>, + } -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum DvvsValue { - Value(#[serde(with = "serde_bytes")] Vec<u8>), - Deleted, + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum DvvsValue { + Value(#[serde(with = "serde_bytes")] Vec<u8>), + Deleted, + } + + impl garage_util::migrate::InitialFormat for K2VItem {} } +pub use v08::*; + impl K2VItem { /// Creates a new K2VItem when no previous entry existed in the db pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self { diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index a74df277..f64a7984 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -273,14 +273,9 @@ impl K2VRpcHandler { } fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> { - let tree_key = self - .item_table - .data - .tree_key(&item.partition, &item.sort_key); - self.item_table .data - .update_entry_with(&tree_key[..], |ent| { + .update_entry_with(&item.partition, &item.sort_key, |ent| { let mut ent = ent.unwrap_or_else(|| { K2VItem::new( item.partition.bucket_id, diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 9d2fc783..bb5334a3 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,45 +1,121 @@ use serde::{Deserialize, Serialize}; -use garage_table::crdt::*; -use garage_table::*; +use garage_util::crdt::{self, Crdt}; use garage_util::data::*; +use garage_table::{DeletedFilter, EmptyKey, Entry, TableSchema}; + use crate::permission::BucketKeyPerm; -use crate::prev::v051::key_table as old; +pub(crate) mod v05 { + use garage_util::crdt; + use serde::{Deserialize, Serialize}; -/// An api key -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Key { - /// The id of the key (immutable), used as partition key - pub key_id: String, + /// An api key + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Key { + /// The id of the key (immutable), used as partition key + pub key_id: String, - /// Internal state of the key - pub state: crdt::Deletable<KeyParams>, -} + /// The secret_key associated + pub secret_key: String, -/// Configuration for a key -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct KeyParams { - /// The secret_key associated (immutable) - pub secret_key: String, + /// Name for the key + pub name: crdt::Lww<String>, - /// Name for the key - pub name: crdt::Lww<String>, + /// Is the key deleted + pub deleted: crdt::Bool, + + /// Buckets in which the key is authorized. Empty if `Key` is deleted + // CRDT interaction: deleted implies authorized_buckets is empty + pub authorized_buckets: crdt::LwwMap<String, PermissionSet>, + } - /// Flag to allow users having this key to create buckets - pub allow_create_bucket: crdt::Lww<bool>, + /// Permission given to a key in a bucket + #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct PermissionSet { + /// The key can be used to read the bucket + pub allow_read: bool, + /// The key can be used to write in the bucket + pub allow_write: bool, + } + + impl crdt::AutoCrdt for PermissionSet { + const WARN_IF_DIFFERENT: bool = true; + } - /// If the key is present: it gives some permissions, - /// a map of bucket IDs (uuids) to permissions. - /// Otherwise no permissions are granted to key - pub authorized_buckets: crdt::Map<Uuid, BucketKeyPerm>, + impl garage_util::migrate::InitialFormat for Key {} +} - /// A key can have a local view of buckets names it is - /// the only one to see, this is the namespace for these aliases - pub local_aliases: crdt::LwwMap<String, Option<Uuid>>, +mod v08 { + use super::v05; + use crate::permission::BucketKeyPerm; + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + /// An api key + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Key { + /// The id of the key (immutable), used as partition key + pub key_id: String, + + /// Internal state of the key + pub state: crdt::Deletable<KeyParams>, + } + + /// Configuration for a key + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct KeyParams { + /// The secret_key associated (immutable) + pub secret_key: String, + + /// Name for the key + pub name: crdt::Lww<String>, + + /// Flag to allow users having this key to create buckets + pub allow_create_bucket: crdt::Lww<bool>, + + /// If the key is present: it gives some permissions, + /// a map of bucket IDs (uuids) to permissions. + /// Otherwise no permissions are granted to key + pub authorized_buckets: crdt::Map<Uuid, BucketKeyPerm>, + + /// A key can have a local view of buckets names it is + /// the only one to see, this is the namespace for these aliases + pub local_aliases: crdt::LwwMap<String, Option<Uuid>>, + } + + impl garage_util::migrate::Migrate for Key { + type Previous = v05::Key; + + fn migrate(old_k: v05::Key) -> Key { + let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone()); + + let state = if old_k.deleted.get() { + crdt::Deletable::Deleted + } else { + // Authorized buckets is ignored here, + // migration is performed in specific migration code in + // garage/migrate.rs + crdt::Deletable::Present(KeyParams { + secret_key: old_k.secret_key, + name, + allow_create_bucket: crdt::Lww::new(false), + authorized_buckets: crdt::Map::new(), + local_aliases: crdt::LwwMap::new(), + }) + }; + Key { + key_id: old_k.key_id, + state, + } + } + } } +pub use v08::*; + impl KeyParams { fn new(secret_key: &str, name: &str) -> Self { KeyParams { @@ -173,28 +249,4 @@ impl TableSchema for KeyTable { } } } - - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old_k = rmp_serde::decode::from_read_ref::<_, old::Key>(bytes).ok()?; - let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone()); - - let state = if old_k.deleted.get() { - crdt::Deletable::Deleted - } else { - // Authorized buckets is ignored here, - // migration is performed in specific migration code in - // garage/migrate.rs - crdt::Deletable::Present(KeyParams { - secret_key: old_k.secret_key, - name, - allow_create_bucket: crdt::Lww::new(false), - authorized_buckets: crdt::Map::new(), - local_aliases: crdt::LwwMap::new(), - }) - }; - Some(Key { - key_id: old_k.key_id, - state, - }) - } } diff --git a/src/model/migrate.rs b/src/model/migrate.rs index cd6ad26a..6b4c3eed 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use garage_util::crdt::*; use garage_util::data::*; +use garage_util::encode::nonversioned_decode; use garage_util::error::Error as GarageError; use garage_util::time::*; @@ -28,8 +29,8 @@ impl Migrate { let mut old_buckets = vec![]; for res in tree.iter().map_err(GarageError::from)? { let (_k, v) = res.map_err(GarageError::from)?; - let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) - .map_err(GarageError::from)?; + let bucket = + nonversioned_decode::<old_bucket::Bucket>(&v[..]).map_err(GarageError::from)?; old_buckets.push(bucket); } diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs index 628a49dd..19893458 100644 --- a/src/model/prev/v051/bucket_table.rs +++ b/src/model/prev/v051/bucket_table.rs @@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize}; use garage_table::crdt::Crdt; use garage_table::*; -use super::key_table::PermissionSet; +use crate::key_table::v05::PermissionSet; /// A bucket is a collection of objects /// diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs deleted file mode 100644 index 37516b1c..00000000 --- a/src/model/prev/v051/key_table.rs +++ /dev/null @@ -1,50 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use garage_table::crdt::*; -use garage_table::*; - -/// An api key -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Key { - /// The id of the key (immutable), used as partition key - pub key_id: String, - - /// The secret_key associated - pub secret_key: String, - - /// Name for the key - pub name: crdt::Lww<String>, - - /// Is the key deleted - pub deleted: crdt::Bool, - - /// Buckets in which the key is authorized. Empty if `Key` is deleted - // CRDT interaction: deleted implies authorized_buckets is empty - pub authorized_buckets: crdt::LwwMap<String, PermissionSet>, -} - -/// Permission given to a key in a bucket -#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct PermissionSet { - /// The key can be used to read the bucket - pub allow_read: bool, - /// The key can be used to write in the bucket - pub allow_write: bool, -} - -impl AutoCrdt for PermissionSet { - const WARN_IF_DIFFERENT: bool = true; -} - -impl Crdt for Key { - fn merge(&mut self, other: &Self) { - self.name.merge(&other.name); - self.deleted.merge(&other.deleted); - - if self.deleted.get() { - self.authorized_buckets.clear(); - } else { - self.authorized_buckets.merge(&other.authorized_buckets); - } - } -} diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs index 7a954752..8c1335a5 100644 --- a/src/model/prev/v051/mod.rs +++ b/src/model/prev/v051/mod.rs @@ -1,4 +1 @@ pub(crate) mod bucket_table; -pub(crate) mod key_table; -pub(crate) mod object_table; -pub(crate) mod version_table; diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs deleted file mode 100644 index e79e5787..00000000 --- a/src/model/prev/v051/object_table.rs +++ /dev/null @@ -1,149 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; - -use garage_util::data::*; - -use garage_table::crdt::*; - -/// An object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Object { - /// The bucket in which the object is stored, used as partition key - pub bucket: String, - - /// The key at which the object is stored in its bucket, used as sorting key - pub key: String, - - /// The list of currenty stored versions of the object - versions: Vec<ObjectVersion>, -} - -impl Object { - /// Get a list of currently stored versions of `Object` - pub fn versions(&self) -> &[ObjectVersion] { - &self.versions[..] - } -} - -/// Informations about a version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersion { - /// Id of the version - pub uuid: Uuid, - /// Timestamp of when the object was created - pub timestamp: u64, - /// State of the version - pub state: ObjectVersionState, -} - -/// State of an object version -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionState { - /// The version is being received - Uploading(ObjectVersionHeaders), - /// The version is fully received - Complete(ObjectVersionData), - /// The version uploaded containded errors or the upload was explicitly aborted - Aborted, -} - -impl Crdt for ObjectVersionState { - fn merge(&mut self, other: &Self) { - use ObjectVersionState::*; - match other { - Aborted => { - *self = Aborted; - } - Complete(b) => match self { - Aborted => {} - Complete(a) => { - a.merge(b); - } - Uploading(_) => { - *self = Complete(b.clone()); - } - }, - Uploading(_) => {} - } - } -} - -/// Data stored in object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionData { - /// The object was deleted, this Version is a tombstone to mark it as such - DeleteMarker, - /// The object is short, it's stored inlined - Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), - /// The object is not short, Hash of first block is stored here, next segments hashes are - /// stored in the version table - FirstBlock(ObjectVersionMeta, Hash), -} - -impl AutoCrdt for ObjectVersionData { - const WARN_IF_DIFFERENT: bool = true; -} - -/// Metadata about the object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionMeta { - /// Headers to send to the client - pub headers: ObjectVersionHeaders, - /// Size of the object - pub size: u64, - /// etag of the object - pub etag: String, -} - -/// Additional headers for an object -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionHeaders { - /// Content type of the object - pub content_type: String, - /// Any other http headers to send - pub other: BTreeMap<String, String>, -} - -impl ObjectVersion { - fn cmp_key(&self) -> (u64, Uuid) { - (self.timestamp, self.uuid) - } - - /// Is the object version completely received - pub fn is_complete(&self) -> bool { - matches!(self.state, ObjectVersionState::Complete(_)) - } -} - -impl Crdt for Object { - fn merge(&mut self, other: &Self) { - // Merge versions from other into here - for other_v in other.versions.iter() { - match self - .versions - .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) - { - Ok(i) => { - self.versions[i].state.merge(&other_v.state); - } - Err(i) => { - self.versions.insert(i, other_v.clone()); - } - } - } - - // Remove versions which are obsolete, i.e. those that come - // before the last version which .is_complete(). - let last_complete = self - .versions - .iter() - .enumerate() - .rev() - .find(|(_, v)| v.is_complete()) - .map(|(vi, _)| vi); - - if let Some(last_vi) = last_complete { - self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>(); - } - } -} diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs deleted file mode 100644 index c11c62d5..00000000 --- a/src/model/prev/v051/version_table.rs +++ /dev/null @@ -1,79 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use garage_util::data::*; - -use garage_table::crdt::*; -use garage_table::*; - -/// A version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Version { - /// UUID of the version, used as partition key - pub uuid: Uuid, - - // Actual data: the blocks for this version - // In the case of a multipart upload, also store the etags - // of individual parts and check them when doing CompleteMultipartUpload - /// Is this version deleted - pub deleted: crdt::Bool, - /// list of blocks of data composing the version - pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, - /// Etag of each part in case of a multipart upload, empty otherwise - pub parts_etags: crdt::Map<u64, String>, - - // Back link to bucket+key so that we can figure if - // this was deleted later on - /// Bucket in which the related object is stored - pub bucket: String, - /// Key in which the related object is stored - pub key: String, -} - -#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlockKey { - /// Number of the part - pub part_number: u64, - /// Offset of this sub-segment in its part - pub offset: u64, -} - -impl Ord for VersionBlockKey { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.part_number - .cmp(&other.part_number) - .then(self.offset.cmp(&other.offset)) - } -} - -impl PartialOrd for VersionBlockKey { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -/// Informations about a single block -#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlock { - /// Blake2 sum of the block - pub hash: Hash, - /// Size of the block - pub size: u64, -} - -impl AutoCrdt for VersionBlock { - const WARN_IF_DIFFERENT: bool = true; -} - -impl Crdt for Version { - fn merge(&mut self, other: &Self) { - self.deleted.merge(&other.deleted); - - if self.deleted.get() { - self.blocks.clear(); - self.parts_etags.clear(); - } else { - self.blocks.merge(&other.blocks); - self.parts_etags.merge(&other.parts_etags); - } - } -} diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index c7017409..7b023d87 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -1,4 +1,3 @@ -use serde::{Deserialize, Serialize}; use std::sync::Arc; use garage_db as db; @@ -10,19 +9,29 @@ use garage_table::*; use garage_block::manager::*; -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct BlockRef { - /// Hash (blake2 sum) of the block, used as partition key - pub block: Hash, +mod v08 { + use garage_util::crdt; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; - /// Id of the Version for the object containing this block, used as sorting key - pub version: Uuid, + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct BlockRef { + /// Hash (blake2 sum) of the block, used as partition key + pub block: Hash, - // Keep track of deleted status - /// Is the Version that contains this block deleted - pub deleted: crdt::Bool, + /// Id of the Version for the object containing this block, used as sorting key + pub version: Uuid, + + // Keep track of deleted status + /// Is the Version that contains this block deleted + pub deleted: crdt::Bool, + } + + impl garage_util::migrate::InitialFormat for BlockRef {} } +pub use v08::*; + impl Entry<Hash, Uuid> for BlockRef { fn partition_key(&self) -> &Hash { &self.block diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 26ff57f6..518acc95 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -1,10 +1,8 @@ use serde::{Deserialize, Serialize}; -use std::collections::BTreeMap; use std::sync::Arc; use garage_db as db; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; @@ -14,25 +12,126 @@ use garage_table::*; use crate::index_counter::*; use crate::s3::version_table::*; -use crate::prev::v051::object_table as old; - pub const OBJECTS: &str = "objects"; pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; pub const BYTES: &str = "bytes"; -/// An object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Object { - /// The bucket in which the object is stored, used as partition key - pub bucket_id: Uuid, +mod v05 { + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + + /// An object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket: String, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + pub(super) versions: Vec<ObjectVersion>, + } + + /// Informations about a version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, + } + + /// State of an object version + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionState { + /// The version is being received + Uploading(ObjectVersionHeaders), + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, + } + + /// Data stored in object version + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such + DeleteMarker, + /// The object is short, it's stored inlined + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table + FirstBlock(ObjectVersionMeta, Hash), + } + + /// Metadata about the object version + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersionMeta { + /// Headers to send to the client + pub headers: ObjectVersionHeaders, + /// Size of the object + pub size: u64, + /// etag of the object + pub etag: String, + } - /// The key at which the object is stored in its bucket, used as sorting key - pub key: String, + /// Additional headers for an object + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersionHeaders { + /// Content type of the object + pub content_type: String, + /// Any other http headers to send + pub other: BTreeMap<String, String>, + } - /// The list of currenty stored versions of the object - versions: Vec<ObjectVersion>, + impl garage_util::migrate::InitialFormat for Object {} } +mod v08 { + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v05; + + pub use v05::{ + ObjectVersion, ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta, + ObjectVersionState, + }; + + /// An object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket_id: Uuid, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + pub(super) versions: Vec<ObjectVersion>, + } + + impl garage_util::migrate::Migrate for Object { + type Previous = v05::Object; + + fn migrate(old: v05::Object) -> Object { + use garage_util::data::blake2sum; + + Object { + bucket_id: blake2sum(old.bucket.as_bytes()), + key: old.key, + versions: old.versions, + } + } + } +} + +pub use v08::*; + impl Object { /// Initialize an Object struct from parts pub fn new(bucket_id: Uuid, key: String, versions: Vec<ObjectVersion>) -> Self { @@ -69,28 +168,6 @@ impl Object { } } -/// Informations about a version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersion { - /// Id of the version - pub uuid: Uuid, - /// Timestamp of when the object was created - pub timestamp: u64, - /// State of the version - pub state: ObjectVersionState, -} - -/// State of an object version -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionState { - /// The version is being received - Uploading(ObjectVersionHeaders), - /// The version is fully received - Complete(ObjectVersionData), - /// The version uploaded containded errors or the upload was explicitly aborted - Aborted, -} - impl Crdt for ObjectVersionState { fn merge(&mut self, other: &Self) { use ObjectVersionState::*; @@ -112,42 +189,10 @@ impl Crdt for ObjectVersionState { } } -/// Data stored in object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub enum ObjectVersionData { - /// The object was deleted, this Version is a tombstone to mark it as such - DeleteMarker, - /// The object is short, it's stored inlined - Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), - /// The object is not short, Hash of first block is stored here, next segments hashes are - /// stored in the version table - FirstBlock(ObjectVersionMeta, Hash), -} - impl AutoCrdt for ObjectVersionData { const WARN_IF_DIFFERENT: bool = true; } -/// Metadata about the object version -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionMeta { - /// Headers to send to the client - pub headers: ObjectVersionHeaders, - /// Size of the object - pub size: u64, - /// etag of the object - pub etag: String, -} - -/// Additional headers for an object -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct ObjectVersionHeaders { - /// Content type of the object - pub content_type: String, - /// Any other http headers to send - pub other: BTreeMap<String, String>, -} - impl ObjectVersion { fn cmp_key(&self) -> (u64, Uuid) { (self.timestamp, self.uuid) @@ -221,7 +266,6 @@ impl Crdt for Object { } pub struct ObjectTable { - pub background: Arc<BackgroundRunner>, pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, pub object_counter_table: Arc<IndexCounter<Object>>, } @@ -255,34 +299,34 @@ impl TableSchema for ObjectTable { ); } - // 2. Spawn threads that propagates deletions to version table - let version_table = self.version_table.clone(); - let old = old.cloned(); - let new = new.cloned(); - - self.background.spawn(async move { - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of old versions - for v in old_v.versions.iter() { - let newly_deleted = match new_v - .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - { - Err(_) => true, - Ok(i) => { - new_v.versions[i].state == ObjectVersionState::Aborted - && v.state != ObjectVersionState::Aborted - } - }; - if newly_deleted { - let deleted_version = - Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); - version_table.insert(&deleted_version).await?; + // 2. Enqueue propagation deletions to version table + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of old versions + for v in old_v.versions.iter() { + let newly_deleted = match new_v + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + { + Err(_) => true, + Ok(i) => { + new_v.versions[i].state == ObjectVersionState::Aborted + && v.state != ObjectVersionState::Aborted + } + }; + if newly_deleted { + let deleted_version = + Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + let res = self.version_table.queue_insert(tx, &deleted_version); + if let Err(e) = db::unabort(res)? { + error!( + "Unable to enqueue version deletion propagation: {}. A repair will be needed.", + e + ); } } } - Ok(()) - }); + } + Ok(()) } @@ -292,11 +336,6 @@ impl TableSchema for ObjectTable { ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), } } - - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?; - Some(migrate_object(old_obj)) - } } impl CountedItem for Object { @@ -341,64 +380,3 @@ impl CountedItem for Object { ] } } - -// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv -// (we just want to change bucket into bucket_id by hashing it) - -fn migrate_object(o: old::Object) -> Object { - let versions = o - .versions() - .iter() - .cloned() - .map(migrate_object_version) - .collect(); - Object { - bucket_id: blake2sum(o.bucket.as_bytes()), - key: o.key, - versions, - } -} - -fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion { - ObjectVersion { - uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(), - timestamp: v.timestamp, - state: match v.state { - old::ObjectVersionState::Uploading(h) => { - ObjectVersionState::Uploading(migrate_object_version_headers(h)) - } - old::ObjectVersionState::Complete(d) => { - ObjectVersionState::Complete(migrate_object_version_data(d)) - } - old::ObjectVersionState::Aborted => ObjectVersionState::Aborted, - }, - } -} - -fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders { - ObjectVersionHeaders { - content_type: h.content_type, - other: h.other, - } -} - -fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData { - match d { - old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker, - old::ObjectVersionData::Inline(m, b) => { - ObjectVersionData::Inline(migrate_object_version_meta(m), b) - } - old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock( - migrate_object_version_meta(m), - Hash::try_from(h.as_slice()).unwrap(), - ), - } -} - -fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta { - ObjectVersionMeta { - headers: migrate_object_version_headers(m.headers), - size: m.size, - etag: m.etag, - } -} diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 6bc2ecd1..6edc83f4 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -1,9 +1,7 @@ -use serde::{Deserialize, Serialize}; use std::sync::Arc; use garage_db as db; -use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_table::crdt::*; @@ -12,32 +10,108 @@ use garage_table::*; use crate::s3::block_ref_table::*; -use crate::prev::v051::version_table as old; - -/// A version of an object -#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct Version { - /// UUID of the version, used as partition key - pub uuid: Uuid, - - // Actual data: the blocks for this version - // In the case of a multipart upload, also store the etags - // of individual parts and check them when doing CompleteMultipartUpload - /// Is this version deleted - pub deleted: crdt::Bool, - /// list of blocks of data composing the version - pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, - /// Etag of each part in case of a multipart upload, empty otherwise - pub parts_etags: crdt::Map<u64, String>, - - // Back link to bucket+key so that we can figure if - // this was deleted later on - /// Bucket in which the related object is stored - pub bucket_id: Uuid, - /// Key in which the related object is stored - pub key: String, +mod v05 { + use garage_util::crdt; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + + /// A version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, + /// Etag of each part in case of a multipart upload, empty otherwise + pub parts_etags: crdt::Map<u64, String>, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket: String, + /// Key in which the related object is stored + pub key: String, + } + + #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct VersionBlockKey { + /// Number of the part + pub part_number: u64, + /// Offset of this sub-segment in its part + pub offset: u64, + } + + /// Informations about a single block + #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct VersionBlock { + /// Blake2 sum of the block + pub hash: Hash, + /// Size of the block + pub size: u64, + } + + impl garage_util::migrate::InitialFormat for Version {} } +mod v08 { + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v05; + + /// A version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, + /// Etag of each part in case of a multipart upload, empty otherwise + pub parts_etags: crdt::Map<u64, String>, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket_id: Uuid, + /// Key in which the related object is stored + pub key: String, + } + + pub use v05::{VersionBlock, VersionBlockKey}; + + impl garage_util::migrate::Migrate for Version { + type Previous = v05::Version; + + fn migrate(old: v05::Version) -> Version { + use garage_util::data::blake2sum; + + Version { + uuid: old.uuid, + deleted: old.deleted, + blocks: old.blocks, + parts_etags: old.parts_etags, + bucket_id: blake2sum(old.bucket.as_bytes()), + key: old.key, + } + } + } +} + +pub use v08::*; + impl Version { pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { Self { @@ -65,14 +139,6 @@ impl Version { } } -#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlockKey { - /// Number of the part - pub part_number: u64, - /// Offset of this sub-segment in its part - pub offset: u64, -} - impl Ord for VersionBlockKey { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.part_number @@ -87,15 +153,6 @@ impl PartialOrd for VersionBlockKey { } } -/// Informations about a single block -#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] -pub struct VersionBlock { - /// Blake2 sum of the block - pub hash: Hash, - /// Size of the block - pub size: u64, -} - impl AutoCrdt for VersionBlock { const WARN_IF_DIFFERENT: bool = true; } @@ -127,7 +184,6 @@ impl Crdt for Version { } pub struct VersionTable { - pub background: Arc<BackgroundRunner>, pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, } @@ -141,33 +197,26 @@ impl TableSchema for VersionTable { fn updated( &self, - _tx: &mut db::Transaction, + tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, ) -> db::TxOpResult<()> { - let block_ref_table = self.block_ref_table.clone(); - let old = old.cloned(); - let new = new.cloned(); - - self.background.spawn(async move { - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of version blocks - if new_v.deleted.get() && !old_v.deleted.get() { - let deleted_block_refs = old_v - .blocks - .items() - .iter() - .map(|(_k, vb)| BlockRef { - block: vb.hash, - version: old_v.uuid, - deleted: true.into(), - }) - .collect::<Vec<_>>(); - block_ref_table.insert_many(&deleted_block_refs[..]).await?; + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of version blocks + if new_v.deleted.get() && !old_v.deleted.get() { + let deleted_block_refs = old_v.blocks.items().iter().map(|(_k, vb)| BlockRef { + block: vb.hash, + version: old_v.uuid, + deleted: true.into(), + }); + for block_ref in deleted_block_refs { + let res = self.block_ref_table.queue_insert(tx, &block_ref); + if let Err(e) = db::unabort(res)? { + error!("Unable to enqueue block ref deletion propagation: {}. A repair will be needed.", e); + } } } - Ok(()) - }); + } Ok(()) } @@ -175,42 +224,4 @@ impl TableSchema for VersionTable { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.deleted.get()) } - - fn try_migrate(bytes: &[u8]) -> Option<Self::E> { - let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?; - - let blocks = old - .blocks - .items() - .iter() - .map(|(k, v)| { - ( - VersionBlockKey { - part_number: k.part_number, - offset: k.offset, - }, - VersionBlock { - hash: Hash::try_from(v.hash.as_slice()).unwrap(), - size: v.size, - }, - ) - }) - .collect::<crdt::Map<_, _>>(); - - let parts_etags = old - .parts_etags - .items() - .iter() - .map(|(k, v)| (*k, v.clone())) - .collect::<crdt::Map<_, _>>(); - - Some(Version { - uuid: Hash::try_from(old.uuid.as_slice()).unwrap(), - deleted: crdt::Bool::new(old.deleted.get()), - blocks, - parts_etags, - bucket_id: blake2sum(old.bucket.as_bytes()), - key: old.key, - }) - } } |