aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-16 17:04:28 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-16 17:04:28 +0200
commite8d750175de3daff0876b63c9ae4dcbd047be793 (patch)
treeac6055924240d1315d3f41912dabe39398138576
parentf01c1e71b5421d8941d1484e224cd6fd75525651 (diff)
downloadgarage-e8d750175de3daff0876b63c9ae4dcbd047be793.tar.gz
garage-e8d750175de3daff0876b63c9ae4dcbd047be793.zip
Implement ring comparison algorithm
-rw-r--r--src/main.rs71
-rw-r--r--src/server.rs9
-rw-r--r--src/table_sync.rs75
3 files changed, 142 insertions, 13 deletions
diff --git a/src/main.rs b/src/main.rs
index 8b124bff..ebf97a29 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -61,6 +61,10 @@ pub enum Command {
/// Configure Garage node
#[structopt(name = "configure")]
Configure(ConfigureOpt),
+
+ /// Remove Garage node from cluster
+ #[structopt(name = "remove")]
+ Remove(RemoveOpt),
}
#[derive(StructOpt, Debug)]
@@ -82,6 +86,16 @@ pub struct ConfigureOpt {
n_tokens: u32,
}
+#[derive(StructOpt, Debug)]
+pub struct RemoveOpt {
+ /// Node to configure (prefix of hexadecimal node id)
+ node_id: String,
+
+ /// If this flag is not given, the node won't be removed
+ #[structopt(long = "yes")]
+ yes: bool,
+}
+
#[tokio::main]
async fn main() {
let opt = Opt::from_args();
@@ -102,11 +116,20 @@ async fn main() {
let rpc_cli = RpcClient::new(&tls_config).expect("Could not create RPC client");
let resp = match opt.cmd {
- Command::Server(server_opt) => server::run_server(server_opt.config_file).await,
+ Command::Server(server_opt) => {
+ // Abort on panic (same behavior as in Go)
+ std::panic::set_hook(Box::new(|panic_info| {
+ eprintln!("{}", panic_info.to_string());
+ std::process::abort();
+ }));
+
+ server::run_server(server_opt.config_file).await
+ }
Command::Status => cmd_status(rpc_cli, opt.rpc_host).await,
Command::Configure(configure_opt) => {
cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await
}
+ Command::Remove(remove_opt) => cmd_remove(rpc_cli, opt.rpc_host, remove_opt).await,
};
if let Err(e) = resp {
@@ -224,3 +247,49 @@ async fn cmd_configure(
.await?;
Ok(())
}
+
+async fn cmd_remove(
+ rpc_cli: RpcClient,
+ rpc_host: SocketAddr,
+ args: RemoveOpt,
+) -> Result<(), Error> {
+ let mut config = match rpc_cli
+ .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
+ .await?
+ {
+ Message::AdvertiseConfig(cfg) => cfg,
+ resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
+ };
+
+ let mut candidates = vec![];
+ for (key, _) in config.members.iter() {
+ if hex::encode(key).starts_with(&args.node_id) {
+ candidates.push(key.clone());
+ }
+ }
+ if candidates.len() != 1 {
+ return Err(Error::Message(format!(
+ "{} matching nodes",
+ candidates.len()
+ )));
+ }
+
+ if !args.yes {
+ return Err(Error::Message(format!(
+ "Add the flag --yes to really remove {:?} from the cluster",
+ candidates[0]
+ )));
+ }
+
+ config.members.remove(&candidates[0]);
+ config.version += 1;
+
+ rpc_cli
+ .call(
+ &rpc_host,
+ &Message::AdvertiseConfig(config),
+ DEFAULT_TIMEOUT,
+ )
+ .await?;
+ Ok(())
+}
diff --git a/src/server.rs b/src/server.rs
index af58ded1..78b992f5 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -93,7 +93,8 @@ impl Garage {
&db,
"block_ref".to_string(),
data_rep_param.clone(),
- ).await;
+ )
+ .await;
let version_table = Table::new(
VersionTable {
background: background.clone(),
@@ -103,7 +104,8 @@ impl Garage {
&db,
"version".to_string(),
meta_rep_param.clone(),
- ).await;
+ )
+ .await;
let object_table = Table::new(
ObjectTable {
background: background.clone(),
@@ -113,7 +115,8 @@ impl Garage {
&db,
"object".to_string(),
meta_rep_param.clone(),
- ).await;
+ )
+ .await;
let mut garage = Self {
db,
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 5097c1b0..039dab6d 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -1,7 +1,7 @@
use rand::Rng;
+use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;
-use std::collections::BTreeSet;
use futures::{pin_mut, select};
use futures_util::future::*;
@@ -10,7 +10,7 @@ use tokio::sync::Mutex;
use crate::data::*;
use crate::error::Error;
-use crate::membership::{Ring, System};
+use crate::membership::Ring;
use crate::table::*;
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
@@ -29,6 +29,7 @@ pub struct SyncTodo {
pub struct Partition {
pub begin: Hash,
pub end: Hash,
+ pub retain: bool,
}
impl<F: TableSchema + 'static> TableSyncer<F> {
@@ -124,7 +125,8 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> {
- unimplemented!()
+ eprintln!("NOT IMPLEMENTED: SYNC PARTITION {:?}", partition);
+ Ok(())
}
}
@@ -135,14 +137,17 @@ impl SyncTodo {
self.todo.clear();
let ring: Arc<Ring> = table.system.ring.borrow().clone();
+
for i in 0..ring.ring.len() {
let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor);
let begin = ring.ring[i].location.clone();
+ if i == 0 {
+ self.add_full_scan_aux(table, [0u8; 32].into(), begin.clone(), &nodes[..], &my_id);
+ }
+
if i == ring.ring.len() - 1 {
- let end = ring.ring[0].location.clone();
self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id);
- self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id);
} else {
let end = ring.ring[i + 1].location.clone();
self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id);
@@ -158,23 +163,75 @@ impl SyncTodo {
nodes: &[UUID],
my_id: &UUID,
) {
- if !nodes.contains(my_id) {
+ let retain = nodes.contains(my_id);
+ if !retain {
// Check if we have some data to send, otherwise skip
if table
.store
.range(begin.clone()..end.clone())
.next()
.is_none()
- {}
+ {
+ return;
+ }
}
- self.todo.push(Partition { begin, end });
+ self.todo.push(Partition { begin, end, retain });
}
fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) {
+ let my_id = table.system.id.clone();
+
let old_ring = ring_points(old);
let new_ring = ring_points(new);
- unimplemented!()
+ let both_ring = old_ring.union(&new_ring).cloned().collect::<BTreeSet<_>>();
+
+ let prev_todo_begin = self
+ .todo
+ .iter()
+ .map(|x| x.begin.clone())
+ .collect::<BTreeSet<_>>();
+ let prev_todo_end = self
+ .todo
+ .iter()
+ .map(|x| x.end.clone())
+ .collect::<BTreeSet<_>>();
+ let prev_todo = prev_todo_begin
+ .union(&prev_todo_end)
+ .cloned()
+ .collect::<BTreeSet<_>>();
+
+ let all_points = both_ring.union(&prev_todo).cloned().collect::<Vec<_>>();
+
+ self.todo.sort_by(|x, y| x.begin.cmp(&y.begin));
+ let mut new_todo = vec![];
+ for i in 0..all_points.len() - 1 {
+ let begin = all_points[i].clone();
+ let end = all_points[i + 1].clone();
+ let was_ours = old
+ .walk_ring(&begin, table.param.replication_factor)
+ .contains(&my_id);
+ let is_ours = new
+ .walk_ring(&begin, table.param.replication_factor)
+ .contains(&my_id);
+ let was_todo = match self.todo.binary_search_by(|x| x.begin.cmp(&begin)) {
+ Ok(_) => true,
+ Err(j) => {
+ (j > 0 && self.todo[j - 1].begin < end && begin < self.todo[j - 1].end)
+ || (j < self.todo.len()
+ && self.todo[j].begin < end && begin < self.todo[j].end)
+ }
+ };
+ if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) {
+ new_todo.push(Partition {
+ begin,
+ end,
+ retain: is_ours,
+ });
+ }
+ }
+
+ self.todo = new_todo;
}
fn pop_task(&mut self) -> Option<Partition> {