aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/Cargo.toml2
-rw-r--r--src/block/block.rs2
-rw-r--r--src/block/lib.rs2
-rw-r--r--src/block/manager.rs41
-rw-r--r--src/block/metrics.rs17
-rw-r--r--src/block/resync.rs29
6 files changed, 40 insertions, 53 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 0f6fb5bb..b5763120 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_block"
-version = "0.9.3"
+version = "0.10.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/block/block.rs b/src/block/block.rs
index 504d11f8..bd95680e 100644
--- a/src/block/block.rs
+++ b/src/block/block.rs
@@ -96,7 +96,7 @@ impl DataBlock {
}
}
-fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
+pub fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?;
encoder.include_checksum(true)?;
diff --git a/src/block/lib.rs b/src/block/lib.rs
index c9ff2845..6c4711ef 100644
--- a/src/block/lib.rs
+++ b/src/block/lib.rs
@@ -9,3 +9,5 @@ mod block;
mod layout;
mod metrics;
mod rc;
+
+pub use block::zstd_encode;
diff --git a/src/block/manager.rs b/src/block/manager.rs
index ef7279e9..eeacf8b9 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -267,8 +267,10 @@ impl BlockManager {
F: Fn(DataBlockStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>,
{
- let who = self.replication.read_nodes(hash);
- let who = self.system.rpc.request_order(&who);
+ let who = self
+ .system
+ .rpc_helper()
+ .block_read_nodes_of(hash, self.system.rpc_helper());
for node in who.iter() {
let node_id = NodeID::from(*node);
@@ -308,7 +310,7 @@ impl BlockManager {
// if the first one doesn't succeed rapidly
// TODO: keep first request running when initiating a new one and take the
// one that finishes earlier
- _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
+ _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => {
debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
@@ -341,26 +343,18 @@ impl BlockManager {
}
}
- /// Ask nodes that might have a block for it, return it as one big Bytes
- pub async fn rpc_get_block(
- &self,
- hash: &Hash,
- order_tag: Option<OrderTag>,
- ) -> Result<Bytes, Error> {
- let stream = self.rpc_get_block_streaming(hash, order_tag).await?;
- Ok(read_stream_to_end(stream).await?.into_bytes())
- }
-
/// Send block to nodes that should have it
pub async fn rpc_put_block(
&self,
hash: Hash,
data: Bytes,
+ prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
- let who = self.replication.write_nodes(&hash);
+ let who = self.replication.write_sets(&hash);
- let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
+ let compression_level = self.compression_level.filter(|_| !prevent_compression);
+ let (header, bytes) = DataBlock::from_buffer(data, compression_level)
.await
.into_parts();
let put_block_rpc =
@@ -372,10 +366,10 @@ impl BlockManager {
};
self.system
- .rpc
- .try_call_many(
+ .rpc_helper()
+ .try_write_many_sets(
&self.endpoint,
- &who[..],
+ who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_quorum(self.replication.write_quorum()),
@@ -390,11 +384,6 @@ impl BlockManager {
Ok(self.rc.rc.len()?)
}
- /// Get number of items in the refcount table
- pub fn rc_fast_len(&self) -> Result<Option<usize>, Error> {
- Ok(self.rc.rc.fast_len()?)
- }
-
/// Send command to start/stop/manager scrub worker
pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) -> Result<(), Error> {
let tx = self.tx_scrub_command.load();
@@ -410,7 +399,7 @@ impl BlockManager {
/// List all resync errors
pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> {
- let mut blocks = Vec::with_capacity(self.resync.errors.len());
+ let mut blocks = Vec::with_capacity(self.resync.errors.len()?);
for ent in self.resync.errors.iter()? {
let (hash, cnt) = ent?;
let cnt = ErrorCounter::decode(&cnt);
@@ -448,7 +437,7 @@ impl BlockManager {
tokio::spawn(async move {
if let Err(e) = this
.resync
- .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout())
+ .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout())
{
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
@@ -542,7 +531,7 @@ impl BlockManager {
None => {
// Not found but maybe we should have had it ??
self.resync
- .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
+ .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?;
return Err(Error::Message(format!(
"block {:?} not found on node",
hash
diff --git a/src/block/metrics.rs b/src/block/metrics.rs
index 6659df32..8e10afdf 100644
--- a/src/block/metrics.rs
+++ b/src/block/metrics.rs
@@ -1,7 +1,6 @@
use opentelemetry::{global, metrics::*};
use garage_db as db;
-use garage_db::counted_tree_hack::CountedTree;
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
@@ -29,8 +28,8 @@ impl BlockManagerMetrics {
pub fn new(
compression_level: Option<i32>,
rc_tree: db::Tree,
- resync_queue: CountedTree,
- resync_errors: CountedTree,
+ resync_queue: db::Tree,
+ resync_errors: db::Tree,
) -> Self {
let meter = global::meter("garage_model/block");
Self {
@@ -45,15 +44,17 @@ impl BlockManagerMetrics {
.init(),
_rc_size: meter
.u64_value_observer("block.rc_size", move |observer| {
- if let Ok(Some(v)) = rc_tree.fast_len() {
- observer.observe(v as u64, &[])
+ if let Ok(value) = rc_tree.len() {
+ observer.observe(value as u64, &[])
}
})
.with_description("Number of blocks known to the reference counter")
.init(),
_resync_queue_len: meter
.u64_value_observer("block.resync_queue_length", move |observer| {
- observer.observe(resync_queue.len() as u64, &[])
+ if let Ok(value) = resync_queue.len() {
+ observer.observe(value as u64, &[]);
+ }
})
.with_description(
"Number of block hashes queued for local check and possible resync",
@@ -61,7 +62,9 @@ impl BlockManagerMetrics {
.init(),
_resync_errored_blocks: meter
.u64_value_observer("block.resync_errored_blocks", move |observer| {
- observer.observe(resync_errors.len() as u64, &[])
+ if let Ok(value) = resync_errors.len() {
+ observer.observe(value as u64, &[]);
+ }
})
.with_description("Number of block hashes whose last resync resulted in an error")
.init(),
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 9c1da4a7..48c2cef1 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -15,7 +15,6 @@ use opentelemetry::{
};
use garage_db as db;
-use garage_db::counted_tree_hack::CountedTree;
use garage_util::background::*;
use garage_util::data::*;
@@ -47,9 +46,9 @@ pub(crate) const MAX_RESYNC_WORKERS: usize = 8;
const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
pub struct BlockResyncManager {
- pub(crate) queue: CountedTree,
+ pub(crate) queue: db::Tree,
pub(crate) notify: Arc<Notify>,
- pub(crate) errors: CountedTree,
+ pub(crate) errors: db::Tree,
busy_set: BusySet,
@@ -90,12 +89,10 @@ impl BlockResyncManager {
let queue = db
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
- let queue = CountedTree::new(queue).expect("Could not count block_local_resync_queue");
let errors = db
.open_tree("block_local_resync_errors")
.expect("Unable to open block_local_resync_errors tree");
- let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors");
let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg");
@@ -110,16 +107,12 @@ impl BlockResyncManager {
/// Get lenght of resync queue
pub fn queue_len(&self) -> Result<usize, Error> {
- // This currently can't return an error because the CountedTree hack
- // doesn't error on .len(), but this will change when we remove the hack
- // (hopefully someday!)
- Ok(self.queue.len())
+ Ok(self.queue.len()?)
}
/// Get number of blocks that have an error
pub fn errors_len(&self) -> Result<usize, Error> {
- // (see queue_len comment)
- Ok(self.errors.len())
+ Ok(self.errors.len()?)
}
/// Clear the error counter for a block and put it in queue immediately
@@ -180,7 +173,7 @@ impl BlockResyncManager {
// deleted once the garbage collection delay has passed.
//
// Here are some explanations on how the resync queue works.
- // There are two Sled trees that are used to have information
+ // There are two db trees that are used to have information
// about the status of blocks that need to be resynchronized:
//
// - resync.queue: a tree that is ordered first by a timestamp
@@ -377,7 +370,7 @@ impl BlockResyncManager {
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
- let mut who = manager.replication.write_nodes(hash);
+ let mut who = manager.replication.storage_nodes(hash);
if who.len() < manager.replication.write_quorum() {
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
@@ -385,7 +378,7 @@ impl BlockResyncManager {
let who_needs_resps = manager
.system
- .rpc
+ .rpc_helper()
.call_many(
&manager.endpoint,
&who,
@@ -431,10 +424,10 @@ impl BlockResyncManager {
.with_stream_from_buffer(bytes);
manager
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&manager.endpoint,
- &need_nodes[..],
+ &need_nodes,
put_block_message,
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_quorum(need_nodes.len()),
@@ -541,9 +534,9 @@ impl Worker for ResyncWorker {
Ok(WorkerState::Idle)
}
Err(e) => {
- // The errors that we have here are only Sled errors
+ // The errors that we have here are only db errors
// We don't really know how to handle them so just ¯\_(ツ)_/¯
- // (there is kind of an assumption that Sled won't error on us,
+ // (there is kind of an assumption that the db won't error on us,
// if it does there is not much we can do -- TODO should we just panic?)
// Here we just give the error to the worker manager,
// it will print it to the logs and increment a counter