aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs417
1 files changed, 247 insertions, 170 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 1e04ee58..85468258 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -27,7 +27,7 @@ use crate::garage::Garage;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
-pub const BACKGROUND_WORKERS: u64 = 1;
+pub const BACKGROUND_WORKERS: u64 = 2;
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
@@ -70,8 +70,8 @@ pub struct BlockManager {
pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf,
- /// Lock to prevent concurrent edition of the directory
- pub data_dir_lock: Mutex<()>,
+
+ mutation_lock: Mutex<BlockManagerLocked>,
rc: sled::Tree,
@@ -83,6 +83,11 @@ pub struct BlockManager {
pub(crate) garage: ArcSwapOption<Garage>,
}
+// This custom struct contains functions that must only be ran
+// when the lock is held. We ensure that it is the case by storing
+// it INSIDE a Mutex.
+struct BlockManagerLocked();
+
impl BlockManager {
pub fn new(
db: &sled::Db,
@@ -102,10 +107,12 @@ impl BlockManager {
.netapp
.endpoint("garage_model/block.rs/Rpc".to_string());
+ let manager_locked = BlockManagerLocked();
+
let block_manager = Arc::new(Self {
replication,
data_dir,
- data_dir_lock: Mutex::new(()),
+ mutation_lock: Mutex::new(manager_locked),
rc,
resync_queue,
resync_notify: Notify::new(),
@@ -118,38 +125,144 @@ impl BlockManager {
block_manager
}
- pub fn spawn_background_worker(self: Arc<Self>) {
- // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
- // launches only one worker with current value of BACKGROUND_WORKERS
- for i in 0..BACKGROUND_WORKERS {
- let bm2 = self.clone();
- let background = self.system.background.clone();
- tokio::spawn(async move {
- tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await;
- background.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
- bm2.resync_loop(must_exit)
- });
- });
+ // ---- Public interface ----
+
+ /// Ask nodes that might have a block for it
+ pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
+ let who = self.replication.read_nodes(&hash);
+ let resps = self
+ .system
+ .rpc
+ .try_call_many(
+ &self.endpoint,
+ &who[..],
+ BlockRpc::GetBlock(*hash),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(1)
+ .with_timeout(BLOCK_RW_TIMEOUT)
+ .interrupt_after_quorum(true),
+ )
+ .await?;
+
+ for resp in resps {
+ if let BlockRpc::PutBlock(msg) = resp {
+ return Ok(msg.data);
+ }
}
+ Err(Error::Message(format!(
+ "Unable to read block {:?}: no valid blocks returned",
+ hash
+ )))
}
- /// Write a block to disk
- async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<BlockRpc, Error> {
- let _lock = self.data_dir_lock.lock().await;
+ /// Send block to nodes that should have it
+ pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
+ let who = self.replication.write_nodes(&hash);
+ self.system
+ .rpc
+ .try_call_many(
+ &self.endpoint,
+ &who[..],
+ BlockRpc::PutBlock(PutBlockMessage { hash, data }),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.replication.write_quorum())
+ .with_timeout(BLOCK_RW_TIMEOUT),
+ )
+ .await?;
+ Ok(())
+ }
- let mut path = self.block_dir(hash);
- fs::create_dir_all(&path).await?;
+ /// Launch the repair procedure on the data store
+ ///
+ /// This will list all blocks locally present, as well as those
+ /// that are required because of refcount > 0, and will try
+ /// to fix any mismatch between the two.
+ pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
+ // 1. Repair blocks from RC table
+ let garage = self.garage.load_full().unwrap();
+ let mut last_hash = None;
+ for (i, entry) in garage.block_ref_table.data.store.iter().enumerate() {
+ let (_k, v_bytes) = entry?;
+ let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?;
+ if Some(&block_ref.block) == last_hash.as_ref() {
+ continue;
+ }
+ if !block_ref.deleted.get() {
+ last_hash = Some(block_ref.block);
+ self.put_to_resync(&block_ref.block, Duration::from_secs(0))?;
+ }
+ if i & 0xFF == 0 && *must_exit.borrow() {
+ return Ok(());
+ }
+ }
- path.push(hex::encode(hash));
- if fs::metadata(&path).await.is_ok() {
- return Ok(BlockRpc::Ok);
+ // 2. Repair blocks actually on disk
+ self.repair_aux_read_dir_rec(&self.data_dir, must_exit)
+ .await?;
+
+ Ok(())
+ }
+
+ /// Get lenght of resync queue
+ pub fn resync_queue_len(&self) -> usize {
+ self.resync_queue.len()
+ }
+
+ /// Get number of items in the refcount table
+ pub fn rc_len(&self) -> usize {
+ self.rc.len()
+ }
+
+ //// ----- Managing the reference counter ----
+
+ /// Increment the number of time a block is used, putting it to resynchronization if it is
+ /// required, but not known
+ pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
+ let old_rc = self.rc.fetch_and_update(&hash, |old| {
+ let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
+ Some(u64::to_be_bytes(old_v + 1).to_vec())
+ })?;
+ let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0);
+ if old_rc == 0 {
+ self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?;
}
+ Ok(())
+ }
- let mut f = fs::File::create(path).await?;
- f.write_all(data).await?;
- drop(f);
+ /// Decrement the number of time a block is used
+ pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
+ let new_rc = self.rc.update_and_fetch(&hash, |old| {
+ let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
+ if old_v > 1 {
+ Some(u64::to_be_bytes(old_v - 1).to_vec())
+ } else {
+ None
+ }
+ })?;
+ if new_rc.is_none() {
+ self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?;
+ }
+ Ok(())
+ }
- Ok(BlockRpc::Ok)
+ /// Read a block's reference count
+ pub fn get_block_rc(&self, hash: &Hash) -> Result<u64, Error> {
+ Ok(self
+ .rc
+ .get(hash.as_ref())?
+ .map(u64_from_be_bytes)
+ .unwrap_or(0))
+ }
+
+ // ---- Reading and writing blocks locally ----
+
+ /// Write a block to disk
+ async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<BlockRpc, Error> {
+ self.mutation_lock
+ .lock()
+ .await
+ .write_block(hash, data, self)
+ .await
}
/// Read block from disk, verifying it's integrity
@@ -169,15 +282,11 @@ impl BlockManager {
drop(f);
if blake2sum(&data[..]) != *hash {
- let _lock = self.data_dir_lock.lock().await;
- warn!(
- "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
- hash
- );
- let mut path2 = path.clone();
- path2.set_extension(".corrupted");
- fs::rename(path, path2).await?;
- self.put_to_resync(&hash, Duration::from_millis(0))?;
+ self.mutation_lock
+ .lock()
+ .await
+ .move_block_to_corrupted(hash, self)
+ .await?;
return Err(Error::CorruptData(*hash));
}
@@ -186,60 +295,44 @@ impl BlockManager {
/// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
- let needed = self
- .rc
- .get(hash.as_ref())?
- .map(|x| u64_from_be_bytes(x) > 0)
- .unwrap_or(false);
- if needed {
- let path = self.block_path(hash);
- let exists = fs::metadata(&path).await.is_ok();
- Ok(!exists)
- } else {
- Ok(false)
- }
+ let BlockStatus { exists, needed } = self
+ .mutation_lock
+ .lock()
+ .await
+ .check_block_status(hash, self)
+ .await?;
+ Ok(needed && !exists)
}
+ /// Utility: gives the path of the directory in which a block should be found
fn block_dir(&self, hash: &Hash) -> PathBuf {
let mut path = self.data_dir.clone();
path.push(hex::encode(&hash.as_slice()[0..1]));
path.push(hex::encode(&hash.as_slice()[1..2]));
path
}
+
+ /// Utility: give the full path where a block should be found
fn block_path(&self, hash: &Hash) -> PathBuf {
let mut path = self.block_dir(hash);
path.push(hex::encode(hash.as_ref()));
path
}
- /// Increment the number of time a block is used, putting it to resynchronization if it is
- /// required, but not known
- pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
- let old_rc = self.rc.fetch_and_update(&hash, |old| {
- let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
- Some(u64::to_be_bytes(old_v + 1).to_vec())
- })?;
- let old_rc = old_rc.map(u64_from_be_bytes).unwrap_or(0);
- if old_rc == 0 {
- self.put_to_resync(&hash, BLOCK_RW_TIMEOUT)?;
- }
- Ok(())
- }
+ // ---- Resync loop ----
- /// Decrement the number of time a block is used
- pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
- let new_rc = self.rc.update_and_fetch(&hash, |old| {
- let old_v = old.map(u64_from_be_bytes).unwrap_or(0);
- if old_v > 1 {
- Some(u64::to_be_bytes(old_v - 1).to_vec())
- } else {
- None
- }
- })?;
- if new_rc.is_none() {
- self.put_to_resync(&hash, BLOCK_GC_TIMEOUT)?;
+ pub fn spawn_background_worker(self: Arc<Self>) {
+ // Launch 2 simultaneous workers for background resync loop preprocessing
+ for i in 0..BACKGROUND_WORKERS {
+ let bm2 = self.clone();
+ let background = self.system.background.clone();
+ tokio::spawn(async move {
+ tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await;
+ background.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
+ bm2.resync_loop(must_exit)
+ });
+ });
}
- Ok(())
}
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
@@ -296,16 +389,12 @@ impl BlockManager {
}
async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
- let lock = self.data_dir_lock.lock().await;
-
- let path = self.block_path(hash);
-
- let exists = fs::metadata(&path).await.is_ok();
- let needed = self
- .rc
- .get(hash.as_ref())?
- .map(|x| u64_from_be_bytes(x) > 0)
- .unwrap_or(false);
+ let BlockStatus { exists, needed } = self
+ .mutation_lock
+ .lock()
+ .await
+ .check_block_status(hash, self)
+ .await?;
if exists != needed {
info!(
@@ -378,12 +467,14 @@ impl BlockManager {
who.len()
);
- fs::remove_file(path).await?;
+ self.mutation_lock
+ .lock()
+ .await
+ .delete_if_unneeded(hash, self)
+ .await?;
}
if needed && !exists {
- drop(lock);
-
// TODO find a way to not do this if they are sending it to us
// Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
// between the RC being incremented and this part being called.
@@ -394,79 +485,6 @@ impl BlockManager {
Ok(())
}
- /// Ask nodes that might have a block for it
- pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
- let who = self.replication.read_nodes(&hash);
- let resps = self
- .system
- .rpc
- .try_call_many(
- &self.endpoint,
- &who[..],
- BlockRpc::GetBlock(*hash),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(1)
- .with_timeout(BLOCK_RW_TIMEOUT)
- .interrupt_after_quorum(true),
- )
- .await?;
-
- for resp in resps {
- if let BlockRpc::PutBlock(msg) = resp {
- return Ok(msg.data);
- }
- }
- Err(Error::Message(format!(
- "Unable to read block {:?}: no valid blocks returned",
- hash
- )))
- }
-
- /// Send block to nodes that should have it
- pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
- let who = self.replication.write_nodes(&hash);
- self.system
- .rpc
- .try_call_many(
- &self.endpoint,
- &who[..],
- BlockRpc::PutBlock(PutBlockMessage { hash, data }),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.replication.write_quorum())
- .with_timeout(BLOCK_RW_TIMEOUT),
- )
- .await?;
- Ok(())
- }
-
- pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
- // 1. Repair blocks from RC table
- let garage = self.garage.load_full().unwrap();
- let mut last_hash = None;
- let mut i = 0usize;
- for entry in garage.block_ref_table.data.store.iter() {
- let (_k, v_bytes) = entry?;
- let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?;
- if Some(&block_ref.block) == last_hash.as_ref() {
- continue;
- }
- if !block_ref.deleted.get() {
- last_hash = Some(block_ref.block);
- self.put_to_resync(&block_ref.block, Duration::from_secs(0))?;
- }
- i += 1;
- if i & 0xFF == 0 && *must_exit.borrow() {
- return Ok(());
- }
- }
-
- // 2. Repair blocks actually on disk
- self.repair_aux_read_dir_rec(&self.data_dir, must_exit)
- .await?;
-
- Ok(())
- }
-
fn repair_aux_read_dir_rec<'a>(
&'a self,
path: &'a Path,
@@ -511,15 +529,6 @@ impl BlockManager {
}
.boxed()
}
-
- /// Get lenght of resync queue
- pub fn resync_queue_len(&self) -> usize {
- self.resync_queue.len()
- }
-
- pub fn rc_len(&self) -> usize {
- self.rc.len()
- }
}
#[async_trait]
@@ -538,6 +547,74 @@ impl EndpointHandler<BlockRpc> for BlockManager {
}
}
+struct BlockStatus {
+ exists: bool,
+ needed: bool,
+}
+
+impl BlockManagerLocked {
+ async fn check_block_status(
+ &self,
+ hash: &Hash,
+ mgr: &BlockManager,
+ ) -> Result<BlockStatus, Error> {
+ let path = mgr.block_path(hash);
+
+ let exists = fs::metadata(&path).await.is_ok();
+ let needed = mgr.get_block_rc(hash)? > 0;
+
+ Ok(BlockStatus { exists, needed })
+ }
+
+ async fn write_block(
+ &self,
+ hash: &Hash,
+ data: &[u8],
+ mgr: &BlockManager,
+ ) -> Result<BlockRpc, Error> {
+ let mut path = mgr.block_dir(hash);
+ fs::create_dir_all(&path).await?;
+
+ path.push(hex::encode(hash));
+ if fs::metadata(&path).await.is_ok() {
+ return Ok(BlockRpc::Ok);
+ }
+
+ let mut path2 = path.clone();
+ path2.set_extension("tmp");
+ let mut f = fs::File::create(&path2).await?;
+ f.write_all(data).await?;
+ drop(f);
+
+ fs::rename(path2, path).await?;
+
+ Ok(BlockRpc::Ok)
+ }
+
+ async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
+ warn!(
+ "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
+ hash
+ );
+ let path = mgr.block_path(hash);
+ let mut path2 = path.clone();
+ path2.set_extension(".corrupted");
+ fs::rename(path, path2).await?;
+ mgr.put_to_resync(&hash, Duration::from_millis(0))?;
+ Ok(())
+ }
+
+ async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
+ let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
+
+ if exists && !needed {
+ let path = mgr.block_path(hash);
+ fs::remove_file(path).await?;
+ }
+ Ok(())
+ }
+}
+
fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 {
assert!(bytes.as_ref().len() == 8);
let mut x8 = [0u8; 8];