diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/lib.rs | 1 | ||||
-rw-r--r-- | src/rpc/membership.rs | 18 | ||||
-rw-r--r-- | src/rpc/ring.rs | 10 | ||||
-rw-r--r-- | src/rpc/rpc_client.rs | 38 | ||||
-rw-r--r-- | src/rpc/rpc_server.rs | 2 |
5 files changed, 34 insertions, 35 deletions
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index e787833c..96561d0e 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -1,4 +1,3 @@ -#![allow(clippy::upper_case_acronyms)] //! Crate containing rpc related functions and types used in Garage #[macro_use] diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index ce4029f1..da7dcf8f 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -56,7 +56,7 @@ impl RpcMessage for Message {} /// A ping, containing informations about status and config #[derive(Debug, Serialize, Deserialize)] pub struct PingMessage { - id: UUID, + id: Uuid, rpc_port: u16, status_hash: Hash, @@ -69,7 +69,7 @@ pub struct PingMessage { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AdvertisedNode { /// Id of the node this advertisement relates to - pub id: UUID, + pub id: Uuid, /// IP and port of the node pub addr: SocketAddr, @@ -84,7 +84,7 @@ pub struct AdvertisedNode { /// This node's membership manager pub struct System { /// The id of this node - pub id: UUID, + pub id: Uuid, persist_config: Persister<NetworkConfig>, persist_status: Persister<Vec<AdvertisedNode>>, @@ -114,7 +114,7 @@ struct Updaters { #[derive(Debug, Clone)] pub struct Status { /// Mapping of each node id to its known status - pub nodes: HashMap<UUID, Arc<StatusEntry>>, + pub nodes: HashMap<Uuid, Arc<StatusEntry>>, /// Hash of `nodes`, used to detect when nodes have different views of the cluster pub hash: Hash, } @@ -198,7 +198,7 @@ impl Status { } } -fn gen_node_id(metadata_dir: &Path) -> Result<UUID, Error> { +fn gen_node_id(metadata_dir: &Path) -> Result<Uuid, Error> { let mut id_file = metadata_dir.to_path_buf(); id_file.push("node_id"); if id_file.as_path().exists() { @@ -301,7 +301,7 @@ impl System { Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await, Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await, - _ => Err(Error::BadRPC("Unexpected RPC message".to_string())), + _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), } } }); @@ -369,7 +369,7 @@ impl System { }); } - async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) { + async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<Uuid>)>) { let ping_msg = self.make_ping(); let ping_resps = join_all(peers.iter().map(|(addr, id_option)| { let sys = self.clone(); @@ -640,7 +640,7 @@ impl System { #[allow(clippy::manual_async_fn)] fn pull_status( self: Arc<Self>, - peer: UUID, + peer: Uuid, ) -> impl futures::future::Future<Output = ()> + Send + 'static { async move { let resp = self @@ -653,7 +653,7 @@ impl System { } } - async fn pull_config(self: Arc<Self>, peer: UUID) { + async fn pull_config(self: Arc<Self>, peer: Uuid) { let resp = self .rpc_client .call(peer, Message::PullConfig, PING_TIMEOUT) diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index d371bb64..0f94d0f6 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -32,7 +32,7 @@ pub const MAX_REPLICATION: usize = 3; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfig { /// Map of each node's id to it's configuration - pub members: HashMap<UUID, NetworkConfigEntry>, + pub members: HashMap<Uuid, NetworkConfigEntry>, /// Version of this config pub version: u64, } @@ -73,7 +73,7 @@ pub struct RingEntry { /// The prefix of the Hash of object which should use this entry pub location: Hash, /// The nodes in which a matching object should get stored - pub nodes: [UUID; MAX_REPLICATION], + pub nodes: [Uuid; MAX_REPLICATION], } impl Ring { @@ -92,7 +92,7 @@ impl Ring { let n_datacenters = datacenters.len(); // Prepare ring - let mut partitions: Vec<Vec<(&UUID, &NetworkConfigEntry)>> = partitions_idx + let mut partitions: Vec<Vec<(&Uuid, &NetworkConfigEntry)>> = partitions_idx .iter() .map(|_i| Vec::new()) .collect::<Vec<_>>(); @@ -180,7 +180,7 @@ impl Ring { let top = (i as u16) << (16 - PARTITION_BITS); let mut hash = [0u8; 32]; hash[0..2].copy_from_slice(&u16::to_be_bytes(top)[..]); - let nodes = nodes.iter().map(|(id, _info)| **id).collect::<Vec<UUID>>(); + let nodes = nodes.iter().map(|(id, _info)| **id).collect::<Vec<Uuid>>(); RingEntry { location: hash.into(), nodes: nodes.try_into().unwrap(), @@ -213,7 +213,7 @@ impl Ring { // TODO rename this function as it no longer walk the ring /// Walk the ring to find the n servers in which data should be replicated - pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { + pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<Uuid> { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); return vec![]; diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index f68e4c03..5ed43d44 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -19,7 +19,7 @@ use tokio::sync::{watch, Semaphore}; use garage_util::background::BackgroundRunner; use garage_util::config::TlsConfig; use garage_util::data::*; -use garage_util::error::{Error, RPCError}; +use garage_util::error::{Error, RpcError}; use crate::membership::Status; use crate::rpc_server::RpcMessage; @@ -70,7 +70,7 @@ pub struct RpcClient<M: RpcMessage> { status: watch::Receiver<Arc<Status>>, background: Arc<BackgroundRunner>, - local_handler: ArcSwapOption<(UUID, LocalHandlerFn<M>)>, + local_handler: ArcSwapOption<(Uuid, LocalHandlerFn<M>)>, rpc_addr_client: RpcAddrClient<M>, } @@ -91,7 +91,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } /// Set the local handler, to process RPC to this node without network usage - pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F) + pub fn set_local_handler<F, Fut>(&self, my_id: Uuid, handler: F) where F: Fn(Arc<M>) -> Fut + Send + Sync + 'static, Fut: Future<Output = Result<M, Error>> + Send + 'static, @@ -110,12 +110,12 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } /// Make a RPC call - pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> { + pub async fn call(&self, to: Uuid, msg: M, timeout: Duration) -> Result<M, Error> { self.call_arc(to, Arc::new(msg), timeout).await } /// Make a RPC call from a message stored in an Arc - pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> { + pub async fn call_arc(&self, to: Uuid, msg: Arc<M>, timeout: Duration) -> Result<M, Error> { if let Some(lh) = self.local_handler.load_full() { let (my_id, local_handler) = lh.as_ref(); if to.borrow() == my_id { @@ -128,7 +128,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { if node_status.is_up() { node_status } else { - return Err(Error::from(RPCError::NodeDown(to))); + return Err(Error::from(RpcError::NodeDown(to))); } } None => { @@ -152,7 +152,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } /// Make a RPC call to multiple servers, returning a Vec containing each result - pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> { + pub async fn call_many(&self, to: &[Uuid], msg: M, timeout: Duration) -> Vec<Result<M, Error>> { let msg = Arc::new(msg); let mut resp_stream = to .iter() @@ -170,7 +170,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { /// strategy could not be respected due to too many errors pub async fn try_call_many( self: &Arc<Self>, - to: &[UUID], + to: &[Uuid], msg: M, strategy: RequestStrategy, ) -> Result<Vec<M>, Error> { @@ -222,7 +222,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { Ok(results) } else { let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>(); - Err(Error::from(RPCError::TooManyErrors(errors))) + Err(Error::from(RpcError::TooManyErrors(errors))) } } } @@ -251,7 +251,7 @@ impl<M: RpcMessage> RpcAddrClient<M> { to_addr: &SocketAddr, msg: MB, timeout: Duration, - ) -> Result<Result<M, Error>, RPCError> + ) -> Result<Result<M, Error>, RpcError> where MB: Borrow<M>, { @@ -268,8 +268,8 @@ pub struct RpcHttpClient { } enum ClientMethod { - HTTP(Client<HttpConnector, hyper::Body>), - HTTPS(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>), + Http(Client<HttpConnector, hyper::Body>), + Https(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>), } impl RpcHttpClient { @@ -294,9 +294,9 @@ impl RpcHttpClient { let connector = tls_util::HttpsConnectorFixedDnsname::<HttpConnector>::new(config, "garage"); - ClientMethod::HTTPS(Client::builder().build(connector)) + ClientMethod::Https(Client::builder().build(connector)) } else { - ClientMethod::HTTP(Client::new()) + ClientMethod::Http(Client::new()) }; Ok(RpcHttpClient { method, @@ -311,14 +311,14 @@ impl RpcHttpClient { to_addr: &SocketAddr, msg: MB, timeout: Duration, - ) -> Result<Result<M, Error>, RPCError> + ) -> Result<Result<M, Error>, RpcError> where MB: Borrow<M>, M: RpcMessage, { let uri = match self.method { - ClientMethod::HTTP(_) => format!("http://{}/{}", to_addr, path), - ClientMethod::HTTPS(_) => format!("https://{}/{}", to_addr, path), + ClientMethod::Http(_) => format!("http://{}/{}", to_addr, path), + ClientMethod::Https(_) => format!("https://{}/{}", to_addr, path), }; let req = Request::builder() @@ -327,8 +327,8 @@ impl RpcHttpClient { .body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?; let resp_fut = match &self.method { - ClientMethod::HTTP(client) => client.request(req).fuse(), - ClientMethod::HTTPS(client) => client.request(req).fuse(), + ClientMethod::Http(client) => client.request(req).fuse(), + ClientMethod::Https(client) => client.request(req).fuse(), }; trace!("({}) Acquiring request_limiter slot...", path); diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 55d97170..81361ab9 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -77,7 +77,7 @@ where let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?; let mut err_response = Response::new(Body::from(rep_bytes)); *err_response.status_mut() = match e { - Error::BadRPC(_) => StatusCode::BAD_REQUEST, + Error::BadRpc(_) => StatusCode::BAD_REQUEST, _ => StatusCode::INTERNAL_SERVER_ERROR, }; warn!( |