diff options
-rw-r--r-- | src/table/gc.rs | 80 | ||||
-rw-r--r-- | src/table/merkle.rs | 67 | ||||
-rw-r--r-- | src/table/sync.rs | 187 | ||||
-rw-r--r-- | src/util/background/job_worker.rs | 5 | ||||
-rw-r--r-- | src/util/background/mod.rs | 20 | ||||
-rw-r--r-- | src/util/background/worker.rs | 91 |
6 files changed, 248 insertions, 202 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs index e7fbbcb0..36124c2f 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use futures::future::join_all; -use futures::select; -use futures_util::future::*; use tokio::sync::watch; use garage_db::counted_tree_hack::CountedTree; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -69,35 +68,11 @@ where gc.endpoint.set_handler(gc.clone()); - let gc1 = gc.clone(); - system.background.spawn_worker( - format!("GC loop for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit), - ); + system.background.spawn_worker(GcWorker::new(gc.clone())); gc } - async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { - while !*must_exit.borrow() { - match self.gc_loop_iter().await { - Ok(None) => { - // Stuff was done, loop immediately - } - Ok(Some(wait_delay)) => { - // Nothing was done, wait specified delay. - select! { - _ = tokio::time::sleep(wait_delay).fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!("({}) Error doing GC: {}", F::TABLE_NAME, e); - } - } - } - } - async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { let now = now_msec(); @@ -328,6 +303,57 @@ where } } +struct GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + gc: Arc<TableGc<F, R>>, + wait_delay: Duration, +} + +impl<F, R> GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn new(gc: Arc<TableGc<F, R>>) -> Self { + Self { + gc, + wait_delay: Duration::from_secs(0), + } + } +} + +#[async_trait] +impl<F, R> Worker for GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("Table GC: {}", F::TABLE_NAME) + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + match self.gc.gc_loop_iter().await? { + None => Ok(WorkerStatus::Busy), + Some(delay) => { + self.wait_delay = delay; + Ok(WorkerStatus::Idle) + } + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + tokio::time::sleep(self.wait_delay).await; + WorkerStatus::Busy + } +} + /// An entry stored in the gc_todo Sled tree associated with the table /// Contains helper function for parsing, saving, and removing /// such entry in Sled diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 7685b193..d4d2717f 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -1,14 +1,13 @@ use std::sync::Arc; use std::time::Duration; -use futures::select; -use futures_util::future::*; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use garage_db as db; -use garage_util::background::BackgroundRunner; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -78,43 +77,17 @@ where empty_node_hash, }); - let ret2 = ret.clone(); - background.spawn_worker( - format!("Merkle tree updater for {}", F::TABLE_NAME), - |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit), - ); + background.spawn_worker(MerkleWorker(ret.clone())); ret } - async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { - while !*must_exit.borrow() { - match self.updater_loop_iter() { - Ok(true) => (), - Ok(false) => { - select! { - _ = self.data.merkle_todo_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!( - "({}) Error while updating Merkle tree item: {}", - F::TABLE_NAME, - e - ); - tokio::time::sleep(Duration::from_secs(10)).await; - } - } - } - } - - fn updater_loop_iter(&self) -> Result<bool, Error> { + fn updater_loop_iter(&self) -> Result<WorkerStatus, Error> { if let Some((key, valhash)) = self.data.merkle_todo.first()? { self.update_item(&key, &valhash)?; - Ok(true) + Ok(WorkerStatus::Busy) } else { - Ok(false) + Ok(WorkerStatus::Idle) } } @@ -325,6 +298,34 @@ where } } +struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; + +#[async_trait] +impl<F, R> Worker for MerkleWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("Merkle tree updater: {}", F::TABLE_NAME) + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + self.0.updater_loop_iter() + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + tokio::time::sleep(Duration::from_secs(10)).await; + WorkerStatus::Busy + } +} + impl MerkleNodeKey { fn encode(&self) -> Vec<u8> { let mut ret = Vec::with_capacity(2 + self.prefix.len()); diff --git a/src/table/sync.rs b/src/table/sync.rs index 4c83e991..be081d96 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -1,17 +1,17 @@ use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; -use futures::select; -use futures_util::future::*; use futures_util::stream::*; use opentelemetry::KeyValue; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; +use tokio::select; use tokio::sync::{mpsc, watch}; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -34,7 +34,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, - todo: Mutex<SyncTodo>, + add_full_sync_tx: mpsc::UnboundedSender<()>, endpoint: Arc<Endpoint<SyncRpc, Self>>, } @@ -52,10 +52,6 @@ impl Rpc for SyncRpc { type Response = Result<SyncRpc, Error>; } -struct SyncTodo { - todo: Vec<TodoPartition>, -} - #[derive(Debug, Clone)] struct TodoPartition { partition: Partition, @@ -80,118 +76,40 @@ where .netapp .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); - let todo = SyncTodo { todo: vec![] }; + let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); let syncer = Arc::new(Self { system: system.clone(), data, merkle, - todo: Mutex::new(todo), + add_full_sync_tx, endpoint, }); syncer.endpoint.set_handler(syncer.clone()); - let (busy_tx, busy_rx) = mpsc::unbounded_channel(); - - let s1 = syncer.clone(); - system.background.spawn_worker( - format!("table sync watcher for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), - ); - - let s2 = syncer.clone(); - system.background.spawn_worker( - format!("table syncer for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), - ); - - let s3 = syncer.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(20)).await; - s3.add_full_sync(); + system.background.spawn_worker(SyncWorker { + syncer: syncer.clone(), + ring_recv: system.ring.clone(), + ring: system.ring.borrow().clone(), + add_full_sync_rx, + todo: vec![], + next_full_sync: Instant::now() + Duration::from_secs(20), }); syncer } - async fn watcher_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - mut busy_rx: mpsc::UnboundedReceiver<bool>, - ) { - let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone(); - let mut nothing_to_do_since = Some(Instant::now()); - - while !*must_exit.borrow() { - select! { - _ = ring_recv.changed().fuse() => { - let new_ring = ring_recv.borrow(); - if !Arc::ptr_eq(&new_ring, &prev_ring) { - debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); - self.add_full_sync(); - prev_ring = new_ring.clone(); - } - } - busy_opt = busy_rx.recv().fuse() => { - if let Some(busy) = busy_opt { - if busy { - nothing_to_do_since = None; - } else if nothing_to_do_since.is_none() { - nothing_to_do_since = Some(Instant::now()); - } - } - } - _ = must_exit.changed().fuse() => {}, - _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { - if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { - nothing_to_do_since = None; - debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME); - self.add_full_sync(); - } - } - } - } - } - pub fn add_full_sync(&self) { - self.todo - .lock() - .unwrap() - .add_full_sync(&self.data, &self.system); - } - - async fn syncer_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - busy_tx: mpsc::UnboundedSender<bool>, - ) { - while !*must_exit.borrow() { - let task = self.todo.lock().unwrap().pop_task(); - if let Some(partition) = task { - busy_tx.send(true).unwrap(); - let res = self - .clone() - .sync_partition(&partition, &mut must_exit) - .await; - if let Err(e) = res { - warn!( - "({}) Error while syncing {:?}: {}", - F::TABLE_NAME, - partition, - e - ); - } - } else { - busy_tx.send(false).unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - } + if self.add_full_sync_tx.send(()).is_err() { + error!("({}) Could not add full sync", F::TABLE_NAME); } } + // ---- + async fn sync_partition( - self: Arc<Self>, + self: &Arc<Self>, partition: &TodoPartition, must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { @@ -577,12 +495,22 @@ where } } -impl SyncTodo { - fn add_full_sync<F: TableSchema, R: TableReplication>( - &mut self, - data: &TableData<F, R>, - system: &System, - ) { +// -------- Sync Worker --------- + +struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { + syncer: Arc<TableSyncer<F, R>>, + ring_recv: watch::Receiver<Arc<Ring>>, + ring: Arc<Ring>, + add_full_sync_rx: mpsc::UnboundedReceiver<()>, + todo: Vec<TodoPartition>, + next_full_sync: Instant, +} + +impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { + fn add_full_sync(&mut self) { + let system = &self.syncer.system; + let data = &self.syncer.data; + let my_id = system.id; self.todo.clear(); @@ -623,6 +551,8 @@ impl SyncTodo { retain, }); } + + self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; } fn pop_task(&mut self) -> Option<TodoPartition> { @@ -641,6 +571,51 @@ impl SyncTodo { } } +#[async_trait] +impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { + fn name(&self) -> String { + format!("Table sync worker for {}", F::TABLE_NAME) + } + + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> { + if let Some(partition) = self.pop_task() { + self.syncer.sync_partition(&partition, must_exit).await?; + Ok(WorkerStatus::Busy) + } else { + Ok(WorkerStatus::Idle) + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + select! { + s = self.add_full_sync_rx.recv() => match s { + Some(()) => { + self.add_full_sync(); + } + None => (), + }, + _ = self.ring_recv.changed() => { + let new_ring = self.ring_recv.borrow(); + if !Arc::ptr_eq(&new_ring, &self.ring) { + self.ring = new_ring.clone(); + drop(new_ring); + debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); + self.add_full_sync(); + } + }, + _ = tokio::time::sleep(self.next_full_sync - Instant::now()) => { + self.add_full_sync(); + } + } + match self.todo.is_empty() { + false => WorkerStatus::Busy, + true => WorkerStatus::Idle, + } + } +} + +// ---- UTIL ---- + fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> { Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) } diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs index 8cc660f8..fcdac582 100644 --- a/src/util/background/job_worker.rs +++ b/src/util/background/job_worker.rs @@ -34,16 +34,15 @@ impl Worker for JobWorker { } } - async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus { loop { match self.job_chan.lock().await.recv().await { Some((job, cancellable)) => { if cancellable && *must_exit.borrow() { - // skip job continue; } self.next_job = Some(job); - return WorkerStatus::Busy + return WorkerStatus::Busy; } None => return WorkerStatus::Done, } diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 97d25784..c06e2225 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -10,7 +10,8 @@ use std::sync::Arc; use tokio::sync::{mpsc, watch, Mutex}; use crate::error::Error; -use worker::{Worker, WorkerProcessor}; +use worker::WorkerProcessor; +pub use worker::{Worker, WorkerStatus}; pub(crate) type JobOutput = Result<(), Error>; pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; @@ -30,9 +31,7 @@ impl BackgroundRunner { let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>(); let await_all_done = - tokio::spawn( - async move { WorkerProcessor::new(worker_out, stop_signal).run().await }, - ); + tokio::spawn(async move { WorkerProcessor::new(worker_out, stop_signal).run().await }); let (send_job, queue_out) = mpsc::unbounded_channel(); let queue_out = Arc::new(Mutex::new(queue_out)); @@ -40,11 +39,14 @@ impl BackgroundRunner { for i in 0..n_runners { let queue_out = queue_out.clone(); - send_worker.send(Box::new(job_worker::JobWorker { - index: i, - job_chan: queue_out.clone(), - next_job: None, - })).ok().unwrap(); + send_worker + .send(Box::new(job_worker::JobWorker { + index: i, + job_chan: queue_out.clone(), + next_job: None, + })) + .ok() + .unwrap(); } let bgrunner = Arc::new(Self { diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index a173902c..92f7990c 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -1,16 +1,16 @@ use std::time::{Duration, Instant}; -use tracing::*; use async_trait::async_trait; use futures::future::*; -use tokio::select; use futures::stream::FuturesUnordered; use futures::StreamExt; +use tokio::select; use tokio::sync::{mpsc, watch}; +use tracing::*; use crate::error::Error; -#[derive(PartialEq, Copy, Clone)] +#[derive(PartialEq, Copy, Clone, Debug)] pub enum WorkerStatus { Busy, Idle, @@ -20,8 +20,20 @@ pub enum WorkerStatus { #[async_trait] pub trait Worker: Send { fn name(&self) -> String; + + /// Work: do a basic unit of work, if one is available (otherwise, should return + /// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the + /// middle of processing, it will only be interrupted at the last minute when Garage is trying + /// to exit and this hasn't returned yet. This function may return an error to indicate that + /// its unit of work could not be processed due to an error: the error will be logged and + /// .work() will be called again immediately. async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>; - async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus; + + /// Wait for work: await for some task to become available. This future can be interrupted in + /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we + /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows + /// it to check if we are exiting. + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus; } pub(crate) struct WorkerProcessor { @@ -58,10 +70,12 @@ impl WorkerProcessor { let task_id = next_task_id; next_task_id += 1; let stop_signal = self.stop_signal.clone(); + let stop_signal_worker = self.stop_signal.clone(); workers.push(async move { let mut worker = WorkerHandler { task_id, stop_signal, + stop_signal_worker, worker: new_worker, status: WorkerStatus::Busy, }; @@ -91,15 +105,22 @@ impl WorkerProcessor { let drain_half_time = Instant::now() + Duration::from_secs(5); let drain_everything = async move { while let Some(mut worker) = workers.next().await { - if worker.status == WorkerStatus::Busy - || (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time) - { - workers.push(async move { - worker.step().await; - worker - }.boxed()); + if worker.status == WorkerStatus::Done { + info!( + "Worker {} (TID {}) exited", + worker.worker.name(), + worker.task_id + ); + } else if Instant::now() > drain_half_time { + warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.status); } else { - info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); + workers.push( + async move { + worker.step().await; + worker + } + .boxed(), + ); } } }; @@ -109,7 +130,7 @@ impl WorkerProcessor { info!("All workers exited in time \\o/"); } _ = tokio::time::sleep(Duration::from_secs(9)) => { - warn!("Some workers could not exit in time, we are cancelling some things in the middle."); + error!("Some workers could not exit in time, we are cancelling some things in the middle"); } } } @@ -119,27 +140,49 @@ impl WorkerProcessor { struct WorkerHandler { task_id: usize, stop_signal: watch::Receiver<bool>, + stop_signal_worker: watch::Receiver<bool>, worker: Box<dyn Worker>, status: WorkerStatus, } impl WorkerHandler { - async fn step(&mut self) { + async fn step(&mut self) { match self.status { - WorkerStatus::Busy => { - match self.worker.work(&mut self.stop_signal).await { - Ok(s) => { - self.status = s; + WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await { + Ok(s) => { + self.status = s; + } + Err(e) => { + error!( + "Error in worker {} (TID {}): {}", + self.worker.name(), + self.task_id, + e + ); + } + }, + WorkerStatus::Idle => { + if *self.stop_signal.borrow() { + select! { + new_st = self.worker.wait_for_work(&mut self.stop_signal_worker) => { + self.status = new_st; + } + _ = tokio::time::sleep(Duration::from_secs(1)) => { + // stay in Idle state + } } - Err(e) => { - error!("Error in worker {}: {}", self.worker.name(), e); + } else { + select! { + new_st = self.worker.wait_for_work(&mut self.stop_signal_worker) => { + self.status = new_st; + } + _ = self.stop_signal.changed() => { + // stay in Idle state + } } } } - WorkerStatus::Idle => { - self.status = self.worker.wait_for_work(&mut self.stop_signal).await; - } - WorkerStatus::Done => unreachable!() + WorkerStatus::Done => unreachable!(), } } } |