aboutsummaryrefslogtreecommitdiff
path: root/src/rpc_client.rs
blob: 7587782eccee61e8bb9ad24454a9a31d2d5cbc64 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use bytes::IntoBuf;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use hyper::client::Client;
use hyper::{Body, Method, Request, StatusCode};

use crate::data::*;
use crate::error::Error;
use crate::membership::System;
use crate::proto::Message;

pub async fn rpc_call_many(
	sys: Arc<System>,
	to: &[UUID],
	msg: &Message,
	timeout: Duration,
) -> Vec<Result<Message, Error>> {
	let mut resp_stream = to
		.iter()
		.map(|to| rpc_call(sys.clone(), to, msg, timeout))
		.collect::<FuturesUnordered<_>>();

	let mut results = vec![];
	while let Some(resp) = resp_stream.next().await {
		results.push(resp);
	}
	results
}

pub async fn rpc_try_call_many(
	sys: Arc<System>,
	to: &[UUID],
	msg: &Message,
	stop_after: usize,
	timeout: Duration,
) -> Result<Vec<Message>, Error> {
	let mut resp_stream = to
		.iter()
		.map(|to| rpc_call(sys.clone(), to, msg, timeout))
		.collect::<FuturesUnordered<_>>();

	let mut results = vec![];
	let mut errors = vec![];

	while let Some(resp) = resp_stream.next().await {
		match resp {
			Ok(msg) => {
				results.push(msg);
				if results.len() >= stop_after {
					break;
				}
			}
			Err(e) => {
				errors.push(e);
			}
		}
	}

	if results.len() >= stop_after {
		Ok(results)
	} else {
		let mut msg = "Too many failures:".to_string();
		for e in errors {
			msg += &format!("\n{}", e);
		}
		Err(Error::Message(msg))
	}
}

pub async fn rpc_call(
	sys: Arc<System>,
	to: &UUID,
	msg: &Message,
	timeout: Duration,
) -> Result<Message, Error> {
	let addr = {
		let members = sys.members.read().await;
		match members.status.get(to) {
			Some(status) => status.addr.clone(),
			None => return Err(Error::Message(format!("Peer ID not found"))),
		}
	};
	sys.rpc_client.call(&addr, msg, timeout).await
}

pub struct RpcClient {
	pub client: Client<hyper::client::HttpConnector, hyper::Body>,
}

impl RpcClient {
	pub fn new() -> Self {
		RpcClient {
			client: Client::new(),
		}
	}

	pub async fn call(
		&self,
		to_addr: &SocketAddr,
		msg: &Message,
		timeout: Duration,
	) -> Result<Message, Error> {
		let uri = format!("http://{}/rpc", to_addr);
		let req = Request::builder()
			.method(Method::POST)
			.uri(uri)
			.body(Body::from(rmp_to_vec_all_named(msg)?))?;

		let resp_fut = self.client.request(req).fuse();
		let resp = tokio::time::timeout(timeout, resp_fut).await??;

		if resp.status() == StatusCode::OK {
			let body = hyper::body::to_bytes(resp.into_body()).await?;
			let msg = rmp_serde::decode::from_read::<_, Message>(body.into_buf())?;
			match msg {
				Message::Error(e) => Err(Error::RPCError(e)),
				x => Ok(x),
			}
		} else {
			Err(Error::RPCError(format!("Status code {}", resp.status())))
		}
	}
}