diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-24 10:10:01 +0000 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-24 10:10:01 +0000 |
commit | d8f5e643bcee95969b59c309809710a38b0661e3 (patch) | |
tree | 9bb179f351f60fc0396db731cb8ca0fe25dde17e /src/admin_rpc.rs | |
parent | 51fb3799a153a0db990fc74a37563ec612e20fc2 (diff) | |
download | garage-d8f5e643bcee95969b59c309809710a38b0661e3.tar.gz garage-d8f5e643bcee95969b59c309809710a38b0661e3.zip |
Split code for modular compilation
Diffstat (limited to 'src/admin_rpc.rs')
-rw-r--r-- | src/admin_rpc.rs | 358 |
1 files changed, 0 insertions, 358 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs deleted file mode 100644 index 4e5c56df..00000000 --- a/src/admin_rpc.rs +++ /dev/null @@ -1,358 +0,0 @@ -use std::sync::Arc; - -use serde::{Deserialize, Serialize}; - -use crate::data::*; -use crate::error::Error; -use crate::server::Garage; - -use crate::table::*; - -use crate::rpc::rpc_client::*; -use crate::rpc::rpc_server::*; - -use crate::store::bucket_table::*; -use crate::store::key_table::*; -use crate::store::repair::Repair; - -use crate::*; - -pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); -pub const ADMIN_RPC_PATH: &str = "_admin"; - -#[derive(Debug, Serialize, Deserialize)] -pub enum AdminRPC { - BucketOperation(BucketOperation), - KeyOperation(KeyOperation), - LaunchRepair(RepairOpt), - - // Replies - Ok(String), - BucketList(Vec<String>), - BucketInfo(Bucket), - KeyList(Vec<(String, String)>), - KeyInfo(Key), -} - -impl RpcMessage for AdminRPC {} - -pub struct AdminRpcHandler { - garage: Arc<Garage>, - rpc_client: Arc<RpcClient<AdminRPC>>, -} - -impl AdminRpcHandler { - pub fn new(garage: Arc<Garage>) -> Arc<Self> { - let rpc_client = garage.system.clone().rpc_client::<AdminRPC>(ADMIN_RPC_PATH); - Arc::new(Self { garage, rpc_client }) - } - - pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) { - rpc_server.add_handler::<AdminRPC, _, _>(ADMIN_RPC_PATH.to_string(), move |msg, _addr| { - let self2 = self.clone(); - async move { - match msg { - AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, - AdminRPC::KeyOperation(ko) => self2.handle_key_cmd(ko).await, - AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, - _ => Err(Error::BadRequest(format!("Invalid RPC"))), - } - } - }); - } - - async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result<AdminRPC, Error> { - match cmd { - BucketOperation::List => { - let bucket_names = self - .garage - .bucket_table - .get_range(&EmptyKey, None, Some(()), 10000) - .await? - .iter() - .map(|b| b.name.to_string()) - .collect::<Vec<_>>(); - Ok(AdminRPC::BucketList(bucket_names)) - } - BucketOperation::Info(query) => { - let bucket = self.get_existing_bucket(&query.name).await?; - Ok(AdminRPC::BucketInfo(bucket)) - } - BucketOperation::Create(query) => { - let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; - if bucket.as_ref().filter(|b| !b.deleted).is_some() { - return Err(Error::BadRequest(format!( - "Bucket {} already exists", - query.name - ))); - } - let new_time = match bucket { - Some(b) => std::cmp::max(b.timestamp + 1, now_msec()), - None => now_msec(), - }; - self.garage - .bucket_table - .insert(&Bucket::new(query.name.clone(), new_time, false, vec![])) - .await?; - Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) - } - BucketOperation::Delete(query) => { - let bucket = self.get_existing_bucket(&query.name).await?; - let objects = self - .garage - .object_table - .get_range(&query.name, None, Some(()), 10) - .await?; - if !objects.is_empty() { - return Err(Error::BadRequest(format!( - "Bucket {} is not empty", - query.name - ))); - } - if !query.yes { - return Err(Error::BadRequest(format!( - "Add --yes flag to really perform this operation" - ))); - } - // --- done checking, now commit --- - for ak in bucket.authorized_keys() { - if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { - if !key.deleted { - self.update_key_bucket(key, &bucket.name, false, false) - .await?; - } - } else { - return Err(Error::Message(format!("Key not found: {}", ak.key_id))); - } - } - self.garage - .bucket_table - .insert(&Bucket::new( - query.name.clone(), - std::cmp::max(bucket.timestamp + 1, now_msec()), - true, - vec![], - )) - .await?; - Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) - } - BucketOperation::Allow(query) => { - let key = self.get_existing_key(&query.key_id).await?; - let bucket = self.get_existing_bucket(&query.bucket).await?; - let allow_read = query.read || key.allow_read(&query.bucket); - let allow_write = query.write || key.allow_write(&query.bucket); - self.update_key_bucket(key, &query.bucket, allow_read, allow_write) - .await?; - self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) - .await?; - Ok(AdminRPC::Ok(format!( - "New permissions for {} on {}: read {}, write {}.", - &query.key_id, &query.bucket, allow_read, allow_write - ))) - } - BucketOperation::Deny(query) => { - let key = self.get_existing_key(&query.key_id).await?; - let bucket = self.get_existing_bucket(&query.bucket).await?; - let allow_read = !query.read && key.allow_read(&query.bucket); - let allow_write = !query.write && key.allow_write(&query.bucket); - self.update_key_bucket(key, &query.bucket, allow_read, allow_write) - .await?; - self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) - .await?; - Ok(AdminRPC::Ok(format!( - "New permissions for {} on {}: read {}, write {}.", - &query.key_id, &query.bucket, allow_read, allow_write - ))) - } - } - } - - async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result<AdminRPC, Error> { - match cmd { - KeyOperation::List => { - let key_ids = self - .garage - .key_table - .get_range(&EmptyKey, None, Some(()), 10000) - .await? - .iter() - .map(|k| (k.key_id.to_string(), k.name.to_string())) - .collect::<Vec<_>>(); - Ok(AdminRPC::KeyList(key_ids)) - } - KeyOperation::Info(query) => { - let key = self.get_existing_key(&query.key_id).await?; - Ok(AdminRPC::KeyInfo(key)) - } - KeyOperation::New(query) => { - let key = Key::new(query.name, vec![]); - self.garage.key_table.insert(&key).await?; - Ok(AdminRPC::KeyInfo(key)) - } - KeyOperation::Rename(query) => { - let mut key = self.get_existing_key(&query.key_id).await?; - key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec()); - key.name = query.new_name; - self.garage.key_table.insert(&key).await?; - Ok(AdminRPC::KeyInfo(key)) - } - KeyOperation::Delete(query) => { - let key = self.get_existing_key(&query.key_id).await?; - if !query.yes { - return Err(Error::BadRequest(format!( - "Add --yes flag to really perform this operation" - ))); - } - // --- done checking, now commit --- - for ab in key.authorized_buckets().iter() { - if let Some(bucket) = - self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await? - { - if !bucket.deleted { - self.update_bucket_key(bucket, &key.key_id, false, false) - .await?; - } - } else { - return Err(Error::Message(format!("Bucket not found: {}", ab.bucket))); - } - } - let del_key = Key::delete(key.key_id); - self.garage.key_table.insert(&del_key).await?; - Ok(AdminRPC::Ok(format!( - "Key {} was deleted successfully.", - query.key_id - ))) - } - } - } - - async fn get_existing_bucket(&self, bucket: &String) -> Result<Bucket, Error> { - self.garage - .bucket_table - .get(&EmptyKey, bucket) - .await? - .filter(|b| !b.deleted) - .map(Ok) - .unwrap_or(Err(Error::BadRequest(format!( - "Bucket {} does not exist", - bucket - )))) - } - - async fn get_existing_key(&self, id: &String) -> Result<Key, Error> { - self.garage - .key_table - .get(&EmptyKey, id) - .await? - .filter(|k| !k.deleted) - .map(Ok) - .unwrap_or(Err(Error::BadRequest(format!("Key {} does not exist", id)))) - } - - async fn update_bucket_key( - &self, - mut bucket: Bucket, - key_id: &String, - allow_read: bool, - allow_write: bool, - ) -> Result<(), Error> { - let timestamp = match bucket - .authorized_keys() - .iter() - .find(|x| x.key_id == *key_id) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - bucket.clear_keys(); - bucket - .add_key(AllowedKey { - key_id: key_id.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); - self.garage.bucket_table.insert(&bucket).await?; - Ok(()) - } - - async fn update_key_bucket( - &self, - mut key: Key, - bucket: &String, - allow_read: bool, - allow_write: bool, - ) -> Result<(), Error> { - let timestamp = match key - .authorized_buckets() - .iter() - .find(|x| x.bucket == *bucket) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - key.clear_buckets(); - key.add_bucket(AllowedBucket { - bucket: bucket.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); - self.garage.key_table.insert(&key).await?; - Ok(()) - } - - async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRPC, Error> { - if !opt.yes { - return Err(Error::BadRequest(format!( - "Please provide the --yes flag to initiate repair operations." - ))); - } - if opt.all_nodes { - let mut opt_to_send = opt.clone(); - opt_to_send.all_nodes = false; - - let mut failures = vec![]; - let ring = self.garage.system.ring.borrow().clone(); - for node in ring.config.members.keys() { - if self - .rpc_client - .call( - *node, - AdminRPC::LaunchRepair(opt_to_send.clone()), - ADMIN_RPC_TIMEOUT, - ) - .await - .is_err() - { - failures.push(node.clone()); - } - } - if failures.is_empty() { - Ok(AdminRPC::Ok(format!("Repair launched on all nodes"))) - } else { - Err(Error::Message(format!( - "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", - failures - ))) - } - } else { - let repair = Repair { - garage: self.garage.clone(), - }; - self.garage - .system - .background - .spawn_worker("Repair worker".into(), move |must_exit| async move { - repair.repair_worker(opt, must_exit).await - }) - .await; - Ok(AdminRPC::Ok(format!( - "Repair launched on {:?}", - self.garage.system.id - ))) - } - } -} |