diff options
author | Mendes <mendes.oulamara@pm.me> | 2022-10-04 18:14:49 +0200 |
---|---|---|
committer | Mendes <mendes.oulamara@pm.me> | 2022-10-04 18:14:49 +0200 |
commit | 829f815a897b04986559910bbcbf53625adcdf20 (patch) | |
tree | 6db3c27cff2aded754a641d1f2b05c83be701267 /src/model | |
parent | 99f96b9564c9c841dc6c56f1255a6e70ff884d46 (diff) | |
parent | a096ced35562bd0a8877a1ee2f755be1edafe343 (diff) | |
download | garage-829f815a897b04986559910bbcbf53625adcdf20.tar.gz garage-829f815a897b04986559910bbcbf53625adcdf20.zip |
Merge remote-tracking branch 'origin/main' into optimal-layout
Diffstat (limited to 'src/model')
27 files changed, 2174 insertions, 147 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 007cec89..2c2e2bfe 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_model" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" @@ -14,22 +14,22 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_rpc = { version = "0.7.0", path = "../rpc" } -garage_table = { version = "0.7.0", path = "../table" } -garage_block = { version = "0.7.0", path = "../block" } -garage_util = { version = "0.7.0", path = "../util" } -garage_model_050 = { package = "garage_model", version = "0.5.1" } +garage_db = { version = "0.8.0", path = "../db" } +garage_rpc = { version = "0.8.0", path = "../rpc" } +garage_table = { version = "0.8.0", path = "../table" } +garage_block = { version = "0.8.0", path = "../block" } +garage_util = { version = "0.8.0", path = "../util" } async-trait = "0.1.7" arc-swap = "1.0" +blake2 = "0.9" err-derive = "0.3" hex = "0.4" +base64 = "0.13" tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" @@ -39,6 +39,10 @@ futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } opentelemetry = "0.17" -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp" } -netapp = "0.4" +netapp = "0.5" + +[features] +k2v = [ "garage_util/k2v" ] +lmdb = [ "garage_db/lmdb" ] +sled = [ "garage_db/sled" ] +sqlite = [ "garage_db/sqlite" ] diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs index fce03d04..fcd1536e 100644 --- a/src/model/bucket_alias_table.rs +++ b/src/model/bucket_alias_table.rs @@ -7,7 +7,7 @@ use garage_table::*; /// The bucket alias table holds the names given to buckets /// in the global namespace. -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct BucketAlias { name: String, pub state: crdt::Lww<Option<Uuid>>, diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 7c7b9f30..7be42702 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use garage_table::crdt::Crdt; +use garage_table::crdt::*; use garage_table::*; use garage_util::data::*; use garage_util::time::*; @@ -12,7 +12,7 @@ use crate::permission::BucketKeyPerm; /// Its parameters are not directly accessible as: /// - It must be possible to merge paramaters, hence the use of a LWW CRDT. /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { /// ID of the bucket pub id: Uuid, @@ -21,7 +21,7 @@ pub struct Bucket { } /// Configuration for a bucket -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct BucketParams { /// Bucket's creation date pub creation_date: u64, @@ -44,6 +44,9 @@ pub struct BucketParams { pub website_config: crdt::Lww<Option<WebsiteConfig>>, /// CORS rules pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>, + /// Bucket quotas + #[serde(default)] + pub quotas: crdt::Lww<BucketQuotas>, } #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] @@ -62,6 +65,18 @@ pub struct CorsRule { pub expose_headers: Vec<String>, } +#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct BucketQuotas { + /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket) + pub max_size: Option<u64>, + /// Maximum number of non-deleted objects in the bucket + pub max_objects: Option<u64>, +} + +impl AutoCrdt for BucketQuotas { + const WARN_IF_DIFFERENT: bool = true; +} + impl BucketParams { /// Create an empty BucketParams with no authorized keys and no website accesss pub fn new() -> Self { @@ -72,6 +87,7 @@ impl BucketParams { local_aliases: crdt::LwwMap::new(), website_config: crdt::Lww::new(None), cors_config: crdt::Lww::new(None), + quotas: crdt::Lww::new(BucketQuotas::default()), } } } @@ -86,6 +102,7 @@ impl Crdt for BucketParams { self.website_config.merge(&o.website_config); self.cors_config.merge(&o.cors_config); + self.quotas.merge(&o.quotas); } } diff --git a/src/model/garage.rs b/src/model/garage.rs index abdb920a..75012952 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -2,8 +2,11 @@ use std::sync::Arc; use netapp::NetworkKey; +use garage_db as db; + use garage_util::background::*; use garage_util::config::*; +use garage_util::error::*; use garage_rpc::system::System; @@ -13,13 +16,18 @@ use garage_table::replication::TableFullReplication; use garage_table::replication::TableShardedReplication; use garage_table::*; -use crate::block_ref_table::*; +use crate::s3::block_ref_table::*; +use crate::s3::object_table::*; +use crate::s3::version_table::*; + use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::helper; +use crate::index_counter::*; use crate::key_table::*; -use crate::object_table::*; -use crate::version_table::*; + +#[cfg(feature = "k2v")] +use crate::k2v::{item_table::*, poll::*, rpc::*}; /// An entire Garage full of data pub struct Garage { @@ -27,7 +35,7 @@ pub struct Garage { pub config: Config, /// The local database - pub db: sled::Db, + pub db: db::Db, /// A background job runner pub background: Arc<BackgroundRunner>, /// The membership manager @@ -35,21 +43,118 @@ pub struct Garage { /// The block manager pub block_manager: Arc<BlockManager>, - /// Table containing informations about buckets + /// Table containing buckets pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>, - /// Table containing informations about bucket aliases + /// Table containing bucket aliases pub bucket_alias_table: Arc<Table<BucketAliasTable, TableFullReplication>>, - /// Table containing informations about api keys + /// Table containing api keys pub key_table: Arc<Table<KeyTable, TableFullReplication>>, + /// Table containing S3 objects pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, + /// Counting table containing object counters + pub object_counter_table: Arc<IndexCounter<Object>>, + /// Table containing S3 object versions pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, + /// Table containing S3 block references (not blocks themselves) pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, + + #[cfg(feature = "k2v")] + pub k2v: GarageK2V, +} + +#[cfg(feature = "k2v")] +pub struct GarageK2V { + /// Table containing K2V items + pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + /// Indexing table containing K2V item counters + pub counter_table: Arc<IndexCounter<K2VItem>>, + /// K2V RPC handler + pub rpc: Arc<K2VRpcHandler>, } impl Garage { /// Create and run garage - pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> { + pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> { + // Create meta dir and data dir if they don't exist already + std::fs::create_dir_all(&config.metadata_dir) + .ok_or_message("Unable to create Garage metadata directory")?; + std::fs::create_dir_all(&config.data_dir) + .ok_or_message("Unable to create Garage data directory")?; + + info!("Opening database..."); + let mut db_path = config.metadata_dir.clone(); + let db = match config.db_engine.as_str() { + // ---- Sled DB ---- + #[cfg(feature = "sled")] + "sled" => { + db_path.push("db"); + info!("Opening Sled database at: {}", db_path.display()); + let db = db::sled_adapter::sled::Config::default() + .path(&db_path) + .cache_capacity(config.sled_cache_capacity) + .flush_every_ms(Some(config.sled_flush_every_ms)) + .open() + .expect("Unable to open sled DB"); + db::sled_adapter::SledDb::init(db) + } + #[cfg(not(feature = "sled"))] + "sled" => return Err(Error::Message("sled db not available in this build".into())), + // ---- Sqlite DB ---- + #[cfg(feature = "sqlite")] + "sqlite" | "sqlite3" | "rusqlite" => { + db_path.push("db.sqlite"); + info!("Opening Sqlite database at: {}", db_path.display()); + let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) + .expect("Unable to open sqlite DB"); + db::sqlite_adapter::SqliteDb::init(db) + } + #[cfg(not(feature = "sqlite"))] + "sqlite" | "sqlite3" | "rusqlite" => { + return Err(Error::Message( + "sqlite db not available in this build".into(), + )) + } + // ---- LMDB DB ---- + #[cfg(feature = "lmdb")] + "lmdb" | "heed" => { + db_path.push("db.lmdb"); + info!("Opening LMDB database at: {}", db_path.display()); + std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); + let map_size = garage_db::lmdb_adapter::recommended_map_size(); + + use db::lmdb_adapter::heed; + let mut env_builder = heed::EnvOpenOptions::new(); + env_builder.max_dbs(100); + env_builder.max_readers(500); + env_builder.map_size(map_size); + unsafe { + env_builder.flag(heed::flags::Flags::MdbNoSync); + env_builder.flag(heed::flags::Flags::MdbNoMetaSync); + } + let db = env_builder.open(&db_path).expect("Unable to open LMDB DB"); + db::lmdb_adapter::LmdbDb::init(db) + } + #[cfg(not(feature = "lmdb"))] + "lmdb" | "heed" => return Err(Error::Message("lmdb db not available in this build".into())), + // ---- Unavailable DB engine ---- + e => { + return Err(Error::Message(format!( + "Unsupported DB engine: {} (options: {})", + e, + vec![ + #[cfg(feature = "sled")] + "sled", + #[cfg(feature = "sqlite")] + "sqlite", + #[cfg(feature = "lmdb")] + "lmdb", + ] + .join(", ") + ))); + } + }; + let network_key = NetworkKey::from_slice( &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], ) @@ -64,7 +169,7 @@ impl Garage { background.clone(), replication_mode.replication_factor(), &config, - ); + )?; let data_rep_param = TableShardedReplication { system: system.clone(), @@ -90,11 +195,25 @@ impl Garage { &db, config.data_dir.clone(), config.compression_level, - config.block_manager_background_tranquility, data_rep_param, system.clone(), ); + // ---- admin tables ---- + info!("Initialize bucket_table..."); + let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db); + + info!("Initialize bucket_alias_table..."); + let bucket_alias_table = Table::new( + BucketAliasTable, + control_rep_param.clone(), + system.clone(), + &db, + ); + info!("Initialize key_table_table..."); + let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db); + + // ---- S3 tables ---- info!("Initialize block_ref_table..."); let block_ref_table = Table::new( BlockRefTable { @@ -116,34 +235,28 @@ impl Garage { &db, ); + info!("Initialize object counter table..."); + let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); + info!("Initialize object_table..."); + #[allow(clippy::redundant_clone)] let object_table = Table::new( ObjectTable { background: background.clone(), version_table: version_table.clone(), + object_counter_table: object_counter_table.clone(), }, - meta_rep_param, - system.clone(), - &db, - ); - - info!("Initialize bucket_table..."); - let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db); - - info!("Initialize bucket_alias_table..."); - let bucket_alias_table = Table::new( - BucketAliasTable, - control_rep_param.clone(), + meta_rep_param.clone(), system.clone(), &db, ); - info!("Initialize key_table_table..."); - let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db); - - info!("Initialize Garage..."); + // ---- K2V ---- + #[cfg(feature = "k2v")] + let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); - Arc::new(Self { + // -- done -- + Ok(Arc::new(Self { config, db, background, @@ -153,12 +266,46 @@ impl Garage { bucket_alias_table, key_table, object_table, + object_counter_table, version_table, block_ref_table, - }) + #[cfg(feature = "k2v")] + k2v, + })) } pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { helper::bucket::BucketHelper(self) } + + pub fn key_helper(&self) -> helper::key::KeyHelper { + helper::key::KeyHelper(self) + } +} + +#[cfg(feature = "k2v")] +impl GarageK2V { + fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { + info!("Initialize K2V counter table..."); + let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); + info!("Initialize K2V subscription manager..."); + let subscriptions = Arc::new(SubscriptionManager::new()); + info!("Initialize K2V item table..."); + let item_table = Table::new( + K2VItemTable { + counter_table: counter_table.clone(), + subscriptions: subscriptions.clone(), + }, + meta_rep_param, + system.clone(), + db, + ); + let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions); + + Self { + item_table, + counter_table, + rpc, + } + } } diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 706faf26..130ba5be 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -1,15 +1,18 @@ -use garage_table::util::EmptyKey; use garage_util::crdt::*; use garage_util::data::*; use garage_util::error::{Error as GarageError, OkOrMessage}; use garage_util::time::*; +use garage_table::util::*; + use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::garage::Garage; use crate::helper::error::*; -use crate::key_table::{Key, KeyFilter}; +use crate::helper::key::KeyHelper; +use crate::key_table::*; use crate::permission::BucketKeyPerm; +use crate::s3::object_table::ObjectFilter; pub struct BucketHelper<'a>(pub(crate) &'a Garage); @@ -49,6 +52,23 @@ impl<'a> BucketHelper<'a> { } } + #[allow(clippy::ptr_arg)] + pub async fn resolve_bucket(&self, bucket_name: &String, api_key: &Key) -> Result<Uuid, Error> { + let api_key_params = api_key + .state + .as_option() + .ok_or_message("Key should not be deleted at this point")?; + + if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) { + Ok(*bucket_id) + } else { + Ok(self + .resolve_global_bucket_name(bucket_name) + .await? + .ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string()))?) + } + } + /// Returns a Bucket if it is present in bucket table, /// even if it is in deleted state. Querying a non-existing /// bucket ID returns an internal error. @@ -71,63 +91,7 @@ impl<'a> BucketHelper<'a> { .get(&EmptyKey, &bucket_id) .await? .filter(|b| !b.is_deleted()) - .ok_or_bad_request(format!( - "Bucket {:?} does not exist or has been deleted", - bucket_id - )) - } - - /// Returns a Key if it is present in key table, - /// even if it is in deleted state. Querying a non-existing - /// key ID returns an internal error. - pub async fn get_internal_key(&self, key_id: &String) -> Result<Key, Error> { - Ok(self - .0 - .key_table - .get(&EmptyKey, key_id) - .await? - .ok_or_message(format!("Key {} does not exist", key_id))?) - } - - /// Returns a Key if it is present in key table, - /// only if it is in non-deleted state. - /// Querying a non-existing key ID or a deleted key - /// returns a bad request error. - pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> { - self.0 - .key_table - .get(&EmptyKey, key_id) - .await? - .filter(|b| !b.state.is_deleted()) - .ok_or_bad_request(format!("Key {} does not exist or has been deleted", key_id)) - } - - /// Returns a Key if it is present in key table, - /// looking it up by key ID or by a match on its name, - /// only if it is in non-deleted state. - /// Querying a non-existing key ID or a deleted key - /// returns a bad request error. - pub async fn get_existing_matching_key(&self, pattern: &str) -> Result<Key, Error> { - let candidates = self - .0 - .key_table - .get_range( - &EmptyKey, - None, - Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), - 10, - ) - .await? - .into_iter() - .collect::<Vec<_>>(); - if candidates.len() != 1 { - Err(Error::BadRequest(format!( - "{} matching keys", - candidates.len() - ))) - } else { - Ok(candidates.into_iter().next().unwrap()) - } + .ok_or_else(|| Error::NoSuchBucket(hex::encode(bucket_id))) } /// Sets a new alias for a bucket in global namespace. @@ -141,10 +105,7 @@ impl<'a> BucketHelper<'a> { alias_name: &String, ) -> Result<(), Error> { if !is_valid_bucket_name(alias_name) { - return Err(Error::BadRequest(format!( - "{}: {}", - alias_name, INVALID_BUCKET_NAME_MESSAGE - ))); + return Err(Error::InvalidBucketName(alias_name.to_string())); } let mut bucket = self.get_existing_bucket(bucket_id).await?; @@ -175,7 +136,7 @@ impl<'a> BucketHelper<'a> { let alias = match alias { None => BucketAlias::new(alias_name.clone(), alias_ts, Some(bucket_id)) - .ok_or_bad_request(format!("{}: {}", alias_name, INVALID_BUCKET_NAME_MESSAGE))?, + .ok_or_else(|| Error::InvalidBucketName(alias_name.clone()))?, Some(mut a) => { a.state = Lww::raw(alias_ts, Some(bucket_id)); a @@ -263,7 +224,7 @@ impl<'a> BucketHelper<'a> { .bucket_alias_table .get(&EmptyKey, alias_name) .await? - .ok_or_message(format!("Alias {} not found", alias_name))?; + .ok_or_else(|| Error::NoSuchBucket(alias_name.to_string()))?; // Checks ok, remove alias let alias_ts = match bucket.state.as_option() { @@ -302,15 +263,14 @@ impl<'a> BucketHelper<'a> { key_id: &String, alias_name: &String, ) -> Result<(), Error> { + let key_helper = KeyHelper(self.0); + if !is_valid_bucket_name(alias_name) { - return Err(Error::BadRequest(format!( - "{}: {}", - alias_name, INVALID_BUCKET_NAME_MESSAGE - ))); + return Err(Error::InvalidBucketName(alias_name.to_string())); } let mut bucket = self.get_existing_bucket(bucket_id).await?; - let mut key = self.get_existing_key(key_id).await?; + let mut key = key_helper.get_existing_key(key_id).await?; let mut key_param = key.state.as_option_mut().unwrap(); @@ -359,8 +319,10 @@ impl<'a> BucketHelper<'a> { key_id: &String, alias_name: &String, ) -> Result<(), Error> { + let key_helper = KeyHelper(self.0); + let mut bucket = self.get_existing_bucket(bucket_id).await?; - let mut key = self.get_existing_key(key_id).await?; + let mut key = key_helper.get_existing_key(key_id).await?; let mut bucket_p = bucket.state.as_option_mut().unwrap(); @@ -428,8 +390,10 @@ impl<'a> BucketHelper<'a> { key_id: &String, mut perm: BucketKeyPerm, ) -> Result<(), Error> { + let key_helper = KeyHelper(self.0); + let mut bucket = self.get_internal_bucket(bucket_id).await?; - let mut key = self.get_internal_key(key_id).await?; + let mut key = key_helper.get_internal_key(key_id).await?; if let Some(bstate) = bucket.state.as_option() { if let Some(kp) = bstate.authorized_keys.get(key_id) { @@ -465,4 +429,47 @@ impl<'a> BucketHelper<'a> { Ok(()) } + + pub async fn is_bucket_empty(&self, bucket_id: Uuid) -> Result<bool, Error> { + let objects = self + .0 + .object_table + .get_range( + &bucket_id, + None, + Some(ObjectFilter::IsData), + 10, + EnumerationOrder::Forward, + ) + .await?; + if !objects.is_empty() { + return Ok(false); + } + + #[cfg(feature = "k2v")] + { + use garage_rpc::ring::Ring; + use std::sync::Arc; + + let ring: Arc<Ring> = self.0.system.ring.borrow().clone(); + let k2vindexes = self + .0 + .k2v + .counter_table + .table + .get_range( + &bucket_id, + None, + Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + 10, + EnumerationOrder::Forward, + ) + .await?; + if !k2vindexes.is_empty() { + return Ok(false); + } + } + + Ok(true) + } } diff --git a/src/model/helper/error.rs b/src/model/helper/error.rs index 30b2ba32..3ca8f55c 100644 --- a/src/model/helper/error.rs +++ b/src/model/helper/error.rs @@ -10,6 +10,16 @@ pub enum Error { #[error(display = "Bad request: {}", _0)] BadRequest(String), + + /// Bucket name is not valid according to AWS S3 specs + #[error(display = "Invalid bucket name: {}", _0)] + InvalidBucketName(String), + + #[error(display = "Access key not found: {}", _0)] + NoSuchAccessKey(String), + + #[error(display = "Bucket not found: {}", _0)] + NoSuchBucket(String), } impl From<netapp::error::Error> for Error { diff --git a/src/model/helper/key.rs b/src/model/helper/key.rs new file mode 100644 index 00000000..c1a8e974 --- /dev/null +++ b/src/model/helper/key.rs @@ -0,0 +1,102 @@ +use garage_table::util::*; +use garage_util::crdt::*; +use garage_util::error::OkOrMessage; + +use crate::garage::Garage; +use crate::helper::bucket::BucketHelper; +use crate::helper::error::*; +use crate::key_table::{Key, KeyFilter}; +use crate::permission::BucketKeyPerm; + +pub struct KeyHelper<'a>(pub(crate) &'a Garage); + +#[allow(clippy::ptr_arg)] +impl<'a> KeyHelper<'a> { + /// Returns a Key if it is present in key table, + /// even if it is in deleted state. Querying a non-existing + /// key ID returns an internal error. + pub async fn get_internal_key(&self, key_id: &String) -> Result<Key, Error> { + Ok(self + .0 + .key_table + .get(&EmptyKey, key_id) + .await? + .ok_or_message(format!("Key {} does not exist", key_id))?) + } + + /// Returns a Key if it is present in key table, + /// only if it is in non-deleted state. + /// Querying a non-existing key ID or a deleted key + /// returns a bad request error. + pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> { + self.0 + .key_table + .get(&EmptyKey, key_id) + .await? + .filter(|b| !b.state.is_deleted()) + .ok_or_else(|| Error::NoSuchAccessKey(key_id.to_string())) + } + + /// Returns a Key if it is present in key table, + /// looking it up by key ID or by a match on its name, + /// only if it is in non-deleted state. + /// Querying a non-existing key ID or a deleted key + /// returns a bad request error. + pub async fn get_existing_matching_key(&self, pattern: &str) -> Result<Key, Error> { + let candidates = self + .0 + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())), + 10, + EnumerationOrder::Forward, + ) + .await? + .into_iter() + .collect::<Vec<_>>(); + if candidates.len() != 1 { + Err(Error::BadRequest(format!( + "{} matching keys", + candidates.len() + ))) + } else { + Ok(candidates.into_iter().next().unwrap()) + } + } + + /// Deletes an API access key + pub async fn delete_key(&self, key: &mut Key) -> Result<(), Error> { + let bucket_helper = BucketHelper(self.0); + + let state = key.state.as_option_mut().unwrap(); + + // --- done checking, now commit --- + // (the step at unset_local_bucket_alias will fail if a bucket + // does not have another alias, the deletion will be + // interrupted in the middle if that happens) + + // 1. Delete local aliases + for (alias, _, to) in state.local_aliases.items().iter() { + if let Some(bucket_id) = to { + bucket_helper + .unset_local_bucket_alias(*bucket_id, &key.key_id, alias) + .await?; + } + } + + // 2. Remove permissions on all authorized buckets + for (ab_id, _auth) in state.authorized_buckets.items().iter() { + bucket_helper + .set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS) + .await?; + } + + // 3. Actually delete key + key.state = Deletable::delete(); + self.0.key_table.insert(key).await?; + + Ok(()) + } +} diff --git a/src/model/helper/mod.rs b/src/model/helper/mod.rs index 2f4e8898..dd947c86 100644 --- a/src/model/helper/mod.rs +++ b/src/model/helper/mod.rs @@ -1,2 +1,3 @@ pub mod bucket; pub mod error; +pub mod key; diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs new file mode 100644 index 00000000..e6394f0c --- /dev/null +++ b/src/model/index_counter.rs @@ -0,0 +1,496 @@ +use core::ops::Bound; +use std::collections::{hash_map, BTreeMap, HashMap}; +use std::marker::PhantomData; +use std::sync::Arc; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, watch}; + +use garage_db as db; + +use garage_rpc::ring::Ring; +use garage_rpc::system::System; +use garage_util::background::*; +use garage_util::data::*; +use garage_util::error::*; +use garage_util::time::*; + +use garage_table::crdt::*; +use garage_table::replication::*; +use garage_table::*; + +pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { + const COUNTER_TABLE_NAME: &'static str; + + type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + + fn counter_partition_key(&self) -> &Self::CP; + fn counter_sort_key(&self) -> &Self::CS; + fn counts(&self) -> Vec<(&'static str, i64)>; +} + +/// A counter entry in the global table +#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] +pub struct CounterEntry<T: CountedItem> { + pub pk: T::CP, + pub sk: T::CS, + pub values: BTreeMap<String, CounterValue>, +} + +impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { + fn partition_key(&self) -> &T::CP { + &self.pk + } + fn sort_key(&self) -> &T::CS { + &self.sk + } + fn is_tombstone(&self) -> bool { + self.values + .iter() + .all(|(_, v)| v.node_values.iter().all(|(_, (_, v))| *v == 0)) + } +} + +impl<T: CountedItem> CounterEntry<T> { + pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> { + let nodes = &ring.layout.node_id_vec[..]; + self.filtered_values_with_nodes(nodes) + } + + pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> { + let mut ret = HashMap::new(); + for (name, vals) in self.values.iter() { + let new_vals = vals + .node_values + .iter() + .filter(|(n, _)| nodes.contains(n)) + .map(|(_, (_, v))| *v) + .collect::<Vec<_>>(); + if !new_vals.is_empty() { + ret.insert( + name.clone(), + new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)), + ); + } + } + + ret + } +} + +/// A counter entry in the global table +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct CounterValue { + pub node_values: BTreeMap<Uuid, (u64, i64)>, +} + +impl<T: CountedItem> Crdt for CounterEntry<T> { + fn merge(&mut self, other: &Self) { + for (name, e2) in other.values.iter() { + if let Some(e) = self.values.get_mut(name) { + e.merge(e2); + } else { + self.values.insert(name.clone(), e2.clone()); + } + } + } +} + +impl Crdt for CounterValue { + fn merge(&mut self, other: &Self) { + for (node, (t2, e2)) in other.node_values.iter() { + if let Some((t, e)) = self.node_values.get_mut(node) { + if t2 > t { + *e = *e2; + } + } else { + self.node_values.insert(*node, (*t2, *e2)); + } + } + } +} + +pub struct CounterTable<T: CountedItem> { + _phantom_t: PhantomData<T>, +} + +impl<T: CountedItem> TableSchema for CounterTable<T> { + const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME; + + type P = T::CP; + type S = T::CS; + type E = CounterEntry<T>; + type Filter = (DeletedFilter, Vec<Uuid>); + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + if filter.0 == DeletedFilter::Any { + return true; + } + + let is_tombstone = entry + .filtered_values_with_nodes(&filter.1[..]) + .iter() + .all(|(_, v)| *v == 0); + filter.0.apply(is_tombstone) + } +} + +// ---- + +pub struct IndexCounter<T: CountedItem> { + this_node: Uuid, + local_counter: db::Tree, + propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>, + pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, +} + +impl<T: CountedItem> IndexCounter<T> { + pub fn new( + system: Arc<System>, + replication: TableShardedReplication, + db: &db::Db, + ) -> Arc<Self> { + let background = system.background.clone(); + + let (propagate_tx, propagate_rx) = mpsc::unbounded_channel(); + + let this = Arc::new(Self { + this_node: system.id, + local_counter: db + .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME)) + .expect("Unable to open local counter tree"), + propagate_tx, + table: Table::new( + CounterTable { + _phantom_t: Default::default(), + }, + replication, + system, + db, + ), + }); + + background.spawn_worker(IndexPropagatorWorker { + index_counter: this.clone(), + propagate_rx, + buf: HashMap::new(), + errors: 0, + }); + + this + } + + pub fn count( + &self, + tx: &mut db::Transaction, + old: Option<&T>, + new: Option<&T>, + ) -> db::TxResult<(), Error> { + let pk = old + .map(|e| e.counter_partition_key()) + .unwrap_or_else(|| new.unwrap().counter_partition_key()); + let sk = old + .map(|e| e.counter_sort_key()) + .unwrap_or_else(|| new.unwrap().counter_sort_key()); + + // calculate counter differences + let mut counts = HashMap::new(); + for (k, v) in old.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) -= v; + } + for (k, v) in new.map(|x| x.counts()).unwrap_or_default() { + *counts.entry(k).or_insert(0) += v; + } + + // update local counter table + let tree_key = self.table.data.tree_key(pk, sk); + + let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { + Some(old_bytes) => { + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)? + } + None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), + values: BTreeMap::new(), + }, + }; + + let now = now_msec(); + for (s, inc) in counts.iter() { + let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + ent.0 = std::cmp::max(ent.0 + 1, now); + ent.1 += *inc; + } + + let new_entry_bytes = rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?; + tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; + + if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) { + error!( + "Could not propagate updated counter values, failed to send to channel: {}", + e + ); + } + + Ok(()) + } + + pub fn offline_recount_all<TS, TR>( + &self, + counted_table: &Arc<Table<TS, TR>>, + ) -> Result<(), Error> + where + TS: TableSchema<E = T>, + TR: TableReplication, + { + let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> { + let entry_k = self + .table + .data + .tree_key(entry.partition_key(), entry.sort_key()); + self.table + .data + .update_entry_with(&entry_k, |ent| match ent { + Some(mut ent) => { + ent.merge(&entry); + ent + } + None => entry.clone(), + })?; + Ok(()) + }; + + // 1. Set all old local counters to zero + let now = now_msec(); + let mut next_start: Option<Vec<u8>> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in self.local_counter.range((low_bound, Bound::Unbounded))? { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + info!("zeroing old counters... ({})", hex::encode(&batch[0].0)); + for (local_counter_k, local_counter) in batch { + let mut local_counter = + rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?; + + for (_, tv) in local_counter.values.iter_mut() { + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 = 0; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_k, &local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(local_counter_k); + } + } + + // 2. Recount all table entries + let now = now_msec(); + let mut next_start: Option<Vec<u8>> = None; + loop { + let low_bound = match next_start.take() { + Some(v) => Bound::Excluded(v), + None => Bound::Unbounded, + }; + let mut batch = vec![]; + for item in counted_table + .data + .store + .range((low_bound, Bound::Unbounded))? + { + batch.push(item?); + if batch.len() > 1000 { + break; + } + } + + if batch.is_empty() { + break; + } + + info!("counting entries... ({})", hex::encode(&batch[0].0)); + for (counted_entry_k, counted_entry) in batch { + let counted_entry = counted_table.data.decode_entry(&counted_entry)?; + + let pk = counted_entry.counter_partition_key(); + let sk = counted_entry.counter_sort_key(); + let counts = counted_entry.counts(); + + let local_counter_key = self.table.data.tree_key(pk, sk); + let mut local_counter = match self.local_counter.get(&local_counter_key)? { + Some(old_bytes) => { + let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>( + &old_bytes, + )?; + assert!(ent.pk == *pk); + assert!(ent.sk == *sk); + ent + } + None => LocalCounterEntry { + pk: pk.clone(), + sk: sk.clone(), + values: BTreeMap::new(), + }, + }; + for (s, v) in counts.iter() { + let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); + tv.0 = std::cmp::max(tv.0 + 1, now); + tv.1 += v; + } + + let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; + self.local_counter + .insert(&local_counter_key, local_counter_bytes)?; + + let counter_entry = local_counter.into_counter_entry(self.this_node); + save_counter_entry(counter_entry)?; + + next_start = Some(counted_entry_k); + } + } + + // Done + Ok(()) + } +} + +struct IndexPropagatorWorker<T: CountedItem> { + index_counter: Arc<IndexCounter<T>>, + propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>, + + buf: HashMap<Vec<u8>, CounterEntry<T>>, + errors: usize, +} + +impl<T: CountedItem> IndexPropagatorWorker<T> { + fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) { + let tree_key = self.index_counter.table.data.tree_key(&pk, &sk); + let dist_entry = counters.into_counter_entry(self.index_counter.this_node); + match self.buf.entry(tree_key) { + hash_map::Entry::Vacant(e) => { + e.insert(dist_entry); + } + hash_map::Entry::Occupied(mut e) => { + e.get_mut().merge(&dist_entry); + } + } + } +} + +#[async_trait] +impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { + fn name(&self) -> String { + format!("{} index counter propagator", T::COUNTER_TABLE_NAME) + } + + fn info(&self) -> Option<String> { + if !self.buf.is_empty() { + Some(format!("{} items in queue", self.buf.len())) + } else { + None + } + } + + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + // This loop batches updates to counters to be sent all at once. + // They are sent once the propagate_rx channel has been emptied (or is closed). + let closed = loop { + match self.propagate_rx.try_recv() { + Ok((pk, sk, counters)) => { + self.add_ent(pk, sk, counters); + } + Err(mpsc::error::TryRecvError::Empty) => break false, + Err(mpsc::error::TryRecvError::Disconnected) => break true, + } + }; + + if !self.buf.is_empty() { + let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>(); + let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap()); + if let Err(e) = self.index_counter.table.insert_many(entries).await { + self.errors += 1; + if self.errors >= 2 && *must_exit.borrow() { + error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); + return Ok(WorkerState::Done); + } + // Propagate error up to worker manager, it will log it, increment a counter, + // and sleep for a certain delay (with exponential backoff), waiting for + // things to go back to normal + return Err(e); + } else { + for k in entries_k { + self.buf.remove(&k); + } + self.errors = 0; + } + + return Ok(WorkerState::Busy); + } else if closed { + return Ok(WorkerState::Done); + } else { + return Ok(WorkerState::Idle); + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { + match self.propagate_rx.recv().await { + Some((pk, sk, counters)) => { + self.add_ent(pk, sk, counters); + WorkerState::Busy + } + None => match self.buf.is_empty() { + false => WorkerState::Busy, + true => WorkerState::Done, + }, + } + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +struct LocalCounterEntry<T: CountedItem> { + pk: T::CP, + sk: T::CS, + values: BTreeMap<String, (u64, i64)>, +} + +impl<T: CountedItem> LocalCounterEntry<T> { + fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> { + CounterEntry { + pk: self.pk, + sk: self.sk, + values: self + .values + .into_iter() + .map(|(name, (ts, v))| { + let mut node_values = BTreeMap::new(); + node_values.insert(this_node, (ts, v)); + (name, CounterValue { node_values }) + }) + .collect(), + } + } +} diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs new file mode 100644 index 00000000..9a692870 --- /dev/null +++ b/src/model/k2v/causality.rs @@ -0,0 +1,96 @@ +use std::collections::BTreeMap; +use std::convert::TryInto; + +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +/// Node IDs used in K2V are u64 integers that are the abbreviation +/// of full Garage node IDs which are 256-bit UUIDs. +pub type K2VNodeId = u64; + +pub fn make_node_id(node_id: Uuid) -> K2VNodeId { + let mut tmp = [0u8; 8]; + tmp.copy_from_slice(&node_id.as_slice()[..8]); + u64::from_be_bytes(tmp) +} + +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] +pub struct CausalContext { + pub vector_clock: BTreeMap<K2VNodeId, u64>, +} + +impl CausalContext { + /// Empty causality context + pub fn new_empty() -> Self { + Self { + vector_clock: BTreeMap::new(), + } + } + /// Make binary representation and encode in base64 + pub fn serialize(&self) -> String { + let mut ints = Vec::with_capacity(2 * self.vector_clock.len()); + for (node, time) in self.vector_clock.iter() { + ints.push(*node); + ints.push(*time); + } + let checksum = ints.iter().fold(0, |acc, v| acc ^ *v); + + let mut bytes = u64::to_be_bytes(checksum).to_vec(); + for i in ints { + bytes.extend(u64::to_be_bytes(i)); + } + + base64::encode_config(bytes, base64::URL_SAFE_NO_PAD) + } + /// Parse from base64-encoded binary representation + pub fn parse(s: &str) -> Result<Self, String> { + let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD) + .map_err(|e| format!("bad causality token base64: {}", e))?; + if bytes.len() % 16 != 8 || bytes.len() < 8 { + return Err("bad causality token length".into()); + } + + let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap()); + let mut ret = CausalContext { + vector_clock: BTreeMap::new(), + }; + + for i in 0..(bytes.len() / 16) { + let node_id = u64::from_be_bytes(bytes[8 + i * 16..16 + i * 16].try_into().unwrap()); + let time = u64::from_be_bytes(bytes[16 + i * 16..24 + i * 16].try_into().unwrap()); + ret.vector_clock.insert(node_id, time); + } + + let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t); + + if check != checksum { + return Err("bad causality token checksum".into()); + } + + Ok(ret) + } + /// Check if this causal context contains newer items than another one + pub fn is_newer_than(&self, other: &Self) -> bool { + self.vector_clock + .iter() + .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_causality_token_serialization() { + let ct = CausalContext { + vector_clock: [(4, 42), (1928131023, 76), (0xefc0c1c47f9de433, 2)] + .iter() + .cloned() + .collect(), + }; + + assert_eq!(CausalContext::parse(&ct.serialize()).unwrap(), ct); + } +} diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs new file mode 100644 index 00000000..7860cb17 --- /dev/null +++ b/src/model/k2v/item_table.rs @@ -0,0 +1,305 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::sync::Arc; + +use garage_db as db; +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::*; + +use crate::index_counter::*; +use crate::k2v::causality::*; +use crate::k2v::poll::*; + +pub const ENTRIES: &str = "entries"; +pub const CONFLICTS: &str = "conflicts"; +pub const VALUES: &str = "values"; +pub const BYTES: &str = "bytes"; + +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct K2VItem { + pub partition: K2VItemPartition, + pub sort_key: String, + + items: BTreeMap<K2VNodeId, DvvsEntry>, +} + +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] +pub struct K2VItemPartition { + pub bucket_id: Uuid, + pub partition_key: String, +} + +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +struct DvvsEntry { + t_discard: u64, + values: Vec<(u64, DvvsValue)>, +} + +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub enum DvvsValue { + Value(#[serde(with = "serde_bytes")] Vec<u8>), + Deleted, +} + +impl K2VItem { + /// Creates a new K2VItem when no previous entry existed in the db + pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self { + Self { + partition: K2VItemPartition { + bucket_id, + partition_key, + }, + sort_key, + items: BTreeMap::new(), + } + } + /// Updates a K2VItem with a new value or a deletion event + pub fn update( + &mut self, + this_node: Uuid, + context: &Option<CausalContext>, + new_value: DvvsValue, + ) { + if let Some(context) = context { + for (node, t_discard) in context.vector_clock.iter() { + if let Some(e) = self.items.get_mut(node) { + e.t_discard = std::cmp::max(e.t_discard, *t_discard); + } else { + self.items.insert( + *node, + DvvsEntry { + t_discard: *t_discard, + values: vec![], + }, + ); + } + } + } + + self.discard(); + + let node_id = make_node_id(this_node); + let e = self.items.entry(node_id).or_insert(DvvsEntry { + t_discard: 0, + values: vec![], + }); + let t_prev = e.max_time(); + e.values.push((t_prev + 1, new_value)); + } + + /// Extract the causality context of a K2V Item + pub fn causal_context(&self) -> CausalContext { + let mut cc = CausalContext::new_empty(); + for (node, ent) in self.items.iter() { + cc.vector_clock.insert(*node, ent.max_time()); + } + cc + } + + /// Extract the list of values + pub fn values(&'_ self) -> Vec<&'_ DvvsValue> { + let mut ret = vec![]; + for (_, ent) in self.items.iter() { + for (_, v) in ent.values.iter() { + if !ret.contains(&v) { + ret.push(v); + } + } + } + ret + } + + fn discard(&mut self) { + for (_, ent) in self.items.iter_mut() { + ent.discard(); + } + } +} + +impl DvvsEntry { + fn max_time(&self) -> u64 { + self.values + .iter() + .fold(self.t_discard, |acc, (vts, _)| std::cmp::max(acc, *vts)) + } + + fn discard(&mut self) { + self.values = std::mem::take(&mut self.values) + .into_iter() + .filter(|(t, _)| *t > self.t_discard) + .collect::<Vec<_>>(); + } +} + +impl Crdt for K2VItem { + fn merge(&mut self, other: &Self) { + for (node, e2) in other.items.iter() { + if let Some(e) = self.items.get_mut(node) { + e.merge(e2); + } else { + self.items.insert(*node, e2.clone()); + } + } + } +} + +impl Crdt for DvvsEntry { + fn merge(&mut self, other: &Self) { + self.t_discard = std::cmp::max(self.t_discard, other.t_discard); + self.discard(); + + let t_max = self.max_time(); + for (vt, vv) in other.values.iter() { + if *vt > t_max { + self.values.push((*vt, vv.clone())); + } + } + } +} + +impl PartitionKey for K2VItemPartition { + fn hash(&self) -> Hash { + use blake2::{Blake2b, Digest}; + + let mut hasher = Blake2b::new(); + hasher.update(self.bucket_id.as_slice()); + hasher.update(self.partition_key.as_bytes()); + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hasher.finalize()[..32]); + hash.into() + } +} + +impl Entry<K2VItemPartition, String> for K2VItem { + fn partition_key(&self) -> &K2VItemPartition { + &self.partition + } + fn sort_key(&self) -> &String { + &self.sort_key + } + fn is_tombstone(&self) -> bool { + self.values() + .iter() + .all(|v| matches!(v, DvvsValue::Deleted)) + } +} + +pub struct K2VItemTable { + pub(crate) counter_table: Arc<IndexCounter<K2VItem>>, + pub(crate) subscriptions: Arc<SubscriptionManager>, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct ItemFilter { + pub exclude_only_tombstones: bool, + pub conflicts_only: bool, +} + +impl TableSchema for K2VItemTable { + const TABLE_NAME: &'static str = "k2v_item"; + + type P = K2VItemPartition; + type S = String; + type E = K2VItem; + type Filter = ItemFilter; + + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + // This result can be returned by `counter_table.count()` for instance + // if messagepack serialization or deserialization fails at some step. + // Warn admin but ignore this error for now, that's all we can do. + error!( + "Unable to update K2V item counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Notify + if let Some(new_ent) = new { + self.subscriptions.notify(new_ent); + } + + Ok(()) + } + + #[allow(clippy::nonminimal_bool)] + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + let v = entry.values(); + !(filter.conflicts_only && v.len() < 2) + && !(filter.exclude_only_tombstones && entry.is_tombstone()) + } +} + +impl CountedItem for K2VItem { + const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter_v2"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = K2V item's partition key + type CS = String; + + fn counter_partition_key(&self) -> &Uuid { + &self.partition.bucket_id + } + fn counter_sort_key(&self) -> &String { + &self.partition.partition_key + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let values = self.values(); + + let n_entries = if self.is_tombstone() { 0 } else { 1 }; + let n_conflicts = if values.len() > 1 { 1 } else { 0 }; + let n_values = values + .iter() + .filter(|v| matches!(v, DvvsValue::Value(_))) + .count() as i64; + let n_bytes = values + .iter() + .map(|v| match v { + DvvsValue::Deleted => 0, + DvvsValue::Value(v) => v.len() as i64, + }) + .sum(); + + vec![ + (ENTRIES, n_entries), + (CONFLICTS, n_conflicts), + (VALUES, n_values), + (BYTES, n_bytes), + ] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dvvsentry_merge_simple() { + let e1 = DvvsEntry { + t_discard: 4, + values: vec![ + (5, DvvsValue::Value(vec![15])), + (6, DvvsValue::Value(vec![16])), + ], + }; + let e2 = DvvsEntry { + t_discard: 5, + values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)], + }; + + let mut e3 = e1.clone(); + e3.merge(&e2); + assert_eq!(e2, e3); + } +} diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs new file mode 100644 index 00000000..f6a96151 --- /dev/null +++ b/src/model/k2v/mod.rs @@ -0,0 +1,6 @@ +pub mod causality; + +pub mod item_table; + +pub mod poll; +pub mod rpc; diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs new file mode 100644 index 00000000..93105207 --- /dev/null +++ b/src/model/k2v/poll.rs @@ -0,0 +1,50 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; + +use crate::k2v::item_table::*; + +#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PollKey { + pub partition: K2VItemPartition, + pub sort_key: String, +} + +#[derive(Default)] +pub struct SubscriptionManager { + subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>, +} + +impl SubscriptionManager { + pub fn new() -> Self { + Self::default() + } + + pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> { + let mut subs = self.subscriptions.lock().unwrap(); + if let Some(s) = subs.get(key) { + s.subscribe() + } else { + let (tx, rx) = broadcast::channel(8); + subs.insert(key.clone(), tx); + rx + } + } + + pub fn notify(&self, item: &K2VItem) { + let key = PollKey { + partition: item.partition.clone(), + sort_key: item.sort_key.clone(), + }; + let mut subs = self.subscriptions.lock().unwrap(); + if let Some(s) = subs.get(&key) { + if s.send(item.clone()).is_err() { + // no more subscribers, remove channel from here + // (we will re-create it later if we need to subscribe again) + subs.remove(&key); + } + } + } +} diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs new file mode 100644 index 00000000..a74df277 --- /dev/null +++ b/src/model/k2v/rpc.rs @@ -0,0 +1,341 @@ +//! Module that implements RPCs specific to K2V. +//! This is necessary for insertions into the K2V store, +//! as they have to be transmitted to one of the nodes responsible +//! for storing the entry to be processed (the API entry +//! node does not process the entry directly, as this would +//! mean the vector clock gets much larger than needed). + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use serde::{Deserialize, Serialize}; +use tokio::select; + +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::error::*; + +use garage_rpc::system::System; +use garage_rpc::*; + +use garage_table::replication::{TableReplication, TableShardedReplication}; +use garage_table::{PartitionKey, Table}; + +use crate::k2v::causality::*; +use crate::k2v::item_table::*; +use crate::k2v::poll::*; + +/// RPC messages for K2V +#[derive(Debug, Serialize, Deserialize)] +enum K2VRpc { + Ok, + InsertItem(InsertedItem), + InsertManyItems(Vec<InsertedItem>), + PollItem { + key: PollKey, + causal_context: CausalContext, + timeout_msec: u64, + }, + PollItemResponse(Option<K2VItem>), +} + +#[derive(Debug, Serialize, Deserialize)] +struct InsertedItem { + partition: K2VItemPartition, + sort_key: String, + causal_context: Option<CausalContext>, + value: DvvsValue, +} + +impl Rpc for K2VRpc { + type Response = Result<K2VRpc, Error>; +} + +/// The block manager, handling block exchange between nodes, and block storage on local node +pub struct K2VRpcHandler { + system: Arc<System>, + item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + endpoint: Arc<Endpoint<K2VRpc, Self>>, + subscriptions: Arc<SubscriptionManager>, +} + +impl K2VRpcHandler { + pub fn new( + system: Arc<System>, + item_table: Arc<Table<K2VItemTable, TableShardedReplication>>, + subscriptions: Arc<SubscriptionManager>, + ) -> Arc<Self> { + let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string()); + + let rpc_handler = Arc::new(Self { + system, + item_table, + endpoint, + subscriptions, + }); + rpc_handler.endpoint.set_handler(rpc_handler.clone()); + + rpc_handler + } + + // ---- public interface ---- + + pub async fn insert( + &self, + bucket_id: Uuid, + partition_key: String, + sort_key: String, + causal_context: Option<CausalContext>, + value: DvvsValue, + ) -> Result<(), Error> { + let partition = K2VItemPartition { + bucket_id, + partition_key, + }; + let mut who = self + .item_table + .data + .replication + .write_nodes(&partition.hash()); + who.sort(); + + self.system + .rpc + .try_call_many( + &self.endpoint, + &who[..], + K2VRpc::InsertItem(InsertedItem { + partition, + sort_key, + causal_context, + value, + }), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(1) + .interrupt_after_quorum(true), + ) + .await?; + + Ok(()) + } + + pub async fn insert_batch( + &self, + bucket_id: Uuid, + items: Vec<(String, String, Option<CausalContext>, DvvsValue)>, + ) -> Result<(), Error> { + let n_items = items.len(); + + let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); + + for (partition_key, sort_key, causal_context, value) in items { + let partition = K2VItemPartition { + bucket_id, + partition_key, + }; + let mut who = self + .item_table + .data + .replication + .write_nodes(&partition.hash()); + who.sort(); + + call_list.entry(who).or_default().push(InsertedItem { + partition, + sort_key, + causal_context, + value, + }); + } + + debug!( + "K2V insert_batch: {} requests to insert {} items", + call_list.len(), + n_items + ); + let call_futures = call_list.into_iter().map(|(nodes, items)| async move { + let resp = self + .system + .rpc + .try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::InsertManyItems(items), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(1) + .interrupt_after_quorum(true), + ) + .await?; + Ok::<_, Error>((nodes, resp)) + }); + + let mut resps = call_futures.collect::<FuturesUnordered<_>>(); + while let Some(resp) = resps.next().await { + resp?; + } + + Ok(()) + } + + pub async fn poll( + &self, + bucket_id: Uuid, + partition_key: String, + sort_key: String, + causal_context: CausalContext, + timeout_msec: u64, + ) -> Result<Option<K2VItem>, Error> { + let poll_key = PollKey { + partition: K2VItemPartition { + bucket_id, + partition_key, + }, + sort_key, + }; + let nodes = self + .item_table + .data + .replication + .write_nodes(&poll_key.partition.hash()); + + let rpc = self.system.rpc.try_call_many( + &self.endpoint, + &nodes[..], + K2VRpc::PollItem { + key: poll_key, + causal_context, + timeout_msec, + }, + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.item_table.data.replication.read_quorum()) + .without_timeout(), + ); + let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let resps = select! { + r = rpc => r?, + _ = tokio::time::sleep(timeout_duration) => return Ok(None), + }; + + let mut resp: Option<K2VItem> = None; + for v in resps { + match v { + K2VRpc::PollItemResponse(Some(x)) => { + if let Some(y) = &mut resp { + y.merge(&x); + } else { + resp = Some(x); + } + } + K2VRpc::PollItemResponse(None) => { + return Ok(None); + } + v => return Err(Error::unexpected_rpc_message(v)), + } + } + + Ok(resp) + } + + // ---- internal handlers ---- + + async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> { + let new = self.local_insert(item)?; + + // Propagate to rest of network + if let Some(updated) = new { + self.item_table.insert(&updated).await?; + } + + Ok(K2VRpc::Ok) + } + + async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> { + let mut updated_vec = vec![]; + + for item in items { + let new = self.local_insert(item)?; + + if let Some(updated) = new { + updated_vec.push(updated); + } + } + + // Propagate to rest of network + if !updated_vec.is_empty() { + self.item_table.insert_many(&updated_vec).await?; + } + + Ok(K2VRpc::Ok) + } + + fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> { + let tree_key = self + .item_table + .data + .tree_key(&item.partition, &item.sort_key); + + self.item_table + .data + .update_entry_with(&tree_key[..], |ent| { + let mut ent = ent.unwrap_or_else(|| { + K2VItem::new( + item.partition.bucket_id, + item.partition.partition_key.clone(), + item.sort_key.clone(), + ) + }); + ent.update(self.system.id, &item.causal_context, item.value.clone()); + ent + }) + } + + async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> { + let mut chan = self.subscriptions.subscribe(key); + + let mut value = self + .item_table + .data + .read_entry(&key.partition, &key.sort_key)? + .map(|bytes| self.item_table.data.decode_entry(&bytes[..])) + .transpose()? + .unwrap_or_else(|| { + K2VItem::new( + key.partition.bucket_id, + key.partition.partition_key.clone(), + key.sort_key.clone(), + ) + }); + + while !value.causal_context().is_newer_than(ct) { + value = chan.recv().await?; + } + + Ok(value) + } +} + +#[async_trait] +impl EndpointHandler<K2VRpc> for K2VRpcHandler { + async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> { + match message { + K2VRpc::InsertItem(item) => self.handle_insert(item).await, + K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await, + K2VRpc::PollItem { + key, + causal_context, + timeout_msec, + } => { + let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec)); + select! { + ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse), + _ = delay => Ok(K2VRpc::PollItemResponse(None)), + } + } + m => Err(Error::unexpected_rpc_message(m)), + } + } +} diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 330e83f0..9d2fc783 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -6,10 +6,10 @@ use garage_util::data::*; use crate::permission::BucketKeyPerm; -use garage_model_050::key_table as old; +use crate::prev::v051::key_table as old; /// An api key -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Key { /// The id of the key (immutable), used as partition key pub key_id: String, @@ -19,7 +19,7 @@ pub struct Key { } /// Configuration for a key -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct KeyParams { /// The secret_key associated (immutable) pub secret_key: String, diff --git a/src/model/lib.rs b/src/model/lib.rs index 05a4cdc7..4f20ea46 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -1,14 +1,20 @@ #[macro_use] extern crate tracing; +// For migration from previous versions +pub(crate) mod prev; + pub mod permission; -pub mod block_ref_table; +pub mod index_counter; + pub mod bucket_alias_table; pub mod bucket_table; pub mod key_table; -pub mod object_table; -pub mod version_table; + +#[cfg(feature = "k2v")] +pub mod k2v; +pub mod s3; pub mod garage; pub mod helper; diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 7e61957a..cd6ad26a 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -5,7 +5,7 @@ use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; -use garage_model_050::bucket_table as old_bucket; +use crate::prev::v051::bucket_table as old_bucket; use crate::bucket_alias_table::*; use crate::bucket_table::*; @@ -25,11 +25,15 @@ impl Migrate { .open_tree("bucket:table") .map_err(GarageError::from)?; - for res in tree.iter() { + let mut old_buckets = vec![]; + for res in tree.iter().map_err(GarageError::from)? { let (_k, v) = res.map_err(GarageError::from)?; let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) .map_err(GarageError::from)?; + old_buckets.push(bucket); + } + for bucket in old_buckets { if let old_bucket::BucketState::Present(p) = bucket.state.get() { self.migrate_buckets050_do_bucket(&bucket, p).await?; } @@ -73,6 +77,7 @@ impl Migrate { local_aliases: LwwMap::new(), website_config: Lww::new(website), cors_config: Lww::new(None), + quotas: Lww::new(Default::default()), }), }) .await?; diff --git a/src/model/prev/mod.rs b/src/model/prev/mod.rs new file mode 100644 index 00000000..68bb1502 --- /dev/null +++ b/src/model/prev/mod.rs @@ -0,0 +1 @@ +pub(crate) mod v051; diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs new file mode 100644 index 00000000..628a49dd --- /dev/null +++ b/src/model/prev/v051/bucket_table.rs @@ -0,0 +1,63 @@ +use serde::{Deserialize, Serialize}; + +use garage_table::crdt::Crdt; +use garage_table::*; + +use super::key_table::PermissionSet; + +/// A bucket is a collection of objects +/// +/// Its parameters are not directly accessible as: +/// - It must be possible to merge paramaters, hence the use of a LWW CRDT. +/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct Bucket { + /// Name of the bucket + pub name: String, + /// State, and configuration if not deleted, of the bucket + pub state: crdt::Lww<BucketState>, +} + +/// State of a bucket +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub enum BucketState { + /// The bucket is deleted + Deleted, + /// The bucket exists + Present(BucketParams), +} + +impl Crdt for BucketState { + fn merge(&mut self, o: &Self) { + match o { + BucketState::Deleted => *self = BucketState::Deleted, + BucketState::Present(other_params) => { + if let BucketState::Present(params) = self { + params.merge(other_params); + } + } + } + } +} + +/// Configuration for a bucket +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct BucketParams { + /// Map of key with access to the bucket, and what kind of access they give + pub authorized_keys: crdt::LwwMap<String, PermissionSet>, + /// Is the bucket served as http + pub website: crdt::Lww<bool>, +} + +impl Crdt for BucketParams { + fn merge(&mut self, o: &Self) { + self.authorized_keys.merge(&o.authorized_keys); + self.website.merge(&o.website); + } +} + +impl Crdt for Bucket { + fn merge(&mut self, other: &Self) { + self.state.merge(&other.state); + } +} diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs new file mode 100644 index 00000000..37516b1c --- /dev/null +++ b/src/model/prev/v051/key_table.rs @@ -0,0 +1,50 @@ +use serde::{Deserialize, Serialize}; + +use garage_table::crdt::*; +use garage_table::*; + +/// An api key +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct Key { + /// The id of the key (immutable), used as partition key + pub key_id: String, + + /// The secret_key associated + pub secret_key: String, + + /// Name for the key + pub name: crdt::Lww<String>, + + /// Is the key deleted + pub deleted: crdt::Bool, + + /// Buckets in which the key is authorized. Empty if `Key` is deleted + // CRDT interaction: deleted implies authorized_buckets is empty + pub authorized_buckets: crdt::LwwMap<String, PermissionSet>, +} + +/// Permission given to a key in a bucket +#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct PermissionSet { + /// The key can be used to read the bucket + pub allow_read: bool, + /// The key can be used to write in the bucket + pub allow_write: bool, +} + +impl AutoCrdt for PermissionSet { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for Key { + fn merge(&mut self, other: &Self) { + self.name.merge(&other.name); + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.authorized_buckets.clear(); + } else { + self.authorized_buckets.merge(&other.authorized_buckets); + } + } +} diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs new file mode 100644 index 00000000..7a954752 --- /dev/null +++ b/src/model/prev/v051/mod.rs @@ -0,0 +1,4 @@ +pub(crate) mod bucket_table; +pub(crate) mod key_table; +pub(crate) mod object_table; +pub(crate) mod version_table; diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs new file mode 100644 index 00000000..e79e5787 --- /dev/null +++ b/src/model/prev/v051/object_table.rs @@ -0,0 +1,149 @@ +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +use garage_util::data::*; + +use garage_table::crdt::*; + +/// An object +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket: String, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + versions: Vec<ObjectVersion>, +} + +impl Object { + /// Get a list of currently stored versions of `Object` + pub fn versions(&self) -> &[ObjectVersion] { + &self.versions[..] + } +} + +/// Informations about a version of an object +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, +} + +/// State of an object version +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionState { + /// The version is being received + Uploading(ObjectVersionHeaders), + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, +} + +impl Crdt for ObjectVersionState { + fn merge(&mut self, other: &Self) { + use ObjectVersionState::*; + match other { + Aborted => { + *self = Aborted; + } + Complete(b) => match self { + Aborted => {} + Complete(a) => { + a.merge(b); + } + Uploading(_) => { + *self = Complete(b.clone()); + } + }, + Uploading(_) => {} + } + } +} + +/// Data stored in object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub enum ObjectVersionData { + /// The object was deleted, this Version is a tombstone to mark it as such + DeleteMarker, + /// The object is short, it's stored inlined + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), + /// The object is not short, Hash of first block is stored here, next segments hashes are + /// stored in the version table + FirstBlock(ObjectVersionMeta, Hash), +} + +impl AutoCrdt for ObjectVersionData { + const WARN_IF_DIFFERENT: bool = true; +} + +/// Metadata about the object version +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionMeta { + /// Headers to send to the client + pub headers: ObjectVersionHeaders, + /// Size of the object + pub size: u64, + /// etag of the object + pub etag: String, +} + +/// Additional headers for an object +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionHeaders { + /// Content type of the object + pub content_type: String, + /// Any other http headers to send + pub other: BTreeMap<String, String>, +} + +impl ObjectVersion { + fn cmp_key(&self) -> (u64, Uuid) { + (self.timestamp, self.uuid) + } + + /// Is the object version completely received + pub fn is_complete(&self) -> bool { + matches!(self.state, ObjectVersionState::Complete(_)) + } +} + +impl Crdt for Object { + fn merge(&mut self, other: &Self) { + // Merge versions from other into here + for other_v in other.versions.iter() { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { + Ok(i) => { + self.versions[i].state.merge(&other_v.state); + } + Err(i) => { + self.versions.insert(i, other_v.clone()); + } + } + } + + // Remove versions which are obsolete, i.e. those that come + // before the last version which .is_complete(). + let last_complete = self + .versions + .iter() + .enumerate() + .rev() + .find(|(_, v)| v.is_complete()) + .map(|(vi, _)| vi); + + if let Some(last_vi) = last_complete { + self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>(); + } + } +} diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs new file mode 100644 index 00000000..c11c62d5 --- /dev/null +++ b/src/model/prev/v051/version_table.rs @@ -0,0 +1,79 @@ +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +use garage_table::crdt::*; +use garage_table::*; + +/// A version of an object +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, + /// Etag of each part in case of a multipart upload, empty otherwise + pub parts_etags: crdt::Map<u64, String>, + + // Back link to bucket+key so that we can figure if + // this was deleted later on + /// Bucket in which the related object is stored + pub bucket: String, + /// Key in which the related object is stored + pub key: String, +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlockKey { + /// Number of the part + pub part_number: u64, + /// Offset of this sub-segment in its part + pub offset: u64, +} + +impl Ord for VersionBlockKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.offset.cmp(&other.offset)) + } +} + +impl PartialOrd for VersionBlockKey { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +/// Informations about a single block +#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct VersionBlock { + /// Blake2 sum of the block + pub hash: Hash, + /// Size of the block + pub size: u64, +} + +impl AutoCrdt for VersionBlock { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for Version { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.blocks.clear(); + self.parts_etags.clear(); + } else { + self.blocks.merge(&other.blocks); + self.parts_etags.merge(&other.parts_etags); + } + } +} diff --git a/src/model/block_ref_table.rs b/src/model/s3/block_ref_table.rs index b6945403..c7017409 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::data::*; use garage_table::crdt::Crdt; @@ -8,7 +10,7 @@ use garage_table::*; use garage_block::manager::*; -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { /// Hash (blake2 sum) of the block, used as partition key pub block: Hash, @@ -51,21 +53,22 @@ impl TableSchema for BlockRefTable { type E = BlockRef; type Filter = DeletedFilter; - fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { - #[allow(clippy::or_fun_call)] - let block = &old.as_ref().or(new.as_ref()).unwrap().block; - let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); - let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false); + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + let block = old.or(new).unwrap().block; + let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); + let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { - if let Err(e) = self.block_manager.block_incref(block) { - warn!("block_incref failed for block {:?}: {}", block, e); - } + self.block_manager.block_incref(tx, block)?; } if was_before && !is_after { - if let Err(e) = self.block_manager.block_decref(block) { - warn!("block_decref failed for block {:?}: {}", block, e); - } + self.block_manager.block_decref(tx, block)?; } + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs new file mode 100644 index 00000000..4e94337d --- /dev/null +++ b/src/model/s3/mod.rs @@ -0,0 +1,3 @@ +pub mod block_ref_table; +pub mod object_table; +pub mod version_table; diff --git a/src/model/object_table.rs b/src/model/s3/object_table.rs index da53878e..26ff57f6 100644 --- a/src/model/object_table.rs +++ b/src/model/s3/object_table.rs @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -9,12 +11,17 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; -use crate::version_table::*; +use crate::index_counter::*; +use crate::s3::version_table::*; + +use crate::prev::v051::object_table as old; -use garage_model_050::object_table as old; +pub const OBJECTS: &str = "objects"; +pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; +pub const BYTES: &str = "bytes"; /// An object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Object { /// The bucket in which the object is stored, used as partition key pub bucket_id: Uuid, @@ -63,7 +70,7 @@ impl Object { } /// Informations about a version of an object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct ObjectVersion { /// Id of the version pub uuid: Uuid, @@ -74,7 +81,7 @@ pub struct ObjectVersion { } /// State of an object version -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionState { /// The version is being received Uploading(ObjectVersionHeaders), @@ -216,6 +223,7 @@ impl Crdt for Object { pub struct ObjectTable { pub background: Arc<BackgroundRunner>, pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, + pub object_counter_table: Arc<IndexCounter<Object>>, } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -232,8 +240,26 @@ impl TableSchema for ObjectTable { type E = Object; type Filter = ObjectFilter; - fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.object_counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + error!( + "Unable to update object counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Spawn threads that propagates deletions to version table let version_table = self.version_table.clone(); + let old = old.cloned(); + let new = new.cloned(); + self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of old versions @@ -256,7 +282,8 @@ impl TableSchema for ObjectTable { } } Ok(()) - }) + }); + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { @@ -272,6 +299,49 @@ impl TableSchema for ObjectTable { } } +impl CountedItem for Object { + const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = nothing + type CS = EmptyKey; + + fn counter_partition_key(&self) -> &Uuid { + &self.bucket_id + } + fn counter_sort_key(&self) -> &EmptyKey { + &EmptyKey + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let versions = self.versions(); + let n_objects = if versions.iter().any(|v| v.is_data()) { + 1 + } else { + 0 + }; + let n_unfinished_uploads = versions + .iter() + .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_))) + .count(); + let n_bytes = versions + .iter() + .map(|v| match &v.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) + | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size, + _ => 0, + }) + .sum::<u64>(); + + vec![ + (OBJECTS, n_objects), + (UNFINISHED_UPLOADS, n_unfinished_uploads as i64), + (BYTES, n_bytes as i64), + ] + } +} + // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // (we just want to change bucket into bucket_id by hashing it) diff --git a/src/model/version_table.rs b/src/model/s3/version_table.rs index 839b1f4f..6bc2ecd1 100644 --- a/src/model/version_table.rs +++ b/src/model/s3/version_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -8,12 +10,12 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; -use crate::block_ref_table::*; +use crate::s3::block_ref_table::*; -use garage_model_050::version_table as old; +use crate::prev::v051::version_table as old; /// A version of an object -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Version { /// UUID of the version, used as partition key pub uuid: Uuid, @@ -137,8 +139,16 @@ impl TableSchema for VersionTable { type E = Version; type Filter = DeletedFilter; - fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { let block_ref_table = self.block_ref_table.clone(); + let old = old.cloned(); + let new = new.cloned(); + self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks @@ -157,7 +167,9 @@ impl TableSchema for VersionTable { } } Ok(()) - }) + }); + + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { |