aboutsummaryrefslogtreecommitdiff
path: root/src/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-17 15:36:16 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-17 15:40:13 +0200
commite41ce4d81528388f043c1c5e6608df45347ea70d (patch)
treeee25f06b6f7da356c53d5f0a8fc8ec9e81d4bb23 /src/block.rs
parent867646093b24a9bb7e4b24a7f2248615c6e03fde (diff)
downloadgarage-e41ce4d81528388f043c1c5e6608df45347ea70d.tar.gz
garage-e41ce4d81528388f043c1c5e6608df45347ea70d.zip
Implement getting missing blocks when RC increases
Issue: RC increases also when the block ref entry is first put by the actual client. At that point the client is probably already sending us the block content, so we don't need to do a get... We should add a delay before the task is added or find something to do.
Diffstat (limited to 'src/block.rs')
-rw-r--r--src/block.rs163
1 files changed, 140 insertions, 23 deletions
diff --git a/src/block.rs b/src/block.rs
index e50dab38..e898ad19 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -1,32 +1,52 @@
use std::path::PathBuf;
+use std::sync::Arc;
+use std::time::Duration;
-use futures_util::future::*;
+use futures::stream::*;
use tokio::fs;
use tokio::prelude::*;
-use tokio::sync::Mutex;
+use tokio::sync::{watch, Mutex};
-use crate::background::*;
+use crate::data;
use crate::data::*;
use crate::error::Error;
+use crate::membership::System;
use crate::proto::*;
+use crate::rpc_client::*;
pub struct BlockManager {
pub data_dir: PathBuf,
pub rc: sled::Tree,
+ pub resync_queue: sled::Tree,
pub lock: Mutex<()>,
+ pub system: Arc<System>,
}
impl BlockManager {
- pub fn new(db: &sled::Db, data_dir: PathBuf) -> Self {
+ pub async fn new(db: &sled::Db, data_dir: PathBuf, system: Arc<System>) -> Arc<Self> {
let rc = db
.open_tree("block_local_rc")
.expect("Unable to open block_local_rc tree");
rc.set_merge_operator(rc_merge);
- Self {
+
+ let resync_queue = db
+ .open_tree("block_local_resync_queue")
+ .expect("Unable to open block_local_resync_queue tree");
+
+ let block_manager = Arc::new(Self {
rc,
+ resync_queue,
data_dir,
lock: Mutex::new(()),
- }
+ system,
+ });
+ let bm2 = block_manager.clone();
+ block_manager
+ .system
+ .background
+ .spawn_worker(move |must_exit| bm2.resync_loop(must_exit))
+ .await;
+ block_manager
}
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
@@ -51,9 +71,18 @@ impl BlockManager {
let mut path = self.block_dir(hash);
path.push(hex::encode(hash));
- let mut f = fs::File::open(path).await?;
+ let mut f = fs::File::open(&path).await?;
let mut data = vec![];
f.read_to_end(&mut data).await?;
+ drop(f);
+
+ if data::hash(&data[..]) != *hash {
+ let _lock = self.lock.lock().await;
+ eprintln!("Block {:?} is corrupted. Deleting and resyncing.", hash);
+ fs::remove_file(path).await?;
+ self.resync_queue.insert(hash.to_vec(), vec![1u8])?;
+ return Err(Error::CorruptData(hash.clone()));
+ }
Ok(Message::PutBlock(PutBlockMessage {
hash: hash.clone(),
@@ -73,28 +102,74 @@ impl BlockManager {
Ok(())
}
- pub fn block_decref(&self, hash: &Hash, background: &BackgroundRunner) -> Result<(), Error> {
- match self.rc.merge(&hash, vec![0])? {
- None => {
- let mut path = self.block_dir(hash);
- path.push(hex::encode(hash));
- background.spawn(tokio::fs::remove_file(path).map_err(Into::into));
- Ok(())
+ pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
+ if self.rc.merge(&hash, vec![0])?.is_none() {
+ self.resync_queue.insert(hash.to_vec(), vec![1u8])?;
+ }
+ Ok(())
+ }
+
+ async fn resync_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> {
+ while !*must_exit.borrow() {
+ if let Some((hash_bytes, _v)) = self.resync_queue.get_gt(&[])? {
+ let mut hash = [0u8; 32];
+ hash.copy_from_slice(hash_bytes.as_ref());
+ let hash = Hash::from(hash);
+
+ match self.resync_iter(&hash).await {
+ Ok(_) => {
+ self.resync_queue.remove(&hash_bytes)?;
+ }
+ Err(e) => {
+ eprintln!(
+ "Failed to resync hash {:?}, leaving it in queue: {}",
+ hash, e
+ );
+ }
+ }
+ } else {
+ tokio::time::delay_for(Duration::from_secs(1)).await;
}
- Some(_) => Ok(()),
}
+ Ok(())
}
+
+ async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> {
+ let mut path = self.data_dir.clone();
+ path.push(hex::encode(hash.as_ref()));
+
+ let exists = fs::metadata(&path).await.is_ok();
+ let needed = self
+ .rc
+ .get(hash.as_ref())?
+ .map(|x| u64_from_bytes(x.as_ref()) > 0)
+ .unwrap_or(false);
+
+ if exists && !needed {
+ // TODO: verify that other nodes that might need it have it ?
+ fs::remove_file(path).await?;
+ self.resync_queue.remove(&hash)?;
+ }
+
+ if needed && !exists {
+ // TODO find a way to not do this if they are sending it to us
+ let block_data = rpc_get_block(&self.system, &hash).await?;
+ self.write_block(hash, &block_data[..]).await?;
+ }
+
+ Ok(())
+ }
+}
+
+fn u64_from_bytes(bytes: &[u8]) -> u64 {
+ assert!(bytes.len() == 8);
+ let mut x8 = [0u8; 8];
+ x8.copy_from_slice(bytes);
+ u64::from_be_bytes(x8)
}
fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
- let old = old
- .map(|x| {
- assert!(x.len() == 8);
- let mut x8 = [0u8; 8];
- x8.copy_from_slice(x);
- u64::from_be_bytes(x8)
- })
- .unwrap_or(0);
+ let old = old.map(u64_from_bytes).unwrap_or(0);
assert!(new.len() == 1);
let new = match new[0] {
0 => {
@@ -113,3 +188,45 @@ fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
Some(u64::to_be_bytes(new).to_vec())
}
}
+
+pub async fn rpc_get_block(system: &Arc<System>, hash: &Hash) -> Result<Vec<u8>, Error> {
+ let who = system
+ .ring
+ .borrow()
+ .clone()
+ .walk_ring(&hash, system.config.data_replication_factor);
+ let msg = Message::GetBlock(hash.clone());
+ let mut resp_stream = who
+ .iter()
+ .map(|to| rpc_call(system.clone(), to, &msg, BLOCK_RW_TIMEOUT))
+ .collect::<FuturesUnordered<_>>();
+
+ while let Some(resp) = resp_stream.next().await {
+ if let Ok(Message::PutBlock(msg)) = resp {
+ if data::hash(&msg.data[..]) == *hash {
+ return Ok(msg.data);
+ }
+ }
+ }
+ Err(Error::Message(format!(
+ "Unable to read block {:?}: no valid blocks returned",
+ hash
+ )))
+}
+
+pub async fn rpc_put_block(system: &Arc<System>, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
+ let who = system
+ .ring
+ .borrow()
+ .clone()
+ .walk_ring(&hash, system.config.data_replication_factor);
+ rpc_try_call_many(
+ system.clone(),
+ &who[..],
+ Message::PutBlock(PutBlockMessage { hash, data }),
+ (system.config.data_replication_factor + 1) / 2,
+ BLOCK_RW_TIMEOUT,
+ )
+ .await?;
+ Ok(())
+}