aboutsummaryrefslogtreecommitdiff
path: root/src/model/s3/object_table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-08 20:03:30 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-08 20:03:30 +0200
commit17e111139308ba995fb782cbd1af555920cbbb81 (patch)
tree2262f3b46bb84a21a5e7df59c19c2ca8fa444415 /src/model/s3/object_table.rs
parent03e811bbbfca5a2467bb24ce1500c74661234947 (diff)
downloadgarage-17e111139308ba995fb782cbd1af555920cbbb81.tar.gz
garage-17e111139308ba995fb782cbd1af555920cbbb81.zip
First iteration of bucket object counters
Diffstat (limited to 'src/model/s3/object_table.rs')
-rw-r--r--src/model/s3/object_table.rs58
1 files changed, 57 insertions, 1 deletions
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 62f5d8d9..027acea0 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -11,10 +11,15 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
+use crate::index_counter::*;
use crate::s3::version_table::*;
use garage_model_050::object_table as old;
+pub const OBJECTS: &str = "objects";
+pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
+pub const BYTES: &str = "bytes";
+
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
@@ -218,6 +223,7 @@ impl Crdt for Object {
pub struct ObjectTable {
pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ pub object_counter_table: Arc<IndexCounter<Object>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
@@ -236,10 +242,20 @@ impl TableSchema for ObjectTable {
fn updated(
&self,
- _tx: &mut db::Transaction,
+ tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
+ // 1. Count
+ let counter_res = self.object_counter_table.count(tx, old, new);
+ if let Err(e) = db::unabort(counter_res)? {
+ error!(
+ "Unable to update object counter: {}. Index values will be wrong!",
+ e
+ );
+ }
+
+ // 2. Spawn threads that propagates deletions to version table
let version_table = self.version_table.clone();
let old = old.cloned();
let new = new.cloned();
@@ -283,6 +299,46 @@ impl TableSchema for ObjectTable {
}
}
+impl CountedItem for Object {
+ const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter";
+
+ // Partition key = nothing
+ type CP = EmptyKey;
+ // Sort key = bucket id
+ type CS = Uuid;
+
+ fn counter_partition_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+ fn counter_sort_key(&self) -> &Uuid {
+ &self.bucket_id
+ }
+
+ fn counts(&self) -> Vec<(&'static str, i64)> {
+ let n_objects = if self.is_tombstone() { 0 } else { 1 };
+
+ let versions = self.versions();
+ let n_unfinished_uploads = versions
+ .iter()
+ .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_)))
+ .count();
+ let n_bytes = versions
+ .iter()
+ .map(|v| match &v.state {
+ ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _))
+ | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size,
+ _ => 0,
+ })
+ .sum::<u64>();
+
+ vec![
+ (OBJECTS, n_objects),
+ (UNFINISHED_UPLOADS, n_unfinished_uploads as i64),
+ (BYTES, n_bytes as i64),
+ ]
+ }
+}
+
// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
// (we just want to change bucket into bucket_id by hashing it)