aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/Cargo.toml3
-rw-r--r--src/block/manager.rs127
-rw-r--r--src/block/metrics.rs4
-rw-r--r--src/block/rc.rs49
4 files changed, 127 insertions, 56 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 9cba69ee..80346aca 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_util = { version = "0.7.0", path = "../util" }
garage_table = { version = "0.7.0", path = "../table" }
@@ -27,8 +28,6 @@ tracing = "0.1.30"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 9b2d9cad..32ba0431 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,3 +1,5 @@
+use core::ops::Bound;
+
use std::convert::TryInto;
use std::path::{Path, PathBuf};
use std::sync::Arc;
@@ -17,10 +19,12 @@ use opentelemetry::{
Context, KeyValue,
};
+use garage_db as db;
+use garage_db::counted_tree_hack::CountedTree;
+
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
-use garage_util::sled_counter::SledCountedTree;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@@ -91,9 +95,9 @@ pub struct BlockManager {
rc: BlockRc,
- resync_queue: SledCountedTree,
+ resync_queue: CountedTree,
resync_notify: Notify,
- resync_errors: SledCountedTree,
+ resync_errors: CountedTree,
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
@@ -108,7 +112,7 @@ struct BlockManagerLocked();
impl BlockManager {
pub fn new(
- db: &sled::Db,
+ db: &db::Db,
data_dir: PathBuf,
compression_level: Option<i32>,
background_tranquility: u32,
@@ -123,12 +127,14 @@ impl BlockManager {
let resync_queue = db
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
- let resync_queue = SledCountedTree::new(resync_queue);
+ let resync_queue =
+ CountedTree::new(resync_queue).expect("Could not count block_local_resync_queue");
let resync_errors = db
.open_tree("block_local_resync_errors")
.expect("Unable to open block_local_resync_errors tree");
- let resync_errors = SledCountedTree::new(resync_errors);
+ let resync_errors =
+ CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors");
let endpoint = system
.netapp
@@ -219,11 +225,44 @@ impl BlockManager {
/// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table.
- for (i, entry) in self.rc.rc.iter().enumerate() {
- let (hash, _) = entry?;
- let hash = Hash::try_from(&hash[..]).unwrap();
- self.put_to_resync(&hash, Duration::from_secs(0))?;
- if i & 0xFF == 0 && *must_exit.borrow() {
+ let mut next_start: Option<Hash> = None;
+ loop {
+ // We have to do this complicated two-step process where we first read a bunch
+ // of hashes from the RC table, and then insert them in the to-resync queue,
+ // because of SQLite. Basically, as long as we have an iterator on a DB table,
+ // we can't do anything else on the DB. The naive approach (which we had previously)
+ // of just iterating on the RC table and inserting items one to one in the resync
+ // queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
+ // This is mostly because the Rust bindings for SQLite assume a worst-case scenario
+ // where SQLite is not compiled in thread-safe mode, so we have to wrap everything
+ // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
+ let mut batch_of_hashes = vec![];
+ let start_bound = match next_start.as_ref() {
+ None => Bound::Unbounded,
+ Some(x) => Bound::Excluded(x.as_slice()),
+ };
+ for entry in self
+ .rc
+ .rc
+ .range::<&[u8], _>((start_bound, Bound::Unbounded))?
+ {
+ let (hash, _) = entry?;
+ let hash = Hash::try_from(&hash[..]).unwrap();
+ batch_of_hashes.push(hash);
+ if batch_of_hashes.len() >= 1000 {
+ break;
+ }
+ }
+ if batch_of_hashes.is_empty() {
+ break;
+ }
+
+ for hash in batch_of_hashes.into_iter() {
+ self.put_to_resync(&hash, Duration::from_secs(0))?;
+ next_start = Some(hash)
+ }
+
+ if *must_exit.borrow() {
return Ok(());
}
}
@@ -264,46 +303,69 @@ impl BlockManager {
}
/// Get lenght of resync queue
- pub fn resync_queue_len(&self) -> usize {
- self.resync_queue.len()
+ pub fn resync_queue_len(&self) -> Result<usize, Error> {
+ // This currently can't return an error because the CountedTree hack
+ // doesn't error on .len(), but this will change when we remove the hack
+ // (hopefully someday!)
+ Ok(self.resync_queue.len())
}
/// Get number of blocks that have an error
- pub fn resync_errors_len(&self) -> usize {
- self.resync_errors.len()
+ pub fn resync_errors_len(&self) -> Result<usize, Error> {
+ // (see resync_queue_len comment)
+ Ok(self.resync_errors.len())
}
/// Get number of items in the refcount table
- pub fn rc_len(&self) -> usize {
- self.rc.rc.len()
+ pub fn rc_len(&self) -> Result<usize, Error> {
+ Ok(self.rc.rc.len()?)
}
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
/// required, but not known
- pub fn block_incref(&self, hash: &Hash) -> Result<(), Error> {
- if self.rc.block_incref(hash)? {
+ pub fn block_incref(
+ self: &Arc<Self>,
+ tx: &mut db::Transaction,
+ hash: Hash,
+ ) -> db::TxOpResult<()> {
+ if self.rc.block_incref(tx, &hash)? {
// When the reference counter is incremented, there is
// normally a node that is responsible for sending us the
// data of the block. However that operation may fail,
// so in all cases we add the block here to the todo list
// to check later that it arrived correctly, and if not
// we will fecth it from someone.
- self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?;
+ let this = self.clone();
+ tokio::spawn(async move {
+ if let Err(e) = this.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) {
+ error!("Block {:?} could not be put in resync queue: {}.", hash, e);
+ }
+ });
}
Ok(())
}
/// Decrement the number of time a block is used
- pub fn block_decref(&self, hash: &Hash) -> Result<(), Error> {
- if self.rc.block_decref(hash)? {
+ pub fn block_decref(
+ self: &Arc<Self>,
+ tx: &mut db::Transaction,
+ hash: Hash,
+ ) -> db::TxOpResult<()> {
+ if self.rc.block_decref(tx, &hash)? {
// When the RC is decremented, it might drop to zero,
// indicating that we don't need the block.
// There is a delay before we garbage collect it;
// make sure that it is handled in the resync loop
// after that delay has passed.
- self.put_to_resync(hash, BLOCK_GC_DELAY + Duration::from_secs(10))?;
+ let this = self.clone();
+ tokio::spawn(async move {
+ if let Err(e) = this.put_to_resync(&hash, BLOCK_GC_DELAY + Duration::from_secs(10))
+ {
+ error!("Block {:?} could not be put in resync queue: {}.", hash, e);
+ }
+ });
}
Ok(())
}
@@ -503,12 +565,12 @@ impl BlockManager {
});
}
- fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), sled::Error> {
+ fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
- fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), sled::Error> {
+ fn put_to_resync_at(&self, hash: &Hash, when: u64) -> db::Result<()> {
trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
@@ -547,13 +609,8 @@ impl BlockManager {
// - Ok(true) -> a block was processed (successfully or not)
// - Ok(false) -> no block was processed, but we are ready for the next iteration
// - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
- async fn resync_iter(
- &self,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<bool, sled::Error> {
- if let Some(first_pair_res) = self.resync_queue.iter().next() {
- let (time_bytes, hash_bytes) = first_pair_res?;
-
+ async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
+ if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@@ -561,7 +618,7 @@ impl BlockManager {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
if let Some(ec) = self.resync_errors.get(hash.as_slice())? {
- let ec = ErrorCounter::decode(ec);
+ let ec = ErrorCounter::decode(&ec);
if now < ec.next_try() {
// if next retry after an error is not yet,
// don't do resync and return early, but still
@@ -602,7 +659,7 @@ impl BlockManager {
warn!("Error when resyncing {:?}: {}", hash, e);
let err_counter = match self.resync_errors.get(hash.as_slice())? {
- Some(ec) => ErrorCounter::decode(ec).add1(now + 1),
+ Some(ec) => ErrorCounter::decode(&ec).add1(now + 1),
None => ErrorCounter::new(now + 1),
};
@@ -966,7 +1023,7 @@ impl ErrorCounter {
}
}
- fn decode(data: sled::IVec) -> Self {
+ fn decode(data: &[u8]) -> Self {
Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
diff --git a/src/block/metrics.rs b/src/block/metrics.rs
index f0f541a3..477add66 100644
--- a/src/block/metrics.rs
+++ b/src/block/metrics.rs
@@ -1,6 +1,6 @@
use opentelemetry::{global, metrics::*};
-use garage_util::sled_counter::SledCountedTree;
+use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
@@ -23,7 +23,7 @@ pub struct BlockManagerMetrics {
}
impl BlockManagerMetrics {
- pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self {
+ pub fn new(resync_queue: CountedTree, resync_errors: CountedTree) -> Self {
let meter = global::meter("garage_model/block");
Self {
_resync_queue_len: meter
diff --git a/src/block/rc.rs b/src/block/rc.rs
index ec3ea44e..ce6defad 100644
--- a/src/block/rc.rs
+++ b/src/block/rc.rs
@@ -1,5 +1,7 @@
use std::convert::TryInto;
+use garage_db as db;
+
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@@ -7,31 +9,41 @@ use garage_util::time::*;
use crate::manager::BLOCK_GC_DELAY;
pub struct BlockRc {
- pub(crate) rc: sled::Tree,
+ pub(crate) rc: db::Tree,
}
impl BlockRc {
- pub(crate) fn new(rc: sled::Tree) -> Self {
+ pub(crate) fn new(rc: db::Tree) -> Self {
Self { rc }
}
/// Increment the reference counter associated to a hash.
/// Returns true if the RC goes from zero to nonzero.
- pub(crate) fn block_incref(&self, hash: &Hash) -> Result<bool, Error> {
- let old_rc = self
- .rc
- .fetch_and_update(&hash, |old| RcEntry::parse_opt(old).increment().serialize())?;
- let old_rc = RcEntry::parse_opt(old_rc);
+ pub(crate) fn block_incref(
+ &self,
+ tx: &mut db::Transaction,
+ hash: &Hash,
+ ) -> db::TxOpResult<bool> {
+ let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
+ match old_rc.increment().serialize() {
+ Some(x) => tx.insert(&self.rc, &hash, x)?,
+ None => unreachable!(),
+ };
Ok(old_rc.is_zero())
}
/// Decrement the reference counter associated to a hash.
/// Returns true if the RC is now zero.
- pub(crate) fn block_decref(&self, hash: &Hash) -> Result<bool, Error> {
- let new_rc = self
- .rc
- .update_and_fetch(&hash, |old| RcEntry::parse_opt(old).decrement().serialize())?;
- let new_rc = RcEntry::parse_opt(new_rc);
+ pub(crate) fn block_decref(
+ &self,
+ tx: &mut db::Transaction,
+ hash: &Hash,
+ ) -> db::TxOpResult<bool> {
+ let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement();
+ match new_rc.serialize() {
+ Some(x) => tx.insert(&self.rc, &hash, x)?,
+ None => tx.remove(&self.rc, &hash)?,
+ };
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
}
@@ -44,12 +56,15 @@ impl BlockRc {
/// deletion time has passed
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
- self.rc.update_and_fetch(&hash, |rcval| {
- let updated = match RcEntry::parse_opt(rcval) {
- RcEntry::Deletable { at_time } if now > at_time => RcEntry::Absent,
- v => v,
+ self.rc.db().transaction(|mut tx| {
+ let rcval = RcEntry::parse_opt(tx.get(&self.rc, &hash)?);
+ match rcval {
+ RcEntry::Deletable { at_time } if now > at_time => {
+ tx.remove(&self.rc, &hash)?;
+ }
+ _ => (),
};
- updated.serialize()
+ tx.commit(())
})?;
Ok(())
}