aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-08 20:03:30 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-08 20:03:30 +0200
commit17e111139308ba995fb782cbd1af555920cbbb81 (patch)
tree2262f3b46bb84a21a5e7df59c19c2ca8fa444415
parent03e811bbbfca5a2467bb24ce1500c74661234947 (diff)
downloadgarage-17e111139308ba995fb782cbd1af555920cbbb81.tar.gz
garage-17e111139308ba995fb782cbd1af555920cbbb81.zip
First iteration of bucket object counters
-rw-r--r--Cargo.lock7
-rw-r--r--src/garage/Cargo.toml1
-rw-r--r--src/garage/admin.rs21
-rw-r--r--src/garage/cli/cmd.rs8
-rw-r--r--src/garage/cli/structs.rs3
-rw-r--r--src/garage/cli/util.rs26
-rw-r--r--src/garage/repair/offline.rs5
-rw-r--r--src/model/garage.rs10
-rw-r--r--src/model/s3/object_table.rs58
9 files changed, 131 insertions, 8 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 11aa070d..ecdf8a57 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -388,6 +388,12 @@ dependencies = [
]
[[package]]
+name = "bytesize"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6c58ec36aac5066d5ca17df51b3e70279f5670a72102f5752cb7e7c856adfc70"
+
+[[package]]
name = "cc"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -948,6 +954,7 @@ dependencies = [
"aws-sdk-s3",
"base64",
"bytes 1.1.0",
+ "bytesize",
"chrono",
"futures",
"futures-util",
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index eb643160..640e6975 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -30,6 +30,7 @@ garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0"
+bytesize = "1.1"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index afe7fe7a..31305b51 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -39,7 +39,11 @@ pub enum AdminRpc {
// Replies
Ok(String),
BucketList(Vec<Bucket>),
- BucketInfo(Bucket, HashMap<String, Key>),
+ BucketInfo {
+ bucket: Bucket,
+ relevant_keys: HashMap<String, Key>,
+ counters: HashMap<String, i64>,
+ },
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
}
@@ -104,6 +108,15 @@ impl AdminRpcHandler {
.get_existing_bucket(bucket_id)
.await?;
+ let counters = self
+ .garage
+ .object_counter_table
+ .table
+ .get(&EmptyKey, &bucket_id)
+ .await?
+ .map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
+ .unwrap_or_default();
+
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
@@ -139,7 +152,11 @@ impl AdminRpcHandler {
}
}
- Ok(AdminRpc::BucketInfo(bucket, relevant_keys))
+ Ok(AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ })
}
#[allow(clippy::ptr_arg)]
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index b2dd8f14..3a0bd956 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -169,8 +169,12 @@ pub async fn cmd_admin(
AdminRpc::BucketList(bl) => {
print_bucket_list(bl);
}
- AdminRpc::BucketInfo(bucket, rk) => {
- print_bucket_info(&bucket, &rk);
+ AdminRpc::BucketInfo {
+ bucket,
+ relevant_keys,
+ counters,
+ } => {
+ print_bucket_info(&bucket, &relevant_keys, &counters);
}
AdminRpc::KeyList(kl) => {
print_key_list(kl);
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index 575ac857..cdaa09be 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -425,6 +425,9 @@ pub enum OfflineRepairWhat {
/// Repair K2V item counters
#[structopt(name = "k2v_item_counters")]
K2VItemCounters,
+ /// Repair object counters
+ #[structopt(name = "object_counters")]
+ ObjectCounters,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 6d73be3a..23c669b9 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -7,6 +7,7 @@ use garage_util::formater::format_table;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
+use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:");
@@ -121,7 +122,11 @@ pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) {
}
}
-pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) {
+pub fn print_bucket_info(
+ bucket: &Bucket,
+ relevant_keys: &HashMap<String, Key>,
+ counters: &HashMap<String, i64>,
+) {
let key_name = |k| {
relevant_keys
.get(k)
@@ -133,6 +138,25 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>)
match &bucket.state {
Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => {
+ let size =
+ bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64);
+ println!(
+ "Size: {} ({})",
+ size.to_string_as(true),
+ size.to_string_as(false)
+ );
+ println!(
+ "Objects: {}",
+ counters.get(OBJECTS).cloned().unwrap_or_default()
+ );
+ println!(
+ "Unfinished multipart uploads: {}",
+ counters
+ .get(UNFINISHED_UPLOADS)
+ .cloned()
+ .unwrap_or_default()
+ );
+
println!("Website access: {}", p.website_config.get().is_some());
println!("\nGlobal aliases:");
diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs
index 853bfdf3..ef56cc5c 100644
--- a/src/garage/repair/offline.rs
+++ b/src/garage/repair/offline.rs
@@ -38,6 +38,11 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu
#[cfg(not(feature = "k2v"))]
error!("K2V not enabled in this build.");
}
+ OfflineRepairWhat::ObjectCounters => {
+ garage
+ .object_counter_table
+ .offline_recount_all(&garage.object_table)?;
+ }
}
info!("Repair operation finished, shutting down Garage internals...");
diff --git a/src/model/garage.rs b/src/model/garage.rs
index eed9445c..06ef25d1 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -23,11 +23,10 @@ use crate::s3::version_table::*;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::helper;
+use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
-use crate::index_counter::*;
-#[cfg(feature = "k2v")]
use crate::k2v::{item_table::*, poll::*, rpc::*};
/// An entire Garage full of data
@@ -53,6 +52,8 @@ pub struct Garage {
/// Table containing S3 objects
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
+ /// Counting table containing object counters
+ pub object_counter_table: Arc<IndexCounter<Object>>,
/// Table containing S3 object versions
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
/// Table containing S3 block references (not blocks themselves)
@@ -205,12 +206,16 @@ impl Garage {
&db,
);
+ info!("Initialize object counter table...");
+ let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
+
info!("Initialize object_table...");
#[allow(clippy::redundant_clone)]
let object_table = Table::new(
ObjectTable {
background: background.clone(),
version_table: version_table.clone(),
+ object_counter_table: object_counter_table.clone(),
},
meta_rep_param.clone(),
system.clone(),
@@ -232,6 +237,7 @@ impl Garage {
bucket_alias_table,
key_table,
object_table,
+ object_counter_table,
version_table,
block_ref_table,
#[cfg(feature = "k2v")]
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 62f5d8d9..027acea0 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -11,10 +11,15 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
+use crate::index_counter::*;
use crate::s3::version_table::*;
use garage_model_050::object_table as old;
+pub const OBJECTS: &str = "objects";
+pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
+pub const BYTES: &str = "bytes";
+
/// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
@@ -218,6 +223,7 @@ impl Crdt for Object {
pub struct ObjectTable {
pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
+ pub object_counter_table: Arc<IndexCounter<Object>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
@@ -236,10 +242,20 @@ impl TableSchema for ObjectTable {
fn updated(
&self,
- _tx: &mut db::Transaction,
+ tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
+ // 1. Count
+ let counter_res = self.object_counter_table.count(tx, old, new);
+ if let Err(e) = db::unabort(counter_res)? {
+ error!(
+ "Unable to update object counter: {}. Index values will be wrong!",
+ e
+ );
+ }
+
+ // 2. Spawn threads that propagates deletions to version table
let version_table = self.version_table.clone();
let old = old.cloned();
let new = new.cloned();
@@ -283,6 +299,46 @@ impl TableSchema for ObjectTable {
}
}
+impl CountedItem for Object {
+ const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter";
+
+ // Partition key = nothing
+ type CP = EmptyKey;
+ // Sort key = bucket id
+ type CS = Uuid;
+
+ fn counter_partition_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+ fn counter_sort_key(&self) -> &Uuid {
+ &self.bucket_id
+ }
+
+ fn counts(&self) -> Vec<(&'static str, i64)> {
+ let n_objects = if self.is_tombstone() { 0 } else { 1 };
+
+ let versions = self.versions();
+ let n_unfinished_uploads = versions
+ .iter()
+ .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_)))
+ .count();
+ let n_bytes = versions
+ .iter()
+ .map(|v| match &v.state {
+ ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _))
+ | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size,
+ _ => 0,
+ })
+ .sum::<u64>();
+
+ vec![
+ (OBJECTS, n_objects),
+ (UNFINISHED_UPLOADS, n_unfinished_uploads as i64),
+ (BYTES, n_bytes as i64),
+ ]
+ }
+}
+
// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
// (we just want to change bucket into bucket_id by hashing it)