diff options
-rw-r--r-- | Cargo.lock | 18 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/data.rs | 14 | ||||
-rw-r--r-- | src/error.rs | 2 | ||||
-rw-r--r-- | src/membership.rs | 9 | ||||
-rw-r--r-- | src/object_table.rs | 3 | ||||
-rw-r--r-- | src/rpc_client.rs | 2 | ||||
-rw-r--r-- | src/rpc_server.rs | 7 | ||||
-rw-r--r-- | src/server.rs | 2 | ||||
-rw-r--r-- | src/table.rs | 11 |
10 files changed, 60 insertions, 9 deletions
@@ -296,6 +296,7 @@ dependencies = [ "rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", "serde_bytes 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "sled 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)", "structopt 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)", @@ -754,6 +755,11 @@ dependencies = [ ] [[package]] +name = "ryu" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] name = "scopeguard" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -785,6 +791,16 @@ dependencies = [ ] [[package]] +name = "serde_json" +version = "1.0.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "itoa 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)", + "ryu 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] name = "sha2" version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1139,10 +1155,12 @@ dependencies = [ "checksum rmp 0.8.9 (registry+https://github.com/rust-lang/crates.io-index)" = "0f10b46df14cf1ee1ac7baa4d2fbc2c52c0622a4b82fa8740e37bc452ac0184f" "checksum rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4c1ee98f14fe8b8e9c5ea13d25da7b2a1796169202c57a09d7288de90d56222b" "checksum rustversion 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b3bba175698996010c4f6dce5e7f173b6eb781fce25d2cfc45e27091ce0b79f6" +"checksum ryu 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "535622e6be132bccd223f4bb2b8ac8d53cda3c7a6394944d3b2b33fb974f9d76" "checksum scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" "checksum serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)" = "36df6ac6412072f67cf767ebbde4133a5b2e88e76dc6187fa7104cd16f783399" "checksum serde_bytes 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "325a073952621257820e7a3469f55ba4726d8b28657e7e36653d1c36dc2c84ae" "checksum serde_derive 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)" = "9e549e3abf4fb8621bd1609f11dfc9f5e50320802273b12f3811a67e6716ea6c" +"checksum serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)" = "da07b57ee2623368351e9a0488bb0b261322a15a6e0ae53e243cbdc0f4208da9" "checksum sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "27044adfd2e1f077f649f59deb9490d3941d674002f7d062870a60ebe9bd47a0" "checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" @@ -28,3 +28,4 @@ hex = "0.3" sha2 = "0.8" async-trait = "0.1.30" reduce = "0.1.2" +serde_json = "1.0" diff --git a/src/data.rs b/src/data.rs index 14846fe2..91b21b02 100644 --- a/src/data.rs +++ b/src/data.rs @@ -93,6 +93,20 @@ pub fn now_msec() -> u64 { .as_millis() as u64 } +// RMP serialization with names of fields and variants + +pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> +where T: Serialize + ?Sized +{ + let mut wr = Vec::with_capacity(128); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + val.serialize(&mut se)?; + Ok(wr) + +} + // Network management #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/src/error.rs b/src/error.rs index 578f73e9..9cbbd4f7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,6 +23,8 @@ pub enum Error { RMPEncode(#[error(source)] rmp_serde::encode::Error), #[error(display = "Messagepack decode error: {}", _0)] RMPDecode(#[error(source)] rmp_serde::decode::Error), + #[error(display = "JSON error: {}", _0)] + JSON(#[error(source)] serde_json::error::Error), #[error(display = "TOML decode error: {}", _0)] TomlDecode(#[error(source)] toml::de::Error), diff --git a/src/membership.rs b/src/membership.rs index e1eeae41..314495e9 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -45,6 +45,7 @@ pub struct NodeStatus { pub remaining_ping_attempts: usize, } +#[derive(Debug)] pub struct RingEntry { pub location: Hash, pub node: UUID, @@ -109,6 +110,12 @@ impl Members { new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location)); self.ring = new_ring; self.n_datacenters = datacenters.len(); + + eprintln!("RING: --"); + for e in self.ring.iter() { + eprintln!("{:?}", e); + } + eprintln!("END --"); } pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { @@ -200,7 +207,7 @@ impl System { path.push("network_config"); let members = self.members.read().await; - let data = rmp_serde::encode::to_vec_named(&members.config) + let data = rmp_to_vec_all_named(&members.config) .expect("Error while encoding network config"); drop(members); diff --git a/src/object_table.rs b/src/object_table.rs index 37c02225..626c00b2 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -83,6 +83,7 @@ impl TableFormat for ObjectTable { type E = Object; async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { - unimplemented!() + //unimplemented!() + // TODO } } diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 7995cdfa..9c9726b1 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -106,7 +106,7 @@ impl RpcClient { let req = Request::builder() .method(Method::POST) .uri(uri) - .body(Body::from(rmp_serde::encode::to_vec_named(msg)?))?; + .body(Body::from(rmp_to_vec_all_named(msg)?))?; let resp_fut = self.client.request(req); let resp = tokio::time::timeout(timeout, resp_fut).await??; diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 0ac26141..d3bc174d 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -8,6 +8,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode}; use futures::future::Future; use crate::error::Error; +use crate::data::rmp_to_vec_all_named; use crate::proto::Message; use crate::server::Garage; @@ -28,7 +29,7 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R let whole_body = hyper::body::to_bytes(req.into_body()).await?; let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?; - eprintln!("RPC from {}: {:?}", addr, msg); + eprintln!("RPC from {}: {}", addr, serde_json::to_string(&msg)?); let sys = garage.system.clone(); let resp = err_to_msg(match &msg { @@ -49,8 +50,10 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))), }); + eprintln!("reply to {}: {}", addr, serde_json::to_string(&resp)?); + Ok(Response::new(Body::from( - rmp_serde::encode::to_vec_named(&resp)? + rmp_to_vec_all_named(&resp)? ))) } diff --git a/src/server.rs b/src/server.rs index c41ee9b7..cc7e5dce 100644 --- a/src/server.rs +++ b/src/server.rs @@ -136,7 +136,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { .expect("Unable to read config file"); let mut db_path = config.metadata_dir.clone(); - db_path.push("garage_metadata"); + db_path.push("db"); let db = sled::open(db_path) .expect("Unable to open DB"); diff --git a/src/table.rs b/src/table.rs index 55ae9229..6a72c8bc 100644 --- a/src/table.rs +++ b/src/table.rs @@ -44,7 +44,7 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> { async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error> { let msg = rmp_serde::decode::from_read_ref::<_, TableRPC<F>>(rpc)?; let rep = self.table.handle(msg).await?; - Ok(rmp_serde::encode::to_vec_named(&rep)?) + Ok(rmp_to_vec_all_named(&rep)?) } } @@ -129,6 +129,7 @@ impl<F: TableFormat + 'static> Table<F> { let hash = e.partition_key().hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); + eprintln!("insert who: {:?}", who); let rpc = &TableRPC::<F>::Update(vec![e.clone()]); @@ -142,6 +143,7 @@ impl<F: TableFormat + 'static> Table<F> { let hash = partition_key.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); + eprintln!("get who: {:?}", who); let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self.rpc_try_call_many(&who[..], @@ -169,7 +171,9 @@ impl<F: TableFormat + 'static> Table<F> { } async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> { - let rpc_bytes = rmp_serde::encode::to_vec_named(rpc)?; + eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?); + + let rpc_bytes = rmp_to_vec_all_named(rpc)?; let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); let resps = rpc_try_call_many(self.system.clone(), @@ -188,6 +192,7 @@ impl<F: TableFormat + 'static> Table<F> { } return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) } + eprintln!("Table RPC responses: {}", serde_json::to_string(&resps_vals)?); Ok(resps_vals) } @@ -228,7 +233,7 @@ impl<F: TableFormat + 'static> Table<F> { None => None }; - let new_bytes = rmp_serde::encode::to_vec_named(&entry)?; + let new_bytes = rmp_to_vec_all_named(&entry)?; self.store.insert(&tree_key, new_bytes)?; self.instance.updated(old_val.as_ref(), &entry).await; |