diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-11 23:00:26 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-11 23:00:26 +0200 |
commit | 5dd59e437d5af84dfa2cf5dcc2c15807b971002d (patch) | |
tree | 2debd795082e810e384f765f84217d47cd158ba3 /src/block.rs | |
parent | dcf58499a4f529e1033de65b4fca8d45458d60d2 (diff) | |
download | garage-5dd59e437d5af84dfa2cf5dcc2c15807b971002d.tar.gz garage-5dd59e437d5af84dfa2cf5dcc2c15807b971002d.zip |
Local refcounting of blocks
Diffstat (limited to 'src/block.rs')
-rw-r--r-- | src/block.rs | 122 |
1 files changed, 93 insertions, 29 deletions
diff --git a/src/block.rs b/src/block.rs index 99a0121f..e78ddd78 100644 --- a/src/block.rs +++ b/src/block.rs @@ -1,49 +1,113 @@ use std::path::PathBuf; -use std::sync::Arc; use tokio::fs; use tokio::prelude::*; +use tokio::sync::Mutex; +use futures_util::future::*; use crate::data::*; use crate::error::Error; use crate::proto::*; -use crate::server::Garage; +use crate::background::*; -fn block_dir(garage: &Garage, hash: &Hash) -> PathBuf { - let mut path = garage.system.config.data_dir.clone(); - path.push(hex::encode(&hash.as_slice()[0..1])); - path.push(hex::encode(&hash.as_slice()[1..2])); - path + +pub struct BlockManager { + pub data_dir: PathBuf, + pub rc: sled::Tree, + pub lock: Mutex<()>, } -pub async fn write_block(garage: Arc<Garage>, hash: &Hash, data: &[u8]) -> Result<Message, Error> { - garage.fs_lock.lock().await; +impl BlockManager { + pub fn new(db: &sled::Db, data_dir: PathBuf) -> Self { + let rc = db.open_tree("block_local_rc") + .expect("Unable to open block_local_rc tree"); + rc.set_merge_operator(rc_merge); + Self{ + rc, + data_dir, + lock: Mutex::new(()), + } + } + + pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { + let _lock = self.lock.lock().await; + + let mut path = self.block_dir(hash); + fs::create_dir_all(&path).await?; - let mut path = block_dir(&garage, hash); - fs::create_dir_all(&path).await?; + path.push(hex::encode(hash)); + if fs::metadata(&path).await.is_ok() { + return Ok(Message::Ok); + } - path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok); + let mut f = fs::File::create(path).await?; + f.write_all(data).await?; + drop(f); + + Ok(Message::Ok) } - let mut f = fs::File::create(path).await?; - f.write_all(data).await?; - drop(f); + pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { + let mut path = self.block_dir(hash); + path.push(hex::encode(hash)); - Ok(Message::Ok) -} + let mut f = fs::File::open(path).await?; + let mut data = vec![]; + f.read_to_end(&mut data).await?; + + Ok(Message::PutBlock(PutBlockMessage { + hash: hash.clone(), + data, + })) + } -pub async fn read_block(garage: Arc<Garage>, hash: &Hash) -> Result<Message, Error> { - let mut path = block_dir(&garage, hash); - path.push(hex::encode(hash)); + fn block_dir(&self, hash: &Hash) -> PathBuf { + let mut path = self.data_dir.clone(); + path.push(hex::encode(&hash.as_slice()[0..1])); + path.push(hex::encode(&hash.as_slice()[1..2])); + path + } - let mut f = fs::File::open(path).await?; - let mut data = vec![]; - f.read_to_end(&mut data).await?; + pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> { + self.rc.merge(&hash, vec![1])?; + Ok(()) + } - Ok(Message::PutBlock(PutBlockMessage { - hash: hash.clone(), - data, - })) + pub fn block_decref(&self, hash: &Hash, background: &BackgroundRunner) -> Result<(), Error> { + match self.rc.merge(&hash, vec![0])? { + None => { + let mut path = self.block_dir(hash); + path.push(hex::encode(hash)); + background.spawn(tokio::fs::remove_file(path).map_err(Into::into)); + Ok(()) + } + Some(_) => Ok(()) + } + } +} + +fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> { + let old = old.map(|x| { + assert!(x.len() == 8); + let mut x8 = [0u8; 8]; + x8.copy_from_slice(x); + u64::from_be_bytes(x8) + }).unwrap_or(0); + assert!(new.len() == 1); + let new = match new[0] { + 0 => { + if old > 0 { + old - 1 + } else { + 0 + } + } + 1 => old + 1, + _ => unreachable!(), + }; + if new == 0 { + None + } else { + Some(u64::to_be_bytes(new).to_vec()) + } } |