diff options
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | src/block/manager.rs | 31 | ||||
-rw-r--r-- | src/garage/admin.rs | 4 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 15 | ||||
-rw-r--r-- | src/garage/server.rs | 3 | ||||
-rw-r--r-- | src/model/garage.rs | 21 | ||||
-rw-r--r-- | src/model/index_counter.rs | 4 | ||||
-rw-r--r-- | src/table/Cargo.toml | 1 | ||||
-rw-r--r-- | src/table/gc.rs | 13 | ||||
-rw-r--r-- | src/table/merkle.rs | 12 | ||||
-rw-r--r-- | src/table/sync.rs | 43 | ||||
-rw-r--r-- | src/table/table.rs | 22 |
12 files changed, 112 insertions, 58 deletions
@@ -1243,6 +1243,7 @@ dependencies = [ name = "garage_table" version = "0.8.0" dependencies = [ + "arc-swap", "async-trait", "bytes", "futures", diff --git a/src/block/manager.rs b/src/block/manager.rs index 28523a93..ffb9de9a 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,6 +3,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; @@ -87,7 +88,7 @@ pub struct BlockManager { pub(crate) metrics: BlockManagerMetrics, - tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>, + tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -126,8 +127,6 @@ impl BlockManager { let metrics = BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); - let (scrub_tx, scrub_rx) = mpsc::channel(1); - let block_manager = Arc::new(Self { replication, data_dir, @@ -138,21 +137,26 @@ impl BlockManager { system, endpoint, metrics, - tx_scrub_command: scrub_tx, + tx_scrub_command: ArcSwapOption::new(None), }); block_manager.endpoint.set_handler(block_manager.clone()); + block_manager + } + + pub fn spawn_workers(self: &Arc<Self>) { // Spawn a bunch of resync workers for index in 0..MAX_RESYNC_WORKERS { - let worker = ResyncWorker::new(index, block_manager.clone()); - block_manager.system.background.spawn_worker(worker); + let worker = ResyncWorker::new(index, self.clone()); + self.system.background.spawn_worker(worker); } // Spawn scrub worker - let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx); - block_manager.system.background.spawn_worker(scrub_worker); - - block_manager + let (scrub_tx, scrub_rx) = mpsc::channel(1); + self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); + self.system + .background + .spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); } /// Ask nodes that might have a (possibly compressed) block for it @@ -325,8 +329,11 @@ impl BlockManager { } /// Send command to start/stop/manager scrub worker - pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) { - let _ = self.tx_scrub_command.send(cmd).await; + pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) -> Result<(), Error> { + let tx = self.tx_scrub_command.load(); + let tx = tx.as_ref().ok_or_message("scrub worker is not running")?; + tx.send(cmd).await.ok_or_message("send error")?; + Ok(()) } /// Get the reference count of a block diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 1ca3698a..96d838d5 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -759,7 +759,7 @@ impl AdminRpcHandler { ))) } } else { - launch_online_repair(self.garage.clone(), opt).await; + launch_online_repair(self.garage.clone(), opt).await?; Ok(AdminRpc::Ok(format!( "Repair launched on {:?}", self.garage.system.id @@ -944,7 +944,7 @@ impl AdminRpcHandler { self.garage .block_manager .send_scrub_command(scrub_command) - .await; + .await?; Ok(AdminRpc::Ok("Scrub tranquility updated".into())) } WorkerSetCmd::ResyncWorkerCount { worker_count } => { diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 42221c2a..2a8e6298 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -15,15 +15,15 @@ use garage_util::error::Error; use crate::*; -pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) { +pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), Error> { match opt.what { RepairWhat::Tables => { info!("Launching a full sync of tables"); - garage.bucket_table.syncer.add_full_sync(); - garage.object_table.syncer.add_full_sync(); - garage.version_table.syncer.add_full_sync(); - garage.block_ref_table.syncer.add_full_sync(); - garage.key_table.syncer.add_full_sync(); + garage.bucket_table.syncer.add_full_sync()?; + garage.object_table.syncer.add_full_sync()?; + garage.version_table.syncer.add_full_sync()?; + garage.block_ref_table.syncer.add_full_sync()?; + garage.key_table.syncer.add_full_sync()?; } RepairWhat::Versions => { info!("Repairing the versions table"); @@ -56,9 +56,10 @@ pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) { } }; info!("Sending command to scrub worker: {:?}", cmd); - garage.block_manager.send_scrub_command(cmd).await; + garage.block_manager.send_scrub_command(cmd).await?; } } + Ok(()) } // ---- diff --git a/src/garage/server.rs b/src/garage/server.rs index d4099a97..8e29f6ec 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -42,6 +42,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initializing Garage main data store..."); let garage = Garage::new(config.clone(), background)?; + info!("Spawning Garage workers..."); + garage.spawn_workers(); + if config.admin.trace_sink.is_some() { info!("Initialize tracing..."); diff --git a/src/model/garage.rs b/src/model/garage.rs index e34d034f..9ae6af82 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -273,6 +273,22 @@ impl Garage { })) } + pub fn spawn_workers(&self) { + self.block_manager.spawn_workers(); + + self.bucket_table.spawn_workers(); + self.bucket_alias_table.spawn_workers(); + self.key_table.spawn_workers(); + + self.object_table.spawn_workers(); + self.object_counter_table.spawn_workers(); + self.version_table.spawn_workers(); + self.block_ref_table.spawn_workers(); + + #[cfg(feature = "k2v")] + self.k2v.spawn_workers(); + } + pub fn bucket_helper(&self) -> helper::bucket::BucketHelper { helper::bucket::BucketHelper(self) } @@ -307,4 +323,9 @@ impl GarageK2V { rpc, } } + + pub fn spawn_workers(&self) { + self.item_table.spawn_workers(); + self.counter_table.spawn_workers(); + } } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 9c8e00c2..d907e947 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -164,6 +164,10 @@ impl<T: CountedItem> IndexCounter<T> { }) } + pub fn spawn_workers(&self) { + self.table.spawn_workers(); + } + pub fn count( &self, tx: &mut db::Transaction, diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 38c6b41c..a8d9b5e6 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -21,6 +21,7 @@ garage_util = { version = "0.8.0", path = "../util" } opentelemetry = "0.17" async-trait = "0.1.7" +arc-swap = "1.0" bytes = "1.0" hex = "0.4" hexdump = "0.1" diff --git a/src/table/gc.rs b/src/table/gc.rs index cfdc9d2d..c83c2050 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -54,24 +54,27 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { + pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { let endpoint = system .netapp .endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME)); let gc = Arc::new(Self { - system: system.clone(), + system, data, endpoint, }); - gc.endpoint.set_handler(gc.clone()); - system.background.spawn_worker(GcWorker::new(gc.clone())); - gc } + pub(crate) fn spawn_workers(self: &Arc<Self>) { + self.system + .background + .spawn_worker(GcWorker::new(self.clone())); + } + async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { let now = now_msec(); diff --git a/src/table/merkle.rs b/src/table/merkle.rs index bcf9f9d7..0fe7d2cb 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -70,17 +70,17 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> { + pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); - let ret = Arc::new(Self { + Arc::new(Self { data, empty_node_hash, - }); - - background.spawn_worker(MerkleWorker(ret.clone())); + }) + } - ret + pub(crate) fn spawn_workers(self: &Arc<Self>, background: &BackgroundRunner) { + background.spawn_worker(MerkleWorker(self.clone())); } fn updater_loop_iter(&self) -> Result<WorkerState, Error> { diff --git a/src/table/sync.rs b/src/table/sync.rs index af7aa640..7008a383 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures_util::stream::*; use opentelemetry::KeyValue; @@ -13,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::error::Error; +use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; use garage_rpc::system::System; @@ -32,7 +33,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, - add_full_sync_tx: mpsc::UnboundedSender<()>, + add_full_sync_tx: ArcSwapOption<mpsc::UnboundedSender<()>>, endpoint: Arc<Endpoint<SyncRpc, Self>>, } @@ -65,7 +66,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch( + pub(crate) fn new( system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, @@ -74,34 +75,40 @@ where .netapp .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); - let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); - let syncer = Arc::new(Self { - system: system.clone(), + system, data, merkle, - add_full_sync_tx, + add_full_sync_tx: ArcSwapOption::new(None), endpoint, }); - syncer.endpoint.set_handler(syncer.clone()); - system.background.spawn_worker(SyncWorker { - syncer: syncer.clone(), - ring_recv: system.ring.clone(), - ring: system.ring.borrow().clone(), + syncer + } + + pub(crate) fn spawn_workers(self: &Arc<Self>) { + let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); + self.add_full_sync_tx + .store(Some(Arc::new(add_full_sync_tx))); + + self.system.background.spawn_worker(SyncWorker { + syncer: self.clone(), + ring_recv: self.system.ring.clone(), + ring: self.system.ring.borrow().clone(), add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), }); - - syncer } - pub fn add_full_sync(&self) { - if self.add_full_sync_tx.send(()).is_err() { - error!("({}) Could not add full sync", F::TABLE_NAME); - } + pub fn add_full_sync(&self) -> Result<(), Error> { + let tx = self.add_full_sync_tx.load(); + let tx = tx + .as_ref() + .ok_or_message("table sync worker is not running")?; + tx.send(()).ok_or_message("send error")?; + Ok(()) } // ---- diff --git a/src/table/table.rs b/src/table/table.rs index c8e0576e..cb200ef2 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -36,6 +36,7 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { pub data: Arc<TableData<F, R>>, pub merkle_updater: Arc<MerkleUpdater<F, R>>, pub syncer: Arc<TableSyncer<F, R>>, + gc: Arc<TableGc<F, R>>, endpoint: Arc<Endpoint<TableRpc<F>, Self>>, } @@ -76,29 +77,34 @@ where let data = TableData::new(system.clone(), instance, replication, db); - let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); + let merkle_updater = MerkleUpdater::new(data.clone()); - let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone()); - TableGc::launch(system.clone(), data.clone()); + let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone()); + let gc = TableGc::new(system.clone(), data.clone()); let table = Arc::new(Self { system, data, merkle_updater, + gc, syncer, endpoint, }); - table - .system - .background - .spawn_worker(InsertQueueWorker(table.clone())); - table.endpoint.set_handler(table.clone()); table } + pub fn spawn_workers(self: &Arc<Self>) { + self.merkle_updater.spawn_workers(&self.system.background); + self.syncer.spawn_workers(); + self.gc.spawn_workers(); + self.system + .background + .spawn_worker(InsertQueueWorker(self.clone())); + } + pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} insert", F::TABLE_NAME)); |