diff options
Diffstat (limited to 'src/admin_rpc.rs')
-rw-r--r-- | src/admin_rpc.rs | 210 |
1 files changed, 178 insertions, 32 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs index e4afc689..4e5c56df 100644 --- a/src/admin_rpc.rs +++ b/src/admin_rpc.rs @@ -12,6 +12,7 @@ use crate::rpc::rpc_client::*; use crate::rpc::rpc_server::*; use crate::store::bucket_table::*; +use crate::store::key_table::*; use crate::store::repair::Repair; use crate::*; @@ -29,6 +30,8 @@ pub enum AdminRPC { Ok(String), BucketList(Vec<String>), BucketInfo(Bucket), + KeyList(Vec<(String, String)>), + KeyInfo(Key), } impl RpcMessage for AdminRPC {} @@ -72,19 +75,8 @@ impl AdminRpcHandler { Ok(AdminRPC::BucketList(bucket_names)) } BucketOperation::Info(query) => { - let bucket = self - .garage - .bucket_table - .get(&EmptyKey, &query.name) - .await? - .filter(|b| !b.deleted); - match bucket { - Some(b) => Ok(AdminRPC::BucketInfo(b)), - None => Err(Error::BadRequest(format!( - "Bucket {} not found", - query.name - ))), - } + let bucket = self.get_existing_bucket(&query.name).await?; + Ok(AdminRPC::BucketInfo(bucket)) } BucketOperation::Create(query) => { let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; @@ -105,21 +97,7 @@ impl AdminRpcHandler { Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) } BucketOperation::Delete(query) => { - let bucket = match self - .garage - .bucket_table - .get(&EmptyKey, &query.name) - .await? - .filter(|b| !b.deleted) - { - None => { - return Err(Error::BadRequest(format!( - "Bucket {} does not exist", - query.name - ))); - } - Some(b) => b, - }; + let bucket = self.get_existing_bucket(&query.name).await?; let objects = self .garage .object_table @@ -136,6 +114,17 @@ impl AdminRpcHandler { "Add --yes flag to really perform this operation" ))); } + // --- done checking, now commit --- + for ak in bucket.authorized_keys() { + if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { + if !key.deleted { + self.update_key_bucket(key, &bucket.name, false, false) + .await?; + } + } else { + return Err(Error::Message(format!("Key not found: {}", ak.key_id))); + } + } self.garage .bucket_table .insert(&Bucket::new( @@ -147,15 +136,172 @@ impl AdminRpcHandler { .await?; Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) } - _ => { - // TODO - Err(Error::Message(format!("Not implemented"))) + BucketOperation::Allow(query) => { + let key = self.get_existing_key(&query.key_id).await?; + let bucket = self.get_existing_bucket(&query.bucket).await?; + let allow_read = query.read || key.allow_read(&query.bucket); + let allow_write = query.write || key.allow_write(&query.bucket); + self.update_key_bucket(key, &query.bucket, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) + .await?; + Ok(AdminRPC::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &query.key_id, &query.bucket, allow_read, allow_write + ))) + } + BucketOperation::Deny(query) => { + let key = self.get_existing_key(&query.key_id).await?; + let bucket = self.get_existing_bucket(&query.bucket).await?; + let allow_read = !query.read && key.allow_read(&query.bucket); + let allow_write = !query.write && key.allow_write(&query.bucket); + self.update_key_bucket(key, &query.bucket, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) + .await?; + Ok(AdminRPC::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &query.key_id, &query.bucket, allow_read, allow_write + ))) } } } async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result<AdminRPC, Error> { - Err(Error::Message(format!("Not implemented"))) + match cmd { + KeyOperation::List => { + let key_ids = self + .garage + .key_table + .get_range(&EmptyKey, None, Some(()), 10000) + .await? + .iter() + .map(|k| (k.key_id.to_string(), k.name.to_string())) + .collect::<Vec<_>>(); + Ok(AdminRPC::KeyList(key_ids)) + } + KeyOperation::Info(query) => { + let key = self.get_existing_key(&query.key_id).await?; + Ok(AdminRPC::KeyInfo(key)) + } + KeyOperation::New(query) => { + let key = Key::new(query.name, vec![]); + self.garage.key_table.insert(&key).await?; + Ok(AdminRPC::KeyInfo(key)) + } + KeyOperation::Rename(query) => { + let mut key = self.get_existing_key(&query.key_id).await?; + key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec()); + key.name = query.new_name; + self.garage.key_table.insert(&key).await?; + Ok(AdminRPC::KeyInfo(key)) + } + KeyOperation::Delete(query) => { + let key = self.get_existing_key(&query.key_id).await?; + if !query.yes { + return Err(Error::BadRequest(format!( + "Add --yes flag to really perform this operation" + ))); + } + // --- done checking, now commit --- + for ab in key.authorized_buckets().iter() { + if let Some(bucket) = + self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await? + { + if !bucket.deleted { + self.update_bucket_key(bucket, &key.key_id, false, false) + .await?; + } + } else { + return Err(Error::Message(format!("Bucket not found: {}", ab.bucket))); + } + } + let del_key = Key::delete(key.key_id); + self.garage.key_table.insert(&del_key).await?; + Ok(AdminRPC::Ok(format!( + "Key {} was deleted successfully.", + query.key_id + ))) + } + } + } + + async fn get_existing_bucket(&self, bucket: &String) -> Result<Bucket, Error> { + self.garage + .bucket_table + .get(&EmptyKey, bucket) + .await? + .filter(|b| !b.deleted) + .map(Ok) + .unwrap_or(Err(Error::BadRequest(format!( + "Bucket {} does not exist", + bucket + )))) + } + + async fn get_existing_key(&self, id: &String) -> Result<Key, Error> { + self.garage + .key_table + .get(&EmptyKey, id) + .await? + .filter(|k| !k.deleted) + .map(Ok) + .unwrap_or(Err(Error::BadRequest(format!("Key {} does not exist", id)))) + } + + async fn update_bucket_key( + &self, + mut bucket: Bucket, + key_id: &String, + allow_read: bool, + allow_write: bool, + ) -> Result<(), Error> { + let timestamp = match bucket + .authorized_keys() + .iter() + .find(|x| x.key_id == *key_id) + { + None => now_msec(), + Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), + }; + bucket.clear_keys(); + bucket + .add_key(AllowedKey { + key_id: key_id.clone(), + timestamp, + allow_read, + allow_write, + }) + .unwrap(); + self.garage.bucket_table.insert(&bucket).await?; + Ok(()) + } + + async fn update_key_bucket( + &self, + mut key: Key, + bucket: &String, + allow_read: bool, + allow_write: bool, + ) -> Result<(), Error> { + let timestamp = match key + .authorized_buckets() + .iter() + .find(|x| x.bucket == *bucket) + { + None => now_msec(), + Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), + }; + key.clear_buckets(); + key.add_bucket(AllowedBucket { + bucket: bucket.clone(), + timestamp, + allow_read, + allow_write, + }) + .unwrap(); + self.garage.key_table.insert(&key).await?; + Ok(()) } async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRPC, Error> { |