aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/bucket_table.rs19
-rw-r--r--src/model/garage.rs67
-rw-r--r--src/model/index_counter.rs250
-rw-r--r--src/model/k2v/counter_table.rs20
-rw-r--r--src/model/k2v/item_table.rs102
-rw-r--r--src/model/k2v/mod.rs1
-rw-r--r--src/model/migrate.rs1
-rw-r--r--src/model/s3/object_table.rs61
8 files changed, 392 insertions, 129 deletions
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 7c7b9f30..130eb6a6 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
-use garage_table::crdt::Crdt;
+use garage_table::crdt::*;
use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
@@ -44,6 +44,9 @@ pub struct BucketParams {
pub website_config: crdt::Lww<Option<WebsiteConfig>>,
/// CORS rules
pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
+ /// Bucket quotas
+ #[serde(default)]
+ pub quotas: crdt::Lww<BucketQuotas>,
}
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
@@ -62,6 +65,18 @@ pub struct CorsRule {
pub expose_headers: Vec<String>,
}
+#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub struct BucketQuotas {
+ /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
+ pub max_size: Option<u64>,
+ /// Maximum number of non-deleted objects in the bucket
+ pub max_objects: Option<u64>,
+}
+
+impl AutoCrdt for BucketQuotas {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
impl BucketParams {
/// Create an empty BucketParams with no authorized keys and no website accesss
pub fn new() -> Self {
@@ -72,6 +87,7 @@ impl BucketParams {
local_aliases: crdt::LwwMap::new(),
website_config: crdt::Lww::new(None),
cors_config: crdt::Lww::new(None),
+ quotas: crdt::Lww::new(BucketQuotas::default()),
}
}
}
@@ -86,6 +102,7 @@ impl Crdt for BucketParams {
self.website_config.merge(&o.website_config);
self.cors_config.merge(&o.cors_config);
+ self.quotas.merge(&o.quotas);
}
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 280f3dc7..15769a17 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -6,6 +6,7 @@ use garage_db as db;
use garage_util::background::*;
use garage_util::config::*;
+use garage_util::error::Error;
use garage_rpc::system::System;
@@ -22,12 +23,11 @@ use crate::s3::version_table::*;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::helper;
+use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
-use crate::index_counter::*;
-#[cfg(feature = "k2v")]
-use crate::k2v::{counter_table::*, item_table::*, poll::*, rpc::*};
+use crate::k2v::{item_table::*, poll::*, rpc::*};
/// An entire Garage full of data
pub struct Garage {
@@ -52,6 +52,8 @@ pub struct Garage {
/// Table containing S3 objects
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
+ /// Counting table containing object counters
+ pub object_counter_table: Arc<IndexCounter<Object>>,
/// Table containing S3 object versions
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
/// Table containing S3 block references (not blocks themselves)
@@ -66,14 +68,57 @@ pub struct GarageK2V {
/// Table containing K2V items
pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
/// Indexing table containing K2V item counters
- pub counter_table: Arc<IndexCounter<K2VCounterTable>>,
+ pub counter_table: Arc<IndexCounter<K2VItem>>,
/// K2V RPC handler
pub rpc: Arc<K2VRpcHandler>,
}
impl Garage {
/// Create and run garage
- pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> {
+ info!("Opening database...");
+ let mut db_path = config.metadata_dir.clone();
+ std::fs::create_dir_all(&db_path).expect("Unable to create Garage meta data directory");
+ let db = match config.db_engine.as_str() {
+ "sled" => {
+ db_path.push("db");
+ info!("Opening Sled database at: {}", db_path.display());
+ let db = db::sled_adapter::sled::Config::default()
+ .path(&db_path)
+ .cache_capacity(config.sled_cache_capacity)
+ .flush_every_ms(Some(config.sled_flush_every_ms))
+ .open()
+ .expect("Unable to open sled DB");
+ db::sled_adapter::SledDb::init(db)
+ }
+ "sqlite" | "sqlite3" | "rusqlite" => {
+ db_path.push("db.sqlite");
+ info!("Opening Sqlite database at: {}", db_path.display());
+ let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
+ .expect("Unable to open sqlite DB");
+ db::sqlite_adapter::SqliteDb::init(db)
+ }
+ "lmdb" | "heed" => {
+ db_path.push("db.lmdb");
+ info!("Opening LMDB database at: {}", db_path.display());
+ std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
+ let map_size = garage_db::lmdb_adapter::recommended_map_size();
+
+ let db = db::lmdb_adapter::heed::EnvOpenOptions::new()
+ .max_dbs(100)
+ .map_size(map_size)
+ .open(&db_path)
+ .expect("Unable to open LMDB DB");
+ db::lmdb_adapter::LmdbDb::init(db)
+ }
+ e => {
+ return Err(Error::Message(format!(
+ "Unsupported DB engine: {} (options: sled, sqlite, lmdb)",
+ e
+ )));
+ }
+ };
+
let network_key = NetworkKey::from_slice(
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
)
@@ -155,12 +200,16 @@ impl Garage {
&db,
);
+ info!("Initialize object counter table...");
+ let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
+
info!("Initialize object_table...");
#[allow(clippy::redundant_clone)]
let object_table = Table::new(
ObjectTable {
background: background.clone(),
version_table: version_table.clone(),
+ object_counter_table: object_counter_table.clone(),
},
meta_rep_param.clone(),
system.clone(),
@@ -171,9 +220,8 @@ impl Garage {
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
- info!("Initialize Garage...");
-
- Arc::new(Self {
+ // -- done --
+ Ok(Arc::new(Self {
config,
db,
background,
@@ -183,11 +231,12 @@ impl Garage {
bucket_alias_table,
key_table,
object_table,
+ object_counter_table,
version_table,
block_ref_table,
#[cfg(feature = "k2v")]
k2v,
- })
+ }))
}
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 2602d5d9..36e8172b 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -1,3 +1,4 @@
+use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
@@ -12,30 +13,36 @@ use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::*;
use garage_table::crdt::*;
-use garage_table::replication::TableShardedReplication;
+use garage_table::replication::*;
use garage_table::*;
-pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static {
- const NAME: &'static str;
- type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
- type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
+ const COUNTER_TABLE_NAME: &'static str;
+
+ type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+
+ fn counter_partition_key(&self) -> &Self::CP;
+ fn counter_sort_key(&self) -> &Self::CS;
+ fn counts(&self) -> Vec<(&'static str, i64)>;
}
/// A counter entry in the global table
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-pub struct CounterEntry<T: CounterSchema> {
- pub pk: T::P,
- pub sk: T::S,
+#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
+pub struct CounterEntry<T: CountedItem> {
+ pub pk: T::CP,
+ pub sk: T::CS,
pub values: BTreeMap<String, CounterValue>,
}
-impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
- fn partition_key(&self) -> &T::P {
+impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
+ fn partition_key(&self) -> &T::CP {
&self.pk
}
- fn sort_key(&self) -> &T::S {
+ fn sort_key(&self) -> &T::CS {
&self.sk
}
fn is_tombstone(&self) -> bool {
@@ -45,7 +52,7 @@ impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
}
}
-impl<T: CounterSchema> CounterEntry<T> {
+impl<T: CountedItem> CounterEntry<T> {
pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
let nodes = &ring.layout.node_id_vec[..];
self.filtered_values_with_nodes(nodes)
@@ -78,7 +85,7 @@ pub struct CounterValue {
pub node_values: BTreeMap<Uuid, (u64, i64)>,
}
-impl<T: CounterSchema> Crdt for CounterEntry<T> {
+impl<T: CountedItem> Crdt for CounterEntry<T> {
fn merge(&mut self, other: &Self) {
for (name, e2) in other.values.iter() {
if let Some(e) = self.values.get_mut(name) {
@@ -104,15 +111,15 @@ impl Crdt for CounterValue {
}
}
-pub struct CounterTable<T: CounterSchema> {
+pub struct CounterTable<T: CountedItem> {
_phantom_t: PhantomData<T>,
}
-impl<T: CounterSchema> TableSchema for CounterTable<T> {
- const TABLE_NAME: &'static str = T::NAME;
+impl<T: CountedItem> TableSchema for CounterTable<T> {
+ const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME;
- type P = T::P;
- type S = T::S;
+ type P = T::CP;
+ type S = T::CS;
type E = CounterEntry<T>;
type Filter = (DeletedFilter, Vec<Uuid>);
@@ -131,14 +138,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
// ----
-pub struct IndexCounter<T: CounterSchema> {
+pub struct IndexCounter<T: CountedItem> {
this_node: Uuid,
local_counter: db::Tree,
- propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
+ propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
-impl<T: CounterSchema> IndexCounter<T> {
+impl<T: CountedItem> IndexCounter<T> {
pub fn new(
system: Arc<System>,
replication: TableShardedReplication,
@@ -151,7 +158,7 @@ impl<T: CounterSchema> IndexCounter<T> {
let this = Arc::new(Self {
this_node: system.id,
local_counter: db
- .open_tree(format!("local_counter:{}", T::NAME))
+ .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME))
.expect("Unable to open local counter tree"),
propagate_tx,
table: Table::new(
@@ -166,7 +173,7 @@ impl<T: CounterSchema> IndexCounter<T> {
let this2 = this.clone();
background.spawn_worker(
- format!("{} index counter propagator", T::NAME),
+ format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
);
this
@@ -175,24 +182,45 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn count(
&self,
tx: &mut db::Transaction,
- pk: &T::P,
- sk: &T::S,
- counts: &[(&str, i64)],
+ old: Option<&T>,
+ new: Option<&T>,
) -> db::TxResult<(), Error> {
+ let pk = old
+ .map(|e| e.counter_partition_key())
+ .unwrap_or_else(|| new.unwrap().counter_partition_key());
+ let sk = old
+ .map(|e| e.counter_sort_key())
+ .unwrap_or_else(|| new.unwrap().counter_sort_key());
+
+ // calculate counter differences
+ let mut counts = HashMap::new();
+ for (k, v) in old.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) -= v;
+ }
+ for (k, v) in new.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) += v;
+ }
+
+ // update local counter table
let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
- Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
- .map_err(Error::RmpDecode)
- .map_err(db::TxError::Abort)?,
+ Some(old_bytes) => {
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
+ .map_err(Error::RmpDecode)
+ .map_err(db::TxError::Abort)?
+ }
None => LocalCounterEntry {
+ pk: pk.clone(),
+ sk: sk.clone(),
values: BTreeMap::new(),
},
};
+ let now = now_msec();
for (s, inc) in counts.iter() {
let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
- ent.0 += 1;
+ ent.0 = std::cmp::max(ent.0 + 1, now);
ent.1 += *inc;
}
@@ -213,7 +241,7 @@ impl<T: CounterSchema> IndexCounter<T> {
async fn propagate_loop(
self: Arc<Self>,
- mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>,
+ mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
must_exit: watch::Receiver<bool>,
) {
// This loop batches updates to counters to be sent all at once.
@@ -236,7 +264,7 @@ impl<T: CounterSchema> IndexCounter<T> {
if let Some((pk, sk, counters)) = ent {
let tree_key = self.table.data.tree_key(&pk, &sk);
- let dist_entry = counters.into_counter_entry::<T>(self.this_node, pk, sk);
+ let dist_entry = counters.into_counter_entry(self.this_node);
match buf.entry(tree_key) {
hash_map::Entry::Vacant(e) => {
e.insert(dist_entry);
@@ -255,10 +283,10 @@ impl<T: CounterSchema> IndexCounter<T> {
if let Err(e) = self.table.insert_many(entries).await {
errors += 1;
if errors >= 2 && *must_exit.borrow() {
- error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
+ error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
break;
}
- warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors);
+ warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
@@ -272,23 +300,155 @@ impl<T: CounterSchema> IndexCounter<T> {
}
}
}
+
+ pub fn offline_recount_all<TS, TR>(
+ &self,
+ counted_table: &Arc<Table<TS, TR>>,
+ ) -> Result<(), Error>
+ where
+ TS: TableSchema<E = T>,
+ TR: TableReplication,
+ {
+ let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
+ let entry_k = self
+ .table
+ .data
+ .tree_key(entry.partition_key(), entry.sort_key());
+ self.table
+ .data
+ .update_entry_with(&entry_k, |ent| match ent {
+ Some(mut ent) => {
+ ent.merge(&entry);
+ ent
+ }
+ None => entry.clone(),
+ })?;
+ Ok(())
+ };
+
+ // 1. Set all old local counters to zero
+ let now = now_msec();
+ let mut next_start: Option<Vec<u8>> = None;
+ loop {
+ let low_bound = match next_start.take() {
+ Some(v) => Bound::Excluded(v),
+ None => Bound::Unbounded,
+ };
+ let mut batch = vec![];
+ for item in self.local_counter.range((low_bound, Bound::Unbounded))? {
+ batch.push(item?);
+ if batch.len() > 1000 {
+ break;
+ }
+ }
+
+ if batch.is_empty() {
+ break;
+ }
+
+ info!("zeroing old counters... ({})", hex::encode(&batch[0].0));
+ for (local_counter_k, local_counter) in batch {
+ let mut local_counter =
+ rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;
+
+ for (_, tv) in local_counter.values.iter_mut() {
+ tv.0 = std::cmp::max(tv.0 + 1, now);
+ tv.1 = 0;
+ }
+
+ let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ self.local_counter
+ .insert(&local_counter_k, &local_counter_bytes)?;
+
+ let counter_entry = local_counter.into_counter_entry(self.this_node);
+ save_counter_entry(counter_entry)?;
+
+ next_start = Some(local_counter_k);
+ }
+ }
+
+ // 2. Recount all table entries
+ let now = now_msec();
+ let mut next_start: Option<Vec<u8>> = None;
+ loop {
+ let low_bound = match next_start.take() {
+ Some(v) => Bound::Excluded(v),
+ None => Bound::Unbounded,
+ };
+ let mut batch = vec![];
+ for item in counted_table
+ .data
+ .store
+ .range((low_bound, Bound::Unbounded))?
+ {
+ batch.push(item?);
+ if batch.len() > 1000 {
+ break;
+ }
+ }
+
+ if batch.is_empty() {
+ break;
+ }
+
+ info!("counting entries... ({})", hex::encode(&batch[0].0));
+ for (counted_entry_k, counted_entry) in batch {
+ let counted_entry = counted_table.data.decode_entry(&counted_entry)?;
+
+ let pk = counted_entry.counter_partition_key();
+ let sk = counted_entry.counter_sort_key();
+ let counts = counted_entry.counts();
+
+ let local_counter_key = self.table.data.tree_key(pk, sk);
+ let mut local_counter = match self.local_counter.get(&local_counter_key)? {
+ Some(old_bytes) => {
+ let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
+ &old_bytes,
+ )?;
+ assert!(ent.pk == *pk);
+ assert!(ent.sk == *sk);
+ ent
+ }
+ None => LocalCounterEntry {
+ pk: pk.clone(),
+ sk: sk.clone(),
+ values: BTreeMap::new(),
+ },
+ };
+ for (s, v) in counts.iter() {
+ let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0));
+ tv.0 = std::cmp::max(tv.0 + 1, now);
+ tv.1 += v;
+ }
+
+ let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ self.local_counter
+ .insert(&local_counter_key, local_counter_bytes)?;
+
+ let counter_entry = local_counter.into_counter_entry(self.this_node);
+ save_counter_entry(counter_entry)?;
+
+ next_start = Some(counted_entry_k);
+ }
+ }
+
+ // Done
+ Ok(())
+ }
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-struct LocalCounterEntry {
+struct LocalCounterEntry<T: CountedItem> {
+ pk: T::CP,
+ sk: T::CS,
values: BTreeMap<String, (u64, i64)>,
}
-impl LocalCounterEntry {
- fn into_counter_entry<T: CounterSchema>(
- self,
- this_node: Uuid,
- pk: T::P,
- sk: T::S,
- ) -> CounterEntry<T> {
+impl<T: CountedItem> LocalCounterEntry<T> {
+ fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
CounterEntry {
- pk,
- sk,
+ pk: self.pk,
+ sk: self.sk,
values: self
.values
.into_iter()
diff --git a/src/model/k2v/counter_table.rs b/src/model/k2v/counter_table.rs
deleted file mode 100644
index 4856eb2b..00000000
--- a/src/model/k2v/counter_table.rs
+++ /dev/null
@@ -1,20 +0,0 @@
-use garage_util::data::*;
-
-use crate::index_counter::*;
-
-pub const ENTRIES: &str = "entries";
-pub const CONFLICTS: &str = "conflicts";
-pub const VALUES: &str = "values";
-pub const BYTES: &str = "bytes";
-
-#[derive(PartialEq, Clone)]
-pub struct K2VCounterTable;
-
-impl CounterSchema for K2VCounterTable {
- const NAME: &'static str = "k2v_index_counter";
-
- // Partition key = bucket id
- type P = Uuid;
- // Sort key = K2V item's partition key
- type S = String;
-}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 991fe66d..baa1db4b 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -10,9 +10,13 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
-use crate::k2v::counter_table::*;
use crate::k2v::poll::*;
+pub const ENTRIES: &str = "entries";
+pub const CONFLICTS: &str = "conflicts";
+pub const VALUES: &str = "values";
+pub const BYTES: &str = "bytes";
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct K2VItem {
pub partition: K2VItemPartition,
@@ -112,27 +116,6 @@ impl K2VItem {
ent.discard();
}
}
-
- // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used)
- fn stats(&self) -> (i64, i64, i64, i64) {
- let values = self.values();
-
- let n_entries = if self.is_tombstone() { 0 } else { 1 };
- let n_conflicts = if values.len() > 1 { 1 } else { 0 };
- let n_values = values
- .iter()
- .filter(|v| matches!(v, DvvsValue::Value(_)))
- .count() as i64;
- let n_bytes = values
- .iter()
- .map(|v| match v {
- DvvsValue::Deleted => 0,
- DvvsValue::Value(v) => v.len() as i64,
- })
- .sum();
-
- (n_entries, n_conflicts, n_values, n_bytes)
- }
}
impl DvvsEntry {
@@ -204,7 +187,7 @@ impl Entry<K2VItemPartition, String> for K2VItem {
}
pub struct K2VItemTable {
- pub(crate) counter_table: Arc<IndexCounter<K2VCounterTable>>,
+ pub(crate) counter_table: Arc<IndexCounter<K2VItem>>,
pub(crate) subscriptions: Arc<SubscriptionManager>,
}
@@ -229,40 +212,14 @@ impl TableSchema for K2VItemTable {
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
// 1. Count
- let (old_entries, old_conflicts, old_values, old_bytes) = match old {
- None => (0, 0, 0, 0),
- Some(e) => e.stats(),
- };
- let (new_entries, new_conflicts, new_values, new_bytes) = match new {
- None => (0, 0, 0, 0),
- Some(e) => e.stats(),
- };
-
- let count_pk = old
- .map(|e| e.partition.bucket_id)
- .unwrap_or_else(|| new.unwrap().partition.bucket_id);
- let count_sk = old
- .map(|e| &e.partition.partition_key)
- .unwrap_or_else(|| &new.unwrap().partition.partition_key);
-
- let counter_res = self.counter_table.count(
- tx,
- &count_pk,
- count_sk,
- &[
- (ENTRIES, new_entries - old_entries),
- (CONFLICTS, new_conflicts - old_conflicts),
- (VALUES, new_values - old_values),
- (BYTES, new_bytes - old_bytes),
- ],
- );
+ let counter_res = self.counter_table.count(tx, old, new);
if let Err(e) = db::unabort(counter_res)? {
// This result can be returned by `counter_table.count()` for instance
// if messagepack serialization or deserialization fails at some step.
// Warn admin but ignore this error for now, that's all we can do.
error!(
- "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!",
- count_pk, count_sk, e
+ "Unable to update K2V item counter: {}. Index values will be wrong!",
+ e
);
}
@@ -282,6 +239,47 @@ impl TableSchema for K2VItemTable {
}
}
+impl CountedItem for K2VItem {
+ const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter_v2";
+
+ // Partition key = bucket id
+ type CP = Uuid;
+ // Sort key = K2V item's partition key
+ type CS = String;
+
+ fn counter_partition_key(&self) -> &Uuid {
+ &self.partition.bucket_id
+ }
+ fn counter_sort_key(&self) -> &String {
+ &self.partition.partition_key
+ }
+
+ fn counts(&self) -> Vec<(&'static str, i64)> {
+ let values = self.values();
+
+ let n_entries = if self.is_tombstone() { 0 } else { 1 };
+ let n_conflicts = if values.len() > 1 { 1 } else { 0 };
+ let n_values = values
+ .iter()
+ .filter(|v| matches!(v, DvvsValue::Value(_)))
+ .count() as i64;
+ let n_bytes = values
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Deleted => 0,
+ DvvsValue::Value(v) => v.len() as i64,
+ })
+ .sum();
+
+ vec![
+ (ENTRIES, n_entries),
+ (CONFLICTS, n_conflicts),
+ (VALUES, n_values),
+ (BYTES, n_bytes),
+ ]
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index 664172a6..f6a96151 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -1,6 +1,5 @@
pub mod causality;
-pub mod counter_table;
pub mod item_table;
pub mod poll;
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index 25acb4b0..5fc67069 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -77,6 +77,7 @@ impl Migrate {
local_aliases: LwwMap::new(),
website_config: Lww::new(website),
cors_config: Lww::new(None),
+ quotas: Lww::new(Default::default()),
}),
})
.await?;
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 62f5d8d9..23cce1d3 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -11,10 +11,15 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
+use crate::index_counter::*;
use crate::s3::version_table::*;
use garage_model_050::object_table as old;
+pub const OBJECTS: &str = "objects";
+pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
+pub const BYTES: &str = "bytes";
+
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
@@ -218,6 +223,7 @@ impl Crdt for Object {
pub struct ObjectTable {
pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ pub object_counter_table: Arc<IndexCounter<Object>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
@@ -236,10 +242,20 @@ impl TableSchema for ObjectTable {
fn updated(
&self,
- _tx: &mut db::Transaction,
+ tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
+ // 1. Count
+ let counter_res = self.object_counter_table.count(tx, old, new);
+ if let Err(e) = db::unabort(counter_res)? {
+ error!(
+ "Unable to update object counter: {}. Index values will be wrong!",
+ e
+ );
+ }
+
+ // 2. Spawn threads that propagates deletions to version table
let version_table = self.version_table.clone();
let old = old.cloned();
let new = new.cloned();
@@ -283,6 +299,49 @@ impl TableSchema for ObjectTable {
}
}
+impl CountedItem for Object {
+ const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter";
+
+ // Partition key = nothing
+ type CP = EmptyKey;
+ // Sort key = bucket id
+ type CS = Uuid;
+
+ fn counter_partition_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+ fn counter_sort_key(&self) -> &Uuid {
+ &self.bucket_id
+ }
+
+ fn counts(&self) -> Vec<(&'static str, i64)> {
+ let versions = self.versions();
+ let n_objects = if versions.iter().any(|v| v.is_data()) {
+ 0
+ } else {
+ 1
+ };
+ let n_unfinished_uploads = versions
+ .iter()
+ .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_)))
+ .count();
+ let n_bytes = versions
+ .iter()
+ .map(|v| match &v.state {
+ ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _))
+ | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size,
+ _ => 0,
+ })
+ .sum::<u64>();
+
+ vec![
+ (OBJECTS, n_objects),
+ (UNFINISHED_UPLOADS, n_unfinished_uploads as i64),
+ (BYTES, n_bytes as i64),
+ ]
+ }
+}
+
// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
// (we just want to change bucket into bucket_id by hashing it)