diff options
Diffstat (limited to 'src/model/s3')
-rw-r--r-- | src/model/s3/object_table.rs | 58 |
1 files changed, 57 insertions, 1 deletions
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 62f5d8d9..027acea0 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -11,10 +11,15 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; +use crate::index_counter::*; use crate::s3::version_table::*; use garage_model_050::object_table as old; +pub const OBJECTS: &str = "objects"; +pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; +pub const BYTES: &str = "bytes"; + /// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { @@ -218,6 +223,7 @@ impl Crdt for Object { pub struct ObjectTable { pub background: Arc<BackgroundRunner>, pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, + pub object_counter_table: Arc<IndexCounter<Object>>, } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -236,10 +242,20 @@ impl TableSchema for ObjectTable { fn updated( &self, - _tx: &mut db::Transaction, + tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.object_counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + error!( + "Unable to update object counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Spawn threads that propagates deletions to version table let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -283,6 +299,46 @@ impl TableSchema for ObjectTable { } } +impl CountedItem for Object { + const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter"; + + // Partition key = nothing + type CP = EmptyKey; + // Sort key = bucket id + type CS = Uuid; + + fn counter_partition_key(&self) -> &EmptyKey { + &EmptyKey + } + fn counter_sort_key(&self) -> &Uuid { + &self.bucket_id + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let n_objects = if self.is_tombstone() { 0 } else { 1 }; + + let versions = self.versions(); + let n_unfinished_uploads = versions + .iter() + .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_))) + .count(); + let n_bytes = versions + .iter() + .map(|v| match &v.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) + | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size, + _ => 0, + }) + .sum::<u64>(); + + vec![ + (OBJECTS, n_objects), + (UNFINISHED_UPLOADS, n_unfinished_uploads as i64), + (BYTES, n_bytes as i64), + ] + } +} + // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // (we just want to change bucket into bucket_id by hashing it) |