diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/data.rs | 14 | ||||
-rw-r--r-- | src/error.rs | 2 | ||||
-rw-r--r-- | src/membership.rs | 9 | ||||
-rw-r--r-- | src/object_table.rs | 3 | ||||
-rw-r--r-- | src/rpc_client.rs | 2 | ||||
-rw-r--r-- | src/rpc_server.rs | 7 | ||||
-rw-r--r-- | src/server.rs | 2 | ||||
-rw-r--r-- | src/table.rs | 11 |
8 files changed, 41 insertions, 9 deletions
diff --git a/src/data.rs b/src/data.rs index 14846fe2..91b21b02 100644 --- a/src/data.rs +++ b/src/data.rs @@ -93,6 +93,20 @@ pub fn now_msec() -> u64 { .as_millis() as u64 } +// RMP serialization with names of fields and variants + +pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> +where T: Serialize + ?Sized +{ + let mut wr = Vec::with_capacity(128); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + val.serialize(&mut se)?; + Ok(wr) + +} + // Network management #[derive(Clone, Debug, Serialize, Deserialize)] diff --git a/src/error.rs b/src/error.rs index 578f73e9..9cbbd4f7 100644 --- a/src/error.rs +++ b/src/error.rs @@ -23,6 +23,8 @@ pub enum Error { RMPEncode(#[error(source)] rmp_serde::encode::Error), #[error(display = "Messagepack decode error: {}", _0)] RMPDecode(#[error(source)] rmp_serde::decode::Error), + #[error(display = "JSON error: {}", _0)] + JSON(#[error(source)] serde_json::error::Error), #[error(display = "TOML decode error: {}", _0)] TomlDecode(#[error(source)] toml::de::Error), diff --git a/src/membership.rs b/src/membership.rs index e1eeae41..314495e9 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -45,6 +45,7 @@ pub struct NodeStatus { pub remaining_ping_attempts: usize, } +#[derive(Debug)] pub struct RingEntry { pub location: Hash, pub node: UUID, @@ -109,6 +110,12 @@ impl Members { new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location)); self.ring = new_ring; self.n_datacenters = datacenters.len(); + + eprintln!("RING: --"); + for e in self.ring.iter() { + eprintln!("{:?}", e); + } + eprintln!("END --"); } pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { @@ -200,7 +207,7 @@ impl System { path.push("network_config"); let members = self.members.read().await; - let data = rmp_serde::encode::to_vec_named(&members.config) + let data = rmp_to_vec_all_named(&members.config) .expect("Error while encoding network config"); drop(members); diff --git a/src/object_table.rs b/src/object_table.rs index 37c02225..626c00b2 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -83,6 +83,7 @@ impl TableFormat for ObjectTable { type E = Object; async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { - unimplemented!() + //unimplemented!() + // TODO } } diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 7995cdfa..9c9726b1 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -106,7 +106,7 @@ impl RpcClient { let req = Request::builder() .method(Method::POST) .uri(uri) - .body(Body::from(rmp_serde::encode::to_vec_named(msg)?))?; + .body(Body::from(rmp_to_vec_all_named(msg)?))?; let resp_fut = self.client.request(req); let resp = tokio::time::timeout(timeout, resp_fut).await??; diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 0ac26141..d3bc174d 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -8,6 +8,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode}; use futures::future::Future; use crate::error::Error; +use crate::data::rmp_to_vec_all_named; use crate::proto::Message; use crate::server::Garage; @@ -28,7 +29,7 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R let whole_body = hyper::body::to_bytes(req.into_body()).await?; let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?; - eprintln!("RPC from {}: {:?}", addr, msg); + eprintln!("RPC from {}: {}", addr, serde_json::to_string(&msg)?); let sys = garage.system.clone(); let resp = err_to_msg(match &msg { @@ -49,8 +50,10 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))), }); + eprintln!("reply to {}: {}", addr, serde_json::to_string(&resp)?); + Ok(Response::new(Body::from( - rmp_serde::encode::to_vec_named(&resp)? + rmp_to_vec_all_named(&resp)? ))) } diff --git a/src/server.rs b/src/server.rs index c41ee9b7..cc7e5dce 100644 --- a/src/server.rs +++ b/src/server.rs @@ -136,7 +136,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { .expect("Unable to read config file"); let mut db_path = config.metadata_dir.clone(); - db_path.push("garage_metadata"); + db_path.push("db"); let db = sled::open(db_path) .expect("Unable to open DB"); diff --git a/src/table.rs b/src/table.rs index 55ae9229..6a72c8bc 100644 --- a/src/table.rs +++ b/src/table.rs @@ -44,7 +44,7 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> { async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error> { let msg = rmp_serde::decode::from_read_ref::<_, TableRPC<F>>(rpc)?; let rep = self.table.handle(msg).await?; - Ok(rmp_serde::encode::to_vec_named(&rep)?) + Ok(rmp_to_vec_all_named(&rep)?) } } @@ -129,6 +129,7 @@ impl<F: TableFormat + 'static> Table<F> { let hash = e.partition_key().hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); + eprintln!("insert who: {:?}", who); let rpc = &TableRPC::<F>::Update(vec![e.clone()]); @@ -142,6 +143,7 @@ impl<F: TableFormat + 'static> Table<F> { let hash = partition_key.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); + eprintln!("get who: {:?}", who); let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self.rpc_try_call_many(&who[..], @@ -169,7 +171,9 @@ impl<F: TableFormat + 'static> Table<F> { } async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> { - let rpc_bytes = rmp_serde::encode::to_vec_named(rpc)?; + eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?); + + let rpc_bytes = rmp_to_vec_all_named(rpc)?; let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); let resps = rpc_try_call_many(self.system.clone(), @@ -188,6 +192,7 @@ impl<F: TableFormat + 'static> Table<F> { } return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) } + eprintln!("Table RPC responses: {}", serde_json::to_string(&resps_vals)?); Ok(resps_vals) } @@ -228,7 +233,7 @@ impl<F: TableFormat + 'static> Table<F> { None => None }; - let new_bytes = rmp_serde::encode::to_vec_named(&entry)?; + let new_bytes = rmp_to_vec_all_named(&entry)?; self.store.insert(&tree_key, new_bytes)?; self.instance.updated(old_val.as_ref(), &entry).await; |