diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/Cargo.toml | 1 | ||||
-rw-r--r-- | src/table/gc.rs | 13 | ||||
-rw-r--r-- | src/table/merkle.rs | 12 | ||||
-rw-r--r-- | src/table/sync.rs | 43 | ||||
-rw-r--r-- | src/table/table.rs | 22 |
5 files changed, 54 insertions, 37 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 38c6b41c..a8d9b5e6 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -21,6 +21,7 @@ garage_util = { version = "0.8.0", path = "../util" } opentelemetry = "0.17" async-trait = "0.1.7" +arc-swap = "1.0" bytes = "1.0" hex = "0.4" hexdump = "0.1" diff --git a/src/table/gc.rs b/src/table/gc.rs index cfdc9d2d..c83c2050 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -54,24 +54,27 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { + pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> { let endpoint = system .netapp .endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME)); let gc = Arc::new(Self { - system: system.clone(), + system, data, endpoint, }); - gc.endpoint.set_handler(gc.clone()); - system.background.spawn_worker(GcWorker::new(gc.clone())); - gc } + pub(crate) fn spawn_workers(self: &Arc<Self>) { + self.system + .background + .spawn_worker(GcWorker::new(self.clone())); + } + async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { let now = now_msec(); diff --git a/src/table/merkle.rs b/src/table/merkle.rs index bcf9f9d7..0fe7d2cb 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -70,17 +70,17 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> { + pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); - let ret = Arc::new(Self { + Arc::new(Self { data, empty_node_hash, - }); - - background.spawn_worker(MerkleWorker(ret.clone())); + }) + } - ret + pub(crate) fn spawn_workers(self: &Arc<Self>, background: &BackgroundRunner) { + background.spawn_worker(MerkleWorker(self.clone())); } fn updater_loop_iter(&self) -> Result<WorkerState, Error> { diff --git a/src/table/sync.rs b/src/table/sync.rs index af7aa640..7008a383 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures_util::stream::*; use opentelemetry::KeyValue; @@ -13,7 +14,7 @@ use tokio::sync::{mpsc, watch}; use garage_util::background::*; use garage_util::data::*; -use garage_util::error::Error; +use garage_util::error::{Error, OkOrMessage}; use garage_rpc::ring::*; use garage_rpc::system::System; @@ -32,7 +33,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, - add_full_sync_tx: mpsc::UnboundedSender<()>, + add_full_sync_tx: ArcSwapOption<mpsc::UnboundedSender<()>>, endpoint: Arc<Endpoint<SyncRpc, Self>>, } @@ -65,7 +66,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch( + pub(crate) fn new( system: Arc<System>, data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, @@ -74,34 +75,40 @@ where .netapp .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); - let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); - let syncer = Arc::new(Self { - system: system.clone(), + system, data, merkle, - add_full_sync_tx, + add_full_sync_tx: ArcSwapOption::new(None), endpoint, }); - syncer.endpoint.set_handler(syncer.clone()); - system.background.spawn_worker(SyncWorker { - syncer: syncer.clone(), - ring_recv: system.ring.clone(), - ring: system.ring.borrow().clone(), + syncer + } + + pub(crate) fn spawn_workers(self: &Arc<Self>) { + let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); + self.add_full_sync_tx + .store(Some(Arc::new(add_full_sync_tx))); + + self.system.background.spawn_worker(SyncWorker { + syncer: self.clone(), + ring_recv: self.system.ring.clone(), + ring: self.system.ring.borrow().clone(), add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), }); - - syncer } - pub fn add_full_sync(&self) { - if self.add_full_sync_tx.send(()).is_err() { - error!("({}) Could not add full sync", F::TABLE_NAME); - } + pub fn add_full_sync(&self) -> Result<(), Error> { + let tx = self.add_full_sync_tx.load(); + let tx = tx + .as_ref() + .ok_or_message("table sync worker is not running")?; + tx.send(()).ok_or_message("send error")?; + Ok(()) } // ---- diff --git a/src/table/table.rs b/src/table/table.rs index c8e0576e..cb200ef2 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -36,6 +36,7 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { pub data: Arc<TableData<F, R>>, pub merkle_updater: Arc<MerkleUpdater<F, R>>, pub syncer: Arc<TableSyncer<F, R>>, + gc: Arc<TableGc<F, R>>, endpoint: Arc<Endpoint<TableRpc<F>, Self>>, } @@ -76,29 +77,34 @@ where let data = TableData::new(system.clone(), instance, replication, db); - let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); + let merkle_updater = MerkleUpdater::new(data.clone()); - let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone()); - TableGc::launch(system.clone(), data.clone()); + let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone()); + let gc = TableGc::new(system.clone(), data.clone()); let table = Arc::new(Self { system, data, merkle_updater, + gc, syncer, endpoint, }); - table - .system - .background - .spawn_worker(InsertQueueWorker(table.clone())); - table.endpoint.set_handler(table.clone()); table } + pub fn spawn_workers(self: &Arc<Self>) { + self.merkle_updater.spawn_workers(&self.system.background); + self.syncer.spawn_workers(); + self.gc.spawn_workers(); + self.system + .background + .spawn_worker(InsertQueueWorker(self.clone())); + } + pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} insert", F::TABLE_NAME)); |