aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-11 23:00:26 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-11 23:00:26 +0200
commit5dd59e437d5af84dfa2cf5dcc2c15807b971002d (patch)
tree2debd795082e810e384f765f84217d47cd158ba3
parentdcf58499a4f529e1033de65b4fca8d45458d60d2 (diff)
downloadgarage-5dd59e437d5af84dfa2cf5dcc2c15807b971002d.tar.gz
garage-5dd59e437d5af84dfa2cf5dcc2c15807b971002d.zip
Local refcounting of blocks
-rw-r--r--src/block.rs122
-rw-r--r--src/block_ref_table.rs16
-rw-r--r--src/rpc_server.rs5
-rw-r--r--src/server.rs11
4 files changed, 116 insertions, 38 deletions
diff --git a/src/block.rs b/src/block.rs
index 99a0121f..e78ddd78 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -1,49 +1,113 @@
use std::path::PathBuf;
-use std::sync::Arc;
use tokio::fs;
use tokio::prelude::*;
+use tokio::sync::Mutex;
+use futures_util::future::*;
use crate::data::*;
use crate::error::Error;
use crate::proto::*;
-use crate::server::Garage;
+use crate::background::*;
-fn block_dir(garage: &Garage, hash: &Hash) -> PathBuf {
- let mut path = garage.system.config.data_dir.clone();
- path.push(hex::encode(&hash.as_slice()[0..1]));
- path.push(hex::encode(&hash.as_slice()[1..2]));
- path
+
+pub struct BlockManager {
+ pub data_dir: PathBuf,
+ pub rc: sled::Tree,
+ pub lock: Mutex<()>,
}
-pub async fn write_block(garage: Arc<Garage>, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
- garage.fs_lock.lock().await;
+impl BlockManager {
+ pub fn new(db: &sled::Db, data_dir: PathBuf) -> Self {
+ let rc = db.open_tree("block_local_rc")
+ .expect("Unable to open block_local_rc tree");
+ rc.set_merge_operator(rc_merge);
+ Self{
+ rc,
+ data_dir,
+ lock: Mutex::new(()),
+ }
+ }
+
+ pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
+ let _lock = self.lock.lock().await;
+
+ let mut path = self.block_dir(hash);
+ fs::create_dir_all(&path).await?;
- let mut path = block_dir(&garage, hash);
- fs::create_dir_all(&path).await?;
+ path.push(hex::encode(hash));
+ if fs::metadata(&path).await.is_ok() {
+ return Ok(Message::Ok);
+ }
- path.push(hex::encode(hash));
- if fs::metadata(&path).await.is_ok() {
- return Ok(Message::Ok);
+ let mut f = fs::File::create(path).await?;
+ f.write_all(data).await?;
+ drop(f);
+
+ Ok(Message::Ok)
}
- let mut f = fs::File::create(path).await?;
- f.write_all(data).await?;
- drop(f);
+ pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
+ let mut path = self.block_dir(hash);
+ path.push(hex::encode(hash));
- Ok(Message::Ok)
-}
+ let mut f = fs::File::open(path).await?;
+ let mut data = vec![];
+ f.read_to_end(&mut data).await?;
+
+ Ok(Message::PutBlock(PutBlockMessage {
+ hash: hash.clone(),
+ data,
+ }))
+ }
-pub async fn read_block(garage: Arc<Garage>, hash: &Hash) -> Result<Message, Error> {
- let mut path = block_dir(&garage, hash);
- path.push(hex::encode(hash));
+ fn block_dir(&self, hash: &Hash) -> PathBuf {
+ let mut path = self.data_dir.clone();
+ path.push(hex::encode(&hash.as_slice()[0..1]));
+ path.push(hex::encode(&hash.as_slice()[1..2]));
+ path
+ }
- let mut f = fs::File::open(path).await?;
- let mut data = vec![];
- f.read_to_end(&mut data).await?;
+ pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
+ self.rc.merge(&hash, vec![1])?;
+ Ok(())
+ }
- Ok(Message::PutBlock(PutBlockMessage {
- hash: hash.clone(),
- data,
- }))
+ pub fn block_decref(&self, hash: &Hash, background: &BackgroundRunner) -> Result<(), Error> {
+ match self.rc.merge(&hash, vec![0])? {
+ None => {
+ let mut path = self.block_dir(hash);
+ path.push(hex::encode(hash));
+ background.spawn(tokio::fs::remove_file(path).map_err(Into::into));
+ Ok(())
+ }
+ Some(_) => Ok(())
+ }
+ }
+}
+
+fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option<Vec<u8>> {
+ let old = old.map(|x| {
+ assert!(x.len() == 8);
+ let mut x8 = [0u8; 8];
+ x8.copy_from_slice(x);
+ u64::from_be_bytes(x8)
+ }).unwrap_or(0);
+ assert!(new.len() == 1);
+ let new = match new[0] {
+ 0 => {
+ if old > 0 {
+ old - 1
+ } else {
+ 0
+ }
+ }
+ 1 => old + 1,
+ _ => unreachable!(),
+ };
+ if new == 0 {
+ None
+ } else {
+ Some(u64::to_be_bytes(new).to_vec())
+ }
}
diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs
index b4bff937..e2310f74 100644
--- a/src/block_ref_table.rs
+++ b/src/block_ref_table.rs
@@ -45,7 +45,19 @@ impl TableFormat for BlockRefTable {
type E = BlockRef;
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
- //unimplemented!()
- // TODO
+ let garage = self.garage.read().await.as_ref().cloned().unwrap();
+
+ let was_before = old.map(|x| !x.deleted).unwrap_or(false);
+ let is_after = !new.deleted;
+ if is_after && !was_before {
+ if let Err(e) = garage.block_manager.block_incref(&new.block) {
+ eprintln!("Failed to incref block {:?}: {}", &new.block, e);
+ }
+ }
+ if was_before && !is_after {
+ if let Err(e) = garage.block_manager.block_decref(&new.block, &garage.background) {
+ eprintln!("Failed to decref or delete block {:?}: {}", &new.block, e);
+ }
+ }
}
}
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 98798614..08fa909d 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -9,7 +9,6 @@ use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use serde::Serialize;
-use crate::block::*;
use crate::data::rmp_to_vec_all_named;
use crate::error::Error;
use crate::proto::Message;
@@ -65,8 +64,8 @@ async fn handler(
Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await,
Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await,
- Message::PutBlock(m) => write_block(garage, &m.hash, &m.data).await,
- Message::GetBlock(h) => read_block(garage, &h).await,
+ Message::PutBlock(m) => garage.block_manager.write_block(&m.hash, &m.data).await,
+ Message::GetBlock(h) => garage.block_manager.read_block(&h).await,
Message::TableRPC(table, msg) => {
if let Some(rpc_handler) = garage.table_rpc_handlers.get(table) {
diff --git a/src/server.rs b/src/server.rs
index e1f6dc80..8b2bd0c3 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,4 +1,4 @@
-use futures_util::future::FutureExt;
+pub use futures_util::future::FutureExt;
use serde::Deserialize;
use std::collections::HashMap;
use std::io::{Read, Write};
@@ -6,11 +6,12 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::watch;
-use tokio::sync::{Mutex, RwLock};
+use tokio::sync::RwLock;
use crate::api_server;
use crate::background::*;
use crate::data::*;
+use crate::block::*;
use crate::error::Error;
use crate::membership::System;
use crate::proto::*;
@@ -40,7 +41,7 @@ pub struct Config {
pub struct Garage {
pub db: sled::Db,
pub system: Arc<System>,
- pub fs_lock: Mutex<()>,
+ pub block_manager: BlockManager,
pub background: Arc<BackgroundRunner>,
pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>,
@@ -57,6 +58,8 @@ impl Garage {
db: sled::Db,
background: Arc<BackgroundRunner>,
) -> Arc<Self> {
+ let block_manager = BlockManager::new(&db, config.data_dir.clone());
+
let system = Arc::new(System::new(config, id, background.clone()));
let meta_rep_param = TableReplicationParams {
@@ -105,7 +108,7 @@ impl Garage {
let mut garage = Self {
db,
system: system.clone(),
- fs_lock: Mutex::new(()),
+ block_manager,
background,
table_rpc_handlers: HashMap::new(),
object_table,