aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/s3_delete.rs2
-rw-r--r--src/garage/admin_rpc.rs10
-rw-r--r--src/model/block.rs23
-rw-r--r--src/table/data.rs6
-rw-r--r--src/table/merkle.rs12
5 files changed, 37 insertions, 16 deletions
diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs
index bb42d90c..6abbfc48 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3_delete.rs
@@ -48,7 +48,7 @@ async fn handle_delete_internal(
key.into(),
vec![ObjectVersion {
uuid: version_uuid,
- timestamp: now_msec(),
+ timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index 07c1b582..40674e75 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -432,13 +432,13 @@ impl AdminRpcHandler {
writeln!(
&mut ret,
" number of blocks: {}",
- self.garage.block_manager.rc.len()
+ self.garage.block_manager.rc_len()
)
.unwrap();
writeln!(
&mut ret,
" resync queue length: {}",
- self.garage.block_manager.resync_queue.len()
+ self.garage.block_manager.resync_queue_len()
)
.unwrap();
@@ -460,19 +460,19 @@ impl AdminRpcHandler {
writeln!(
to,
" Merkle updater todo queue length: {}",
- t.data.merkle_updater.todo.len()
+ t.data.merkle_updater.todo_len()
)
.unwrap();
writeln!(
to,
" Merkle tree size: {}",
- t.data.merkle_updater.merkle_tree.len()
+ t.data.merkle_updater.merkle_tree_len()
)
.unwrap();
writeln!(
to,
" GC todo queue length: {}",
- t.data.gc_todo.len()
+ t.data.gc_todo_len()
)
.unwrap();
Ok(())
diff --git a/src/model/block.rs b/src/model/block.rs
index 8523474a..9fe6c76b 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -27,6 +27,8 @@ use crate::garage::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
+pub const BACKGROUND_WORKERS: u64 = 1;
+
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
@@ -56,14 +58,14 @@ pub struct BlockManager {
pub data_dir: PathBuf,
pub data_dir_lock: Mutex<()>,
- pub rc: sled::Tree,
+ rc: sled::Tree,
- pub resync_queue: sled::Tree,
- pub resync_notify: Notify,
+ resync_queue: sled::Tree,
+ resync_notify: Notify,
- pub system: Arc<System>,
+ system: Arc<System>,
rpc_client: Arc<RpcClient<Message>>,
- pub garage: ArcSwapOption<Garage>,
+ pub(crate) garage: ArcSwapOption<Garage>,
}
impl BlockManager {
@@ -128,7 +130,7 @@ impl BlockManager {
pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing
- for i in 0..2u64 {
+ for i in 0..BACKGROUND_WORKERS {
let bm2 = self.clone();
let background = self.system.background.clone();
tokio::spawn(async move {
@@ -373,7 +375,6 @@ impl BlockManager {
);
fs::remove_file(path).await?;
- self.resync_queue.remove(&hash)?;
}
if needed && !exists {
@@ -494,6 +495,14 @@ impl BlockManager {
}
.boxed()
}
+
+ pub fn resync_queue_len(&self) -> usize {
+ self.resync_queue.len()
+ }
+
+ pub fn rc_len(&self) -> usize {
+ self.rc.len()
+ }
}
fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 {
diff --git a/src/table/data.rs b/src/table/data.rs
index a491f877..0a7b2cec 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -18,7 +18,7 @@ pub struct TableData<F: TableSchema> {
pub instance: F,
pub store: sled::Tree,
- pub gc_todo: sled::Tree,
+ pub(crate) gc_todo: sled::Tree,
pub merkle_updater: Arc<MerkleUpdater>,
}
@@ -239,4 +239,8 @@ where
},
}
}
+
+ pub fn gc_todo_len(&self) -> usize {
+ self.gc_todo.len()
+ }
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index f60be8a8..aefb5169 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -40,13 +40,13 @@ pub struct MerkleUpdater {
// - key = the key of an item in the main table, ie hash(partition_key)+sort_key
// - value = the hash of the full serialized item, if present,
// or an empty vec if item is absent (deleted)
- pub todo: sled::Tree,
+ pub(crate) todo: sled::Tree,
pub(crate) todo_notify: Notify,
// Content of the merkle tree: items where
// - key = .bytes() for MerkleNodeKey
// - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found
- pub merkle_tree: sled::Tree,
+ pub(crate) merkle_tree: sled::Tree,
empty_node_hash: Hash,
}
@@ -311,6 +311,14 @@ impl MerkleUpdater {
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
}
}
+
+ pub fn merkle_tree_len(&self) -> usize {
+ self.merkle_tree.len()
+ }
+
+ pub fn todo_len(&self) -> usize {
+ self.todo.len()
+ }
}
impl MerkleNodeKey {