use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use crate::data::*;
use crate::error::Error;
use crate::rpc_client::*;
use crate::rpc_server::*;
use crate::server::Garage;
use crate::table::*;
use crate::*;
use crate::block_ref_table::*;
use crate::bucket_table::*;
use crate::version_table::*;
pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub const ADMIN_RPC_PATH: &str = "_admin";
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRPC {
BucketOperation(BucketOperation),
LaunchRepair(RepairOpt),
// Replies
Ok(String),
BucketList(Vec<String>),
BucketInfo(Bucket),
}
impl RpcMessage for AdminRPC {}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
rpc_client: Arc<RpcClient<AdminRPC>>,
}
impl AdminRpcHandler {
pub fn new(garage: Arc<Garage>) -> Arc<Self> {
let rpc_client = garage.system.clone().rpc_client::<AdminRPC>(ADMIN_RPC_PATH);
Arc::new(Self { garage, rpc_client })
}
pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) {
rpc_server.add_handler::<AdminRPC, _, _>(ADMIN_RPC_PATH.to_string(), move |msg, _addr| {
let self2 = self.clone();
async move {
match msg {
AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await,
_ => Err(Error::BadRequest(format!("Invalid RPC"))),
}
}
});
}
async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result<AdminRPC, Error> {
match cmd {
BucketOperation::List => {
let bucket_names = self
.garage
.bucket_table
.get_range(&EmptyKey, None, Some(()), 10000)
.await?
.iter()
.map(|b| b.name.to_string())
.collect::<Vec<_>>();
Ok(AdminRPC::BucketList(bucket_names))
}
BucketOperation::Info(query) => {
let bucket = self
.garage
.bucket_table
.get(&EmptyKey, &query.name)
.await?
.filter(|b| !b.deleted);
match bucket {
Some(b) => Ok(AdminRPC::BucketInfo(b)),
None => Err(Error::BadRequest(format!(
"Bucket {} not found",
query.name
))),
}
}
BucketOperation::Create(query) => {
let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?;
if bucket.as_ref().filter(|b| !b.deleted).is_some() {
return Err(Error::BadRequest(format!(
"Bucket {} already exists",
query.name
)));
}
let new_time = match bucket {
Some(b) => std::cmp::max(b.timestamp + 1, now_msec()),
None => now_msec(),
};
self.garage
.bucket_table
.insert(&Bucket {
name: query.name.clone(),
timestamp: new_time,
deleted: false,
authorized_keys: vec![],
})
.await?;
Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name)))
}
BucketOperation::Delete(query) => {
let bucket = match self
.garage
.bucket_table
.get(&EmptyKey, &query.name)
.await?
.filter(|b| !b.deleted)
{
None => {
return Err(Error::BadRequest(format!(
"Bucket {} does not exist",
query.name
)));
}
Some(b) => b,
};
let objects = self
.garage
.object_table
.get_range(&query.name, None, Some(()), 10)
.await?;
if !objects.is_empty() {
return Err(Error::BadRequest(format!(
"Bucket {} is not empty",
query.name
)));
}
if !query.yes {
return Err(Error::BadRequest(format!(
"Add --yes flag to really perform this operation"
)));
}
self.garage
.bucket_table
.insert(&Bucket {
name: query.name.clone(),
timestamp: std::cmp::max(bucket.timestamp + 1, now_msec()),
deleted: true,
authorized_keys: vec![],
})
.await?;
Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name)))
}
_ => {
// TODO
Err(Error::Message(format!("Not implemented")))
}
}
}
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRPC, Error> {
if !opt.yes {
return Err(Error::BadRequest(format!(
"Please provide the --yes flag to initiate repair operations."
)));
}
if opt.all_nodes {
let mut opt_to_send = opt.clone();
opt_to_send.all_nodes = false;
let mut failures = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.config.members.keys() {
if self
.rpc_client
.call(
*node,
AdminRPC::LaunchRepair(opt_to_send.clone()),
ADMIN_RPC_TIMEOUT,
)
.await
.is_err()
{
failures.push(node.clone());
}
}
if failures.is_empty() {
Ok(AdminRPC::Ok(format!("Repair launched on all nodes")))
} else {
Err(Error::Message(format!(
"Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
failures
)))
}
} else {
let self2 = self.clone();
self.garage
.system
.background
.spawn_worker("Repair worker".into(), move |must_exit| async move {
self2.repair_worker(opt, must_exit).await
})
.await;
Ok(AdminRPC::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
)))
}
}
async fn repair_worker(
self: Arc<Self>,
opt: RepairOpt,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true);
if todo(RepairWhat::Tables) {
info!("Launching a full sync of tables");
self.garage
.bucket_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
self.garage
.object_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
self.garage
.version_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
self.garage
.block_ref_table
.syncer
.load_full()
.unwrap()
.add_full_scan()
.await;
}
// TODO: wait for full sync to finish before proceeding to the rest?
if todo(RepairWhat::Versions) {
info!("Repairing the versions table");
self.repair_versions(&must_exit).await?;
}
if todo(RepairWhat::BlockRefs) {
info!("Repairing the block refs table");
self.repair_block_ref(&must_exit).await?;
}
if opt.what.is_none() {
info!("Repairing the RC");
self.repair_rc(&must_exit).await?;
}
if todo(RepairWhat::Blocks) {
info!("Repairing the stored blocks");
self.garage
.block_manager
.repair_data_store(&must_exit)
.await?;
}
Ok(())
}
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? {
pos = item_key.to_vec();
let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
if version.deleted {
continue;
}
let object = self
.garage
.object_table
.get(&version.bucket, &version.key)
.await?;
let version_exists = match object {
Some(o) => o.versions.iter().any(|x| x.uuid == version.uuid),
None => {
warn!(
"Repair versions: object for version {:?} not found",
version
);
false
}
};
if !version_exists {
info!("Repair versions: marking version as deleted: {:?}", version);
self.garage
.version_table
.insert(&Version {
uuid: version.uuid,
deleted: true,
blocks: vec![],
bucket: version.bucket,
key: version.key,
})
.await?;
}
if *must_exit.borrow() {
break;
}
}
Ok(())
}
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? {
pos = item_key.to_vec();
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
if block_ref.deleted {
continue;
}
let version = self
.garage
.version_table
.get(&block_ref.version, &EmptyKey)
.await?;
let ref_exists = match version {
Some(v) => !v.deleted,
None => {
warn!(
"Block ref repair: version for block ref {:?} not found",
block_ref
);
false
}
};
if !ref_exists {
info!(
"Repair block ref: marking block_ref as deleted: {:?}",
block_ref
);
self.garage
.block_ref_table
.insert(&BlockRef {
block: block_ref.block,
version: block_ref.version,
deleted: true,
})
.await?;
}
if *must_exit.borrow() {
break;
}
}
Ok(())
}
async fn repair_rc(&self, _must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// TODO
warn!("repair_rc: not implemented");
Ok(())
}
}