aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/block.rs61
-rw-r--r--src/proto.rs2
-rw-r--r--src/rpc_server.rs5
3 files changed, 67 insertions, 1 deletions
diff --git a/src/block.rs b/src/block.rs
index e209dab6..cd570bda 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -3,6 +3,7 @@ use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
+use futures::future::*;
use futures::stream::*;
use tokio::fs;
use tokio::prelude::*;
@@ -16,6 +17,8 @@ use crate::proto::*;
use crate::rpc_client::*;
use crate::server::Garage;
+const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
+
pub struct BlockManager {
pub data_dir: PathBuf,
pub rc: sled::Tree,
@@ -102,6 +105,22 @@ impl BlockManager {
}))
}
+ pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
+ let needed = self
+ .rc
+ .get(hash.as_ref())?
+ .map(|x| u64_from_bytes(x.as_ref()) > 0)
+ .unwrap_or(false);
+ if needed {
+ let mut path = self.data_dir.clone();
+ path.push(hex::encode(hash.as_ref()));
+ let exists = fs::metadata(&path).await.is_ok();
+ Ok(!exists)
+ } else {
+ Ok(false)
+ }
+ }
+
fn block_dir(&self, hash: &Hash) -> PathBuf {
let mut path = self.data_dir.clone();
path.push(hex::encode(&hash.as_slice()[0..1]));
@@ -191,7 +210,47 @@ impl BlockManager {
.await?;
let needed_by_others = !active_refs.is_empty();
if needed_by_others {
- // TODO check they have it and send it if not
+ let ring = garage.system.ring.borrow().clone();
+ let who = ring.walk_ring(&hash, garage.system.config.data_replication_factor);
+ let msg = Message::NeedBlockQuery(hash.clone());
+ let who_needs_fut = who
+ .iter()
+ .map(|to| rpc_call(garage.system.clone(), to, &msg, NEED_BLOCK_QUERY_TIMEOUT));
+ let who_needs = join_all(who_needs_fut).await;
+
+ let mut need_nodes = vec![];
+ let mut errors = 0;
+ for (node, needed) in who.into_iter().zip(who_needs.iter()) {
+ match needed {
+ Ok(Message::NeedBlockReply(true)) => {
+ need_nodes.push(node);
+ }
+ Err(_) => {
+ errors += 1;
+ }
+ _ => (),
+ }
+ }
+
+ if errors > (garage.system.config.data_replication_factor - 1) / 2 {
+ return Err(Error::Message(format!(
+ "Should delete block, but not enough nodes confirm that they have it."
+ )));
+ }
+
+ if need_nodes.len() > 0 {
+ let put_block_message = self.read_block(hash).await?;
+ for resp in rpc_call_many(
+ garage.system.clone(),
+ &need_nodes[..],
+ put_block_message,
+ BLOCK_RW_TIMEOUT,
+ )
+ .await
+ {
+ resp?;
+ }
+ }
}
fs::remove_file(path).await?;
self.resync_queue.remove(&hash)?;
diff --git a/src/proto.rs b/src/proto.rs
index 7d8d3899..cf7ed1cc 100644
--- a/src/proto.rs
+++ b/src/proto.rs
@@ -20,6 +20,8 @@ pub enum Message {
GetBlock(Hash),
PutBlock(PutBlockMessage),
+ NeedBlockQuery(Hash),
+ NeedBlockReply(bool),
TableRPC(String, #[serde(with = "serde_bytes")] Vec<u8>),
}
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index c473a32d..3410ab97 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -66,6 +66,11 @@ async fn handler(
tokio::spawn(write_fut).await?
}
Message::GetBlock(h) => garage.block_manager.read_block(&h).await,
+ Message::NeedBlockQuery(h) => garage
+ .block_manager
+ .need_block(&h)
+ .await
+ .map(Message::NeedBlockReply),
Message::TableRPC(table, msg) => {
// Same trick for table RPCs than for PutBlock