From 4abfb75509f216f4d62bc8b18b22eb680eefe2d9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 17 Apr 2020 19:16:08 +0200 Subject: Implement sending blocks to nodes that need them --- src/block.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- src/proto.rs | 2 ++ src/rpc_server.rs | 5 +++++ 3 files changed, 67 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/block.rs b/src/block.rs index e209dab6..cd570bda 100644 --- a/src/block.rs +++ b/src/block.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::Duration; use arc_swap::ArcSwapOption; +use futures::future::*; use futures::stream::*; use tokio::fs; use tokio::prelude::*; @@ -16,6 +17,8 @@ use crate::proto::*; use crate::rpc_client::*; use crate::server::Garage; +const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); + pub struct BlockManager { pub data_dir: PathBuf, pub rc: sled::Tree, @@ -102,6 +105,22 @@ impl BlockManager { })) } + pub async fn need_block(&self, hash: &Hash) -> Result { + let needed = self + .rc + .get(hash.as_ref())? + .map(|x| u64_from_bytes(x.as_ref()) > 0) + .unwrap_or(false); + if needed { + let mut path = self.data_dir.clone(); + path.push(hex::encode(hash.as_ref())); + let exists = fs::metadata(&path).await.is_ok(); + Ok(!exists) + } else { + Ok(false) + } + } + fn block_dir(&self, hash: &Hash) -> PathBuf { let mut path = self.data_dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); @@ -191,7 +210,47 @@ impl BlockManager { .await?; let needed_by_others = !active_refs.is_empty(); if needed_by_others { - // TODO check they have it and send it if not + let ring = garage.system.ring.borrow().clone(); + let who = ring.walk_ring(&hash, garage.system.config.data_replication_factor); + let msg = Message::NeedBlockQuery(hash.clone()); + let who_needs_fut = who + .iter() + .map(|to| rpc_call(garage.system.clone(), to, &msg, NEED_BLOCK_QUERY_TIMEOUT)); + let who_needs = join_all(who_needs_fut).await; + + let mut need_nodes = vec![]; + let mut errors = 0; + for (node, needed) in who.into_iter().zip(who_needs.iter()) { + match needed { + Ok(Message::NeedBlockReply(true)) => { + need_nodes.push(node); + } + Err(_) => { + errors += 1; + } + _ => (), + } + } + + if errors > (garage.system.config.data_replication_factor - 1) / 2 { + return Err(Error::Message(format!( + "Should delete block, but not enough nodes confirm that they have it." + ))); + } + + if need_nodes.len() > 0 { + let put_block_message = self.read_block(hash).await?; + for resp in rpc_call_many( + garage.system.clone(), + &need_nodes[..], + put_block_message, + BLOCK_RW_TIMEOUT, + ) + .await + { + resp?; + } + } } fs::remove_file(path).await?; self.resync_queue.remove(&hash)?; diff --git a/src/proto.rs b/src/proto.rs index 7d8d3899..cf7ed1cc 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -20,6 +20,8 @@ pub enum Message { GetBlock(Hash), PutBlock(PutBlockMessage), + NeedBlockQuery(Hash), + NeedBlockReply(bool), TableRPC(String, #[serde(with = "serde_bytes")] Vec), } diff --git a/src/rpc_server.rs b/src/rpc_server.rs index c473a32d..3410ab97 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -66,6 +66,11 @@ async fn handler( tokio::spawn(write_fut).await? } Message::GetBlock(h) => garage.block_manager.read_block(&h).await, + Message::NeedBlockQuery(h) => garage + .block_manager + .need_block(&h) + .await + .map(Message::NeedBlockReply), Message::TableRPC(table, msg) => { // Same trick for table RPCs than for PutBlock -- cgit v1.2.3