aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs14
-rw-r--r--src/membership.rs4
-rw-r--r--src/rpc_server.rs16
-rw-r--r--src/table_sync.rs25
4 files changed, 44 insertions, 15 deletions
diff --git a/src/main.rs b/src/main.rs
index 9b75740d..01972928 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -111,6 +111,10 @@ pub struct ConfigureNodeOpt {
/// Number of tokens
n_tokens: u32,
+
+ /// Optionnal node tag
+ #[structopt(long = "tag", default_value = "")]
+ tag: String,
}
#[derive(StructOpt, Debug)]
@@ -266,8 +270,8 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
for adv in status.iter() {
if let Some(cfg) = config.members.get(&adv.id) {
println!(
- "{:?}\t{}\t{}\t{}\t{}",
- adv.id, adv.state_info.hostname, adv.addr, cfg.datacenter, cfg.n_tokens
+ "{:?}\t{}\t{}\t{}\t{}\t{}",
+ adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.n_tokens
);
}
}
@@ -281,7 +285,10 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
println!("\nFailed nodes:");
for (id, cfg) in config.members.iter() {
if !status.iter().any(|x| x.id == *id) {
- println!("{:?}\t{}\t{}", id, cfg.datacenter, cfg.n_tokens);
+ println!(
+ "{:?}\t{}\t{}\t{}",
+ id, cfg.tag, cfg.datacenter, cfg.n_tokens
+ );
}
}
}
@@ -340,6 +347,7 @@ async fn cmd_configure(
NetworkConfigEntry {
datacenter: args.datacenter,
n_tokens: args.n_tokens,
+ tag: args.tag,
},
);
config.version += 1;
diff --git a/src/membership.rs b/src/membership.rs
index 6f6dd47d..89b0fd67 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -69,6 +69,7 @@ pub struct NetworkConfig {
pub struct NetworkConfigEntry {
pub datacenter: String,
pub n_tokens: u32,
+ pub tag: String,
}
pub struct System {
@@ -248,7 +249,8 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
let mut net_config_bytes = vec![];
file.read_to_end(&mut net_config_bytes)?;
- let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])?;
+ let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
+ .expect("Unable to parse network configuration file (has version format changed?).");
Ok(net_config)
}
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 4541e4da..938eb512 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -1,8 +1,8 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
-use std::time::Instant;
use std::sync::Arc;
+use std::time::Instant;
use bytes::IntoBuf;
use futures::future::Future;
@@ -51,7 +51,11 @@ where
match handler(msg, sockaddr).await {
Ok(resp) => {
let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?;
- trace!("]RPC:{},ok ({} ms)", name, (Instant::now()-begin_time).as_millis());
+ trace!(
+ "]RPC:{},ok ({} ms)",
+ name,
+ (Instant::now() - begin_time).as_millis()
+ );
Ok(Response::new(Body::from(resp_bytes)))
}
Err(e) => {
@@ -59,7 +63,13 @@ where
let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?;
let mut err_response = Response::new(Body::from(rep_bytes));
*err_response.status_mut() = e.http_status_code();
- warn!("RPC error ({}): {} ({} ms), request: {}", name, e, (Instant::now()-begin_time).as_millis(), req_str);
+ warn!(
+ "RPC error ({}): {} ({} ms), request: {}",
+ name,
+ e,
+ (Instant::now() - begin_time).as_millis(),
+ req_str
+ );
Ok(err_response)
}
}
diff --git a/src/table_sync.rs b/src/table_sync.rs
index cb6d87aa..26c5bed8 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -198,19 +198,25 @@ where
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
- debug!("({}) Preparing to sync {:?}...", self.table.name, partition);
- let root_cks = self
- .root_checksum(&partition.begin, &partition.end, must_exit)
- .await?;
-
let my_id = self.table.system.id.clone();
let nodes = self
.table
.replication
- .write_nodes(&partition.begin, &self.table.system);
+ .write_nodes(&partition.begin, &self.table.system)
+ .into_iter()
+ .filter(|node| *node != my_id)
+ .collect::<Vec<_>>();
+
+ debug!(
+ "({}) Preparing to sync {:?} with {:?}...",
+ self.table.name, partition, nodes
+ );
+ let root_cks = self
+ .root_checksum(&partition.begin, &partition.end, must_exit)
+ .await?;
+
let mut sync_futures = nodes
.iter()
- .filter(|node| **node != my_id)
.map(|node| {
self.clone().do_sync_with(
partition.clone(),
@@ -230,7 +236,10 @@ where
}
}
if n_errors > self.table.replication.max_write_errors() {
- return Err(Error::Message(format!("Sync failed with too many nodes.")));
+ return Err(Error::Message(format!(
+ "Sync failed with too many nodes (should have been: {:?}).",
+ nodes
+ )));
}
if !partition.retain {