From 5ea24254a91c3794ceb69e68c940b13f5447f40c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Feb 2024 12:55:41 +0100 Subject: [import-netapp] import Netapp code into Garage codebase --- src/net/endpoint.rs | 201 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 201 insertions(+) create mode 100644 src/net/endpoint.rs (limited to 'src/net/endpoint.rs') diff --git a/src/net/endpoint.rs b/src/net/endpoint.rs new file mode 100644 index 00000000..3cafafeb --- /dev/null +++ b/src/net/endpoint.rs @@ -0,0 +1,201 @@ +use std::marker::PhantomData; +use std::sync::Arc; + +use arc_swap::ArcSwapOption; +use async_trait::async_trait; + +use crate::error::Error; +use crate::message::*; +use crate::netapp::*; + +/// This trait should be implemented by an object of your application +/// that can handle a message of type `M`, if it wishes to handle +/// streams attached to the request and/or to send back streams +/// attached to the response.. +/// +/// The handler object should be in an Arc, see `Endpoint::set_handler` +#[async_trait] +pub trait StreamingEndpointHandler: Send + Sync +where + M: Message, +{ + async fn handle(self: &Arc, m: Req, from: NodeID) -> Resp; +} + +/// If one simply wants to use an endpoint in a client fashion, +/// without locally serving requests to that endpoint, +/// use the unit type `()` as the handler type: +/// it will panic if it is ever made to handle request. +#[async_trait] +impl EndpointHandler for () { + async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response { + panic!("This endpoint should not have a local handler."); + } +} + +// ---- + +/// This trait should be implemented by an object of your application +/// that can handle a message of type `M`, in the cases where it doesn't +/// care about attached stream in the request nor in the response. +#[async_trait] +pub trait EndpointHandler: Send + Sync +where + M: Message, +{ + async fn handle(self: &Arc, m: &M, from: NodeID) -> M::Response; +} + +#[async_trait] +impl StreamingEndpointHandler for T +where + T: EndpointHandler, + M: Message, +{ + async fn handle(self: &Arc, mut m: Req, from: NodeID) -> Resp { + // Immediately drop stream to ignore all data that comes in, + // instead of buffering it indefinitely + drop(m.take_stream()); + Resp::new(EndpointHandler::handle(self, m.msg(), from).await) + } +} + +// ---- + +/// This struct represents an endpoint for message of type `M`. +/// +/// Creating a new endpoint is done by calling `NetApp::endpoint`. +/// An endpoint is identified primarily by its path, which is specified +/// at creation time. +/// +/// An `Endpoint` is used both to send requests to remote nodes, +/// and to specify the handler for such requests on the local node. +/// The type `H` represents the type of the handler object for +/// endpoint messages (see `StreamingEndpointHandler`). +pub struct Endpoint +where + M: Message, + H: StreamingEndpointHandler, +{ + _phantom: PhantomData, + netapp: Arc, + path: String, + handler: ArcSwapOption, +} + +impl Endpoint +where + M: Message, + H: StreamingEndpointHandler, +{ + pub(crate) fn new(netapp: Arc, path: String) -> Self { + Self { + _phantom: PhantomData::default(), + netapp, + path, + handler: ArcSwapOption::from(None), + } + } + + /// Get the path of this endpoint + pub fn path(&self) -> &str { + &self.path + } + + /// Set the object that is responsible of handling requests to + /// this endpoint on the local node. + pub fn set_handler(&self, h: Arc) { + self.handler.swap(Some(h)); + } + + /// Call this endpoint on a remote node (or on the local node, + /// for that matter). This function invokes the full version that + /// allows to attach a stream to the request and to + /// receive such a stream attached to the response. + pub async fn call_streaming( + &self, + target: &NodeID, + req: T, + prio: RequestPriority, + ) -> Result, Error> + where + T: IntoReq, + { + if *target == self.netapp.id { + match self.handler.load_full() { + None => Err(Error::NoHandler), + Some(h) => Ok(h.handle(req.into_req_local(), self.netapp.id).await), + } + } else { + let conn = self + .netapp + .client_conns + .read() + .unwrap() + .get(target) + .cloned(); + match conn { + None => Err(Error::Message(format!( + "Not connected: {}", + hex::encode(&target[..8]) + ))), + Some(c) => c.call(req.into_req()?, self.path.as_str(), prio).await, + } + } + } + + /// Call this endpoint on a remote node. This function is the simplified + /// version that doesn't allow to have streams attached to the request + /// or the response; see `call_streaming` for the full version. + pub async fn call( + &self, + target: &NodeID, + req: M, + prio: RequestPriority, + ) -> Result<::Response, Error> { + Ok(self.call_streaming(target, req, prio).await?.into_msg()) + } +} + +// ---- Internal stuff ---- + +pub(crate) type DynEndpoint = Box; + +#[async_trait] +pub(crate) trait GenericEndpoint { + async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result; + fn drop_handler(&self); + fn clone_endpoint(&self) -> DynEndpoint; +} + +#[derive(Clone)] +pub(crate) struct EndpointArc(pub(crate) Arc>) +where + M: Message, + H: StreamingEndpointHandler; + +#[async_trait] +impl GenericEndpoint for EndpointArc +where + M: Message, + H: StreamingEndpointHandler + 'static, +{ + async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result { + match self.0.handler.load_full() { + None => Err(Error::NoHandler), + Some(h) => { + let req = Req::from_enc(req_enc)?; + let res = h.handle(req, from).await; + Ok(res.into_enc()?) + } + } + } + + fn drop_handler(&self) { + self.0.handler.swap(None); + } + + fn clone_endpoint(&self) -> DynEndpoint { + Box::new(Self(self.0.clone())) + } +} -- cgit v1.2.3