diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-08 20:03:30 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-08 20:03:30 +0200 |
commit | 17e111139308ba995fb782cbd1af555920cbbb81 (patch) | |
tree | 2262f3b46bb84a21a5e7df59c19c2ca8fa444415 /src/model | |
parent | 03e811bbbfca5a2467bb24ce1500c74661234947 (diff) | |
download | garage-17e111139308ba995fb782cbd1af555920cbbb81.tar.gz garage-17e111139308ba995fb782cbd1af555920cbbb81.zip |
First iteration of bucket object counters
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/garage.rs | 10 | ||||
-rw-r--r-- | src/model/s3/object_table.rs | 58 |
2 files changed, 65 insertions, 3 deletions
diff --git a/src/model/garage.rs b/src/model/garage.rs index eed9445c..06ef25d1 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -23,11 +23,10 @@ use crate::s3::version_table::*; use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::helper; +use crate::index_counter::*; use crate::key_table::*; #[cfg(feature = "k2v")] -use crate::index_counter::*; -#[cfg(feature = "k2v")] use crate::k2v::{item_table::*, poll::*, rpc::*}; /// An entire Garage full of data @@ -53,6 +52,8 @@ pub struct Garage { /// Table containing S3 objects pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, + /// Counting table containing object counters + pub object_counter_table: Arc<IndexCounter<Object>>, /// Table containing S3 object versions pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, /// Table containing S3 block references (not blocks themselves) @@ -205,12 +206,16 @@ impl Garage { &db, ); + info!("Initialize object counter table..."); + let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); + info!("Initialize object_table..."); #[allow(clippy::redundant_clone)] let object_table = Table::new( ObjectTable { background: background.clone(), version_table: version_table.clone(), + object_counter_table: object_counter_table.clone(), }, meta_rep_param.clone(), system.clone(), @@ -232,6 +237,7 @@ impl Garage { bucket_alias_table, key_table, object_table, + object_counter_table, version_table, block_ref_table, #[cfg(feature = "k2v")] diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 62f5d8d9..027acea0 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -11,10 +11,15 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; +use crate::index_counter::*; use crate::s3::version_table::*; use garage_model_050::object_table as old; +pub const OBJECTS: &str = "objects"; +pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; +pub const BYTES: &str = "bytes"; + /// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { @@ -218,6 +223,7 @@ impl Crdt for Object { pub struct ObjectTable { pub background: Arc<BackgroundRunner>, pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, + pub object_counter_table: Arc<IndexCounter<Object>>, } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -236,10 +242,20 @@ impl TableSchema for ObjectTable { fn updated( &self, - _tx: &mut db::Transaction, + tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.object_counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + error!( + "Unable to update object counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Spawn threads that propagates deletions to version table let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -283,6 +299,46 @@ impl TableSchema for ObjectTable { } } +impl CountedItem for Object { + const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter"; + + // Partition key = nothing + type CP = EmptyKey; + // Sort key = bucket id + type CS = Uuid; + + fn counter_partition_key(&self) -> &EmptyKey { + &EmptyKey + } + fn counter_sort_key(&self) -> &Uuid { + &self.bucket_id + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let n_objects = if self.is_tombstone() { 0 } else { 1 }; + + let versions = self.versions(); + let n_unfinished_uploads = versions + .iter() + .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_))) + .count(); + let n_bytes = versions + .iter() + .map(|v| match &v.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) + | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size, + _ => 0, + }) + .sum::<u64>(); + + vec![ + (OBJECTS, n_objects), + (UNFINISHED_UPLOADS, n_unfinished_uploads as i64), + (BYTES, n_bytes as i64), + ] + } +} + // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // (we just want to change bucket into bucket_id by hashing it) |