diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-24 10:10:01 +0000 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-24 10:10:01 +0000 |
commit | d8f5e643bcee95969b59c309809710a38b0661e3 (patch) | |
tree | 9bb179f351f60fc0396db731cb8ca0fe25dde17e /src/garage | |
parent | 51fb3799a153a0db990fc74a37563ec612e20fc2 (diff) | |
download | garage-d8f5e643bcee95969b59c309809710a38b0661e3.tar.gz garage-d8f5e643bcee95969b59c309809710a38b0661e3.zip |
Split code for modular compilation
Diffstat (limited to 'src/garage')
-rw-r--r-- | src/garage/Cargo.toml | 36 | ||||
-rw-r--r-- | src/garage/admin_rpc.rs | 358 | ||||
-rw-r--r-- | src/garage/main.rs | 531 | ||||
-rw-r--r-- | src/garage/repair.rs | 183 | ||||
-rw-r--r-- | src/garage/server.rs | 87 |
5 files changed, 1195 insertions, 0 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml new file mode 100644 index 00000000..08b55c32 --- /dev/null +++ b/src/garage/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "garage" +version = "0.1.0" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" + +[[bin]] +name = "garage" +path = "main.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_util = { path = "../util" } +garage_rpc = { path = "../rpc" } +garage_table = { path = "../table" } +garage_core = { path = "../core" } +garage_api = { path = "../api" } + +bytes = "0.4" +rand = "0.7" +hex = "0.3" +sha2 = "0.8" +log = "0.4" +pretty_env_logger = "0.4" + +sled = "0.31" + +structopt = { version = "0.3", default-features = false } +toml = "0.5" +rmp-serde = "0.14.3" +serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } + +futures = "0.3" +futures-util = "0.3" +tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs new file mode 100644 index 00000000..aeaf2682 --- /dev/null +++ b/src/garage/admin_rpc.rs @@ -0,0 +1,358 @@ +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; +use garage_util::error::Error; + +use garage_table::*; + +use garage_rpc::rpc_client::*; +use garage_rpc::rpc_server::*; + +use garage_core::bucket_table::*; +use garage_core::garage::Garage; +use garage_core::key_table::*; + +use crate::repair::Repair; +use crate::*; + +pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); +pub const ADMIN_RPC_PATH: &str = "_admin"; + +#[derive(Debug, Serialize, Deserialize)] +pub enum AdminRPC { + BucketOperation(BucketOperation), + KeyOperation(KeyOperation), + LaunchRepair(RepairOpt), + + // Replies + Ok(String), + BucketList(Vec<String>), + BucketInfo(Bucket), + KeyList(Vec<(String, String)>), + KeyInfo(Key), +} + +impl RpcMessage for AdminRPC {} + +pub struct AdminRpcHandler { + garage: Arc<Garage>, + rpc_client: Arc<RpcClient<AdminRPC>>, +} + +impl AdminRpcHandler { + pub fn new(garage: Arc<Garage>) -> Arc<Self> { + let rpc_client = garage.system.clone().rpc_client::<AdminRPC>(ADMIN_RPC_PATH); + Arc::new(Self { garage, rpc_client }) + } + + pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) { + rpc_server.add_handler::<AdminRPC, _, _>(ADMIN_RPC_PATH.to_string(), move |msg, _addr| { + let self2 = self.clone(); + async move { + match msg { + AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, + AdminRPC::KeyOperation(ko) => self2.handle_key_cmd(ko).await, + AdminRPC::LaunchRepair(opt) => self2.handle_launch_repair(opt).await, + _ => Err(Error::BadRequest(format!("Invalid RPC"))), + } + } + }); + } + + async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result<AdminRPC, Error> { + match cmd { + BucketOperation::List => { + let bucket_names = self + .garage + .bucket_table + .get_range(&EmptyKey, None, Some(()), 10000) + .await? + .iter() + .map(|b| b.name.to_string()) + .collect::<Vec<_>>(); + Ok(AdminRPC::BucketList(bucket_names)) + } + BucketOperation::Info(query) => { + let bucket = self.get_existing_bucket(&query.name).await?; + Ok(AdminRPC::BucketInfo(bucket)) + } + BucketOperation::Create(query) => { + let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; + if bucket.as_ref().filter(|b| !b.deleted).is_some() { + return Err(Error::BadRequest(format!( + "Bucket {} already exists", + query.name + ))); + } + let new_time = match bucket { + Some(b) => std::cmp::max(b.timestamp + 1, now_msec()), + None => now_msec(), + }; + self.garage + .bucket_table + .insert(&Bucket::new(query.name.clone(), new_time, false, vec![])) + .await?; + Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) + } + BucketOperation::Delete(query) => { + let bucket = self.get_existing_bucket(&query.name).await?; + let objects = self + .garage + .object_table + .get_range(&query.name, None, Some(()), 10) + .await?; + if !objects.is_empty() { + return Err(Error::BadRequest(format!( + "Bucket {} is not empty", + query.name + ))); + } + if !query.yes { + return Err(Error::BadRequest(format!( + "Add --yes flag to really perform this operation" + ))); + } + // --- done checking, now commit --- + for ak in bucket.authorized_keys() { + if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { + if !key.deleted { + self.update_key_bucket(key, &bucket.name, false, false) + .await?; + } + } else { + return Err(Error::Message(format!("Key not found: {}", ak.key_id))); + } + } + self.garage + .bucket_table + .insert(&Bucket::new( + query.name.clone(), + std::cmp::max(bucket.timestamp + 1, now_msec()), + true, + vec![], + )) + .await?; + Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) + } + BucketOperation::Allow(query) => { + let key = self.get_existing_key(&query.key_id).await?; + let bucket = self.get_existing_bucket(&query.bucket).await?; + let allow_read = query.read || key.allow_read(&query.bucket); + let allow_write = query.write || key.allow_write(&query.bucket); + self.update_key_bucket(key, &query.bucket, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) + .await?; + Ok(AdminRPC::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &query.key_id, &query.bucket, allow_read, allow_write + ))) + } + BucketOperation::Deny(query) => { + let key = self.get_existing_key(&query.key_id).await?; + let bucket = self.get_existing_bucket(&query.bucket).await?; + let allow_read = !query.read && key.allow_read(&query.bucket); + let allow_write = !query.write && key.allow_write(&query.bucket); + self.update_key_bucket(key, &query.bucket, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &query.key_id, allow_read, allow_write) + .await?; + Ok(AdminRPC::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &query.key_id, &query.bucket, allow_read, allow_write + ))) + } + } + } + + async fn handle_key_cmd(&self, cmd: KeyOperation) -> Result<AdminRPC, Error> { + match cmd { + KeyOperation::List => { + let key_ids = self + .garage + .key_table + .get_range(&EmptyKey, None, Some(()), 10000) + .await? + .iter() + .map(|k| (k.key_id.to_string(), k.name.to_string())) + .collect::<Vec<_>>(); + Ok(AdminRPC::KeyList(key_ids)) + } + KeyOperation::Info(query) => { + let key = self.get_existing_key(&query.key_id).await?; + Ok(AdminRPC::KeyInfo(key)) + } + KeyOperation::New(query) => { + let key = Key::new(query.name, vec![]); + self.garage.key_table.insert(&key).await?; + Ok(AdminRPC::KeyInfo(key)) + } + KeyOperation::Rename(query) => { + let mut key = self.get_existing_key(&query.key_id).await?; + key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec()); + key.name = query.new_name; + self.garage.key_table.insert(&key).await?; + Ok(AdminRPC::KeyInfo(key)) + } + KeyOperation::Delete(query) => { + let key = self.get_existing_key(&query.key_id).await?; + if !query.yes { + return Err(Error::BadRequest(format!( + "Add --yes flag to really perform this operation" + ))); + } + // --- done checking, now commit --- + for ab in key.authorized_buckets().iter() { + if let Some(bucket) = + self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await? + { + if !bucket.deleted { + self.update_bucket_key(bucket, &key.key_id, false, false) + .await?; + } + } else { + return Err(Error::Message(format!("Bucket not found: {}", ab.bucket))); + } + } + let del_key = Key::delete(key.key_id); + self.garage.key_table.insert(&del_key).await?; + Ok(AdminRPC::Ok(format!( + "Key {} was deleted successfully.", + query.key_id + ))) + } + } + } + + async fn get_existing_bucket(&self, bucket: &String) -> Result<Bucket, Error> { + self.garage + .bucket_table + .get(&EmptyKey, bucket) + .await? + .filter(|b| !b.deleted) + .map(Ok) + .unwrap_or(Err(Error::BadRequest(format!( + "Bucket {} does not exist", + bucket + )))) + } + + async fn get_existing_key(&self, id: &String) -> Result<Key, Error> { + self.garage + .key_table + .get(&EmptyKey, id) + .await? + .filter(|k| !k.deleted) + .map(Ok) + .unwrap_or(Err(Error::BadRequest(format!("Key {} does not exist", id)))) + } + + async fn update_bucket_key( + &self, + mut bucket: Bucket, + key_id: &String, + allow_read: bool, + allow_write: bool, + ) -> Result<(), Error> { + let timestamp = match bucket + .authorized_keys() + .iter() + .find(|x| x.key_id == *key_id) + { + None => now_msec(), + Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), + }; + bucket.clear_keys(); + bucket + .add_key(AllowedKey { + key_id: key_id.clone(), + timestamp, + allow_read, + allow_write, + }) + .unwrap(); + self.garage.bucket_table.insert(&bucket).await?; + Ok(()) + } + + async fn update_key_bucket( + &self, + mut key: Key, + bucket: &String, + allow_read: bool, + allow_write: bool, + ) -> Result<(), Error> { + let timestamp = match key + .authorized_buckets() + .iter() + .find(|x| x.bucket == *bucket) + { + None => now_msec(), + Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), + }; + key.clear_buckets(); + key.add_bucket(AllowedBucket { + bucket: bucket.clone(), + timestamp, + allow_read, + allow_write, + }) + .unwrap(); + self.garage.key_table.insert(&key).await?; + Ok(()) + } + + async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRPC, Error> { + if !opt.yes { + return Err(Error::BadRequest(format!( + "Please provide the --yes flag to initiate repair operations." + ))); + } + if opt.all_nodes { + let mut opt_to_send = opt.clone(); + opt_to_send.all_nodes = false; + + let mut failures = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.config.members.keys() { + if self + .rpc_client + .call( + *node, + AdminRPC::LaunchRepair(opt_to_send.clone()), + ADMIN_RPC_TIMEOUT, + ) + .await + .is_err() + { + failures.push(node.clone()); + } + } + if failures.is_empty() { + Ok(AdminRPC::Ok(format!("Repair launched on all nodes"))) + } else { + Err(Error::Message(format!( + "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", + failures + ))) + } + } else { + let repair = Repair { + garage: self.garage.clone(), + }; + self.garage + .system + .background + .spawn_worker("Repair worker".into(), move |must_exit| async move { + repair.repair_worker(opt, must_exit).await + }) + .await; + Ok(AdminRPC::Ok(format!( + "Repair launched on {:?}", + self.garage.system.id + ))) + } + } +} diff --git a/src/garage/main.rs b/src/garage/main.rs new file mode 100644 index 00000000..1185871f --- /dev/null +++ b/src/garage/main.rs @@ -0,0 +1,531 @@ +#![recursion_limit = "1024"] + +#[macro_use] +extern crate log; + +mod admin_rpc; +mod repair; +mod server; + +use std::collections::HashSet; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use structopt::StructOpt; + +use garage_util::config::TlsConfig; +use garage_util::data::*; +use garage_util::error::Error; + +use garage_rpc::membership::*; +use garage_rpc::rpc_client::*; + +use admin_rpc::*; + +#[derive(StructOpt, Debug)] +#[structopt(name = "garage")] +pub struct Opt { + /// RPC connect to this host to execute client operations + #[structopt(short = "h", long = "rpc-host", default_value = "127.0.0.1:3901")] + rpc_host: SocketAddr, + + #[structopt(long = "ca-cert")] + ca_cert: Option<String>, + #[structopt(long = "client-cert")] + client_cert: Option<String>, + #[structopt(long = "client-key")] + client_key: Option<String>, + + #[structopt(subcommand)] + cmd: Command, +} + +#[derive(StructOpt, Debug)] +pub enum Command { + /// Run Garage server + #[structopt(name = "server")] + Server(ServerOpt), + + /// Get network status + #[structopt(name = "status")] + Status, + + /// Garage node operations + #[structopt(name = "node")] + Node(NodeOperation), + + /// Bucket operations + #[structopt(name = "bucket")] + Bucket(BucketOperation), + + /// Key operations + #[structopt(name = "key")] + Key(KeyOperation), + + /// Start repair of node data + #[structopt(name = "repair")] + Repair(RepairOpt), +} + +#[derive(StructOpt, Debug)] +pub struct ServerOpt { + /// Configuration file + #[structopt(short = "c", long = "config", default_value = "./config.toml")] + config_file: PathBuf, +} + +#[derive(StructOpt, Debug)] +pub enum NodeOperation { + /// Configure Garage node + #[structopt(name = "configure")] + Configure(ConfigureNodeOpt), + + /// Remove Garage node from cluster + #[structopt(name = "remove")] + Remove(RemoveNodeOpt), +} + +#[derive(StructOpt, Debug)] +pub struct ConfigureNodeOpt { + /// Node to configure (prefix of hexadecimal node id) + node_id: String, + + /// Location (datacenter) of the node + #[structopt(short = "d", long = "datacenter")] + datacenter: Option<String>, + + /// Number of tokens + #[structopt(short = "n", long = "n-tokens")] + n_tokens: Option<u32>, + + /// Optionnal node tag + #[structopt(short = "t", long = "tag")] + tag: Option<String>, +} + +#[derive(StructOpt, Debug)] +pub struct RemoveNodeOpt { + /// Node to configure (prefix of hexadecimal node id) + node_id: String, + + /// If this flag is not given, the node won't be removed + #[structopt(long = "yes")] + yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub enum BucketOperation { + /// List buckets + #[structopt(name = "list")] + List, + + /// Get bucket info + #[structopt(name = "info")] + Info(BucketOpt), + + /// Create bucket + #[structopt(name = "create")] + Create(BucketOpt), + + /// Delete bucket + #[structopt(name = "delete")] + Delete(DeleteBucketOpt), + + /// Allow key to read or write to bucket + #[structopt(name = "allow")] + Allow(PermBucketOpt), + + /// Allow key to read or write to bucket + #[structopt(name = "deny")] + Deny(PermBucketOpt), +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct BucketOpt { + /// Bucket name + pub name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct DeleteBucketOpt { + /// Bucket name + pub name: String, + + /// If this flag is not given, the bucket won't be deleted + #[structopt(long = "yes")] + pub yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct PermBucketOpt { + /// Access key ID + #[structopt(long = "key")] + pub key_id: String, + + /// Allow/deny read operations + #[structopt(long = "read")] + pub read: bool, + + /// Allow/deny write operations + #[structopt(long = "write")] + pub write: bool, + + /// Bucket name + pub bucket: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub enum KeyOperation { + /// List keys + #[structopt(name = "list")] + List, + + /// Get key info + #[structopt(name = "info")] + Info(KeyOpt), + + /// Create new key + #[structopt(name = "new")] + New(KeyNewOpt), + + /// Rename key + #[structopt(name = "rename")] + Rename(KeyRenameOpt), + + /// Delete key + #[structopt(name = "delete")] + Delete(KeyDeleteOpt), +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyOpt { + /// ID of the key + key_id: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyNewOpt { + /// Name of the key + #[structopt(long = "name", default_value = "Unnamed key")] + name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyRenameOpt { + /// ID of the key + key_id: String, + + /// New name of the key + new_name: String, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct KeyDeleteOpt { + /// ID of the key + key_id: String, + + /// Confirm deletion + #[structopt(long = "yes")] + yes: bool, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +pub struct RepairOpt { + /// Launch repair operation on all nodes + #[structopt(short = "a", long = "all-nodes")] + pub all_nodes: bool, + + /// Confirm the launch of the repair operation + #[structopt(long = "yes")] + pub yes: bool, + + #[structopt(subcommand)] + pub what: Option<RepairWhat>, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum RepairWhat { + /// Only do a full sync of metadata tables + #[structopt(name = "tables")] + Tables, + /// Only repair (resync/rebalance) the set of stored blocks + #[structopt(name = "blocks")] + Blocks, + /// Only redo the propagation of object deletions to the version table (slow) + #[structopt(name = "versions")] + Versions, + /// Only redo the propagation of version deletions to the block ref table (extremely slow) + #[structopt(name = "block_refs")] + BlockRefs, +} + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + let opt = Opt::from_args(); + + let tls_config = match (opt.ca_cert, opt.client_cert, opt.client_key) { + (Some(ca_cert), Some(client_cert), Some(client_key)) => Some(TlsConfig { + ca_cert, + node_cert: client_cert, + node_key: client_key, + }), + (None, None, None) => None, + _ => { + warn!("Missing one of: --ca-cert, --node-cert, --node-key. Not using TLS."); + None + } + }; + + let rpc_http_cli = + Arc::new(RpcHttpClient::new(8, &tls_config).expect("Could not create RPC client")); + let membership_rpc_cli = + RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string()); + let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string()); + + let resp = match opt.cmd { + Command::Server(server_opt) => { + // Abort on panic (same behavior as in Go) + std::panic::set_hook(Box::new(|panic_info| { + error!("{}", panic_info.to_string()); + std::process::abort(); + })); + + server::run_server(server_opt.config_file).await + } + Command::Status => cmd_status(membership_rpc_cli, opt.rpc_host).await, + Command::Node(NodeOperation::Configure(configure_opt)) => { + cmd_configure(membership_rpc_cli, opt.rpc_host, configure_opt).await + } + Command::Node(NodeOperation::Remove(remove_opt)) => { + cmd_remove(membership_rpc_cli, opt.rpc_host, remove_opt).await + } + Command::Bucket(bo) => { + cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await + } + Command::Key(bo) => { + cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::KeyOperation(bo)).await + } + Command::Repair(ro) => { + cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::LaunchRepair(ro)).await + } + }; + + if let Err(e) = resp { + error!("Error: {}", e); + } +} + +async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) + .await?? + { + Message::AdvertiseNodesUp(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + let config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) + .await?? + { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + println!("Healthy nodes:"); + for adv in status.iter().filter(|x| x.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + println!( + "{:?}\t{}\t{}\t[{}]\t{}\t{}", + adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens + ); + } else { + println!( + "{:?}\t{}\t{}\tUNCONFIGURED/REMOVED", + adv.id, adv.state_info.hostname, adv.addr + ); + } + } + + let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>(); + let failure_case_1 = status.iter().any(|x| !x.is_up); + let failure_case_2 = config + .members + .iter() + .any(|(id, _)| !status_keys.contains(id)); + if failure_case_1 || failure_case_2 { + println!("\nFailed nodes:"); + for adv in status.iter().filter(|x| !x.is_up) { + if let Some(cfg) = config.members.get(&adv.id) { + println!( + "{:?}\t{}\t{}\t[{}]\t{}\t{}\tlast seen: {}s ago", + adv.id, + adv.state_info.hostname, + adv.addr, + cfg.tag, + cfg.datacenter, + cfg.n_tokens, + (now_msec() - adv.last_seen) / 1000, + ); + } + } + for (id, cfg) in config.members.iter() { + if !status.iter().any(|x| x.id == *id) { + println!( + "{:?}\t{}\t{}\t{}\tnever seen", + id, cfg.tag, cfg.datacenter, cfg.n_tokens + ); + } + } + } + + Ok(()) +} + +async fn cmd_configure( + rpc_cli: RpcAddrClient<Message>, + rpc_host: SocketAddr, + args: ConfigureNodeOpt, +) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) + .await?? + { + Message::AdvertiseNodesUp(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let mut candidates = vec![]; + for adv in status.iter() { + if hex::encode(&adv.id).starts_with(&args.node_id) { + candidates.push(adv.id); + } + } + if candidates.len() != 1 { + return Err(Error::Message(format!( + "{} matching nodes", + candidates.len() + ))); + } + + let mut config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) + .await?? + { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let new_entry = match config.members.get(&candidates[0]) { + None => NetworkConfigEntry { + datacenter: args + .datacenter + .expect("Please specifiy a datacenter with the -d flag"), + n_tokens: args + .n_tokens + .expect("Please specifiy a number of tokens with the -n flag"), + tag: args.tag.unwrap_or("".to_string()), + }, + Some(old) => NetworkConfigEntry { + datacenter: args.datacenter.unwrap_or(old.datacenter.to_string()), + n_tokens: args.n_tokens.unwrap_or(old.n_tokens), + tag: args.tag.unwrap_or(old.tag.to_string()), + }, + }; + + config.members.insert(candidates[0].clone(), new_entry); + config.version += 1; + + rpc_cli + .call( + &rpc_host, + &Message::AdvertiseConfig(config), + ADMIN_RPC_TIMEOUT, + ) + .await??; + Ok(()) +} + +async fn cmd_remove( + rpc_cli: RpcAddrClient<Message>, + rpc_host: SocketAddr, + args: RemoveNodeOpt, +) -> Result<(), Error> { + let mut config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) + .await?? + { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let mut candidates = vec![]; + for (key, _) in config.members.iter() { + if hex::encode(key).starts_with(&args.node_id) { + candidates.push(*key); + } + } + if candidates.len() != 1 { + return Err(Error::Message(format!( + "{} matching nodes", + candidates.len() + ))); + } + + if !args.yes { + return Err(Error::Message(format!( + "Add the flag --yes to really remove {:?} from the cluster", + candidates[0] + ))); + } + + config.members.remove(&candidates[0]); + config.version += 1; + + rpc_cli + .call( + &rpc_host, + &Message::AdvertiseConfig(config), + ADMIN_RPC_TIMEOUT, + ) + .await??; + Ok(()) +} + +async fn cmd_admin( + rpc_cli: RpcAddrClient<AdminRPC>, + rpc_host: SocketAddr, + args: AdminRPC, +) -> Result<(), Error> { + match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await?? { + AdminRPC::Ok(msg) => { + println!("{}", msg); + } + AdminRPC::BucketList(bl) => { + println!("List of buckets:"); + for bucket in bl { + println!("{}", bucket); + } + } + AdminRPC::BucketInfo(bucket) => { + println!("{:?}", bucket); + } + AdminRPC::KeyList(kl) => { + println!("List of keys:"); + for key in kl { + println!("{}\t{}", key.0, key.1); + } + } + AdminRPC::KeyInfo(key) => { + println!("{:?}", key); + } + r => { + error!("Unexpected response: {:?}", r); + } + } + Ok(()) +} diff --git a/src/garage/repair.rs b/src/garage/repair.rs new file mode 100644 index 00000000..4efb9e84 --- /dev/null +++ b/src/garage/repair.rs @@ -0,0 +1,183 @@ +use std::sync::Arc; + +use tokio::sync::watch; + +use garage_core::block_ref_table::*; +use garage_core::garage::Garage; +use garage_core::version_table::*; +use garage_table::*; +use garage_util::error::Error; + +use crate::*; + +pub struct Repair { + pub garage: Arc<Garage>, +} + +impl Repair { + pub async fn repair_worker( + &self, + opt: RepairOpt, + must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { + let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); + + if todo(RepairWhat::Tables) { + info!("Launching a full sync of tables"); + self.garage + .bucket_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .object_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .version_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + self.garage + .block_ref_table + .syncer + .load_full() + .unwrap() + .add_full_scan() + .await; + } + + // TODO: wait for full sync to finish before proceeding to the rest? + + if todo(RepairWhat::Versions) { + info!("Repairing the versions table"); + self.repair_versions(&must_exit).await?; + } + + if todo(RepairWhat::BlockRefs) { + info!("Repairing the block refs table"); + self.repair_block_ref(&must_exit).await?; + } + + if opt.what.is_none() { + info!("Repairing the RC"); + self.repair_rc(&must_exit).await?; + } + + if todo(RepairWhat::Blocks) { + info!("Repairing the stored blocks"); + self.garage + .block_manager + .repair_data_store(&must_exit) + .await?; + } + + Ok(()) + } + + async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { + let mut pos = vec![]; + + while let Some((item_key, item_bytes)) = self.garage.version_table.store.get_gt(&pos)? { + pos = item_key.to_vec(); + + let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; + if version.deleted { + continue; + } + let object = self + .garage + .object_table + .get(&version.bucket, &version.key) + .await?; + let version_exists = match object { + Some(o) => o.versions().iter().any(|x| x.uuid == version.uuid), + None => { + warn!( + "Repair versions: object for version {:?} not found", + version + ); + false + } + }; + if !version_exists { + info!("Repair versions: marking version as deleted: {:?}", version); + self.garage + .version_table + .insert(&Version::new( + version.uuid, + version.bucket, + version.key, + true, + vec![], + )) + .await?; + } + + if *must_exit.borrow() { + break; + } + } + Ok(()) + } + + async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { + let mut pos = vec![]; + + while let Some((item_key, item_bytes)) = self.garage.block_ref_table.store.get_gt(&pos)? { + pos = item_key.to_vec(); + + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; + if block_ref.deleted { + continue; + } + let version = self + .garage + .version_table + .get(&block_ref.version, &EmptyKey) + .await?; + let ref_exists = match version { + Some(v) => !v.deleted, + None => { + warn!( + "Block ref repair: version for block ref {:?} not found", + block_ref + ); + false + } + }; + if !ref_exists { + info!( + "Repair block ref: marking block_ref as deleted: {:?}", + block_ref + ); + self.garage + .block_ref_table + .insert(&BlockRef { + block: block_ref.block, + version: block_ref.version, + deleted: true, + }) + .await?; + } + + if *must_exit.borrow() { + break; + } + } + Ok(()) + } + + async fn repair_rc(&self, _must_exit: &watch::Receiver<bool>) -> Result<(), Error> { + // TODO + warn!("repair_rc: not implemented"); + Ok(()) + } +} diff --git a/src/garage/server.rs b/src/garage/server.rs new file mode 100644 index 00000000..52d03464 --- /dev/null +++ b/src/garage/server.rs @@ -0,0 +1,87 @@ +use std::path::PathBuf; +use std::sync::Arc; + +use futures_util::future::*; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::config::*; +use garage_util::error::Error; + +use garage_api::api_server; +use garage_core::garage::Garage; +use garage_rpc::rpc_server::RpcServer; + +use crate::admin_rpc::*; + +async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error> { + // Wait for the CTRL+C signal + tokio::signal::ctrl_c() + .await + .expect("failed to install CTRL+C signal handler"); + info!("Received CTRL+C, shutting down."); + send_cancel.broadcast(true)?; + Ok(()) +} + +async fn wait_from(mut chan: watch::Receiver<bool>) -> () { + while let Some(exit_now) = chan.recv().await { + if exit_now { + return; + } + } +} + +pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { + info!("Loading configuration..."); + let config = read_config(config_file).expect("Unable to read config file"); + + info!("Opening database..."); + let mut db_path = config.metadata_dir.clone(); + db_path.push("db"); + let db = sled::open(db_path).expect("Unable to open DB"); + + info!("Initialize RPC server..."); + let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone()); + + info!("Initializing background runner..."); + let (send_cancel, watch_cancel) = watch::channel(false); + let background = BackgroundRunner::new(16, watch_cancel.clone()); + + let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await; + + info!("Crate admin RPC handler..."); + AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server); + + info!("Initializing RPC and API servers..."); + let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); + let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); + + futures::try_join!( + garage + .system + .clone() + .bootstrap(&garage.config.bootstrap_peers[..]) + .map(|rv| { + info!("Bootstrap done"); + Ok(rv) + }), + run_rpc_server.map(|rv| { + info!("RPC server exited"); + rv + }), + api_server.map(|rv| { + info!("API server exited"); + rv + }), + background.run().map(|rv| { + info!("Background runner exited"); + Ok(rv) + }), + shutdown_signal(send_cancel), + )?; + + info!("Cleaning up..."); + + Ok(()) +} |