aboutsummaryrefslogtreecommitdiff
path: root/src/block.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block.rs')
-rw-r--r--src/block.rs143
1 files changed, 92 insertions, 51 deletions
diff --git a/src/block.rs b/src/block.rs
index 6add24b7..879cff2c 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -5,6 +5,7 @@ use std::time::Duration;
use arc_swap::ArcSwapOption;
use futures::future::*;
use futures::stream::*;
+use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::prelude::*;
use tokio::sync::{watch, Mutex};
@@ -15,22 +16,40 @@ use crate::error::Error;
use crate::membership::System;
use crate::proto::*;
use crate::rpc_client::*;
+use crate::rpc_server::*;
use crate::server::Garage;
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
+#[derive(Debug, Serialize, Deserialize)]
+pub enum Message {
+ Ok,
+ GetBlock(Hash),
+ PutBlock(PutBlockMessage),
+ NeedBlockQuery(Hash),
+ NeedBlockReply(bool),
+}
+
+impl RpcMessage for Message {}
+
pub struct BlockManager {
pub data_dir: PathBuf,
pub rc: sled::Tree,
pub resync_queue: sled::Tree,
pub lock: Mutex<()>,
pub system: Arc<System>,
+ rpc_client: Arc<RpcClient<Message>>,
pub garage: ArcSwapOption<Garage>,
}
impl BlockManager {
- pub fn new(db: &sled::Db, data_dir: PathBuf, system: Arc<System>) -> Arc<Self> {
+ pub fn new(
+ db: &sled::Db,
+ data_dir: PathBuf,
+ system: Arc<System>,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
@@ -40,14 +59,38 @@ impl BlockManager {
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
- Arc::new(Self {
+ let rpc_path = "block_manager";
+ let rpc_client = system.rpc_client::<Message>(rpc_path);
+
+ let block_manager = Arc::new(Self {
rc,
resync_queue,
data_dir,
lock: Mutex::new(()),
system,
+ rpc_client,
garage: ArcSwapOption::from(None),
- })
+ });
+ block_manager
+ .clone()
+ .register_handler(rpc_server, rpc_path.into());
+ block_manager
+ }
+
+ fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ rpc_server.add_handler::<Message, _, _>(path, move |msg, _addr| {
+ let self2 = self.clone();
+ async move {
+ match msg {
+ Message::PutBlock(m) => self2.write_block(&m.hash, &m.data).await,
+ Message::GetBlock(h) => self2.read_block(&h).await,
+ Message::NeedBlockQuery(h) => {
+ self2.need_block(&h).await.map(Message::NeedBlockReply)
+ }
+ _ => Err(Error::Message(format!("Invalid RPC"))),
+ }
+ }
+ });
}
pub async fn spawn_background_worker(self: Arc<Self>) {
@@ -214,10 +257,11 @@ impl BlockManager {
if needed_by_others {
let ring = garage.system.ring.borrow().clone();
let who = ring.walk_ring(&hash, garage.system.config.data_replication_factor);
- let msg = Message::NeedBlockQuery(hash.clone());
- let who_needs_fut = who
- .iter()
- .map(|to| rpc_call(garage.system.clone(), to, &msg, NEED_BLOCK_QUERY_TIMEOUT));
+ let msg = Arc::new(Message::NeedBlockQuery(hash.clone()));
+ let who_needs_fut = who.iter().map(|to| {
+ self.rpc_client
+ .call(to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT)
+ });
let who_needs = join_all(who_needs_fut).await;
let mut need_nodes = vec![];
@@ -242,13 +286,10 @@ impl BlockManager {
if need_nodes.len() > 0 {
let put_block_message = self.read_block(hash).await?;
- let put_responses = rpc_call_many(
- garage.system.clone(),
- &need_nodes[..],
- put_block_message,
- BLOCK_RW_TIMEOUT,
- )
- .await;
+ let put_responses = self
+ .rpc_client
+ .call_many(&need_nodes[..], put_block_message, BLOCK_RW_TIMEOUT)
+ .await;
for resp in put_responses {
resp?;
}
@@ -262,12 +303,48 @@ impl BlockManager {
// TODO find a way to not do this if they are sending it to us
// Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
// between the RC being incremented and this part being called.
- let block_data = rpc_get_block(&self.system, &hash).await?;
+ let block_data = self.rpc_get_block(&hash).await?;
self.write_block(hash, &block_data[..]).await?;
}
Ok(())
}
+
+ pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
+ let ring = self.system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, self.system.config.data_replication_factor);
+ let msg = Arc::new(Message::GetBlock(hash.clone()));
+ let mut resp_stream = who
+ .iter()
+ .map(|to| self.rpc_client.call(to, msg.clone(), BLOCK_RW_TIMEOUT))
+ .collect::<FuturesUnordered<_>>();
+
+ while let Some(resp) = resp_stream.next().await {
+ if let Ok(Message::PutBlock(msg)) = resp {
+ if data::hash(&msg.data[..]) == *hash {
+ return Ok(msg.data);
+ }
+ }
+ }
+ Err(Error::Message(format!(
+ "Unable to read block {:?}: no valid blocks returned",
+ hash
+ )))
+ }
+
+ pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
+ let ring = self.system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, self.system.config.data_replication_factor);
+ self.rpc_client
+ .try_call_many(
+ &who[..],
+ Message::PutBlock(PutBlockMessage { hash, data }),
+ (self.system.config.data_replication_factor + 1) / 2,
+ BLOCK_RW_TIMEOUT,
+ )
+ .await?;
+ Ok(())
+ }
}
fn u64_from_bytes(bytes: &[u8]) -> u64 {
@@ -297,39 +374,3 @@ fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
Some(u64::to_be_bytes(new).to_vec())
}
}
-
-pub async fn rpc_get_block(system: &Arc<System>, hash: &Hash) -> Result<Vec<u8>, Error> {
- let ring = system.ring.borrow().clone();
- let who = ring.walk_ring(&hash, system.config.data_replication_factor);
- let msg = Message::GetBlock(hash.clone());
- let mut resp_stream = who
- .iter()
- .map(|to| rpc_call(system.clone(), to, &msg, BLOCK_RW_TIMEOUT))
- .collect::<FuturesUnordered<_>>();
-
- while let Some(resp) = resp_stream.next().await {
- if let Ok(Message::PutBlock(msg)) = resp {
- if data::hash(&msg.data[..]) == *hash {
- return Ok(msg.data);
- }
- }
- }
- Err(Error::Message(format!(
- "Unable to read block {:?}: no valid blocks returned",
- hash
- )))
-}
-
-pub async fn rpc_put_block(system: &Arc<System>, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
- let ring = system.ring.borrow().clone();
- let who = ring.walk_ring(&hash, system.config.data_replication_factor);
- rpc_try_call_many(
- system.clone(),
- &who[..],
- Message::PutBlock(PutBlockMessage { hash, data }),
- (system.config.data_replication_factor + 1) / 2,
- BLOCK_RW_TIMEOUT,
- )
- .await?;
- Ok(())
-}