diff options
Diffstat (limited to 'src/garage/admin_rpc.rs')
-rw-r--r-- | src/garage/admin_rpc.rs | 189 |
1 files changed, 166 insertions, 23 deletions
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index e1981e3a..df00fcaf 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; +use std::fmt::Write; use std::sync::Arc; use serde::{Deserialize, Serialize}; @@ -5,6 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_util::error::Error; use garage_table::crdt::CRDT; +use garage_table::replication::*; use garage_table::*; use garage_rpc::rpc_client::*; @@ -14,6 +17,7 @@ use garage_model::bucket_table::*; use garage_model::garage::Garage; use garage_model::key_table::*; +use crate::cli::*; use crate::repair::Repair; use crate::*; @@ -25,6 +29,7 @@ pub enum AdminRPC { BucketOperation(BucketOperation), KeyOperation(KeyOperation), LaunchRepair(RepairOpt), + Stats(StatsOpt), // Replies Ok(String), @@ -55,6 +60,7 @@ impl AdminRpcHandler { AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, AdminRPC::KeyOperation(ko) => self2.handle_key_cmd(ko).await, AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, + AdminRPC::Stats(opt) => self2.handle_stats(opt).await, _ => Err(Error::BadRPC(format!("Invalid RPC"))), } } @@ -116,7 +122,7 @@ impl AdminRpcHandler { for (key_id, _, _) in bucket.authorized_keys() { if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { if !key.deleted.get() { - self.update_key_bucket(key, &bucket.name, false, false) + self.update_key_bucket(&key, &bucket.name, false, false) .await?; } } else { @@ -128,31 +134,31 @@ impl AdminRpcHandler { Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) } BucketOperation::Allow(query) => { - let key = self.get_existing_key(&query.key_id).await?; + let key = self.get_existing_key(&query.key_pattern).await?; let bucket = self.get_existing_bucket(&query.bucket).await?; let allow_read = query.read || key.allow_read(&query.bucket); let allow_write = query.write || key.allow_write(&query.bucket); - self.update_key_bucket(key, &query.bucket, allow_read, allow_write) + self.update_key_bucket(&key, &query.bucket, allow_read, allow_write) .await?; - self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) + self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) .await?; Ok(AdminRPC::Ok(format!( "New permissions for {} on {}: read {}, write {}.", - &query.key_id, &query.bucket, allow_read, allow_write + &key.key_id, &query.bucket, allow_read, allow_write ))) } BucketOperation::Deny(query) => { - let key = self.get_existing_key(&query.key_id).await?; + let key = self.get_existing_key(&query.key_pattern).await?; let bucket = self.get_existing_bucket(&query.bucket).await?; let allow_read = !query.read && key.allow_read(&query.bucket); let allow_write = !query.write && key.allow_write(&query.bucket); - self.update_key_bucket(key, &query.bucket, allow_read, allow_write) + self.update_key_bucket(&key, &query.bucket, allow_read, allow_write) .await?; - self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) + self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) .await?; Ok(AdminRPC::Ok(format!( "New permissions for {} on {}: read {}, write {}.", - &query.key_id, &query.bucket, allow_read, allow_write + &key.key_id, &query.bucket, allow_read, allow_write ))) } BucketOperation::Website(query) => { @@ -187,7 +193,12 @@ impl AdminRpcHandler { let key_ids = self .garage .key_table - .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), + 10000, + ) .await? .iter() .map(|k| (k.key_id.to_string(), k.name.get().clone())) @@ -195,7 +206,7 @@ impl AdminRpcHandler { Ok(AdminRPC::KeyList(key_ids)) } KeyOperation::Info(query) => { - let key = self.get_existing_key(&query.key_id).await?; + let key = self.get_existing_key(&query.key_pattern).await?; Ok(AdminRPC::KeyInfo(key)) } KeyOperation::New(query) => { @@ -204,13 +215,13 @@ impl AdminRpcHandler { Ok(AdminRPC::KeyInfo(key)) } KeyOperation::Rename(query) => { - let mut key = self.get_existing_key(&query.key_id).await?; + let mut key = self.get_existing_key(&query.key_pattern).await?; key.name.update(query.new_name); self.garage.key_table.insert(&key).await?; Ok(AdminRPC::KeyInfo(key)) } KeyOperation::Delete(query) => { - let key = self.get_existing_key(&query.key_id).await?; + let key = self.get_existing_key(&query.key_pattern).await?; if !query.yes { return Err(Error::BadRPC(format!( "Add --yes flag to really perform this operation" @@ -227,13 +238,24 @@ impl AdminRpcHandler { return Err(Error::Message(format!("Bucket not found: {}", ab_name))); } } - let del_key = Key::delete(key.key_id); + let del_key = Key::delete(key.key_id.to_string()); self.garage.key_table.insert(&del_key).await?; Ok(AdminRPC::Ok(format!( "Key {} was deleted successfully.", - query.key_id + key.key_id ))) } + KeyOperation::Import(query) => { + let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id) + .await?; + if prev_key.is_some() { + return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); + } + let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); + self.garage.key_table.insert(&imported_key).await?; + Ok(AdminRPC::KeyInfo(imported_key)) + + } } } @@ -250,14 +272,28 @@ impl AdminRpcHandler { )))) } - async fn get_existing_key(&self, id: &String) -> Result<Key, Error> { - self.garage + async fn get_existing_key(&self, pattern: &str) -> Result<Key, Error> { + let candidates = self + .garage .key_table - .get(&EmptyKey, id) + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Matches(pattern.to_string())), + 10, + ) .await? + .into_iter() .filter(|k| !k.deleted.get()) - .map(Ok) - .unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id)))) + .collect::<Vec<_>>(); + if candidates.len() != 1 { + Err(Error::Message(format!( + "{} matching keys", + candidates.len() + ))) + } else { + Ok(candidates.into_iter().next().unwrap()) + } } /// Update **bucket table** to inform of the new linked key @@ -290,11 +326,12 @@ impl AdminRpcHandler { /// Update **key table** to inform of the new linked bucket async fn update_key_bucket( &self, - mut key: Key, + key: &Key, bucket: &String, allow_read: bool, allow_write: bool, ) -> Result<(), Error> { + let mut key = key.clone(); let old_map = key.authorized_buckets.take_and_clear(); key.authorized_buckets.merge(&old_map.update_mutator( bucket.clone(), @@ -350,12 +387,118 @@ impl AdminRpcHandler { .background .spawn_worker("Repair worker".into(), move |must_exit| async move { repair.repair_worker(opt, must_exit).await - }) - .await; + }); Ok(AdminRPC::Ok(format!( "Repair launched on {:?}", self.garage.system.id ))) } } + + async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRPC, Error> { + if opt.all_nodes { + let mut ret = String::new(); + let ring = self.garage.system.ring.borrow().clone(); + + for node in ring.config.members.keys() { + let mut opt = opt.clone(); + opt.all_nodes = false; + + writeln!(&mut ret, "\n======================").unwrap(); + writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); + match self + .rpc_client + .call(*node, AdminRPC::Stats(opt), ADMIN_RPC_TIMEOUT) + .await + { + Ok(AdminRPC::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), + Ok(x) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), + Err(e) => writeln!(&mut ret, "Error: {}", e).unwrap(), + } + } + Ok(AdminRPC::Ok(ret)) + } else { + Ok(AdminRPC::Ok(self.gather_stats_local(opt)?)) + } + } + + fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> { + let mut ret = String::new(); + writeln!( + &mut ret, + "\nGarage version: {}", + git_version::git_version!() + ) + .unwrap(); + + // Gather ring statistics + let ring = self.garage.system.ring.borrow().clone(); + let mut ring_nodes = HashMap::new(); + for r in ring.ring.iter() { + for n in r.nodes.iter() { + if !ring_nodes.contains_key(n) { + ring_nodes.insert(*n, 0usize); + } + *ring_nodes.get_mut(n).unwrap() += 1; + } + } + writeln!(&mut ret, "\nRing nodes & partition count:").unwrap(); + for (n, c) in ring_nodes.iter() { + writeln!(&mut ret, " {:?} {}", n, c).unwrap(); + } + + self.gather_table_stats(&mut ret, &self.garage.bucket_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.key_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.object_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.version_table, &opt)?; + self.gather_table_stats(&mut ret, &self.garage.block_ref_table, &opt)?; + + writeln!(&mut ret, "\nBlock manager stats:").unwrap(); + if opt.detailed { + writeln!( + &mut ret, + " number of blocks: {}", + self.garage.block_manager.rc_len() + ) + .unwrap(); + } + writeln!( + &mut ret, + " resync queue length: {}", + self.garage.block_manager.resync_queue_len() + ) + .unwrap(); + + Ok(ret) + } + + fn gather_table_stats<F, R>( + &self, + to: &mut String, + t: &Arc<Table<F, R>>, + opt: &StatsOpt, + ) -> Result<(), Error> + where + F: TableSchema + 'static, + R: TableReplication + 'static, + { + writeln!(to, "\nTable stats for {}", t.data.name).unwrap(); + if opt.detailed { + writeln!(to, " number of items: {}", t.data.store.len()).unwrap(); + writeln!( + to, + " Merkle tree size: {}", + t.merkle_updater.merkle_tree_len() + ) + .unwrap(); + } + writeln!( + to, + " Merkle updater todo queue length: {}", + t.merkle_updater.todo_len() + ) + .unwrap(); + writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); + Ok(()) + } } |