aboutsummaryrefslogblamecommitdiff
path: root/src/endpoint.rs
blob: ff626d8d205d662538dfa68f634da528f6dbff01 (plain) (tree)
1
2
3
4
5
6
7
8
9





                             
                        
                      
                     

                   
                                                                     


                                                                 

                                                                       
              
                                                  


                   
                                                                              

 





                                                               
                                                                               



                                                                         






















                                                                                           








                                                                       
                                                       


                         
                                       
 
                                 







                                  
                                       


                                                                      
                                                         




                                                           
 




                                         

                                                                      


                                              

                                                                      



                                                                         

                                
                       
                                      



                                   


                                                              
                                                                                                    











                                                                   
                                                                 
                                    
                                                                                                   


                         
 


                                                                                





                                                      
                                                                            
         

 



                                                                     

                                  


                           
                                   
                             
                                                          
                               






                                                                   
                                       




                                                
                                                 
 


                           
                                   
                             
                                                           


                                                      
                                                                                 





                                                                       
                                                                    






                                                                            



                         
                                






                                                 
use std::marker::PhantomData;
use std::sync::Arc;

use arc_swap::ArcSwapOption;
use async_trait::async_trait;

use crate::error::Error;
use crate::message::*;
use crate::netapp::*;
use crate::util::*;

/// This trait should be implemented by an object of your application
/// that can handle a message of type `M`, if it wishes to handle
/// streams attached to the request and/or to send back streams
/// attached to the response..
///
/// The handler object should be in an Arc, see `Endpoint::set_handler`
#[async_trait]
pub trait StreamingEndpointHandler<M>: Send + Sync
where
	M: Message,
{
	async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>;
}

/// If one simply wants to use an endpoint in a client fashion,
/// without locally serving requests to that endpoint,
/// use the unit type `()` as the handler type:
/// it will panic if it is ever made to handle request.
#[async_trait]
impl<M: Message + 'static> EndpointHandler<M> for () {
	async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response {
		panic!("This endpoint should not have a local handler.");
	}
}

// ----

#[async_trait]
pub trait EndpointHandler<M>: Send + Sync
where
	M: Message,
{
	async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> <M as Message>::Response;
}

#[async_trait]
impl<T, M> StreamingEndpointHandler<M> for T
where
	T: EndpointHandler<M>,
	M: Message + 'static,
{
	async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M> {
		Resp::new(EndpointHandler::handle(self, m.msg(), from).await)
	}
}

// ----

/// This struct represents an endpoint for message of type `M`.
///
/// Creating a new endpoint is done by calling `NetApp::endpoint`.
/// An endpoint is identified primarily by its path, which is specified
/// at creation time.
///
/// An `Endpoint` is used both to send requests to remote nodes,
/// and to specify the handler for such requests on the local node.
/// The type `H` represents the type of the handler object for
/// endpoint messages (see `StreamingEndpointHandler`).
pub struct Endpoint<M, H>
where
	M: Message,
	H: StreamingEndpointHandler<M>,
{
	_phantom: PhantomData<M>,
	netapp: Arc<NetApp>,
	path: String,
	handler: ArcSwapOption<H>,
}

impl<M, H> Endpoint<M, H>
where
	M: Message,
	H: StreamingEndpointHandler<M>,
{
	pub(crate) fn new(netapp: Arc<NetApp>, path: String) -> Self {
		Self {
			_phantom: PhantomData::default(),
			netapp,
			path,
			handler: ArcSwapOption::from(None),
		}
	}

	/// Get the path of this endpoint
	pub fn path(&self) -> &str {
		&self.path
	}

	/// Set the object that is responsible of handling requests to
	/// this endpoint on the local node.
	pub fn set_handler(&self, h: Arc<H>) {
		self.handler.swap(Some(h));
	}

	/// Call this endpoint on a remote node (or on the local node,
	/// for that matter). This function invokes the full version that
	/// allows to attach a streaming body to the request and to
	/// receive such a body attached to the response.
	pub async fn call_streaming<T>(
		&self,
		target: &NodeID,
		req: T,
		prio: RequestPriority,
	) -> Result<Resp<M>, Error>
	where
		T: IntoReq<M>,
	{
		if *target == self.netapp.id {
			match self.handler.load_full() {
				None => Err(Error::NoHandler),
				Some(h) => Ok(h.handle(req.into_req_local(), self.netapp.id).await),
			}
		} else {
			let conn = self
				.netapp
				.client_conns
				.read()
				.unwrap()
				.get(target)
				.cloned();
			match conn {
				None => Err(Error::Message(format!(
					"Not connected: {}",
					hex::encode(&target[..8])
				))),
				Some(c) => c.call(req.into_req()?, self.path.as_str(), prio).await,
			}
		}
	}

	/// Call this endpoint on a remote node. This function is the simplified
	/// version that doesn't allow to have streams attached to the request
	/// or the response; see `call_streaming` for the full version.
	pub async fn call(
		&self,
		target: &NodeID,
		req: M,
		prio: RequestPriority,
	) -> Result<<M as Message>::Response, Error> {
		Ok(self.call_streaming(target, req, prio).await?.into_msg())
	}
}

// ---- Internal stuff ----

pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;

#[async_trait]
pub(crate) trait GenericEndpoint {
	async fn handle(
		&self,
		buf: &[u8],
		stream: ByteStream,
		from: NodeID,
	) -> Result<(Vec<u8>, Option<ByteStream>), Error>;
	fn drop_handler(&self);
	fn clone_endpoint(&self) -> DynEndpoint;
}

#[derive(Clone)]
pub(crate) struct EndpointArc<M, H>(pub(crate) Arc<Endpoint<M, H>>)
where
	M: Message,
	H: StreamingEndpointHandler<M>;

#[async_trait]
impl<M, H> GenericEndpoint for EndpointArc<M, H>
where
	M: Message + 'static,
	H: StreamingEndpointHandler<M> + 'static,
{
	async fn handle(
		&self,
		buf: &[u8],
		stream: ByteStream,
		from: NodeID,
	) -> Result<(Vec<u8>, Option<ByteStream>), Error> {
		match self.0.handler.load_full() {
			None => Err(Error::NoHandler),
			Some(h) => {
				let req = rmp_serde::decode::from_read_ref(buf)?;
				let req = Req {
					_phantom: Default::default(),
					msg: Arc::new(req),
					msg_ser: None,
					body: BodyData::Stream(stream),
				};
				let res = h.handle(req, from).await;
				let Resp {
					msg,
					body,
					_phantom,
				} = res;
				let res_bytes = rmp_to_vec_all_named(&msg)?;
				Ok((res_bytes, body.into_stream()))
			}
		}
	}

	fn drop_handler(&self) {
		self.0.handler.swap(None);
	}

	fn clone_endpoint(&self) -> DynEndpoint {
		Box::new(Self(self.0.clone()))
	}
}