aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml1
-rw-r--r--src/api/s3_put.rs2
-rw-r--r--src/block/Cargo.toml38
-rw-r--r--src/block/lib.rs6
-rw-r--r--src/block/manager.rs (renamed from src/model/block.rs)36
-rw-r--r--src/block/metrics.rs (renamed from src/model/block_metrics.rs)0
-rw-r--r--src/garage/server.rs3
-rw-r--r--src/model/Cargo.toml1
-rw-r--r--src/model/block_ref_table.rs2
-rw-r--r--src/model/garage.rs13
-rw-r--r--src/model/lib.rs3
11 files changed, 64 insertions, 41 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index d61a986f..5e96b081 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
garage_model = { version = "0.7.0", path = "../model" }
garage_table = { version = "0.7.0", path = "../table" }
+garage_block = { version = "0.7.0", path = "../block" }
garage_util = { version = "0.7.0", path = "../util" }
base64 = "0.13"
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index 08e490ae..ed0bf00b 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -13,7 +13,7 @@ use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
-use garage_model::block::INLINE_THRESHOLD;
+use garage_block::manager::INLINE_THRESHOLD;
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
new file mode 100644
index 00000000..9cba69ee
--- /dev/null
+++ b/src/block/Cargo.toml
@@ -0,0 +1,38 @@
+[package]
+name = "garage_block"
+version = "0.7.0"
+authors = ["Alex Auvolat <alex@adnab.me>"]
+edition = "2018"
+license = "AGPL-3.0"
+description = "Block manager for the Garage object store"
+repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
+readme = "../../README.md"
+
+[lib]
+path = "lib.rs"
+
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+garage_rpc = { version = "0.7.0", path = "../rpc" }
+garage_util = { version = "0.7.0", path = "../util" }
+garage_table = { version = "0.7.0", path = "../table" }
+
+opentelemetry = "0.17"
+
+async-trait = "0.1.7"
+bytes = "1.0"
+hex = "0.4"
+tracing = "0.1.30"
+rand = "0.8"
+zstd = { version = "0.9", default-features = false }
+
+sled = "0.34"
+
+rmp-serde = "0.15"
+serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
+serde_bytes = "0.11"
+
+futures = "0.3"
+futures-util = "0.3"
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/block/lib.rs b/src/block/lib.rs
new file mode 100644
index 00000000..47ff402d
--- /dev/null
+++ b/src/block/lib.rs
@@ -0,0 +1,6 @@
+#[macro_use]
+extern crate tracing;
+
+pub mod manager;
+
+mod metrics;
diff --git a/src/model/block.rs b/src/block/manager.rs
index a41daa64..f047e1d3 100644
--- a/src/model/block.rs
+++ b/src/block/manager.rs
@@ -3,7 +3,6 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
-use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder};
@@ -31,9 +30,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
-use crate::block_metrics::*;
-use crate::block_ref_table::*;
-use crate::garage::Garage;
+use crate::metrics::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@@ -151,6 +148,8 @@ pub struct BlockManager {
pub replication: TableShardedReplication,
/// Directory in which block are stored
pub data_dir: PathBuf,
+ /// Zstd compression level
+ compression_level: Option<i32>,
mutation_lock: Mutex<BlockManagerLocked>,
@@ -162,7 +161,6 @@ pub struct BlockManager {
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
- pub(crate) garage: ArcSwapOption<Garage>,
metrics: BlockManagerMetrics,
}
@@ -176,6 +174,7 @@ impl BlockManager {
pub fn new(
db: &sled::Db,
data_dir: PathBuf,
+ compression_level: Option<i32>,
replication: TableShardedReplication,
system: Arc<System>,
) -> Arc<Self> {
@@ -204,6 +203,7 @@ impl BlockManager {
let block_manager = Arc::new(Self {
replication,
data_dir,
+ compression_level,
mutation_lock: Mutex::new(manager_locked),
rc,
resync_queue,
@@ -211,7 +211,6 @@ impl BlockManager {
resync_errors,
system,
endpoint,
- garage: ArcSwapOption::from(None),
metrics,
});
block_manager.endpoint.set_handler(block_manager.clone());
@@ -257,14 +256,7 @@ impl BlockManager {
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
- let compression_level = self
- .garage
- .load()
- .as_ref()
- .unwrap()
- .config
- .compression_level;
- let data = DataBlock::from_buffer(data, compression_level);
+ let data = DataBlock::from_buffer(data, self.compression_level);
self.system
.rpc
.try_call_many(
@@ -286,18 +278,10 @@ impl BlockManager {
/// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table.
- let garage = self.garage.load_full().unwrap();
- let mut last_hash = None;
- for (i, entry) in garage.block_ref_table.data.store.iter().enumerate() {
- let (_k, v_bytes) = entry?;
- let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?;
- if Some(&block_ref.block) == last_hash.as_ref() {
- continue;
- }
- if !block_ref.deleted.get() {
- last_hash = Some(block_ref.block);
- self.put_to_resync(&block_ref.block, Duration::from_secs(0))?;
- }
+ for (i, entry) in self.rc.iter().enumerate() {
+ let (hash, _) = entry?;
+ let hash = Hash::try_from(&hash[..]).unwrap();
+ self.put_to_resync(&hash, Duration::from_secs(0))?;
if i & 0xFF == 0 && *must_exit.borrow() {
return Ok(());
}
diff --git a/src/model/block_metrics.rs b/src/block/metrics.rs
index f0f541a3..f0f541a3 100644
--- a/src/model/block_metrics.rs
+++ b/src/block/metrics.rs
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 37c3535e..58c9e782 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -98,8 +98,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
// Await for netapp RPC system to end
run_system.await?;
- // Break last reference cycles so that stuff can terminate properly
- garage.break_reference_cycles();
+ // Drop all references so that stuff can terminate properly
drop(garage);
// Await for all background tasks to end
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 9d9cdb25..e542b4df 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
+garage_block = { version = "0.7.0", path = "../block" }
garage_util = { version = "0.7.0", path = "../util" }
garage_model_050 = { package = "garage_model", version = "0.5.1" }
diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs
index 7dc973d5..0ab3d7e8 100644
--- a/src/model/block_ref_table.rs
+++ b/src/model/block_ref_table.rs
@@ -6,7 +6,7 @@ use garage_util::data::*;
use garage_table::crdt::Crdt;
use garage_table::*;
-use crate::block::*;
+use garage_block::manager::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 78b4433a..3f2605f1 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -11,8 +11,8 @@ use garage_table::replication::ReplicationMode;
use garage_table::replication::TableFullReplication;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
+use garage_block::manager::*;
-use crate::block::*;
use crate::block_ref_table::*;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
@@ -87,7 +87,10 @@ impl Garage {
info!("Initialize block manager...");
let block_manager =
- BlockManager::new(&db, config.data_dir.clone(), data_rep_param, system.clone());
+ BlockManager::new(&db,
+ config.data_dir.clone(),
+ config.compression_level,
+ data_rep_param, system.clone());
info!("Initialize block_ref_table...");
let block_ref_table = Table::new(
@@ -151,17 +154,11 @@ impl Garage {
});
info!("Start block manager background thread...");
- garage.block_manager.garage.swap(Some(garage.clone()));
garage.block_manager.clone().spawn_background_worker();
garage
}
- /// Use this for shutdown
- pub fn break_reference_cycles(&self) {
- self.block_manager.garage.swap(None);
- }
-
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
helper::bucket::BucketHelper(self)
}
diff --git a/src/model/lib.rs b/src/model/lib.rs
index 6da86fc6..05a4cdc7 100644
--- a/src/model/lib.rs
+++ b/src/model/lib.rs
@@ -10,9 +10,6 @@ pub mod key_table;
pub mod object_table;
pub mod version_table;
-pub mod block;
-mod block_metrics;
-
pub mod garage;
pub mod helper;
pub mod migrate;