aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/Cargo.toml1
-rw-r--r--src/table/gc.rs13
-rw-r--r--src/table/merkle.rs12
-rw-r--r--src/table/sync.rs43
-rw-r--r--src/table/table.rs22
5 files changed, 54 insertions, 37 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 38c6b41c..a8d9b5e6 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -21,6 +21,7 @@ garage_util = { version = "0.8.0", path = "../util" }
opentelemetry = "0.17"
async-trait = "0.1.7"
+arc-swap = "1.0"
bytes = "1.0"
hex = "0.4"
hexdump = "0.1"
diff --git a/src/table/gc.rs b/src/table/gc.rs
index cfdc9d2d..c83c2050 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -54,24 +54,27 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
+ pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME));
let gc = Arc::new(Self {
- system: system.clone(),
+ system,
data,
endpoint,
});
-
gc.endpoint.set_handler(gc.clone());
- system.background.spawn_worker(GcWorker::new(gc.clone()));
-
gc
}
+ pub(crate) fn spawn_workers(self: &Arc<Self>) {
+ self.system
+ .background
+ .spawn_worker(GcWorker::new(self.clone()));
+ }
+
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index bcf9f9d7..0fe7d2cb 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -70,17 +70,17 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> {
+ pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
- let ret = Arc::new(Self {
+ Arc::new(Self {
data,
empty_node_hash,
- });
-
- background.spawn_worker(MerkleWorker(ret.clone()));
+ })
+ }
- ret
+ pub(crate) fn spawn_workers(self: &Arc<Self>, background: &BackgroundRunner) {
+ background.spawn_worker(MerkleWorker(self.clone()));
}
fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
diff --git a/src/table/sync.rs b/src/table/sync.rs
index af7aa640..7008a383 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
@@ -13,7 +14,7 @@ use tokio::sync::{mpsc, watch};
use garage_util::background::*;
use garage_util::data::*;
-use garage_util::error::Error;
+use garage_util::error::{Error, OkOrMessage};
use garage_rpc::ring::*;
use garage_rpc::system::System;
@@ -32,7 +33,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- add_full_sync_tx: mpsc::UnboundedSender<()>,
+ add_full_sync_tx: ArcSwapOption<mpsc::UnboundedSender<()>>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@@ -65,7 +66,7 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(
+ pub(crate) fn new(
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
@@ -74,34 +75,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
- let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
-
let syncer = Arc::new(Self {
- system: system.clone(),
+ system,
data,
merkle,
- add_full_sync_tx,
+ add_full_sync_tx: ArcSwapOption::new(None),
endpoint,
});
-
syncer.endpoint.set_handler(syncer.clone());
- system.background.spawn_worker(SyncWorker {
- syncer: syncer.clone(),
- ring_recv: system.ring.clone(),
- ring: system.ring.borrow().clone(),
+ syncer
+ }
+
+ pub(crate) fn spawn_workers(self: &Arc<Self>) {
+ let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
+ self.add_full_sync_tx
+ .store(Some(Arc::new(add_full_sync_tx)));
+
+ self.system.background.spawn_worker(SyncWorker {
+ syncer: self.clone(),
+ ring_recv: self.system.ring.clone(),
+ ring: self.system.ring.borrow().clone(),
add_full_sync_rx,
todo: vec![],
next_full_sync: Instant::now() + Duration::from_secs(20),
});
-
- syncer
}
- pub fn add_full_sync(&self) {
- if self.add_full_sync_tx.send(()).is_err() {
- error!("({}) Could not add full sync", F::TABLE_NAME);
- }
+ pub fn add_full_sync(&self) -> Result<(), Error> {
+ let tx = self.add_full_sync_tx.load();
+ let tx = tx
+ .as_ref()
+ .ok_or_message("table sync worker is not running")?;
+ tx.send(()).ok_or_message("send error")?;
+ Ok(())
}
// ----
diff --git a/src/table/table.rs b/src/table/table.rs
index c8e0576e..cb200ef2 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -36,6 +36,7 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
+ gc: Arc<TableGc<F, R>>,
endpoint: Arc<Endpoint<TableRpc<F>, Self>>,
}
@@ -76,29 +77,34 @@ where
let data = TableData::new(system.clone(), instance, replication, db);
- let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
+ let merkle_updater = MerkleUpdater::new(data.clone());
- let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone());
- TableGc::launch(system.clone(), data.clone());
+ let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
+ let gc = TableGc::new(system.clone(), data.clone());
let table = Arc::new(Self {
system,
data,
merkle_updater,
+ gc,
syncer,
endpoint,
});
- table
- .system
- .background
- .spawn_worker(InsertQueueWorker(table.clone()));
-
table.endpoint.set_handler(table.clone());
table
}
+ pub fn spawn_workers(self: &Arc<Self>) {
+ self.merkle_updater.spawn_workers(&self.system.background);
+ self.syncer.spawn_workers();
+ self.gc.spawn_workers();
+ self.system
+ .background
+ .spawn_worker(InsertQueueWorker(self.clone()));
+ }
+
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert", F::TABLE_NAME));