aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
authorMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
committerMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
commit829f815a897b04986559910bbcbf53625adcdf20 (patch)
tree6db3c27cff2aded754a641d1f2b05c83be701267 /src/model
parent99f96b9564c9c841dc6c56f1255a6e70ff884d46 (diff)
parenta096ced35562bd0a8877a1ee2f755be1edafe343 (diff)
downloadgarage-829f815a897b04986559910bbcbf53625adcdf20.tar.gz
garage-829f815a897b04986559910bbcbf53625adcdf20.zip
Merge remote-tracking branch 'origin/main' into optimal-layout
Diffstat (limited to 'src/model')
-rw-r--r--src/model/Cargo.toml26
-rw-r--r--src/model/bucket_alias_table.rs2
-rw-r--r--src/model/bucket_table.rs23
-rw-r--r--src/model/garage.rs203
-rw-r--r--src/model/helper/bucket.rs151
-rw-r--r--src/model/helper/error.rs10
-rw-r--r--src/model/helper/key.rs102
-rw-r--r--src/model/helper/mod.rs1
-rw-r--r--src/model/index_counter.rs496
-rw-r--r--src/model/k2v/causality.rs96
-rw-r--r--src/model/k2v/item_table.rs305
-rw-r--r--src/model/k2v/mod.rs6
-rw-r--r--src/model/k2v/poll.rs50
-rw-r--r--src/model/k2v/rpc.rs341
-rw-r--r--src/model/key_table.rs6
-rw-r--r--src/model/lib.rs12
-rw-r--r--src/model/migrate.rs9
-rw-r--r--src/model/prev/mod.rs1
-rw-r--r--src/model/prev/v051/bucket_table.rs63
-rw-r--r--src/model/prev/v051/key_table.rs50
-rw-r--r--src/model/prev/v051/mod.rs4
-rw-r--r--src/model/prev/v051/object_table.rs149
-rw-r--r--src/model/prev/v051/version_table.rs79
-rw-r--r--src/model/s3/block_ref_table.rs (renamed from src/model/block_ref_table.rs)27
-rw-r--r--src/model/s3/mod.rs3
-rw-r--r--src/model/s3/object_table.rs (renamed from src/model/object_table.rs)84
-rw-r--r--src/model/s3/version_table.rs (renamed from src/model/version_table.rs)22
27 files changed, 2174 insertions, 147 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 007cec89..2c2e2bfe 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_model"
-version = "0.7.0"
+version = "0.8.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,22 +14,22 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_rpc = { version = "0.7.0", path = "../rpc" }
-garage_table = { version = "0.7.0", path = "../table" }
-garage_block = { version = "0.7.0", path = "../block" }
-garage_util = { version = "0.7.0", path = "../util" }
-garage_model_050 = { package = "garage_model", version = "0.5.1" }
+garage_db = { version = "0.8.0", path = "../db" }
+garage_rpc = { version = "0.8.0", path = "../rpc" }
+garage_table = { version = "0.8.0", path = "../table" }
+garage_block = { version = "0.8.0", path = "../block" }
+garage_util = { version = "0.8.0", path = "../util" }
async-trait = "0.1.7"
arc-swap = "1.0"
+blake2 = "0.9"
err-derive = "0.3"
hex = "0.4"
+base64 = "0.13"
tracing = "0.1.30"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
@@ -39,6 +39,10 @@ futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
opentelemetry = "0.17"
-#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-#netapp = { version = "0.4", path = "../../../netapp" }
-netapp = "0.4"
+netapp = "0.5"
+
+[features]
+k2v = [ "garage_util/k2v" ]
+lmdb = [ "garage_db/lmdb" ]
+sled = [ "garage_db/sled" ]
+sqlite = [ "garage_db/sqlite" ]
diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs
index fce03d04..fcd1536e 100644
--- a/src/model/bucket_alias_table.rs
+++ b/src/model/bucket_alias_table.rs
@@ -7,7 +7,7 @@ use garage_table::*;
/// The bucket alias table holds the names given to buckets
/// in the global namespace.
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketAlias {
name: String,
pub state: crdt::Lww<Option<Uuid>>,
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 7c7b9f30..7be42702 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
-use garage_table::crdt::Crdt;
+use garage_table::crdt::*;
use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
@@ -12,7 +12,7 @@ use crate::permission::BucketKeyPerm;
/// Its parameters are not directly accessible as:
/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
/// ID of the bucket
pub id: Uuid,
@@ -21,7 +21,7 @@ pub struct Bucket {
}
/// Configuration for a bucket
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
/// Bucket's creation date
pub creation_date: u64,
@@ -44,6 +44,9 @@ pub struct BucketParams {
pub website_config: crdt::Lww<Option<WebsiteConfig>>,
/// CORS rules
pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
+ /// Bucket quotas
+ #[serde(default)]
+ pub quotas: crdt::Lww<BucketQuotas>,
}
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
@@ -62,6 +65,18 @@ pub struct CorsRule {
pub expose_headers: Vec<String>,
}
+#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub struct BucketQuotas {
+ /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
+ pub max_size: Option<u64>,
+ /// Maximum number of non-deleted objects in the bucket
+ pub max_objects: Option<u64>,
+}
+
+impl AutoCrdt for BucketQuotas {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
impl BucketParams {
/// Create an empty BucketParams with no authorized keys and no website accesss
pub fn new() -> Self {
@@ -72,6 +87,7 @@ impl BucketParams {
local_aliases: crdt::LwwMap::new(),
website_config: crdt::Lww::new(None),
cors_config: crdt::Lww::new(None),
+ quotas: crdt::Lww::new(BucketQuotas::default()),
}
}
}
@@ -86,6 +102,7 @@ impl Crdt for BucketParams {
self.website_config.merge(&o.website_config);
self.cors_config.merge(&o.cors_config);
+ self.quotas.merge(&o.quotas);
}
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index abdb920a..75012952 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -2,8 +2,11 @@ use std::sync::Arc;
use netapp::NetworkKey;
+use garage_db as db;
+
use garage_util::background::*;
use garage_util::config::*;
+use garage_util::error::*;
use garage_rpc::system::System;
@@ -13,13 +16,18 @@ use garage_table::replication::TableFullReplication;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-use crate::block_ref_table::*;
+use crate::s3::block_ref_table::*;
+use crate::s3::object_table::*;
+use crate::s3::version_table::*;
+
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::helper;
+use crate::index_counter::*;
use crate::key_table::*;
-use crate::object_table::*;
-use crate::version_table::*;
+
+#[cfg(feature = "k2v")]
+use crate::k2v::{item_table::*, poll::*, rpc::*};
/// An entire Garage full of data
pub struct Garage {
@@ -27,7 +35,7 @@ pub struct Garage {
pub config: Config,
/// The local database
- pub db: sled::Db,
+ pub db: db::Db,
/// A background job runner
pub background: Arc<BackgroundRunner>,
/// The membership manager
@@ -35,21 +43,118 @@ pub struct Garage {
/// The block manager
pub block_manager: Arc<BlockManager>,
- /// Table containing informations about buckets
+ /// Table containing buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
- /// Table containing informations about bucket aliases
+ /// Table containing bucket aliases
pub bucket_alias_table: Arc<Table<BucketAliasTable, TableFullReplication>>,
- /// Table containing informations about api keys
+ /// Table containing api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
+ /// Table containing S3 objects
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
+ /// Counting table containing object counters
+ pub object_counter_table: Arc<IndexCounter<Object>>,
+ /// Table containing S3 object versions
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ /// Table containing S3 block references (not blocks themselves)
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
+
+ #[cfg(feature = "k2v")]
+ pub k2v: GarageK2V,
+}
+
+#[cfg(feature = "k2v")]
+pub struct GarageK2V {
+ /// Table containing K2V items
+ pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ /// Indexing table containing K2V item counters
+ pub counter_table: Arc<IndexCounter<K2VItem>>,
+ /// K2V RPC handler
+ pub rpc: Arc<K2VRpcHandler>,
}
impl Garage {
/// Create and run garage
- pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> {
+ // Create meta dir and data dir if they don't exist already
+ std::fs::create_dir_all(&config.metadata_dir)
+ .ok_or_message("Unable to create Garage metadata directory")?;
+ std::fs::create_dir_all(&config.data_dir)
+ .ok_or_message("Unable to create Garage data directory")?;
+
+ info!("Opening database...");
+ let mut db_path = config.metadata_dir.clone();
+ let db = match config.db_engine.as_str() {
+ // ---- Sled DB ----
+ #[cfg(feature = "sled")]
+ "sled" => {
+ db_path.push("db");
+ info!("Opening Sled database at: {}", db_path.display());
+ let db = db::sled_adapter::sled::Config::default()
+ .path(&db_path)
+ .cache_capacity(config.sled_cache_capacity)
+ .flush_every_ms(Some(config.sled_flush_every_ms))
+ .open()
+ .expect("Unable to open sled DB");
+ db::sled_adapter::SledDb::init(db)
+ }
+ #[cfg(not(feature = "sled"))]
+ "sled" => return Err(Error::Message("sled db not available in this build".into())),
+ // ---- Sqlite DB ----
+ #[cfg(feature = "sqlite")]
+ "sqlite" | "sqlite3" | "rusqlite" => {
+ db_path.push("db.sqlite");
+ info!("Opening Sqlite database at: {}", db_path.display());
+ let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
+ .expect("Unable to open sqlite DB");
+ db::sqlite_adapter::SqliteDb::init(db)
+ }
+ #[cfg(not(feature = "sqlite"))]
+ "sqlite" | "sqlite3" | "rusqlite" => {
+ return Err(Error::Message(
+ "sqlite db not available in this build".into(),
+ ))
+ }
+ // ---- LMDB DB ----
+ #[cfg(feature = "lmdb")]
+ "lmdb" | "heed" => {
+ db_path.push("db.lmdb");
+ info!("Opening LMDB database at: {}", db_path.display());
+ std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
+ let map_size = garage_db::lmdb_adapter::recommended_map_size();
+
+ use db::lmdb_adapter::heed;
+ let mut env_builder = heed::EnvOpenOptions::new();
+ env_builder.max_dbs(100);
+ env_builder.max_readers(500);
+ env_builder.map_size(map_size);
+ unsafe {
+ env_builder.flag(heed::flags::Flags::MdbNoSync);
+ env_builder.flag(heed::flags::Flags::MdbNoMetaSync);
+ }
+ let db = env_builder.open(&db_path).expect("Unable to open LMDB DB");
+ db::lmdb_adapter::LmdbDb::init(db)
+ }
+ #[cfg(not(feature = "lmdb"))]
+ "lmdb" | "heed" => return Err(Error::Message("lmdb db not available in this build".into())),
+ // ---- Unavailable DB engine ----
+ e => {
+ return Err(Error::Message(format!(
+ "Unsupported DB engine: {} (options: {})",
+ e,
+ vec![
+ #[cfg(feature = "sled")]
+ "sled",
+ #[cfg(feature = "sqlite")]
+ "sqlite",
+ #[cfg(feature = "lmdb")]
+ "lmdb",
+ ]
+ .join(", ")
+ )));
+ }
+ };
+
let network_key = NetworkKey::from_slice(
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
)
@@ -64,7 +169,7 @@ impl Garage {
background.clone(),
replication_mode.replication_factor(),
&config,
- );
+ )?;
let data_rep_param = TableShardedReplication {
system: system.clone(),
@@ -90,11 +195,25 @@ impl Garage {
&db,
config.data_dir.clone(),
config.compression_level,
- config.block_manager_background_tranquility,
data_rep_param,
system.clone(),
);
+ // ---- admin tables ----
+ info!("Initialize bucket_table...");
+ let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
+
+ info!("Initialize bucket_alias_table...");
+ let bucket_alias_table = Table::new(
+ BucketAliasTable,
+ control_rep_param.clone(),
+ system.clone(),
+ &db,
+ );
+ info!("Initialize key_table_table...");
+ let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
+
+ // ---- S3 tables ----
info!("Initialize block_ref_table...");
let block_ref_table = Table::new(
BlockRefTable {
@@ -116,34 +235,28 @@ impl Garage {
&db,
);
+ info!("Initialize object counter table...");
+ let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
+
info!("Initialize object_table...");
+ #[allow(clippy::redundant_clone)]
let object_table = Table::new(
ObjectTable {
background: background.clone(),
version_table: version_table.clone(),
+ object_counter_table: object_counter_table.clone(),
},
- meta_rep_param,
- system.clone(),
- &db,
- );
-
- info!("Initialize bucket_table...");
- let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
-
- info!("Initialize bucket_alias_table...");
- let bucket_alias_table = Table::new(
- BucketAliasTable,
- control_rep_param.clone(),
+ meta_rep_param.clone(),
system.clone(),
&db,
);
- info!("Initialize key_table_table...");
- let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
-
- info!("Initialize Garage...");
+ // ---- K2V ----
+ #[cfg(feature = "k2v")]
+ let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
- Arc::new(Self {
+ // -- done --
+ Ok(Arc::new(Self {
config,
db,
background,
@@ -153,12 +266,46 @@ impl Garage {
bucket_alias_table,
key_table,
object_table,
+ object_counter_table,
version_table,
block_ref_table,
- })
+ #[cfg(feature = "k2v")]
+ k2v,
+ }))
}
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
helper::bucket::BucketHelper(self)
}
+
+ pub fn key_helper(&self) -> helper::key::KeyHelper {
+ helper::key::KeyHelper(self)
+ }
+}
+
+#[cfg(feature = "k2v")]
+impl GarageK2V {
+ fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
+ info!("Initialize K2V counter table...");
+ let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
+ info!("Initialize K2V subscription manager...");
+ let subscriptions = Arc::new(SubscriptionManager::new());
+ info!("Initialize K2V item table...");
+ let item_table = Table::new(
+ K2VItemTable {
+ counter_table: counter_table.clone(),
+ subscriptions: subscriptions.clone(),
+ },
+ meta_rep_param,
+ system.clone(),
+ db,
+ );
+ let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
+
+ Self {
+ item_table,
+ counter_table,
+ rpc,
+ }
+ }
}
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 706faf26..130ba5be 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -1,15 +1,18 @@
-use garage_table::util::EmptyKey;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
use garage_util::time::*;
+use garage_table::util::*;
+
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::garage::Garage;
use crate::helper::error::*;
-use crate::key_table::{Key, KeyFilter};
+use crate::helper::key::KeyHelper;
+use crate::key_table::*;
use crate::permission::BucketKeyPerm;
+use crate::s3::object_table::ObjectFilter;
pub struct BucketHelper<'a>(pub(crate) &'a Garage);
@@ -49,6 +52,23 @@ impl<'a> BucketHelper<'a> {
}
}
+ #[allow(clippy::ptr_arg)]
+ pub async fn resolve_bucket(&self, bucket_name: &String, api_key: &Key) -> Result<Uuid, Error> {
+ let api_key_params = api_key
+ .state
+ .as_option()
+ .ok_or_message("Key should not be deleted at this point")?;
+
+ if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) {
+ Ok(*bucket_id)
+ } else {
+ Ok(self
+ .resolve_global_bucket_name(bucket_name)
+ .await?
+ .ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string()))?)
+ }
+ }
+
/// Returns a Bucket if it is present in bucket table,
/// even if it is in deleted state. Querying a non-existing
/// bucket ID returns an internal error.
@@ -71,63 +91,7 @@ impl<'a> BucketHelper<'a> {
.get(&EmptyKey, &bucket_id)
.await?
.filter(|b| !b.is_deleted())
- .ok_or_bad_request(format!(
- "Bucket {:?} does not exist or has been deleted",
- bucket_id
- ))
- }
-
- /// Returns a Key if it is present in key table,
- /// even if it is in deleted state. Querying a non-existing
- /// key ID returns an internal error.
- pub async fn get_internal_key(&self, key_id: &String) -> Result<Key, Error> {
- Ok(self
- .0
- .key_table
- .get(&EmptyKey, key_id)
- .await?
- .ok_or_message(format!("Key {} does not exist", key_id))?)
- }
-
- /// Returns a Key if it is present in key table,
- /// only if it is in non-deleted state.
- /// Querying a non-existing key ID or a deleted key
- /// returns a bad request error.
- pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> {
- self.0
- .key_table
- .get(&EmptyKey, key_id)
- .await?
- .filter(|b| !b.state.is_deleted())
- .ok_or_bad_request(format!("Key {} does not exist or has been deleted", key_id))
- }
-
- /// Returns a Key if it is present in key table,
- /// looking it up by key ID or by a match on its name,
- /// only if it is in non-deleted state.
- /// Querying a non-existing key ID or a deleted key
- /// returns a bad request error.
- pub async fn get_existing_matching_key(&self, pattern: &str) -> Result<Key, Error> {
- let candidates = self
- .0
- .key_table
- .get_range(
- &EmptyKey,
- None,
- Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())),
- 10,
- )
- .await?
- .into_iter()
- .collect::<Vec<_>>();
- if candidates.len() != 1 {
- Err(Error::BadRequest(format!(
- "{} matching keys",
- candidates.len()
- )))
- } else {
- Ok(candidates.into_iter().next().unwrap())
- }
+ .ok_or_else(|| Error::NoSuchBucket(hex::encode(bucket_id)))
}
/// Sets a new alias for a bucket in global namespace.
@@ -141,10 +105,7 @@ impl<'a> BucketHelper<'a> {
alias_name: &String,
) -> Result<(), Error> {
if !is_valid_bucket_name(alias_name) {
- return Err(Error::BadRequest(format!(
- "{}: {}",
- alias_name, INVALID_BUCKET_NAME_MESSAGE
- )));
+ return Err(Error::InvalidBucketName(alias_name.to_string()));
}
let mut bucket = self.get_existing_bucket(bucket_id).await?;
@@ -175,7 +136,7 @@ impl<'a> BucketHelper<'a> {
let alias = match alias {
None => BucketAlias::new(alias_name.clone(), alias_ts, Some(bucket_id))
- .ok_or_bad_request(format!("{}: {}", alias_name, INVALID_BUCKET_NAME_MESSAGE))?,
+ .ok_or_else(|| Error::InvalidBucketName(alias_name.clone()))?,
Some(mut a) => {
a.state = Lww::raw(alias_ts, Some(bucket_id));
a
@@ -263,7 +224,7 @@ impl<'a> BucketHelper<'a> {
.bucket_alias_table
.get(&EmptyKey, alias_name)
.await?
- .ok_or_message(format!("Alias {} not found", alias_name))?;
+ .ok_or_else(|| Error::NoSuchBucket(alias_name.to_string()))?;
// Checks ok, remove alias
let alias_ts = match bucket.state.as_option() {
@@ -302,15 +263,14 @@ impl<'a> BucketHelper<'a> {
key_id: &String,
alias_name: &String,
) -> Result<(), Error> {
+ let key_helper = KeyHelper(self.0);
+
if !is_valid_bucket_name(alias_name) {
- return Err(Error::BadRequest(format!(
- "{}: {}",
- alias_name, INVALID_BUCKET_NAME_MESSAGE
- )));
+ return Err(Error::InvalidBucketName(alias_name.to_string()));
}
let mut bucket = self.get_existing_bucket(bucket_id).await?;
- let mut key = self.get_existing_key(key_id).await?;
+ let mut key = key_helper.get_existing_key(key_id).await?;
let mut key_param = key.state.as_option_mut().unwrap();
@@ -359,8 +319,10 @@ impl<'a> BucketHelper<'a> {
key_id: &String,
alias_name: &String,
) -> Result<(), Error> {
+ let key_helper = KeyHelper(self.0);
+
let mut bucket = self.get_existing_bucket(bucket_id).await?;
- let mut key = self.get_existing_key(key_id).await?;
+ let mut key = key_helper.get_existing_key(key_id).await?;
let mut bucket_p = bucket.state.as_option_mut().unwrap();
@@ -428,8 +390,10 @@ impl<'a> BucketHelper<'a> {
key_id: &String,
mut perm: BucketKeyPerm,
) -> Result<(), Error> {
+ let key_helper = KeyHelper(self.0);
+
let mut bucket = self.get_internal_bucket(bucket_id).await?;
- let mut key = self.get_internal_key(key_id).await?;
+ let mut key = key_helper.get_internal_key(key_id).await?;
if let Some(bstate) = bucket.state.as_option() {
if let Some(kp) = bstate.authorized_keys.get(key_id) {
@@ -465,4 +429,47 @@ impl<'a> BucketHelper<'a> {
Ok(())
}
+
+ pub async fn is_bucket_empty(&self, bucket_id: Uuid) -> Result<bool, Error> {
+ let objects = self
+ .0
+ .object_table
+ .get_range(
+ &bucket_id,
+ None,
+ Some(ObjectFilter::IsData),
+ 10,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+ if !objects.is_empty() {
+ return Ok(false);
+ }
+
+ #[cfg(feature = "k2v")]
+ {
+ use garage_rpc::ring::Ring;
+ use std::sync::Arc;
+
+ let ring: Arc<Ring> = self.0.system.ring.borrow().clone();
+ let k2vindexes = self
+ .0
+ .k2v
+ .counter_table
+ .table
+ .get_range(
+ &bucket_id,
+ None,
+ Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ 10,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+ if !k2vindexes.is_empty() {
+ return Ok(false);
+ }
+ }
+
+ Ok(true)
+ }
}
diff --git a/src/model/helper/error.rs b/src/model/helper/error.rs
index 30b2ba32..3ca8f55c 100644
--- a/src/model/helper/error.rs
+++ b/src/model/helper/error.rs
@@ -10,6 +10,16 @@ pub enum Error {
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
+
+ /// Bucket name is not valid according to AWS S3 specs
+ #[error(display = "Invalid bucket name: {}", _0)]
+ InvalidBucketName(String),
+
+ #[error(display = "Access key not found: {}", _0)]
+ NoSuchAccessKey(String),
+
+ #[error(display = "Bucket not found: {}", _0)]
+ NoSuchBucket(String),
}
impl From<netapp::error::Error> for Error {
diff --git a/src/model/helper/key.rs b/src/model/helper/key.rs
new file mode 100644
index 00000000..c1a8e974
--- /dev/null
+++ b/src/model/helper/key.rs
@@ -0,0 +1,102 @@
+use garage_table::util::*;
+use garage_util::crdt::*;
+use garage_util::error::OkOrMessage;
+
+use crate::garage::Garage;
+use crate::helper::bucket::BucketHelper;
+use crate::helper::error::*;
+use crate::key_table::{Key, KeyFilter};
+use crate::permission::BucketKeyPerm;
+
+pub struct KeyHelper<'a>(pub(crate) &'a Garage);
+
+#[allow(clippy::ptr_arg)]
+impl<'a> KeyHelper<'a> {
+ /// Returns a Key if it is present in key table,
+ /// even if it is in deleted state. Querying a non-existing
+ /// key ID returns an internal error.
+ pub async fn get_internal_key(&self, key_id: &String) -> Result<Key, Error> {
+ Ok(self
+ .0
+ .key_table
+ .get(&EmptyKey, key_id)
+ .await?
+ .ok_or_message(format!("Key {} does not exist", key_id))?)
+ }
+
+ /// Returns a Key if it is present in key table,
+ /// only if it is in non-deleted state.
+ /// Querying a non-existing key ID or a deleted key
+ /// returns a bad request error.
+ pub async fn get_existing_key(&self, key_id: &String) -> Result<Key, Error> {
+ self.0
+ .key_table
+ .get(&EmptyKey, key_id)
+ .await?
+ .filter(|b| !b.state.is_deleted())
+ .ok_or_else(|| Error::NoSuchAccessKey(key_id.to_string()))
+ }
+
+ /// Returns a Key if it is present in key table,
+ /// looking it up by key ID or by a match on its name,
+ /// only if it is in non-deleted state.
+ /// Querying a non-existing key ID or a deleted key
+ /// returns a bad request error.
+ pub async fn get_existing_matching_key(&self, pattern: &str) -> Result<Key, Error> {
+ let candidates = self
+ .0
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::MatchesAndNotDeleted(pattern.to_string())),
+ 10,
+ EnumerationOrder::Forward,
+ )
+ .await?
+ .into_iter()
+ .collect::<Vec<_>>();
+ if candidates.len() != 1 {
+ Err(Error::BadRequest(format!(
+ "{} matching keys",
+ candidates.len()
+ )))
+ } else {
+ Ok(candidates.into_iter().next().unwrap())
+ }
+ }
+
+ /// Deletes an API access key
+ pub async fn delete_key(&self, key: &mut Key) -> Result<(), Error> {
+ let bucket_helper = BucketHelper(self.0);
+
+ let state = key.state.as_option_mut().unwrap();
+
+ // --- done checking, now commit ---
+ // (the step at unset_local_bucket_alias will fail if a bucket
+ // does not have another alias, the deletion will be
+ // interrupted in the middle if that happens)
+
+ // 1. Delete local aliases
+ for (alias, _, to) in state.local_aliases.items().iter() {
+ if let Some(bucket_id) = to {
+ bucket_helper
+ .unset_local_bucket_alias(*bucket_id, &key.key_id, alias)
+ .await?;
+ }
+ }
+
+ // 2. Remove permissions on all authorized buckets
+ for (ab_id, _auth) in state.authorized_buckets.items().iter() {
+ bucket_helper
+ .set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
+
+ // 3. Actually delete key
+ key.state = Deletable::delete();
+ self.0.key_table.insert(key).await?;
+
+ Ok(())
+ }
+}
diff --git a/src/model/helper/mod.rs b/src/model/helper/mod.rs
index 2f4e8898..dd947c86 100644
--- a/src/model/helper/mod.rs
+++ b/src/model/helper/mod.rs
@@ -1,2 +1,3 @@
pub mod bucket;
pub mod error;
+pub mod key;
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
new file mode 100644
index 00000000..e6394f0c
--- /dev/null
+++ b/src/model/index_counter.rs
@@ -0,0 +1,496 @@
+use core::ops::Bound;
+use std::collections::{hash_map, BTreeMap, HashMap};
+use std::marker::PhantomData;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use tokio::sync::{mpsc, watch};
+
+use garage_db as db;
+
+use garage_rpc::ring::Ring;
+use garage_rpc::system::System;
+use garage_util::background::*;
+use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::time::*;
+
+use garage_table::crdt::*;
+use garage_table::replication::*;
+use garage_table::*;
+
+pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
+ const COUNTER_TABLE_NAME: &'static str;
+
+ type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+
+ fn counter_partition_key(&self) -> &Self::CP;
+ fn counter_sort_key(&self) -> &Self::CS;
+ fn counts(&self) -> Vec<(&'static str, i64)>;
+}
+
+/// A counter entry in the global table
+#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
+pub struct CounterEntry<T: CountedItem> {
+ pub pk: T::CP,
+ pub sk: T::CS,
+ pub values: BTreeMap<String, CounterValue>,
+}
+
+impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
+ fn partition_key(&self) -> &T::CP {
+ &self.pk
+ }
+ fn sort_key(&self) -> &T::CS {
+ &self.sk
+ }
+ fn is_tombstone(&self) -> bool {
+ self.values
+ .iter()
+ .all(|(_, v)| v.node_values.iter().all(|(_, (_, v))| *v == 0))
+ }
+}
+
+impl<T: CountedItem> CounterEntry<T> {
+ pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
+ let nodes = &ring.layout.node_id_vec[..];
+ self.filtered_values_with_nodes(nodes)
+ }
+
+ pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> {
+ let mut ret = HashMap::new();
+ for (name, vals) in self.values.iter() {
+ let new_vals = vals
+ .node_values
+ .iter()
+ .filter(|(n, _)| nodes.contains(n))
+ .map(|(_, (_, v))| *v)
+ .collect::<Vec<_>>();
+ if !new_vals.is_empty() {
+ ret.insert(
+ name.clone(),
+ new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)),
+ );
+ }
+ }
+
+ ret
+ }
+}
+
+/// A counter entry in the global table
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct CounterValue {
+ pub node_values: BTreeMap<Uuid, (u64, i64)>,
+}
+
+impl<T: CountedItem> Crdt for CounterEntry<T> {
+ fn merge(&mut self, other: &Self) {
+ for (name, e2) in other.values.iter() {
+ if let Some(e) = self.values.get_mut(name) {
+ e.merge(e2);
+ } else {
+ self.values.insert(name.clone(), e2.clone());
+ }
+ }
+ }
+}
+
+impl Crdt for CounterValue {
+ fn merge(&mut self, other: &Self) {
+ for (node, (t2, e2)) in other.node_values.iter() {
+ if let Some((t, e)) = self.node_values.get_mut(node) {
+ if t2 > t {
+ *e = *e2;
+ }
+ } else {
+ self.node_values.insert(*node, (*t2, *e2));
+ }
+ }
+ }
+}
+
+pub struct CounterTable<T: CountedItem> {
+ _phantom_t: PhantomData<T>,
+}
+
+impl<T: CountedItem> TableSchema for CounterTable<T> {
+ const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME;
+
+ type P = T::CP;
+ type S = T::CS;
+ type E = CounterEntry<T>;
+ type Filter = (DeletedFilter, Vec<Uuid>);
+
+ fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
+ if filter.0 == DeletedFilter::Any {
+ return true;
+ }
+
+ let is_tombstone = entry
+ .filtered_values_with_nodes(&filter.1[..])
+ .iter()
+ .all(|(_, v)| *v == 0);
+ filter.0.apply(is_tombstone)
+ }
+}
+
+// ----
+
+pub struct IndexCounter<T: CountedItem> {
+ this_node: Uuid,
+ local_counter: db::Tree,
+ propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
+ pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
+}
+
+impl<T: CountedItem> IndexCounter<T> {
+ pub fn new(
+ system: Arc<System>,
+ replication: TableShardedReplication,
+ db: &db::Db,
+ ) -> Arc<Self> {
+ let background = system.background.clone();
+
+ let (propagate_tx, propagate_rx) = mpsc::unbounded_channel();
+
+ let this = Arc::new(Self {
+ this_node: system.id,
+ local_counter: db
+ .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME))
+ .expect("Unable to open local counter tree"),
+ propagate_tx,
+ table: Table::new(
+ CounterTable {
+ _phantom_t: Default::default(),
+ },
+ replication,
+ system,
+ db,
+ ),
+ });
+
+ background.spawn_worker(IndexPropagatorWorker {
+ index_counter: this.clone(),
+ propagate_rx,
+ buf: HashMap::new(),
+ errors: 0,
+ });
+
+ this
+ }
+
+ pub fn count(
+ &self,
+ tx: &mut db::Transaction,
+ old: Option<&T>,
+ new: Option<&T>,
+ ) -> db::TxResult<(), Error> {
+ let pk = old
+ .map(|e| e.counter_partition_key())
+ .unwrap_or_else(|| new.unwrap().counter_partition_key());
+ let sk = old
+ .map(|e| e.counter_sort_key())
+ .unwrap_or_else(|| new.unwrap().counter_sort_key());
+
+ // calculate counter differences
+ let mut counts = HashMap::new();
+ for (k, v) in old.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) -= v;
+ }
+ for (k, v) in new.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) += v;
+ }
+
+ // update local counter table
+ let tree_key = self.table.data.tree_key(pk, sk);
+
+ let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
+ Some(old_bytes) => {
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
+ .map_err(Error::RmpDecode)
+ .map_err(db::TxError::Abort)?
+ }
+ None => LocalCounterEntry {
+ pk: pk.clone(),
+ sk: sk.clone(),
+ values: BTreeMap::new(),
+ },
+ };
+
+ let now = now_msec();
+ for (s, inc) in counts.iter() {
+ let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
+ ent.0 = std::cmp::max(ent.0 + 1, now);
+ ent.1 += *inc;
+ }
+
+ let new_entry_bytes = rmp_to_vec_all_named(&entry)
+ .map_err(Error::RmpEncode)
+ .map_err(db::TxError::Abort)?;
+ tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
+
+ if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) {
+ error!(
+ "Could not propagate updated counter values, failed to send to channel: {}",
+ e
+ );
+ }
+
+ Ok(())
+ }
+
+ pub fn offline_recount_all<TS, TR>(
+ &self,
+ counted_table: &Arc<Table<TS, TR>>,
+ ) -> Result<(), Error>
+ where
+ TS: TableSchema<E = T>,
+ TR: TableReplication,
+ {
+ let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
+ let entry_k = self
+ .table
+ .data
+ .tree_key(entry.partition_key(), entry.sort_key());
+ self.table
+ .data
+ .update_entry_with(&entry_k, |ent| match ent {
+ Some(mut ent) => {
+ ent.merge(&entry);
+ ent
+ }
+ None => entry.clone(),
+ })?;
+ Ok(())
+ };
+
+ // 1. Set all old local counters to zero
+ let now = now_msec();
+ let mut next_start: Option<Vec<u8>> = None;
+ loop {
+ let low_bound = match next_start.take() {
+ Some(v) => Bound::Excluded(v),
+ None => Bound::Unbounded,
+ };
+ let mut batch = vec![];
+ for item in self.local_counter.range((low_bound, Bound::Unbounded))? {
+ batch.push(item?);
+ if batch.len() > 1000 {
+ break;
+ }
+ }
+
+ if batch.is_empty() {
+ break;
+ }
+
+ info!("zeroing old counters... ({})", hex::encode(&batch[0].0));
+ for (local_counter_k, local_counter) in batch {
+ let mut local_counter =
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;
+
+ for (_, tv) in local_counter.values.iter_mut() {
+ tv.0 = std::cmp::max(tv.0 + 1, now);
+ tv.1 = 0;
+ }
+
+ let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ self.local_counter
+ .insert(&local_counter_k, &local_counter_bytes)?;
+
+ let counter_entry = local_counter.into_counter_entry(self.this_node);
+ save_counter_entry(counter_entry)?;
+
+ next_start = Some(local_counter_k);
+ }
+ }
+
+ // 2. Recount all table entries
+ let now = now_msec();
+ let mut next_start: Option<Vec<u8>> = None;
+ loop {
+ let low_bound = match next_start.take() {
+ Some(v) => Bound::Excluded(v),
+ None => Bound::Unbounded,
+ };
+ let mut batch = vec![];
+ for item in counted_table
+ .data
+ .store
+ .range((low_bound, Bound::Unbounded))?
+ {
+ batch.push(item?);
+ if batch.len() > 1000 {
+ break;
+ }
+ }
+
+ if batch.is_empty() {
+ break;
+ }
+
+ info!("counting entries... ({})", hex::encode(&batch[0].0));
+ for (counted_entry_k, counted_entry) in batch {
+ let counted_entry = counted_table.data.decode_entry(&counted_entry)?;
+
+ let pk = counted_entry.counter_partition_key();
+ let sk = counted_entry.counter_sort_key();
+ let counts = counted_entry.counts();
+
+ let local_counter_key = self.table.data.tree_key(pk, sk);
+ let mut local_counter = match self.local_counter.get(&local_counter_key)? {
+ Some(old_bytes) => {
+ let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
+ &old_bytes,
+ )?;
+ assert!(ent.pk == *pk);
+ assert!(ent.sk == *sk);
+ ent
+ }
+ None => LocalCounterEntry {
+ pk: pk.clone(),
+ sk: sk.clone(),
+ values: BTreeMap::new(),
+ },
+ };
+ for (s, v) in counts.iter() {
+ let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
+ tv.0 = std::cmp::max(tv.0 + 1, now);
+ tv.1 += v;
+ }
+
+ let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ self.local_counter
+ .insert(&local_counter_key, local_counter_bytes)?;
+
+ let counter_entry = local_counter.into_counter_entry(self.this_node);
+ save_counter_entry(counter_entry)?;
+
+ next_start = Some(counted_entry_k);
+ }
+ }
+
+ // Done
+ Ok(())
+ }
+}
+
+struct IndexPropagatorWorker<T: CountedItem> {
+ index_counter: Arc<IndexCounter<T>>,
+ propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
+
+ buf: HashMap<Vec<u8>, CounterEntry<T>>,
+ errors: usize,
+}
+
+impl<T: CountedItem> IndexPropagatorWorker<T> {
+ fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) {
+ let tree_key = self.index_counter.table.data.tree_key(&pk, &sk);
+ let dist_entry = counters.into_counter_entry(self.index_counter.this_node);
+ match self.buf.entry(tree_key) {
+ hash_map::Entry::Vacant(e) => {
+ e.insert(dist_entry);
+ }
+ hash_map::Entry::Occupied(mut e) => {
+ e.get_mut().merge(&dist_entry);
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
+ fn name(&self) -> String {
+ format!("{} index counter propagator", T::COUNTER_TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ if !self.buf.is_empty() {
+ Some(format!("{} items in queue", self.buf.len()))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ // This loop batches updates to counters to be sent all at once.
+ // They are sent once the propagate_rx channel has been emptied (or is closed).
+ let closed = loop {
+ match self.propagate_rx.try_recv() {
+ Ok((pk, sk, counters)) => {
+ self.add_ent(pk, sk, counters);
+ }
+ Err(mpsc::error::TryRecvError::Empty) => break false,
+ Err(mpsc::error::TryRecvError::Disconnected) => break true,
+ }
+ };
+
+ if !self.buf.is_empty() {
+ let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>();
+ let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap());
+ if let Err(e) = self.index_counter.table.insert_many(entries).await {
+ self.errors += 1;
+ if self.errors >= 2 && *must_exit.borrow() {
+ error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
+ return Ok(WorkerState::Done);
+ }
+ // Propagate error up to worker manager, it will log it, increment a counter,
+ // and sleep for a certain delay (with exponential backoff), waiting for
+ // things to go back to normal
+ return Err(e);
+ } else {
+ for k in entries_k {
+ self.buf.remove(&k);
+ }
+ self.errors = 0;
+ }
+
+ return Ok(WorkerState::Busy);
+ } else if closed {
+ return Ok(WorkerState::Done);
+ } else {
+ return Ok(WorkerState::Idle);
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ match self.propagate_rx.recv().await {
+ Some((pk, sk, counters)) => {
+ self.add_ent(pk, sk, counters);
+ WorkerState::Busy
+ }
+ None => match self.buf.is_empty() {
+ false => WorkerState::Busy,
+ true => WorkerState::Done,
+ },
+ }
+ }
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+struct LocalCounterEntry<T: CountedItem> {
+ pk: T::CP,
+ sk: T::CS,
+ values: BTreeMap<String, (u64, i64)>,
+}
+
+impl<T: CountedItem> LocalCounterEntry<T> {
+ fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
+ CounterEntry {
+ pk: self.pk,
+ sk: self.sk,
+ values: self
+ .values
+ .into_iter()
+ .map(|(name, (ts, v))| {
+ let mut node_values = BTreeMap::new();
+ node_values.insert(this_node, (ts, v));
+ (name, CounterValue { node_values })
+ })
+ .collect(),
+ }
+ }
+}
diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs
new file mode 100644
index 00000000..9a692870
--- /dev/null
+++ b/src/model/k2v/causality.rs
@@ -0,0 +1,96 @@
+use std::collections::BTreeMap;
+use std::convert::TryInto;
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+
+/// Node IDs used in K2V are u64 integers that are the abbreviation
+/// of full Garage node IDs which are 256-bit UUIDs.
+pub type K2VNodeId = u64;
+
+pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
+ let mut tmp = [0u8; 8];
+ tmp.copy_from_slice(&node_id.as_slice()[..8]);
+ u64::from_be_bytes(tmp)
+}
+
+#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
+pub struct CausalContext {
+ pub vector_clock: BTreeMap<K2VNodeId, u64>,
+}
+
+impl CausalContext {
+ /// Empty causality context
+ pub fn new_empty() -> Self {
+ Self {
+ vector_clock: BTreeMap::new(),
+ }
+ }
+ /// Make binary representation and encode in base64
+ pub fn serialize(&self) -> String {
+ let mut ints = Vec::with_capacity(2 * self.vector_clock.len());
+ for (node, time) in self.vector_clock.iter() {
+ ints.push(*node);
+ ints.push(*time);
+ }
+ let checksum = ints.iter().fold(0, |acc, v| acc ^ *v);
+
+ let mut bytes = u64::to_be_bytes(checksum).to_vec();
+ for i in ints {
+ bytes.extend(u64::to_be_bytes(i));
+ }
+
+ base64::encode_config(bytes, base64::URL_SAFE_NO_PAD)
+ }
+ /// Parse from base64-encoded binary representation
+ pub fn parse(s: &str) -> Result<Self, String> {
+ let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD)
+ .map_err(|e| format!("bad causality token base64: {}", e))?;
+ if bytes.len() % 16 != 8 || bytes.len() < 8 {
+ return Err("bad causality token length".into());
+ }
+
+ let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
+ let mut ret = CausalContext {
+ vector_clock: BTreeMap::new(),
+ };
+
+ for i in 0..(bytes.len() / 16) {
+ let node_id = u64::from_be_bytes(bytes[8 + i * 16..16 + i * 16].try_into().unwrap());
+ let time = u64::from_be_bytes(bytes[16 + i * 16..24 + i * 16].try_into().unwrap());
+ ret.vector_clock.insert(node_id, time);
+ }
+
+ let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
+
+ if check != checksum {
+ return Err("bad causality token checksum".into());
+ }
+
+ Ok(ret)
+ }
+ /// Check if this causal context contains newer items than another one
+ pub fn is_newer_than(&self, other: &Self) -> bool {
+ self.vector_clock
+ .iter()
+ .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_causality_token_serialization() {
+ let ct = CausalContext {
+ vector_clock: [(4, 42), (1928131023, 76), (0xefc0c1c47f9de433, 2)]
+ .iter()
+ .cloned()
+ .collect(),
+ };
+
+ assert_eq!(CausalContext::parse(&ct.serialize()).unwrap(), ct);
+ }
+}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
new file mode 100644
index 00000000..7860cb17
--- /dev/null
+++ b/src/model/k2v/item_table.rs
@@ -0,0 +1,305 @@
+use serde::{Deserialize, Serialize};
+use std::collections::BTreeMap;
+use std::sync::Arc;
+
+use garage_db as db;
+use garage_util::data::*;
+
+use garage_table::crdt::*;
+use garage_table::*;
+
+use crate::index_counter::*;
+use crate::k2v::causality::*;
+use crate::k2v::poll::*;
+
+pub const ENTRIES: &str = "entries";
+pub const CONFLICTS: &str = "conflicts";
+pub const VALUES: &str = "values";
+pub const BYTES: &str = "bytes";
+
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct K2VItem {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
+
+ items: BTreeMap<K2VNodeId, DvvsEntry>,
+}
+
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
+pub struct K2VItemPartition {
+ pub bucket_id: Uuid,
+ pub partition_key: String,
+}
+
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+struct DvvsEntry {
+ t_discard: u64,
+ values: Vec<(u64, DvvsValue)>,
+}
+
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub enum DvvsValue {
+ Value(#[serde(with = "serde_bytes")] Vec<u8>),
+ Deleted,
+}
+
+impl K2VItem {
+ /// Creates a new K2VItem when no previous entry existed in the db
+ pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self {
+ Self {
+ partition: K2VItemPartition {
+ bucket_id,
+ partition_key,
+ },
+ sort_key,
+ items: BTreeMap::new(),
+ }
+ }
+ /// Updates a K2VItem with a new value or a deletion event
+ pub fn update(
+ &mut self,
+ this_node: Uuid,
+ context: &Option<CausalContext>,
+ new_value: DvvsValue,
+ ) {
+ if let Some(context) = context {
+ for (node, t_discard) in context.vector_clock.iter() {
+ if let Some(e) = self.items.get_mut(node) {
+ e.t_discard = std::cmp::max(e.t_discard, *t_discard);
+ } else {
+ self.items.insert(
+ *node,
+ DvvsEntry {
+ t_discard: *t_discard,
+ values: vec![],
+ },
+ );
+ }
+ }
+ }
+
+ self.discard();
+
+ let node_id = make_node_id(this_node);
+ let e = self.items.entry(node_id).or_insert(DvvsEntry {
+ t_discard: 0,
+ values: vec![],
+ });
+ let t_prev = e.max_time();
+ e.values.push((t_prev + 1, new_value));
+ }
+
+ /// Extract the causality context of a K2V Item
+ pub fn causal_context(&self) -> CausalContext {
+ let mut cc = CausalContext::new_empty();
+ for (node, ent) in self.items.iter() {
+ cc.vector_clock.insert(*node, ent.max_time());
+ }
+ cc
+ }
+
+ /// Extract the list of values
+ pub fn values(&'_ self) -> Vec<&'_ DvvsValue> {
+ let mut ret = vec![];
+ for (_, ent) in self.items.iter() {
+ for (_, v) in ent.values.iter() {
+ if !ret.contains(&v) {
+ ret.push(v);
+ }
+ }
+ }
+ ret
+ }
+
+ fn discard(&mut self) {
+ for (_, ent) in self.items.iter_mut() {
+ ent.discard();
+ }
+ }
+}
+
+impl DvvsEntry {
+ fn max_time(&self) -> u64 {
+ self.values
+ .iter()
+ .fold(self.t_discard, |acc, (vts, _)| std::cmp::max(acc, *vts))
+ }
+
+ fn discard(&mut self) {
+ self.values = std::mem::take(&mut self.values)
+ .into_iter()
+ .filter(|(t, _)| *t > self.t_discard)
+ .collect::<Vec<_>>();
+ }
+}
+
+impl Crdt for K2VItem {
+ fn merge(&mut self, other: &Self) {
+ for (node, e2) in other.items.iter() {
+ if let Some(e) = self.items.get_mut(node) {
+ e.merge(e2);
+ } else {
+ self.items.insert(*node, e2.clone());
+ }
+ }
+ }
+}
+
+impl Crdt for DvvsEntry {
+ fn merge(&mut self, other: &Self) {
+ self.t_discard = std::cmp::max(self.t_discard, other.t_discard);
+ self.discard();
+
+ let t_max = self.max_time();
+ for (vt, vv) in other.values.iter() {
+ if *vt > t_max {
+ self.values.push((*vt, vv.clone()));
+ }
+ }
+ }
+}
+
+impl PartitionKey for K2VItemPartition {
+ fn hash(&self) -> Hash {
+ use blake2::{Blake2b, Digest};
+
+ let mut hasher = Blake2b::new();
+ hasher.update(self.bucket_id.as_slice());
+ hasher.update(self.partition_key.as_bytes());
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(&hasher.finalize()[..32]);
+ hash.into()
+ }
+}
+
+impl Entry<K2VItemPartition, String> for K2VItem {
+ fn partition_key(&self) -> &K2VItemPartition {
+ &self.partition
+ }
+ fn sort_key(&self) -> &String {
+ &self.sort_key
+ }
+ fn is_tombstone(&self) -> bool {
+ self.values()
+ .iter()
+ .all(|v| matches!(v, DvvsValue::Deleted))
+ }
+}
+
+pub struct K2VItemTable {
+ pub(crate) counter_table: Arc<IndexCounter<K2VItem>>,
+ pub(crate) subscriptions: Arc<SubscriptionManager>,
+}
+
+#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
+pub struct ItemFilter {
+ pub exclude_only_tombstones: bool,
+ pub conflicts_only: bool,
+}
+
+impl TableSchema for K2VItemTable {
+ const TABLE_NAME: &'static str = "k2v_item";
+
+ type P = K2VItemPartition;
+ type S = String;
+ type E = K2VItem;
+ type Filter = ItemFilter;
+
+ fn updated(
+ &self,
+ tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ // 1. Count
+ let counter_res = self.counter_table.count(tx, old, new);
+ if let Err(e) = db::unabort(counter_res)? {
+ // This result can be returned by `counter_table.count()` for instance
+ // if messagepack serialization or deserialization fails at some step.
+ // Warn admin but ignore this error for now, that's all we can do.
+ error!(
+ "Unable to update K2V item counter: {}. Index values will be wrong!",
+ e
+ );
+ }
+
+ // 2. Notify
+ if let Some(new_ent) = new {
+ self.subscriptions.notify(new_ent);
+ }
+
+ Ok(())
+ }
+
+ #[allow(clippy::nonminimal_bool)]
+ fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
+ let v = entry.values();
+ !(filter.conflicts_only && v.len() < 2)
+ && !(filter.exclude_only_tombstones && entry.is_tombstone())
+ }
+}
+
+impl CountedItem for K2VItem {
+ const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter_v2";
+
+ // Partition key = bucket id
+ type CP = Uuid;
+ // Sort key = K2V item's partition key
+ type CS = String;
+
+ fn counter_partition_key(&self) -> &Uuid {
+ &self.partition.bucket_id
+ }
+ fn counter_sort_key(&self) -> &String {
+ &self.partition.partition_key
+ }
+
+ fn counts(&self) -> Vec<(&'static str, i64)> {
+ let values = self.values();
+
+ let n_entries = if self.is_tombstone() { 0 } else { 1 };
+ let n_conflicts = if values.len() > 1 { 1 } else { 0 };
+ let n_values = values
+ .iter()
+ .filter(|v| matches!(v, DvvsValue::Value(_)))
+ .count() as i64;
+ let n_bytes = values
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Deleted => 0,
+ DvvsValue::Value(v) => v.len() as i64,
+ })
+ .sum();
+
+ vec![
+ (ENTRIES, n_entries),
+ (CONFLICTS, n_conflicts),
+ (VALUES, n_values),
+ (BYTES, n_bytes),
+ ]
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_dvvsentry_merge_simple() {
+ let e1 = DvvsEntry {
+ t_discard: 4,
+ values: vec![
+ (5, DvvsValue::Value(vec![15])),
+ (6, DvvsValue::Value(vec![16])),
+ ],
+ };
+ let e2 = DvvsEntry {
+ t_discard: 5,
+ values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)],
+ };
+
+ let mut e3 = e1.clone();
+ e3.merge(&e2);
+ assert_eq!(e2, e3);
+ }
+}
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
new file mode 100644
index 00000000..f6a96151
--- /dev/null
+++ b/src/model/k2v/mod.rs
@@ -0,0 +1,6 @@
+pub mod causality;
+
+pub mod item_table;
+
+pub mod poll;
+pub mod rpc;
diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs
new file mode 100644
index 00000000..93105207
--- /dev/null
+++ b/src/model/k2v/poll.rs
@@ -0,0 +1,50 @@
+use std::collections::HashMap;
+use std::sync::Mutex;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::broadcast;
+
+use crate::k2v::item_table::*;
+
+#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct PollKey {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
+}
+
+#[derive(Default)]
+pub struct SubscriptionManager {
+ subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
+}
+
+impl SubscriptionManager {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
+ let mut subs = self.subscriptions.lock().unwrap();
+ if let Some(s) = subs.get(key) {
+ s.subscribe()
+ } else {
+ let (tx, rx) = broadcast::channel(8);
+ subs.insert(key.clone(), tx);
+ rx
+ }
+ }
+
+ pub fn notify(&self, item: &K2VItem) {
+ let key = PollKey {
+ partition: item.partition.clone(),
+ sort_key: item.sort_key.clone(),
+ };
+ let mut subs = self.subscriptions.lock().unwrap();
+ if let Some(s) = subs.get(&key) {
+ if s.send(item.clone()).is_err() {
+ // no more subscribers, remove channel from here
+ // (we will re-create it later if we need to subscribe again)
+ subs.remove(&key);
+ }
+ }
+ }
+}
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
new file mode 100644
index 00000000..a74df277
--- /dev/null
+++ b/src/model/k2v/rpc.rs
@@ -0,0 +1,341 @@
+//! Module that implements RPCs specific to K2V.
+//! This is necessary for insertions into the K2V store,
+//! as they have to be transmitted to one of the nodes responsible
+//! for storing the entry to be processed (the API entry
+//! node does not process the entry directly, as this would
+//! mean the vector clock gets much larger than needed).
+
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+use serde::{Deserialize, Serialize};
+use tokio::select;
+
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::error::*;
+
+use garage_rpc::system::System;
+use garage_rpc::*;
+
+use garage_table::replication::{TableReplication, TableShardedReplication};
+use garage_table::{PartitionKey, Table};
+
+use crate::k2v::causality::*;
+use crate::k2v::item_table::*;
+use crate::k2v::poll::*;
+
+/// RPC messages for K2V
+#[derive(Debug, Serialize, Deserialize)]
+enum K2VRpc {
+ Ok,
+ InsertItem(InsertedItem),
+ InsertManyItems(Vec<InsertedItem>),
+ PollItem {
+ key: PollKey,
+ causal_context: CausalContext,
+ timeout_msec: u64,
+ },
+ PollItemResponse(Option<K2VItem>),
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+struct InsertedItem {
+ partition: K2VItemPartition,
+ sort_key: String,
+ causal_context: Option<CausalContext>,
+ value: DvvsValue,
+}
+
+impl Rpc for K2VRpc {
+ type Response = Result<K2VRpc, Error>;
+}
+
+/// The block manager, handling block exchange between nodes, and block storage on local node
+pub struct K2VRpcHandler {
+ system: Arc<System>,
+ item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ endpoint: Arc<Endpoint<K2VRpc, Self>>,
+ subscriptions: Arc<SubscriptionManager>,
+}
+
+impl K2VRpcHandler {
+ pub fn new(
+ system: Arc<System>,
+ item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+ subscriptions: Arc<SubscriptionManager>,
+ ) -> Arc<Self> {
+ let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
+
+ let rpc_handler = Arc::new(Self {
+ system,
+ item_table,
+ endpoint,
+ subscriptions,
+ });
+ rpc_handler.endpoint.set_handler(rpc_handler.clone());
+
+ rpc_handler
+ }
+
+ // ---- public interface ----
+
+ pub async fn insert(
+ &self,
+ bucket_id: Uuid,
+ partition_key: String,
+ sort_key: String,
+ causal_context: Option<CausalContext>,
+ value: DvvsValue,
+ ) -> Result<(), Error> {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key,
+ };
+ let mut who = self
+ .item_table
+ .data
+ .replication
+ .write_nodes(&partition.hash());
+ who.sort();
+
+ self.system
+ .rpc
+ .try_call_many(
+ &self.endpoint,
+ &who[..],
+ K2VRpc::InsertItem(InsertedItem {
+ partition,
+ sort_key,
+ causal_context,
+ value,
+ }),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(1)
+ .interrupt_after_quorum(true),
+ )
+ .await?;
+
+ Ok(())
+ }
+
+ pub async fn insert_batch(
+ &self,
+ bucket_id: Uuid,
+ items: Vec<(String, String, Option<CausalContext>, DvvsValue)>,
+ ) -> Result<(), Error> {
+ let n_items = items.len();
+
+ let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
+
+ for (partition_key, sort_key, causal_context, value) in items {
+ let partition = K2VItemPartition {
+ bucket_id,
+ partition_key,
+ };
+ let mut who = self
+ .item_table
+ .data
+ .replication
+ .write_nodes(&partition.hash());
+ who.sort();
+
+ call_list.entry(who).or_default().push(InsertedItem {
+ partition,
+ sort_key,
+ causal_context,
+ value,
+ });
+ }
+
+ debug!(
+ "K2V insert_batch: {} requests to insert {} items",
+ call_list.len(),
+ n_items
+ );
+ let call_futures = call_list.into_iter().map(|(nodes, items)| async move {
+ let resp = self
+ .system
+ .rpc
+ .try_call_many(
+ &self.endpoint,
+ &nodes[..],
+ K2VRpc::InsertManyItems(items),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(1)
+ .interrupt_after_quorum(true),
+ )
+ .await?;
+ Ok::<_, Error>((nodes, resp))
+ });
+
+ let mut resps = call_futures.collect::<FuturesUnordered<_>>();
+ while let Some(resp) = resps.next().await {
+ resp?;
+ }
+
+ Ok(())
+ }
+
+ pub async fn poll(
+ &self,
+ bucket_id: Uuid,
+ partition_key: String,
+ sort_key: String,
+ causal_context: CausalContext,
+ timeout_msec: u64,
+ ) -> Result<Option<K2VItem>, Error> {
+ let poll_key = PollKey {
+ partition: K2VItemPartition {
+ bucket_id,
+ partition_key,
+ },
+ sort_key,
+ };
+ let nodes = self
+ .item_table
+ .data
+ .replication
+ .write_nodes(&poll_key.partition.hash());
+
+ let rpc = self.system.rpc.try_call_many(
+ &self.endpoint,
+ &nodes[..],
+ K2VRpc::PollItem {
+ key: poll_key,
+ causal_context,
+ timeout_msec,
+ },
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.item_table.data.replication.read_quorum())
+ .without_timeout(),
+ );
+ let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let resps = select! {
+ r = rpc => r?,
+ _ = tokio::time::sleep(timeout_duration) => return Ok(None),
+ };
+
+ let mut resp: Option<K2VItem> = None;
+ for v in resps {
+ match v {
+ K2VRpc::PollItemResponse(Some(x)) => {
+ if let Some(y) = &mut resp {
+ y.merge(&x);
+ } else {
+ resp = Some(x);
+ }
+ }
+ K2VRpc::PollItemResponse(None) => {
+ return Ok(None);
+ }
+ v => return Err(Error::unexpected_rpc_message(v)),
+ }
+ }
+
+ Ok(resp)
+ }
+
+ // ---- internal handlers ----
+
+ async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
+ let new = self.local_insert(item)?;
+
+ // Propagate to rest of network
+ if let Some(updated) = new {
+ self.item_table.insert(&updated).await?;
+ }
+
+ Ok(K2VRpc::Ok)
+ }
+
+ async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
+ let mut updated_vec = vec![];
+
+ for item in items {
+ let new = self.local_insert(item)?;
+
+ if let Some(updated) = new {
+ updated_vec.push(updated);
+ }
+ }
+
+ // Propagate to rest of network
+ if !updated_vec.is_empty() {
+ self.item_table.insert_many(&updated_vec).await?;
+ }
+
+ Ok(K2VRpc::Ok)
+ }
+
+ fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
+ let tree_key = self
+ .item_table
+ .data
+ .tree_key(&item.partition, &item.sort_key);
+
+ self.item_table
+ .data
+ .update_entry_with(&tree_key[..], |ent| {
+ let mut ent = ent.unwrap_or_else(|| {
+ K2VItem::new(
+ item.partition.bucket_id,
+ item.partition.partition_key.clone(),
+ item.sort_key.clone(),
+ )
+ });
+ ent.update(self.system.id, &item.causal_context, item.value.clone());
+ ent
+ })
+ }
+
+ async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
+ let mut chan = self.subscriptions.subscribe(key);
+
+ let mut value = self
+ .item_table
+ .data
+ .read_entry(&key.partition, &key.sort_key)?
+ .map(|bytes| self.item_table.data.decode_entry(&bytes[..]))
+ .transpose()?
+ .unwrap_or_else(|| {
+ K2VItem::new(
+ key.partition.bucket_id,
+ key.partition.partition_key.clone(),
+ key.sort_key.clone(),
+ )
+ });
+
+ while !value.causal_context().is_newer_than(ct) {
+ value = chan.recv().await?;
+ }
+
+ Ok(value)
+ }
+}
+
+#[async_trait]
+impl EndpointHandler<K2VRpc> for K2VRpcHandler {
+ async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
+ match message {
+ K2VRpc::InsertItem(item) => self.handle_insert(item).await,
+ K2VRpc::InsertManyItems(items) => self.handle_insert_many(&items[..]).await,
+ K2VRpc::PollItem {
+ key,
+ causal_context,
+ timeout_msec,
+ } => {
+ let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
+ select! {
+ ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
+ _ = delay => Ok(K2VRpc::PollItemResponse(None)),
+ }
+ }
+ m => Err(Error::unexpected_rpc_message(m)),
+ }
+ }
+}
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 330e83f0..9d2fc783 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -6,10 +6,10 @@ use garage_util::data::*;
use crate::permission::BucketKeyPerm;
-use garage_model_050::key_table as old;
+use crate::prev::v051::key_table as old;
/// An api key
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
/// The id of the key (immutable), used as partition key
pub key_id: String,
@@ -19,7 +19,7 @@ pub struct Key {
}
/// Configuration for a key
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct KeyParams {
/// The secret_key associated (immutable)
pub secret_key: String,
diff --git a/src/model/lib.rs b/src/model/lib.rs
index 05a4cdc7..4f20ea46 100644
--- a/src/model/lib.rs
+++ b/src/model/lib.rs
@@ -1,14 +1,20 @@
#[macro_use]
extern crate tracing;
+// For migration from previous versions
+pub(crate) mod prev;
+
pub mod permission;
-pub mod block_ref_table;
+pub mod index_counter;
+
pub mod bucket_alias_table;
pub mod bucket_table;
pub mod key_table;
-pub mod object_table;
-pub mod version_table;
+
+#[cfg(feature = "k2v")]
+pub mod k2v;
+pub mod s3;
pub mod garage;
pub mod helper;
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index 7e61957a..cd6ad26a 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -5,7 +5,7 @@ use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
-use garage_model_050::bucket_table as old_bucket;
+use crate::prev::v051::bucket_table as old_bucket;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
@@ -25,11 +25,15 @@ impl Migrate {
.open_tree("bucket:table")
.map_err(GarageError::from)?;
- for res in tree.iter() {
+ let mut old_buckets = vec![];
+ for res in tree.iter().map_err(GarageError::from)? {
let (_k, v) = res.map_err(GarageError::from)?;
let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])
.map_err(GarageError::from)?;
+ old_buckets.push(bucket);
+ }
+ for bucket in old_buckets {
if let old_bucket::BucketState::Present(p) = bucket.state.get() {
self.migrate_buckets050_do_bucket(&bucket, p).await?;
}
@@ -73,6 +77,7 @@ impl Migrate {
local_aliases: LwwMap::new(),
website_config: Lww::new(website),
cors_config: Lww::new(None),
+ quotas: Lww::new(Default::default()),
}),
})
.await?;
diff --git a/src/model/prev/mod.rs b/src/model/prev/mod.rs
new file mode 100644
index 00000000..68bb1502
--- /dev/null
+++ b/src/model/prev/mod.rs
@@ -0,0 +1 @@
+pub(crate) mod v051;
diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs
new file mode 100644
index 00000000..628a49dd
--- /dev/null
+++ b/src/model/prev/v051/bucket_table.rs
@@ -0,0 +1,63 @@
+use serde::{Deserialize, Serialize};
+
+use garage_table::crdt::Crdt;
+use garage_table::*;
+
+use super::key_table::PermissionSet;
+
+/// A bucket is a collection of objects
+///
+/// Its parameters are not directly accessible as:
+/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
+/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct Bucket {
+ /// Name of the bucket
+ pub name: String,
+ /// State, and configuration if not deleted, of the bucket
+ pub state: crdt::Lww<BucketState>,
+}
+
+/// State of a bucket
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub enum BucketState {
+ /// The bucket is deleted
+ Deleted,
+ /// The bucket exists
+ Present(BucketParams),
+}
+
+impl Crdt for BucketState {
+ fn merge(&mut self, o: &Self) {
+ match o {
+ BucketState::Deleted => *self = BucketState::Deleted,
+ BucketState::Present(other_params) => {
+ if let BucketState::Present(params) = self {
+ params.merge(other_params);
+ }
+ }
+ }
+ }
+}
+
+/// Configuration for a bucket
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct BucketParams {
+ /// Map of key with access to the bucket, and what kind of access they give
+ pub authorized_keys: crdt::LwwMap<String, PermissionSet>,
+ /// Is the bucket served as http
+ pub website: crdt::Lww<bool>,
+}
+
+impl Crdt for BucketParams {
+ fn merge(&mut self, o: &Self) {
+ self.authorized_keys.merge(&o.authorized_keys);
+ self.website.merge(&o.website);
+ }
+}
+
+impl Crdt for Bucket {
+ fn merge(&mut self, other: &Self) {
+ self.state.merge(&other.state);
+ }
+}
diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs
new file mode 100644
index 00000000..37516b1c
--- /dev/null
+++ b/src/model/prev/v051/key_table.rs
@@ -0,0 +1,50 @@
+use serde::{Deserialize, Serialize};
+
+use garage_table::crdt::*;
+use garage_table::*;
+
+/// An api key
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct Key {
+ /// The id of the key (immutable), used as partition key
+ pub key_id: String,
+
+ /// The secret_key associated
+ pub secret_key: String,
+
+ /// Name for the key
+ pub name: crdt::Lww<String>,
+
+ /// Is the key deleted
+ pub deleted: crdt::Bool,
+
+ /// Buckets in which the key is authorized. Empty if `Key` is deleted
+ // CRDT interaction: deleted implies authorized_buckets is empty
+ pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
+}
+
+/// Permission given to a key in a bucket
+#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct PermissionSet {
+ /// The key can be used to read the bucket
+ pub allow_read: bool,
+ /// The key can be used to write in the bucket
+ pub allow_write: bool,
+}
+
+impl AutoCrdt for PermissionSet {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl Crdt for Key {
+ fn merge(&mut self, other: &Self) {
+ self.name.merge(&other.name);
+ self.deleted.merge(&other.deleted);
+
+ if self.deleted.get() {
+ self.authorized_buckets.clear();
+ } else {
+ self.authorized_buckets.merge(&other.authorized_buckets);
+ }
+ }
+}
diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs
new file mode 100644
index 00000000..7a954752
--- /dev/null
+++ b/src/model/prev/v051/mod.rs
@@ -0,0 +1,4 @@
+pub(crate) mod bucket_table;
+pub(crate) mod key_table;
+pub(crate) mod object_table;
+pub(crate) mod version_table;
diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs
new file mode 100644
index 00000000..e79e5787
--- /dev/null
+++ b/src/model/prev/v051/object_table.rs
@@ -0,0 +1,149 @@
+use serde::{Deserialize, Serialize};
+use std::collections::BTreeMap;
+
+use garage_util::data::*;
+
+use garage_table::crdt::*;
+
+/// An object
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct Object {
+ /// The bucket in which the object is stored, used as partition key
+ pub bucket: String,
+
+ /// The key at which the object is stored in its bucket, used as sorting key
+ pub key: String,
+
+ /// The list of currenty stored versions of the object
+ versions: Vec<ObjectVersion>,
+}
+
+impl Object {
+ /// Get a list of currently stored versions of `Object`
+ pub fn versions(&self) -> &[ObjectVersion] {
+ &self.versions[..]
+ }
+}
+
+/// Informations about a version of an object
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct ObjectVersion {
+ /// Id of the version
+ pub uuid: Uuid,
+ /// Timestamp of when the object was created
+ pub timestamp: u64,
+ /// State of the version
+ pub state: ObjectVersionState,
+}
+
+/// State of an object version
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub enum ObjectVersionState {
+ /// The version is being received
+ Uploading(ObjectVersionHeaders),
+ /// The version is fully received
+ Complete(ObjectVersionData),
+ /// The version uploaded containded errors or the upload was explicitly aborted
+ Aborted,
+}
+
+impl Crdt for ObjectVersionState {
+ fn merge(&mut self, other: &Self) {
+ use ObjectVersionState::*;
+ match other {
+ Aborted => {
+ *self = Aborted;
+ }
+ Complete(b) => match self {
+ Aborted => {}
+ Complete(a) => {
+ a.merge(b);
+ }
+ Uploading(_) => {
+ *self = Complete(b.clone());
+ }
+ },
+ Uploading(_) => {}
+ }
+ }
+}
+
+/// Data stored in object version
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub enum ObjectVersionData {
+ /// The object was deleted, this Version is a tombstone to mark it as such
+ DeleteMarker,
+ /// The object is short, it's stored inlined
+ Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
+ /// The object is not short, Hash of first block is stored here, next segments hashes are
+ /// stored in the version table
+ FirstBlock(ObjectVersionMeta, Hash),
+}
+
+impl AutoCrdt for ObjectVersionData {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+/// Metadata about the object version
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub struct ObjectVersionMeta {
+ /// Headers to send to the client
+ pub headers: ObjectVersionHeaders,
+ /// Size of the object
+ pub size: u64,
+ /// etag of the object
+ pub etag: String,
+}
+
+/// Additional headers for an object
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub struct ObjectVersionHeaders {
+ /// Content type of the object
+ pub content_type: String,
+ /// Any other http headers to send
+ pub other: BTreeMap<String, String>,
+}
+
+impl ObjectVersion {
+ fn cmp_key(&self) -> (u64, Uuid) {
+ (self.timestamp, self.uuid)
+ }
+
+ /// Is the object version completely received
+ pub fn is_complete(&self) -> bool {
+ matches!(self.state, ObjectVersionState::Complete(_))
+ }
+}
+
+impl Crdt for Object {
+ fn merge(&mut self, other: &Self) {
+ // Merge versions from other into here
+ for other_v in other.versions.iter() {
+ match self
+ .versions
+ .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key()))
+ {
+ Ok(i) => {
+ self.versions[i].state.merge(&other_v.state);
+ }
+ Err(i) => {
+ self.versions.insert(i, other_v.clone());
+ }
+ }
+ }
+
+ // Remove versions which are obsolete, i.e. those that come
+ // before the last version which .is_complete().
+ let last_complete = self
+ .versions
+ .iter()
+ .enumerate()
+ .rev()
+ .find(|(_, v)| v.is_complete())
+ .map(|(vi, _)| vi);
+
+ if let Some(last_vi) = last_complete {
+ self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>();
+ }
+ }
+}
diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs
new file mode 100644
index 00000000..c11c62d5
--- /dev/null
+++ b/src/model/prev/v051/version_table.rs
@@ -0,0 +1,79 @@
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::*;
+
+use garage_table::crdt::*;
+use garage_table::*;
+
+/// A version of an object
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
+ pub parts_etags: crdt::Map<u64, String>,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ /// Bucket in which the related object is stored
+ pub bucket: String,
+ /// Key in which the related object is stored
+ pub key: String,
+}
+
+#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+pub struct VersionBlockKey {
+ /// Number of the part
+ pub part_number: u64,
+ /// Offset of this sub-segment in its part
+ pub offset: u64,
+}
+
+impl Ord for VersionBlockKey {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.part_number
+ .cmp(&other.part_number)
+ .then(self.offset.cmp(&other.offset))
+ }
+}
+
+impl PartialOrd for VersionBlockKey {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+
+/// Informations about a single block
+#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
+pub struct VersionBlock {
+ /// Blake2 sum of the block
+ pub hash: Hash,
+ /// Size of the block
+ pub size: u64,
+}
+
+impl AutoCrdt for VersionBlock {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl Crdt for Version {
+ fn merge(&mut self, other: &Self) {
+ self.deleted.merge(&other.deleted);
+
+ if self.deleted.get() {
+ self.blocks.clear();
+ self.parts_etags.clear();
+ } else {
+ self.blocks.merge(&other.blocks);
+ self.parts_etags.merge(&other.parts_etags);
+ }
+ }
+}
diff --git a/src/model/block_ref_table.rs b/src/model/s3/block_ref_table.rs
index b6945403..c7017409 100644
--- a/src/model/block_ref_table.rs
+++ b/src/model/s3/block_ref_table.rs
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
+use garage_db as db;
+
use garage_util::data::*;
use garage_table::crdt::Crdt;
@@ -8,7 +10,7 @@ use garage_table::*;
use garage_block::manager::*;
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {
/// Hash (blake2 sum) of the block, used as partition key
pub block: Hash,
@@ -51,21 +53,22 @@ impl TableSchema for BlockRefTable {
type E = BlockRef;
type Filter = DeletedFilter;
- fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
- #[allow(clippy::or_fun_call)]
- let block = &old.as_ref().or(new.as_ref()).unwrap().block;
- let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
- let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
+ fn updated(
+ &self,
+ tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ let block = old.or(new).unwrap().block;
+ let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false);
+ let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false);
if is_after && !was_before {
- if let Err(e) = self.block_manager.block_incref(block) {
- warn!("block_incref failed for block {:?}: {}", block, e);
- }
+ self.block_manager.block_incref(tx, block)?;
}
if was_before && !is_after {
- if let Err(e) = self.block_manager.block_decref(block) {
- warn!("block_decref failed for block {:?}: {}", block, e);
- }
+ self.block_manager.block_decref(tx, block)?;
}
+ Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs
new file mode 100644
index 00000000..4e94337d
--- /dev/null
+++ b/src/model/s3/mod.rs
@@ -0,0 +1,3 @@
+pub mod block_ref_table;
+pub mod object_table;
+pub mod version_table;
diff --git a/src/model/object_table.rs b/src/model/s3/object_table.rs
index da53878e..26ff57f6 100644
--- a/src/model/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
+use garage_db as db;
+
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -9,12 +11,17 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-use crate::version_table::*;
+use crate::index_counter::*;
+use crate::s3::version_table::*;
+
+use crate::prev::v051::object_table as old;
-use garage_model_050::object_table as old;
+pub const OBJECTS: &str = "objects";
+pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
+pub const BYTES: &str = "bytes";
/// An object
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
/// The bucket in which the object is stored, used as partition key
pub bucket_id: Uuid,
@@ -63,7 +70,7 @@ impl Object {
}
/// Informations about a version of an object
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: Uuid,
@@ -74,7 +81,7 @@ pub struct ObjectVersion {
}
/// State of an object version
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders),
@@ -216,6 +223,7 @@ impl Crdt for Object {
pub struct ObjectTable {
pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ pub object_counter_table: Arc<IndexCounter<Object>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
@@ -232,8 +240,26 @@ impl TableSchema for ObjectTable {
type E = Object;
type Filter = ObjectFilter;
- fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ fn updated(
+ &self,
+ tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ // 1. Count
+ let counter_res = self.object_counter_table.count(tx, old, new);
+ if let Err(e) = db::unabort(counter_res)? {
+ error!(
+ "Unable to update object counter: {}. Index values will be wrong!",
+ e
+ );
+ }
+
+ // 2. Spawn threads that propagates deletions to version table
let version_table = self.version_table.clone();
+ let old = old.cloned();
+ let new = new.cloned();
+
self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of old versions
@@ -256,7 +282,8 @@ impl TableSchema for ObjectTable {
}
}
Ok(())
- })
+ });
+ Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
@@ -272,6 +299,49 @@ impl TableSchema for ObjectTable {
}
}
+impl CountedItem for Object {
+ const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter";
+
+ // Partition key = bucket id
+ type CP = Uuid;
+ // Sort key = nothing
+ type CS = EmptyKey;
+
+ fn counter_partition_key(&self) -> &Uuid {
+ &self.bucket_id
+ }
+ fn counter_sort_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+
+ fn counts(&self) -> Vec<(&'static str, i64)> {
+ let versions = self.versions();
+ let n_objects = if versions.iter().any(|v| v.is_data()) {
+ 1
+ } else {
+ 0
+ };
+ let n_unfinished_uploads = versions
+ .iter()
+ .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_)))
+ .count();
+ let n_bytes = versions
+ .iter()
+ .map(|v| match &v.state {
+ ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _))
+ | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size,
+ _ => 0,
+ })
+ .sum::<u64>();
+
+ vec![
+ (OBJECTS, n_objects),
+ (UNFINISHED_UPLOADS, n_unfinished_uploads as i64),
+ (BYTES, n_bytes as i64),
+ ]
+ }
+}
+
// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
// (we just want to change bucket into bucket_id by hashing it)
diff --git a/src/model/version_table.rs b/src/model/s3/version_table.rs
index 839b1f4f..6bc2ecd1 100644
--- a/src/model/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
+use garage_db as db;
+
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -8,12 +10,12 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-use crate::block_ref_table::*;
+use crate::s3::block_ref_table::*;
-use garage_model_050::version_table as old;
+use crate::prev::v051::version_table as old;
/// A version of an object
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
/// UUID of the version, used as partition key
pub uuid: Uuid,
@@ -137,8 +139,16 @@ impl TableSchema for VersionTable {
type E = Version;
type Filter = DeletedFilter;
- fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
+ fn updated(
+ &self,
+ _tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
let block_ref_table = self.block_ref_table.clone();
+ let old = old.cloned();
+ let new = new.cloned();
+
self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
@@ -157,7 +167,9 @@ impl TableSchema for VersionTable {
}
}
Ok(())
- })
+ });
+
+ Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {