aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/membership.rs18
-rw-r--r--src/rpc/ring.rs10
-rw-r--r--src/rpc/rpc_client.rs38
-rw-r--r--src/rpc/rpc_server.rs2
5 files changed, 34 insertions, 35 deletions
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index e787833c..96561d0e 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -1,4 +1,3 @@
-#![allow(clippy::upper_case_acronyms)]
//! Crate containing rpc related functions and types used in Garage
#[macro_use]
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index ce4029f1..da7dcf8f 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -56,7 +56,7 @@ impl RpcMessage for Message {}
/// A ping, containing informations about status and config
#[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage {
- id: UUID,
+ id: Uuid,
rpc_port: u16,
status_hash: Hash,
@@ -69,7 +69,7 @@ pub struct PingMessage {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode {
/// Id of the node this advertisement relates to
- pub id: UUID,
+ pub id: Uuid,
/// IP and port of the node
pub addr: SocketAddr,
@@ -84,7 +84,7 @@ pub struct AdvertisedNode {
/// This node's membership manager
pub struct System {
/// The id of this node
- pub id: UUID,
+ pub id: Uuid,
persist_config: Persister<NetworkConfig>,
persist_status: Persister<Vec<AdvertisedNode>>,
@@ -114,7 +114,7 @@ struct Updaters {
#[derive(Debug, Clone)]
pub struct Status {
/// Mapping of each node id to its known status
- pub nodes: HashMap<UUID, Arc<StatusEntry>>,
+ pub nodes: HashMap<Uuid, Arc<StatusEntry>>,
/// Hash of `nodes`, used to detect when nodes have different views of the cluster
pub hash: Hash,
}
@@ -198,7 +198,7 @@ impl Status {
}
}
-fn gen_node_id(metadata_dir: &Path) -> Result<UUID, Error> {
+fn gen_node_id(metadata_dir: &Path) -> Result<Uuid, Error> {
let mut id_file = metadata_dir.to_path_buf();
id_file.push("node_id");
if id_file.as_path().exists() {
@@ -301,7 +301,7 @@ impl System {
Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await,
Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await,
- _ => Err(Error::BadRPC("Unexpected RPC message".to_string())),
+ _ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
}
}
});
@@ -369,7 +369,7 @@ impl System {
});
}
- async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
+ async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<Uuid>)>) {
let ping_msg = self.make_ping();
let ping_resps = join_all(peers.iter().map(|(addr, id_option)| {
let sys = self.clone();
@@ -640,7 +640,7 @@ impl System {
#[allow(clippy::manual_async_fn)]
fn pull_status(
self: Arc<Self>,
- peer: UUID,
+ peer: Uuid,
) -> impl futures::future::Future<Output = ()> + Send + 'static {
async move {
let resp = self
@@ -653,7 +653,7 @@ impl System {
}
}
- async fn pull_config(self: Arc<Self>, peer: UUID) {
+ async fn pull_config(self: Arc<Self>, peer: Uuid) {
let resp = self
.rpc_client
.call(peer, Message::PullConfig, PING_TIMEOUT)
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index d371bb64..0f94d0f6 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -32,7 +32,7 @@ pub const MAX_REPLICATION: usize = 3;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig {
/// Map of each node's id to it's configuration
- pub members: HashMap<UUID, NetworkConfigEntry>,
+ pub members: HashMap<Uuid, NetworkConfigEntry>,
/// Version of this config
pub version: u64,
}
@@ -73,7 +73,7 @@ pub struct RingEntry {
/// The prefix of the Hash of object which should use this entry
pub location: Hash,
/// The nodes in which a matching object should get stored
- pub nodes: [UUID; MAX_REPLICATION],
+ pub nodes: [Uuid; MAX_REPLICATION],
}
impl Ring {
@@ -92,7 +92,7 @@ impl Ring {
let n_datacenters = datacenters.len();
// Prepare ring
- let mut partitions: Vec<Vec<(&UUID, &NetworkConfigEntry)>> = partitions_idx
+ let mut partitions: Vec<Vec<(&Uuid, &NetworkConfigEntry)>> = partitions_idx
.iter()
.map(|_i| Vec::new())
.collect::<Vec<_>>();
@@ -180,7 +180,7 @@ impl Ring {
let top = (i as u16) << (16 - PARTITION_BITS);
let mut hash = [0u8; 32];
hash[0..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
- let nodes = nodes.iter().map(|(id, _info)| **id).collect::<Vec<UUID>>();
+ let nodes = nodes.iter().map(|(id, _info)| **id).collect::<Vec<Uuid>>();
RingEntry {
location: hash.into(),
nodes: nodes.try_into().unwrap(),
@@ -213,7 +213,7 @@ impl Ring {
// TODO rename this function as it no longer walk the ring
/// Walk the ring to find the n servers in which data should be replicated
- pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
+ pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<Uuid> {
if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!");
return vec![];
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index f68e4c03..5ed43d44 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -19,7 +19,7 @@ use tokio::sync::{watch, Semaphore};
use garage_util::background::BackgroundRunner;
use garage_util::config::TlsConfig;
use garage_util::data::*;
-use garage_util::error::{Error, RPCError};
+use garage_util::error::{Error, RpcError};
use crate::membership::Status;
use crate::rpc_server::RpcMessage;
@@ -70,7 +70,7 @@ pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>,
- local_handler: ArcSwapOption<(UUID, LocalHandlerFn<M>)>,
+ local_handler: ArcSwapOption<(Uuid, LocalHandlerFn<M>)>,
rpc_addr_client: RpcAddrClient<M>,
}
@@ -91,7 +91,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
/// Set the local handler, to process RPC to this node without network usage
- pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
+ pub fn set_local_handler<F, Fut>(&self, my_id: Uuid, handler: F)
where
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<M, Error>> + Send + 'static,
@@ -110,12 +110,12 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
/// Make a RPC call
- pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
+ pub async fn call(&self, to: Uuid, msg: M, timeout: Duration) -> Result<M, Error> {
self.call_arc(to, Arc::new(msg), timeout).await
}
/// Make a RPC call from a message stored in an Arc
- pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
+ pub async fn call_arc(&self, to: Uuid, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
if let Some(lh) = self.local_handler.load_full() {
let (my_id, local_handler) = lh.as_ref();
if to.borrow() == my_id {
@@ -128,7 +128,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
if node_status.is_up() {
node_status
} else {
- return Err(Error::from(RPCError::NodeDown(to)));
+ return Err(Error::from(RpcError::NodeDown(to)));
}
}
None => {
@@ -152,7 +152,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
/// Make a RPC call to multiple servers, returning a Vec containing each result
- pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
+ pub async fn call_many(&self, to: &[Uuid], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
let msg = Arc::new(msg);
let mut resp_stream = to
.iter()
@@ -170,7 +170,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
/// strategy could not be respected due to too many errors
pub async fn try_call_many(
self: &Arc<Self>,
- to: &[UUID],
+ to: &[Uuid],
msg: M,
strategy: RequestStrategy,
) -> Result<Vec<M>, Error> {
@@ -222,7 +222,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
Ok(results)
} else {
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- Err(Error::from(RPCError::TooManyErrors(errors)))
+ Err(Error::from(RpcError::TooManyErrors(errors)))
}
}
}
@@ -251,7 +251,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
to_addr: &SocketAddr,
msg: MB,
timeout: Duration,
- ) -> Result<Result<M, Error>, RPCError>
+ ) -> Result<Result<M, Error>, RpcError>
where
MB: Borrow<M>,
{
@@ -268,8 +268,8 @@ pub struct RpcHttpClient {
}
enum ClientMethod {
- HTTP(Client<HttpConnector, hyper::Body>),
- HTTPS(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>),
+ Http(Client<HttpConnector, hyper::Body>),
+ Https(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>),
}
impl RpcHttpClient {
@@ -294,9 +294,9 @@ impl RpcHttpClient {
let connector =
tls_util::HttpsConnectorFixedDnsname::<HttpConnector>::new(config, "garage");
- ClientMethod::HTTPS(Client::builder().build(connector))
+ ClientMethod::Https(Client::builder().build(connector))
} else {
- ClientMethod::HTTP(Client::new())
+ ClientMethod::Http(Client::new())
};
Ok(RpcHttpClient {
method,
@@ -311,14 +311,14 @@ impl RpcHttpClient {
to_addr: &SocketAddr,
msg: MB,
timeout: Duration,
- ) -> Result<Result<M, Error>, RPCError>
+ ) -> Result<Result<M, Error>, RpcError>
where
MB: Borrow<M>,
M: RpcMessage,
{
let uri = match self.method {
- ClientMethod::HTTP(_) => format!("http://{}/{}", to_addr, path),
- ClientMethod::HTTPS(_) => format!("https://{}/{}", to_addr, path),
+ ClientMethod::Http(_) => format!("http://{}/{}", to_addr, path),
+ ClientMethod::Https(_) => format!("https://{}/{}", to_addr, path),
};
let req = Request::builder()
@@ -327,8 +327,8 @@ impl RpcHttpClient {
.body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?;
let resp_fut = match &self.method {
- ClientMethod::HTTP(client) => client.request(req).fuse(),
- ClientMethod::HTTPS(client) => client.request(req).fuse(),
+ ClientMethod::Http(client) => client.request(req).fuse(),
+ ClientMethod::Https(client) => client.request(req).fuse(),
};
trace!("({}) Acquiring request_limiter slot...", path);
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 55d97170..81361ab9 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -77,7 +77,7 @@ where
let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?;
let mut err_response = Response::new(Body::from(rep_bytes));
*err_response.status_mut() = match e {
- Error::BadRPC(_) => StatusCode::BAD_REQUEST,
+ Error::BadRpc(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
warn!(