diff options
Diffstat (limited to 'src/block.rs')
-rw-r--r-- | src/block.rs | 143 |
1 files changed, 92 insertions, 51 deletions
diff --git a/src/block.rs b/src/block.rs index 6add24b7..879cff2c 100644 --- a/src/block.rs +++ b/src/block.rs @@ -5,6 +5,7 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use futures::future::*; use futures::stream::*; +use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::prelude::*; use tokio::sync::{watch, Mutex}; @@ -15,22 +16,40 @@ use crate::error::Error; use crate::membership::System; use crate::proto::*; use crate::rpc_client::*; +use crate::rpc_server::*; use crate::server::Garage; const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); +#[derive(Debug, Serialize, Deserialize)] +pub enum Message { + Ok, + GetBlock(Hash), + PutBlock(PutBlockMessage), + NeedBlockQuery(Hash), + NeedBlockReply(bool), +} + +impl RpcMessage for Message {} + pub struct BlockManager { pub data_dir: PathBuf, pub rc: sled::Tree, pub resync_queue: sled::Tree, pub lock: Mutex<()>, pub system: Arc<System>, + rpc_client: Arc<RpcClient<Message>>, pub garage: ArcSwapOption<Garage>, } impl BlockManager { - pub fn new(db: &sled::Db, data_dir: PathBuf, system: Arc<System>) -> Arc<Self> { + pub fn new( + db: &sled::Db, + data_dir: PathBuf, + system: Arc<System>, + rpc_server: &mut RpcServer, + ) -> Arc<Self> { let rc = db .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); @@ -40,14 +59,38 @@ impl BlockManager { .open_tree("block_local_resync_queue") .expect("Unable to open block_local_resync_queue tree"); - Arc::new(Self { + let rpc_path = "block_manager"; + let rpc_client = system.rpc_client::<Message>(rpc_path); + + let block_manager = Arc::new(Self { rc, resync_queue, data_dir, lock: Mutex::new(()), system, + rpc_client, garage: ArcSwapOption::from(None), - }) + }); + block_manager + .clone() + .register_handler(rpc_server, rpc_path.into()); + block_manager + } + + fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) { + rpc_server.add_handler::<Message, _, _>(path, move |msg, _addr| { + let self2 = self.clone(); + async move { + match msg { + Message::PutBlock(m) => self2.write_block(&m.hash, &m.data).await, + Message::GetBlock(h) => self2.read_block(&h).await, + Message::NeedBlockQuery(h) => { + self2.need_block(&h).await.map(Message::NeedBlockReply) + } + _ => Err(Error::Message(format!("Invalid RPC"))), + } + } + }); } pub async fn spawn_background_worker(self: Arc<Self>) { @@ -214,10 +257,11 @@ impl BlockManager { if needed_by_others { let ring = garage.system.ring.borrow().clone(); let who = ring.walk_ring(&hash, garage.system.config.data_replication_factor); - let msg = Message::NeedBlockQuery(hash.clone()); - let who_needs_fut = who - .iter() - .map(|to| rpc_call(garage.system.clone(), to, &msg, NEED_BLOCK_QUERY_TIMEOUT)); + let msg = Arc::new(Message::NeedBlockQuery(hash.clone())); + let who_needs_fut = who.iter().map(|to| { + self.rpc_client + .call(to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT) + }); let who_needs = join_all(who_needs_fut).await; let mut need_nodes = vec![]; @@ -242,13 +286,10 @@ impl BlockManager { if need_nodes.len() > 0 { let put_block_message = self.read_block(hash).await?; - let put_responses = rpc_call_many( - garage.system.clone(), - &need_nodes[..], - put_block_message, - BLOCK_RW_TIMEOUT, - ) - .await; + let put_responses = self + .rpc_client + .call_many(&need_nodes[..], put_block_message, BLOCK_RW_TIMEOUT) + .await; for resp in put_responses { resp?; } @@ -262,12 +303,48 @@ impl BlockManager { // TODO find a way to not do this if they are sending it to us // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // between the RC being incremented and this part being called. - let block_data = rpc_get_block(&self.system, &hash).await?; + let block_data = self.rpc_get_block(&hash).await?; self.write_block(hash, &block_data[..]).await?; } Ok(()) } + + pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.system.config.data_replication_factor); + let msg = Arc::new(Message::GetBlock(hash.clone())); + let mut resp_stream = who + .iter() + .map(|to| self.rpc_client.call(to, msg.clone(), BLOCK_RW_TIMEOUT)) + .collect::<FuturesUnordered<_>>(); + + while let Some(resp) = resp_stream.next().await { + if let Ok(Message::PutBlock(msg)) = resp { + if data::hash(&msg.data[..]) == *hash { + return Ok(msg.data); + } + } + } + Err(Error::Message(format!( + "Unable to read block {:?}: no valid blocks returned", + hash + ))) + } + + pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { + let ring = self.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, self.system.config.data_replication_factor); + self.rpc_client + .try_call_many( + &who[..], + Message::PutBlock(PutBlockMessage { hash, data }), + (self.system.config.data_replication_factor + 1) / 2, + BLOCK_RW_TIMEOUT, + ) + .await?; + Ok(()) + } } fn u64_from_bytes(bytes: &[u8]) -> u64 { @@ -297,39 +374,3 @@ fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> { Some(u64::to_be_bytes(new).to_vec()) } } - -pub async fn rpc_get_block(system: &Arc<System>, hash: &Hash) -> Result<Vec<u8>, Error> { - let ring = system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, system.config.data_replication_factor); - let msg = Message::GetBlock(hash.clone()); - let mut resp_stream = who - .iter() - .map(|to| rpc_call(system.clone(), to, &msg, BLOCK_RW_TIMEOUT)) - .collect::<FuturesUnordered<_>>(); - - while let Some(resp) = resp_stream.next().await { - if let Ok(Message::PutBlock(msg)) = resp { - if data::hash(&msg.data[..]) == *hash { - return Ok(msg.data); - } - } - } - Err(Error::Message(format!( - "Unable to read block {:?}: no valid blocks returned", - hash - ))) -} - -pub async fn rpc_put_block(system: &Arc<System>, hash: Hash, data: Vec<u8>) -> Result<(), Error> { - let ring = system.ring.borrow().clone(); - let who = ring.walk_ring(&hash, system.config.data_replication_factor); - rpc_try_call_many( - system.clone(), - &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), - (system.config.data_replication_factor + 1) / 2, - BLOCK_RW_TIMEOUT, - ) - .await?; - Ok(()) -} |