aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--src/block/manager.rs31
-rw-r--r--src/garage/admin.rs4
-rw-r--r--src/garage/repair/online.rs15
-rw-r--r--src/garage/server.rs3
-rw-r--r--src/model/garage.rs21
-rw-r--r--src/model/index_counter.rs4
-rw-r--r--src/table/Cargo.toml1
-rw-r--r--src/table/gc.rs13
-rw-r--r--src/table/merkle.rs12
-rw-r--r--src/table/sync.rs43
-rw-r--r--src/table/table.rs22
12 files changed, 112 insertions, 58 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 40dac806..a8751d17 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1243,6 +1243,7 @@ dependencies = [
name = "garage_table"
version = "0.8.0"
dependencies = [
+ "arc-swap",
"async-trait",
"bytes",
"futures",
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 28523a93..ffb9de9a 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,6 +3,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
@@ -87,7 +88,7 @@ pub struct BlockManager {
pub(crate) metrics: BlockManagerMetrics,
- tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>,
+ tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@@ -126,8 +127,6 @@ impl BlockManager {
let metrics =
BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
- let (scrub_tx, scrub_rx) = mpsc::channel(1);
-
let block_manager = Arc::new(Self {
replication,
data_dir,
@@ -138,21 +137,26 @@ impl BlockManager {
system,
endpoint,
metrics,
- tx_scrub_command: scrub_tx,
+ tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
+ block_manager
+ }
+
+ pub fn spawn_workers(self: &Arc<Self>) {
// Spawn a bunch of resync workers
for index in 0..MAX_RESYNC_WORKERS {
- let worker = ResyncWorker::new(index, block_manager.clone());
- block_manager.system.background.spawn_worker(worker);
+ let worker = ResyncWorker::new(index, self.clone());
+ self.system.background.spawn_worker(worker);
}
// Spawn scrub worker
- let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx);
- block_manager.system.background.spawn_worker(scrub_worker);
-
- block_manager
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ self.system
+ .background
+ .spawn_worker(ScrubWorker::new(self.clone(), scrub_rx));
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -325,8 +329,11 @@ impl BlockManager {
}
/// Send command to start/stop/manager scrub worker
- pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
- let _ = self.tx_scrub_command.send(cmd).await;
+ pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) -> Result<(), Error> {
+ let tx = self.tx_scrub_command.load();
+ let tx = tx.as_ref().ok_or_message("scrub worker is not running")?;
+ tx.send(cmd).await.ok_or_message("send error")?;
+ Ok(())
}
/// Get the reference count of a block
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 1ca3698a..96d838d5 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -759,7 +759,7 @@ impl AdminRpcHandler {
)))
}
} else {
- launch_online_repair(self.garage.clone(), opt).await;
+ launch_online_repair(self.garage.clone(), opt).await?;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
@@ -944,7 +944,7 @@ impl AdminRpcHandler {
self.garage
.block_manager
.send_scrub_command(scrub_command)
- .await;
+ .await?;
Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
}
WorkerSetCmd::ResyncWorkerCount { worker_count } => {
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 42221c2a..2a8e6298 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -15,15 +15,15 @@ use garage_util::error::Error;
use crate::*;
-pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
+pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), Error> {
match opt.what {
RepairWhat::Tables => {
info!("Launching a full sync of tables");
- garage.bucket_table.syncer.add_full_sync();
- garage.object_table.syncer.add_full_sync();
- garage.version_table.syncer.add_full_sync();
- garage.block_ref_table.syncer.add_full_sync();
- garage.key_table.syncer.add_full_sync();
+ garage.bucket_table.syncer.add_full_sync()?;
+ garage.object_table.syncer.add_full_sync()?;
+ garage.version_table.syncer.add_full_sync()?;
+ garage.block_ref_table.syncer.add_full_sync()?;
+ garage.key_table.syncer.add_full_sync()?;
}
RepairWhat::Versions => {
info!("Repairing the versions table");
@@ -56,9 +56,10 @@ pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
}
};
info!("Sending command to scrub worker: {:?}", cmd);
- garage.block_manager.send_scrub_command(cmd).await;
+ garage.block_manager.send_scrub_command(cmd).await?;
}
}
+ Ok(())
}
// ----
diff --git a/src/garage/server.rs b/src/garage/server.rs
index d4099a97..8e29f6ec 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -42,6 +42,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing Garage main data store...");
let garage = Garage::new(config.clone(), background)?;
+ info!("Spawning Garage workers...");
+ garage.spawn_workers();
+
if config.admin.trace_sink.is_some() {
info!("Initialize tracing...");
diff --git a/src/model/garage.rs b/src/model/garage.rs
index e34d034f..9ae6af82 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -273,6 +273,22 @@ impl Garage {
}))
}
+ pub fn spawn_workers(&self) {
+ self.block_manager.spawn_workers();
+
+ self.bucket_table.spawn_workers();
+ self.bucket_alias_table.spawn_workers();
+ self.key_table.spawn_workers();
+
+ self.object_table.spawn_workers();
+ self.object_counter_table.spawn_workers();
+ self.version_table.spawn_workers();
+ self.block_ref_table.spawn_workers();
+
+ #[cfg(feature = "k2v")]
+ self.k2v.spawn_workers();
+ }
+
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
helper::bucket::BucketHelper(self)
}
@@ -307,4 +323,9 @@ impl GarageK2V {
rpc,
}
}
+
+ pub fn spawn_workers(&self) {
+ self.item_table.spawn_workers();
+ self.counter_table.spawn_workers();
+ }
}
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 9c8e00c2..d907e947 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -164,6 +164,10 @@ impl<T: CountedItem> IndexCounter<T> {
})
}
+ pub fn spawn_workers(&self) {
+ self.table.spawn_workers();
+ }
+
pub fn count(
&self,
tx: &mut db::Transaction,
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 38c6b41c..a8d9b5e6 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -21,6 +21,7 @@ garage_util = { version = "0.8.0", path = "../util" }
opentelemetry = "0.17"
async-trait = "0.1.7"
+arc-swap = "1.0"
bytes = "1.0"
hex = "0.4"
hexdump = "0.1"
diff --git a/src/table/gc.rs b/src/table/gc.rs
index cfdc9d2d..c83c2050 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -54,24 +54,27 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
+ pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME));
let gc = Arc::new(Self {
- system: system.clone(),
+ system,
data,
endpoint,
});
-
gc.endpoint.set_handler(gc.clone());
- system.background.spawn_worker(GcWorker::new(gc.clone()));
-
gc
}
+ pub(crate) fn spawn_workers(self: &Arc<Self>) {
+ self.system
+ .background
+ .spawn_worker(GcWorker::new(self.clone()));
+ }
+
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index bcf9f9d7..0fe7d2cb 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -70,17 +70,17 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> {
+ pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
- let ret = Arc::new(Self {
+ Arc::new(Self {
data,
empty_node_hash,
- });
-
- background.spawn_worker(MerkleWorker(ret.clone()));
+ })
+ }
- ret
+ pub(crate) fn spawn_workers(self: &Arc<Self>, background: &BackgroundRunner) {
+ background.spawn_worker(MerkleWorker(self.clone()));
}
fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
diff --git a/src/table/sync.rs b/src/table/sync.rs
index af7aa640..7008a383 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
@@ -13,7 +14,7 @@ use tokio::sync::{mpsc, watch};
use garage_util::background::*;
use garage_util::data::*;
-use garage_util::error::Error;
+use garage_util::error::{Error, OkOrMessage};
use garage_rpc::ring::*;
use garage_rpc::system::System;
@@ -32,7 +33,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- add_full_sync_tx: mpsc::UnboundedSender<()>,
+ add_full_sync_tx: ArcSwapOption<mpsc::UnboundedSender<()>>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@@ -65,7 +66,7 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(
+ pub(crate) fn new(
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
@@ -74,34 +75,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
- let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
-
let syncer = Arc::new(Self {
- system: system.clone(),
+ system,
data,
merkle,
- add_full_sync_tx,
+ add_full_sync_tx: ArcSwapOption::new(None),
endpoint,
});
-
syncer.endpoint.set_handler(syncer.clone());
- system.background.spawn_worker(SyncWorker {
- syncer: syncer.clone(),
- ring_recv: system.ring.clone(),
- ring: system.ring.borrow().clone(),
+ syncer
+ }
+
+ pub(crate) fn spawn_workers(self: &Arc<Self>) {
+ let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
+ self.add_full_sync_tx
+ .store(Some(Arc::new(add_full_sync_tx)));
+
+ self.system.background.spawn_worker(SyncWorker {
+ syncer: self.clone(),
+ ring_recv: self.system.ring.clone(),
+ ring: self.system.ring.borrow().clone(),
add_full_sync_rx,
todo: vec![],
next_full_sync: Instant::now() + Duration::from_secs(20),
});
-
- syncer
}
- pub fn add_full_sync(&self) {
- if self.add_full_sync_tx.send(()).is_err() {
- error!("({}) Could not add full sync", F::TABLE_NAME);
- }
+ pub fn add_full_sync(&self) -> Result<(), Error> {
+ let tx = self.add_full_sync_tx.load();
+ let tx = tx
+ .as_ref()
+ .ok_or_message("table sync worker is not running")?;
+ tx.send(()).ok_or_message("send error")?;
+ Ok(())
}
// ----
diff --git a/src/table/table.rs b/src/table/table.rs
index c8e0576e..cb200ef2 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -36,6 +36,7 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
+ gc: Arc<TableGc<F, R>>,
endpoint: Arc<Endpoint<TableRpc<F>, Self>>,
}
@@ -76,29 +77,34 @@ where
let data = TableData::new(system.clone(), instance, replication, db);
- let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
+ let merkle_updater = MerkleUpdater::new(data.clone());
- let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone());
- TableGc::launch(system.clone(), data.clone());
+ let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
+ let gc = TableGc::new(system.clone(), data.clone());
let table = Arc::new(Self {
system,
data,
merkle_updater,
+ gc,
syncer,
endpoint,
});
- table
- .system
- .background
- .spawn_worker(InsertQueueWorker(table.clone()));
-
table.endpoint.set_handler(table.clone());
table
}
+ pub fn spawn_workers(self: &Arc<Self>) {
+ self.merkle_updater.spawn_workers(&self.system.background);
+ self.syncer.spawn_workers();
+ self.gc.spawn_workers();
+ self.system
+ .background
+ .spawn_worker(InsertQueueWorker(self.clone()));
+ }
+
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert", F::TABLE_NAME));