diff options
-rw-r--r-- | src/main.rs | 71 | ||||
-rw-r--r-- | src/server.rs | 9 | ||||
-rw-r--r-- | src/table_sync.rs | 75 |
3 files changed, 142 insertions, 13 deletions
diff --git a/src/main.rs b/src/main.rs index 8b124bff..ebf97a29 100644 --- a/src/main.rs +++ b/src/main.rs @@ -61,6 +61,10 @@ pub enum Command { /// Configure Garage node #[structopt(name = "configure")] Configure(ConfigureOpt), + + /// Remove Garage node from cluster + #[structopt(name = "remove")] + Remove(RemoveOpt), } #[derive(StructOpt, Debug)] @@ -82,6 +86,16 @@ pub struct ConfigureOpt { n_tokens: u32, } +#[derive(StructOpt, Debug)] +pub struct RemoveOpt { + /// Node to configure (prefix of hexadecimal node id) + node_id: String, + + /// If this flag is not given, the node won't be removed + #[structopt(long = "yes")] + yes: bool, +} + #[tokio::main] async fn main() { let opt = Opt::from_args(); @@ -102,11 +116,20 @@ async fn main() { let rpc_cli = RpcClient::new(&tls_config).expect("Could not create RPC client"); let resp = match opt.cmd { - Command::Server(server_opt) => server::run_server(server_opt.config_file).await, + Command::Server(server_opt) => { + // Abort on panic (same behavior as in Go) + std::panic::set_hook(Box::new(|panic_info| { + eprintln!("{}", panic_info.to_string()); + std::process::abort(); + })); + + server::run_server(server_opt.config_file).await + } Command::Status => cmd_status(rpc_cli, opt.rpc_host).await, Command::Configure(configure_opt) => { cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await } + Command::Remove(remove_opt) => cmd_remove(rpc_cli, opt.rpc_host, remove_opt).await, }; if let Err(e) = resp { @@ -224,3 +247,49 @@ async fn cmd_configure( .await?; Ok(()) } + +async fn cmd_remove( + rpc_cli: RpcClient, + rpc_host: SocketAddr, + args: RemoveOpt, +) -> Result<(), Error> { + let mut config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT) + .await? + { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let mut candidates = vec![]; + for (key, _) in config.members.iter() { + if hex::encode(key).starts_with(&args.node_id) { + candidates.push(key.clone()); + } + } + if candidates.len() != 1 { + return Err(Error::Message(format!( + "{} matching nodes", + candidates.len() + ))); + } + + if !args.yes { + return Err(Error::Message(format!( + "Add the flag --yes to really remove {:?} from the cluster", + candidates[0] + ))); + } + + config.members.remove(&candidates[0]); + config.version += 1; + + rpc_cli + .call( + &rpc_host, + &Message::AdvertiseConfig(config), + DEFAULT_TIMEOUT, + ) + .await?; + Ok(()) +} diff --git a/src/server.rs b/src/server.rs index af58ded1..78b992f5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -93,7 +93,8 @@ impl Garage { &db, "block_ref".to_string(), data_rep_param.clone(), - ).await; + ) + .await; let version_table = Table::new( VersionTable { background: background.clone(), @@ -103,7 +104,8 @@ impl Garage { &db, "version".to_string(), meta_rep_param.clone(), - ).await; + ) + .await; let object_table = Table::new( ObjectTable { background: background.clone(), @@ -113,7 +115,8 @@ impl Garage { &db, "object".to_string(), meta_rep_param.clone(), - ).await; + ) + .await; let mut garage = Self { db, diff --git a/src/table_sync.rs b/src/table_sync.rs index 5097c1b0..039dab6d 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,7 +1,7 @@ use rand::Rng; +use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; -use std::collections::BTreeSet; use futures::{pin_mut, select}; use futures_util::future::*; @@ -10,7 +10,7 @@ use tokio::sync::Mutex; use crate::data::*; use crate::error::Error; -use crate::membership::{Ring, System}; +use crate::membership::Ring; use crate::table::*; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); @@ -29,6 +29,7 @@ pub struct SyncTodo { pub struct Partition { pub begin: Hash, pub end: Hash, + pub retain: bool, } impl<F: TableSchema + 'static> TableSyncer<F> { @@ -124,7 +125,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> { } async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> { - unimplemented!() + eprintln!("NOT IMPLEMENTED: SYNC PARTITION {:?}", partition); + Ok(()) } } @@ -135,14 +137,17 @@ impl SyncTodo { self.todo.clear(); let ring: Arc<Ring> = table.system.ring.borrow().clone(); + for i in 0..ring.ring.len() { let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor); let begin = ring.ring[i].location.clone(); + if i == 0 { + self.add_full_scan_aux(table, [0u8; 32].into(), begin.clone(), &nodes[..], &my_id); + } + if i == ring.ring.len() - 1 { - let end = ring.ring[0].location.clone(); self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id); - self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id); } else { let end = ring.ring[i + 1].location.clone(); self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id); @@ -158,23 +163,75 @@ impl SyncTodo { nodes: &[UUID], my_id: &UUID, ) { - if !nodes.contains(my_id) { + let retain = nodes.contains(my_id); + if !retain { // Check if we have some data to send, otherwise skip if table .store .range(begin.clone()..end.clone()) .next() .is_none() - {} + { + return; + } } - self.todo.push(Partition { begin, end }); + self.todo.push(Partition { begin, end, retain }); } fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) { + let my_id = table.system.id.clone(); + let old_ring = ring_points(old); let new_ring = ring_points(new); - unimplemented!() + let both_ring = old_ring.union(&new_ring).cloned().collect::<BTreeSet<_>>(); + + let prev_todo_begin = self + .todo + .iter() + .map(|x| x.begin.clone()) + .collect::<BTreeSet<_>>(); + let prev_todo_end = self + .todo + .iter() + .map(|x| x.end.clone()) + .collect::<BTreeSet<_>>(); + let prev_todo = prev_todo_begin + .union(&prev_todo_end) + .cloned() + .collect::<BTreeSet<_>>(); + + let all_points = both_ring.union(&prev_todo).cloned().collect::<Vec<_>>(); + + self.todo.sort_by(|x, y| x.begin.cmp(&y.begin)); + let mut new_todo = vec![]; + for i in 0..all_points.len() - 1 { + let begin = all_points[i].clone(); + let end = all_points[i + 1].clone(); + let was_ours = old + .walk_ring(&begin, table.param.replication_factor) + .contains(&my_id); + let is_ours = new + .walk_ring(&begin, table.param.replication_factor) + .contains(&my_id); + let was_todo = match self.todo.binary_search_by(|x| x.begin.cmp(&begin)) { + Ok(_) => true, + Err(j) => { + (j > 0 && self.todo[j - 1].begin < end && begin < self.todo[j - 1].end) + || (j < self.todo.len() + && self.todo[j].begin < end && begin < self.todo[j].end) + } + }; + if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { + new_todo.push(Partition { + begin, + end, + retain: is_ours, + }); + } + } + + self.todo = new_todo; } fn pop_task(&mut self) -> Option<Partition> { |