aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock18
-rw-r--r--Cargo.toml1
-rw-r--r--src/data.rs14
-rw-r--r--src/error.rs2
-rw-r--r--src/membership.rs9
-rw-r--r--src/object_table.rs3
-rw-r--r--src/rpc_client.rs2
-rw-r--r--src/rpc_server.rs7
-rw-r--r--src/server.rs2
-rw-r--r--src/table.rs11
10 files changed, 60 insertions, 9 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 58e4046b..e76866fc 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -296,6 +296,7 @@ dependencies = [
"rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_bytes 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)",
"sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"sled 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)",
"structopt 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -754,6 +755,11 @@ dependencies = [
]
[[package]]
+name = "ryu"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -785,6 +791,16 @@ dependencies = [
]
[[package]]
+name = "serde_json"
+version = "1.0.51"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)",
+ "ryu 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "sha2"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1139,10 +1155,12 @@ dependencies = [
"checksum rmp 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)" = "0f10b46df14cf1ee1ac7baa4d2fbc2c52c0622a4b82fa8740e37bc452ac0184f"
"checksum rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4c1ee98f14fe8b8e9c5ea13d25da7b2a1796169202c57a09d7288de90d56222b"
"checksum rustversion 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b3bba175698996010c4f6dce5e7f173b6eb781fce25d2cfc45e27091ce0b79f6"
+"checksum ryu 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "535622e6be132bccd223f4bb2b8ac8d53cda3c7a6394944d3b2b33fb974f9d76"
"checksum scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
"checksum serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)" = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399"
"checksum serde_bytes 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "325a073952621257820e7a3469f55ba4726d8b28657e7e36653d1c36dc2c84ae"
"checksum serde_derive 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)" = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c"
+"checksum serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)" = "da07b57ee2623368351e9a0488bb0b261322a15a6e0ae53e243cbdc0f4208da9"
"checksum sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "27044adfd2e1f077f649f59deb9490d3941d674002f7d062870a60ebe9bd47a0"
"checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
diff --git a/Cargo.toml b/Cargo.toml
index f9c15725..a8505cbc 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -28,3 +28,4 @@ hex = "0.3"
sha2 = "0.8"
async-trait = "0.1.30"
reduce = "0.1.2"
+serde_json = "1.0"
diff --git a/src/data.rs b/src/data.rs
index 14846fe2..91b21b02 100644
--- a/src/data.rs
+++ b/src/data.rs
@@ -93,6 +93,20 @@ pub fn now_msec() -> u64 {
.as_millis() as u64
}
+// RMP serialization with names of fields and variants
+
+pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
+where T: Serialize + ?Sized
+{
+ let mut wr = Vec::with_capacity(128);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ val.serialize(&mut se)?;
+ Ok(wr)
+
+}
+
// Network management
#[derive(Clone, Debug, Serialize, Deserialize)]
diff --git a/src/error.rs b/src/error.rs
index 578f73e9..9cbbd4f7 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -23,6 +23,8 @@ pub enum Error {
RMPEncode(#[error(source)] rmp_serde::encode::Error),
#[error(display = "Messagepack decode error: {}", _0)]
RMPDecode(#[error(source)] rmp_serde::decode::Error),
+ #[error(display = "JSON error: {}", _0)]
+ JSON(#[error(source)] serde_json::error::Error),
#[error(display = "TOML decode error: {}", _0)]
TomlDecode(#[error(source)] toml::de::Error),
diff --git a/src/membership.rs b/src/membership.rs
index e1eeae41..314495e9 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -45,6 +45,7 @@ pub struct NodeStatus {
pub remaining_ping_attempts: usize,
}
+#[derive(Debug)]
pub struct RingEntry {
pub location: Hash,
pub node: UUID,
@@ -109,6 +110,12 @@ impl Members {
new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
self.ring = new_ring;
self.n_datacenters = datacenters.len();
+
+ eprintln!("RING: --");
+ for e in self.ring.iter() {
+ eprintln!("{:?}", e);
+ }
+ eprintln!("END --");
}
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
@@ -200,7 +207,7 @@ impl System {
path.push("network_config");
let members = self.members.read().await;
- let data = rmp_serde::encode::to_vec_named(&members.config)
+ let data = rmp_to_vec_all_named(&members.config)
.expect("Error while encoding network config");
drop(members);
diff --git a/src/object_table.rs b/src/object_table.rs
index 37c02225..626c00b2 100644
--- a/src/object_table.rs
+++ b/src/object_table.rs
@@ -83,6 +83,7 @@ impl TableFormat for ObjectTable {
type E = Object;
async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
- unimplemented!()
+ //unimplemented!()
+ // TODO
}
}
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index 7995cdfa..9c9726b1 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -106,7 +106,7 @@ impl RpcClient {
let req = Request::builder()
.method(Method::POST)
.uri(uri)
- .body(Body::from(rmp_serde::encode::to_vec_named(msg)?))?;
+ .body(Body::from(rmp_to_vec_all_named(msg)?))?;
let resp_fut = self.client.request(req);
let resp = tokio::time::timeout(timeout, resp_fut).await??;
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 0ac26141..d3bc174d 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -8,6 +8,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode};
use futures::future::Future;
use crate::error::Error;
+use crate::data::rmp_to_vec_all_named;
use crate::proto::Message;
use crate::server::Garage;
@@ -28,7 +29,7 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?;
- eprintln!("RPC from {}: {:?}", addr, msg);
+ eprintln!("RPC from {}: {}", addr, serde_json::to_string(&msg)?);
let sys = garage.system.clone();
let resp = err_to_msg(match &msg {
@@ -49,8 +50,10 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R
_ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),
});
+ eprintln!("reply to {}: {}", addr, serde_json::to_string(&resp)?);
+
Ok(Response::new(Body::from(
- rmp_serde::encode::to_vec_named(&resp)?
+ rmp_to_vec_all_named(&resp)?
)))
}
diff --git a/src/server.rs b/src/server.rs
index c41ee9b7..cc7e5dce 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -136,7 +136,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.expect("Unable to read config file");
let mut db_path = config.metadata_dir.clone();
- db_path.push("garage_metadata");
+ db_path.push("db");
let db = sled::open(db_path)
.expect("Unable to open DB");
diff --git a/src/table.rs b/src/table.rs
index 55ae9229..6a72c8bc 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -44,7 +44,7 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> {
async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error> {
let msg = rmp_serde::decode::from_read_ref::<_, TableRPC<F>>(rpc)?;
let rep = self.table.handle(msg).await?;
- Ok(rmp_serde::encode::to_vec_named(&rep)?)
+ Ok(rmp_to_vec_all_named(&rep)?)
}
}
@@ -129,6 +129,7 @@ impl<F: TableFormat + 'static> Table<F> {
let hash = e.partition_key().hash();
let who = self.system.members.read().await
.walk_ring(&hash, self.param.replication_factor);
+ eprintln!("insert who: {:?}", who);
let rpc = &TableRPC::<F>::Update(vec![e.clone()]);
@@ -142,6 +143,7 @@ impl<F: TableFormat + 'static> Table<F> {
let hash = partition_key.hash();
let who = self.system.members.read().await
.walk_ring(&hash, self.param.replication_factor);
+ eprintln!("get who: {:?}", who);
let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self.rpc_try_call_many(&who[..],
@@ -169,7 +171,9 @@ impl<F: TableFormat + 'static> Table<F> {
}
async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> {
- let rpc_bytes = rmp_serde::encode::to_vec_named(rpc)?;
+ eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?);
+
+ let rpc_bytes = rmp_to_vec_all_named(rpc)?;
let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes);
let resps = rpc_try_call_many(self.system.clone(),
@@ -188,6 +192,7 @@ impl<F: TableFormat + 'static> Table<F> {
}
return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp)))
}
+ eprintln!("Table RPC responses: {}", serde_json::to_string(&resps_vals)?);
Ok(resps_vals)
}
@@ -228,7 +233,7 @@ impl<F: TableFormat + 'static> Table<F> {
None => None
};
- let new_bytes = rmp_serde::encode::to_vec_named(&entry)?;
+ let new_bytes = rmp_to_vec_all_named(&entry)?;
self.store.insert(&tree_key, new_bytes)?;
self.instance.updated(old_val.as_ref(), &entry).await;