diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/Cargo.toml | 1 | ||||
-rw-r--r-- | src/api/s3_put.rs | 2 | ||||
-rw-r--r-- | src/block/Cargo.toml | 38 | ||||
-rw-r--r-- | src/block/lib.rs | 6 | ||||
-rw-r--r-- | src/block/manager.rs (renamed from src/model/block.rs) | 36 | ||||
-rw-r--r-- | src/block/metrics.rs (renamed from src/model/block_metrics.rs) | 0 | ||||
-rw-r--r-- | src/garage/server.rs | 3 | ||||
-rw-r--r-- | src/model/Cargo.toml | 1 | ||||
-rw-r--r-- | src/model/block_ref_table.rs | 2 | ||||
-rw-r--r-- | src/model/garage.rs | 13 | ||||
-rw-r--r-- | src/model/lib.rs | 3 |
11 files changed, 64 insertions, 41 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index d61a986f..5e96b081 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -16,6 +16,7 @@ path = "lib.rs" [dependencies] garage_model = { version = "0.7.0", path = "../model" } garage_table = { version = "0.7.0", path = "../table" } +garage_block = { version = "0.7.0", path = "../block" } garage_util = { version = "0.7.0", path = "../util" } base64 = "0.13" diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 08e490ae..ed0bf00b 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -13,7 +13,7 @@ use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; -use garage_model::block::INLINE_THRESHOLD; +use garage_block::manager::INLINE_THRESHOLD; use garage_model::block_ref_table::*; use garage_model::garage::Garage; use garage_model::object_table::*; diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml new file mode 100644 index 00000000..9cba69ee --- /dev/null +++ b/src/block/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "garage_block" +version = "0.7.0" +authors = ["Alex Auvolat <alex@adnab.me>"] +edition = "2018" +license = "AGPL-3.0" +description = "Block manager for the Garage object store" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +garage_rpc = { version = "0.7.0", path = "../rpc" } +garage_util = { version = "0.7.0", path = "../util" } +garage_table = { version = "0.7.0", path = "../table" } + +opentelemetry = "0.17" + +async-trait = "0.1.7" +bytes = "1.0" +hex = "0.4" +tracing = "0.1.30" +rand = "0.8" +zstd = { version = "0.9", default-features = false } + +sled = "0.34" + +rmp-serde = "0.15" +serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } +serde_bytes = "0.11" + +futures = "0.3" +futures-util = "0.3" +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/block/lib.rs b/src/block/lib.rs new file mode 100644 index 00000000..47ff402d --- /dev/null +++ b/src/block/lib.rs @@ -0,0 +1,6 @@ +#[macro_use] +extern crate tracing; + +pub mod manager; + +mod metrics; diff --git a/src/model/block.rs b/src/block/manager.rs index a41daa64..f047e1d3 100644 --- a/src/model/block.rs +++ b/src/block/manager.rs @@ -3,7 +3,6 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; -use arc_swap::ArcSwapOption; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use zstd::stream::{decode_all as zstd_decode, Encoder}; @@ -31,9 +30,7 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; -use crate::block_metrics::*; -use crate::block_ref_table::*; -use crate::garage::Garage; +use crate::metrics::*; /// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; @@ -151,6 +148,8 @@ pub struct BlockManager { pub replication: TableShardedReplication, /// Directory in which block are stored pub data_dir: PathBuf, + /// Zstd compression level + compression_level: Option<i32>, mutation_lock: Mutex<BlockManagerLocked>, @@ -162,7 +161,6 @@ pub struct BlockManager { system: Arc<System>, endpoint: Arc<Endpoint<BlockRpc, Self>>, - pub(crate) garage: ArcSwapOption<Garage>, metrics: BlockManagerMetrics, } @@ -176,6 +174,7 @@ impl BlockManager { pub fn new( db: &sled::Db, data_dir: PathBuf, + compression_level: Option<i32>, replication: TableShardedReplication, system: Arc<System>, ) -> Arc<Self> { @@ -204,6 +203,7 @@ impl BlockManager { let block_manager = Arc::new(Self { replication, data_dir, + compression_level, mutation_lock: Mutex::new(manager_locked), rc, resync_queue, @@ -211,7 +211,6 @@ impl BlockManager { resync_errors, system, endpoint, - garage: ArcSwapOption::from(None), metrics, }); block_manager.endpoint.set_handler(block_manager.clone()); @@ -257,14 +256,7 @@ impl BlockManager { /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - let compression_level = self - .garage - .load() - .as_ref() - .unwrap() - .config - .compression_level; - let data = DataBlock::from_buffer(data, compression_level); + let data = DataBlock::from_buffer(data, self.compression_level); self.system .rpc .try_call_many( @@ -286,18 +278,10 @@ impl BlockManager { /// to fix any mismatch between the two. pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> { // 1. Repair blocks from RC table. - let garage = self.garage.load_full().unwrap(); - let mut last_hash = None; - for (i, entry) in garage.block_ref_table.data.store.iter().enumerate() { - let (_k, v_bytes) = entry?; - let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?; - if Some(&block_ref.block) == last_hash.as_ref() { - continue; - } - if !block_ref.deleted.get() { - last_hash = Some(block_ref.block); - self.put_to_resync(&block_ref.block, Duration::from_secs(0))?; - } + for (i, entry) in self.rc.iter().enumerate() { + let (hash, _) = entry?; + let hash = Hash::try_from(&hash[..]).unwrap(); + self.put_to_resync(&hash, Duration::from_secs(0))?; if i & 0xFF == 0 && *must_exit.borrow() { return Ok(()); } diff --git a/src/model/block_metrics.rs b/src/block/metrics.rs index f0f541a3..f0f541a3 100644 --- a/src/model/block_metrics.rs +++ b/src/block/metrics.rs diff --git a/src/garage/server.rs b/src/garage/server.rs index 37c3535e..58c9e782 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -98,8 +98,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { // Await for netapp RPC system to end run_system.await?; - // Break last reference cycles so that stuff can terminate properly - garage.break_reference_cycles(); + // Drop all references so that stuff can terminate properly drop(garage); // Await for all background tasks to end diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 9d9cdb25..e542b4df 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -16,6 +16,7 @@ path = "lib.rs" [dependencies] garage_rpc = { version = "0.7.0", path = "../rpc" } garage_table = { version = "0.7.0", path = "../table" } +garage_block = { version = "0.7.0", path = "../block" } garage_util = { version = "0.7.0", path = "../util" } garage_model_050 = { package = "garage_model", version = "0.5.1" } diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index 7dc973d5..0ab3d7e8 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -6,7 +6,7 @@ use garage_util::data::*; use garage_table::crdt::Crdt; use garage_table::*; -use crate::block::*; +use garage_block::manager::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BlockRef { diff --git a/src/model/garage.rs b/src/model/garage.rs index 78b4433a..3f2605f1 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -11,8 +11,8 @@ use garage_table::replication::ReplicationMode; use garage_table::replication::TableFullReplication; use garage_table::replication::TableShardedReplication; use garage_table::*; +use garage_block::manager::*; -use crate::block::*; use crate::block_ref_table::*; use crate::bucket_alias_table::*; use crate::bucket_table::*; @@ -87,7 +87,10 @@ impl Garage { info!("Initialize block manager..."); let block_manager = - BlockManager::new(&db, config.data_dir.clone(), data_rep_param, system.clone()); + BlockManager::new(&db, + config.data_dir.clone(), + config.compression_level, + data_rep_param, system.clone()); info!("Initialize block_ref_table..."); let block_ref_table = Table::new( @@ -151,17 +154,11 @@ impl Garage { }); info!("Start block manager background thread..."); - garage.block_manager.garage.swap(Some(garage.clone())); garage.block_manager.clone().spawn_background_worker(); garage } - /// Use this for shutdown - pub fn break_reference_cycles(&self) { - self.block_manager.garage.swap(None); - } - pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { helper::bucket::BucketHelper(self) } diff --git a/src/model/lib.rs b/src/model/lib.rs index 6da86fc6..05a4cdc7 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -10,9 +10,6 @@ pub mod key_table; pub mod object_table; pub mod version_table; -pub mod block; -mod block_metrics; - pub mod garage; pub mod helper; pub mod migrate; |