aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/data.rs14
-rw-r--r--src/error.rs2
-rw-r--r--src/membership.rs9
-rw-r--r--src/object_table.rs3
-rw-r--r--src/rpc_client.rs2
-rw-r--r--src/rpc_server.rs7
-rw-r--r--src/server.rs2
-rw-r--r--src/table.rs11
8 files changed, 41 insertions, 9 deletions
diff --git a/src/data.rs b/src/data.rs
index 14846fe2..91b21b02 100644
--- a/src/data.rs
+++ b/src/data.rs
@@ -93,6 +93,20 @@ pub fn now_msec() -> u64 {
.as_millis() as u64
}
+// RMP serialization with names of fields and variants
+
+pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
+where T: Serialize + ?Sized
+{
+ let mut wr = Vec::with_capacity(128);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ val.serialize(&mut se)?;
+ Ok(wr)
+
+}
+
// Network management
#[derive(Clone, Debug, Serialize, Deserialize)]
diff --git a/src/error.rs b/src/error.rs
index 578f73e9..9cbbd4f7 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -23,6 +23,8 @@ pub enum Error {
RMPEncode(#[error(source)] rmp_serde::encode::Error),
#[error(display = "Messagepack decode error: {}", _0)]
RMPDecode(#[error(source)] rmp_serde::decode::Error),
+ #[error(display = "JSON error: {}", _0)]
+ JSON(#[error(source)] serde_json::error::Error),
#[error(display = "TOML decode error: {}", _0)]
TomlDecode(#[error(source)] toml::de::Error),
diff --git a/src/membership.rs b/src/membership.rs
index e1eeae41..314495e9 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -45,6 +45,7 @@ pub struct NodeStatus {
pub remaining_ping_attempts: usize,
}
+#[derive(Debug)]
pub struct RingEntry {
pub location: Hash,
pub node: UUID,
@@ -109,6 +110,12 @@ impl Members {
new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location));
self.ring = new_ring;
self.n_datacenters = datacenters.len();
+
+ eprintln!("RING: --");
+ for e in self.ring.iter() {
+ eprintln!("{:?}", e);
+ }
+ eprintln!("END --");
}
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
@@ -200,7 +207,7 @@ impl System {
path.push("network_config");
let members = self.members.read().await;
- let data = rmp_serde::encode::to_vec_named(&members.config)
+ let data = rmp_to_vec_all_named(&members.config)
.expect("Error while encoding network config");
drop(members);
diff --git a/src/object_table.rs b/src/object_table.rs
index 37c02225..626c00b2 100644
--- a/src/object_table.rs
+++ b/src/object_table.rs
@@ -83,6 +83,7 @@ impl TableFormat for ObjectTable {
type E = Object;
async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
- unimplemented!()
+ //unimplemented!()
+ // TODO
}
}
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index 7995cdfa..9c9726b1 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -106,7 +106,7 @@ impl RpcClient {
let req = Request::builder()
.method(Method::POST)
.uri(uri)
- .body(Body::from(rmp_serde::encode::to_vec_named(msg)?))?;
+ .body(Body::from(rmp_to_vec_all_named(msg)?))?;
let resp_fut = self.client.request(req);
let resp = tokio::time::timeout(timeout, resp_fut).await??;
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 0ac26141..d3bc174d 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -8,6 +8,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode};
use futures::future::Future;
use crate::error::Error;
+use crate::data::rmp_to_vec_all_named;
use crate::proto::Message;
use crate::server::Garage;
@@ -28,7 +29,7 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, Message>(whole_body.into_buf())?;
- eprintln!("RPC from {}: {:?}", addr, msg);
+ eprintln!("RPC from {}: {}", addr, serde_json::to_string(&msg)?);
let sys = garage.system.clone();
let resp = err_to_msg(match &msg {
@@ -49,8 +50,10 @@ async fn handler(garage: Arc<Garage>, req: Request<Body>, addr: SocketAddr) -> R
_ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))),
});
+ eprintln!("reply to {}: {}", addr, serde_json::to_string(&resp)?);
+
Ok(Response::new(Body::from(
- rmp_serde::encode::to_vec_named(&resp)?
+ rmp_to_vec_all_named(&resp)?
)))
}
diff --git a/src/server.rs b/src/server.rs
index c41ee9b7..cc7e5dce 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -136,7 +136,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
.expect("Unable to read config file");
let mut db_path = config.metadata_dir.clone();
- db_path.push("garage_metadata");
+ db_path.push("db");
let db = sled::open(db_path)
.expect("Unable to open DB");
diff --git a/src/table.rs b/src/table.rs
index 55ae9229..6a72c8bc 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -44,7 +44,7 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> {
async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error> {
let msg = rmp_serde::decode::from_read_ref::<_, TableRPC<F>>(rpc)?;
let rep = self.table.handle(msg).await?;
- Ok(rmp_serde::encode::to_vec_named(&rep)?)
+ Ok(rmp_to_vec_all_named(&rep)?)
}
}
@@ -129,6 +129,7 @@ impl<F: TableFormat + 'static> Table<F> {
let hash = e.partition_key().hash();
let who = self.system.members.read().await
.walk_ring(&hash, self.param.replication_factor);
+ eprintln!("insert who: {:?}", who);
let rpc = &TableRPC::<F>::Update(vec![e.clone()]);
@@ -142,6 +143,7 @@ impl<F: TableFormat + 'static> Table<F> {
let hash = partition_key.hash();
let who = self.system.members.read().await
.walk_ring(&hash, self.param.replication_factor);
+ eprintln!("get who: {:?}", who);
let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self.rpc_try_call_many(&who[..],
@@ -169,7 +171,9 @@ impl<F: TableFormat + 'static> Table<F> {
}
async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> {
- let rpc_bytes = rmp_serde::encode::to_vec_named(rpc)?;
+ eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?);
+
+ let rpc_bytes = rmp_to_vec_all_named(rpc)?;
let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes);
let resps = rpc_try_call_many(self.system.clone(),
@@ -188,6 +192,7 @@ impl<F: TableFormat + 'static> Table<F> {
}
return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp)))
}
+ eprintln!("Table RPC responses: {}", serde_json::to_string(&resps_vals)?);
Ok(resps_vals)
}
@@ -228,7 +233,7 @@ impl<F: TableFormat + 'static> Table<F> {
None => None
};
- let new_bytes = rmp_serde::encode::to_vec_named(&entry)?;
+ let new_bytes = rmp_to_vec_all_named(&entry)?;
self.store.insert(&tree_key, new_bytes)?;
self.instance.updated(old_val.as_ref(), &entry).await;