aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/table/gc.rs80
-rw-r--r--src/table/merkle.rs67
-rw-r--r--src/table/sync.rs187
-rw-r--r--src/util/background/job_worker.rs5
-rw-r--r--src/util/background/mod.rs20
-rw-r--r--src/util/background/worker.rs91
6 files changed, 248 insertions, 202 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs
index e7fbbcb0..36124c2f 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use futures::future::join_all;
-use futures::select;
-use futures_util::future::*;
use tokio::sync::watch;
use garage_db::counted_tree_hack::CountedTree;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@@ -69,35 +68,11 @@ where
gc.endpoint.set_handler(gc.clone());
- let gc1 = gc.clone();
- system.background.spawn_worker(
- format!("GC loop for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
- );
+ system.background.spawn_worker(GcWorker::new(gc.clone()));
gc
}
- async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- match self.gc_loop_iter().await {
- Ok(None) => {
- // Stuff was done, loop immediately
- }
- Ok(Some(wait_delay)) => {
- // Nothing was done, wait specified delay.
- select! {
- _ = tokio::time::sleep(wait_delay).fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
- Err(e) => {
- warn!("({}) Error doing GC: {}", F::TABLE_NAME, e);
- }
- }
- }
- }
-
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
@@ -328,6 +303,57 @@ where
}
}
+struct GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ gc: Arc<TableGc<F, R>>,
+ wait_delay: Duration,
+}
+
+impl<F, R> GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn new(gc: Arc<TableGc<F, R>>) -> Self {
+ Self {
+ gc,
+ wait_delay: Duration::from_secs(0),
+ }
+ }
+}
+
+#[async_trait]
+impl<F, R> Worker for GcWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("Table GC: {}", F::TABLE_NAME)
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ match self.gc.gc_loop_iter().await? {
+ None => Ok(WorkerStatus::Busy),
+ Some(delay) => {
+ self.wait_delay = delay;
+ Ok(WorkerStatus::Idle)
+ }
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ tokio::time::sleep(self.wait_delay).await;
+ WorkerStatus::Busy
+ }
+}
+
/// An entry stored in the gc_todo Sled tree associated with the table
/// Contains helper function for parsing, saving, and removing
/// such entry in Sled
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 7685b193..d4d2717f 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -1,14 +1,13 @@
use std::sync::Arc;
use std::time::Duration;
-use futures::select;
-use futures_util::future::*;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use garage_db as db;
-use garage_util::background::BackgroundRunner;
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@@ -78,43 +77,17 @@ where
empty_node_hash,
});
- let ret2 = ret.clone();
- background.spawn_worker(
- format!("Merkle tree updater for {}", F::TABLE_NAME),
- |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
- );
+ background.spawn_worker(MerkleWorker(ret.clone()));
ret
}
- async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
- while !*must_exit.borrow() {
- match self.updater_loop_iter() {
- Ok(true) => (),
- Ok(false) => {
- select! {
- _ = self.data.merkle_todo_notify.notified().fuse() => {},
- _ = must_exit.changed().fuse() => {},
- }
- }
- Err(e) => {
- warn!(
- "({}) Error while updating Merkle tree item: {}",
- F::TABLE_NAME,
- e
- );
- tokio::time::sleep(Duration::from_secs(10)).await;
- }
- }
- }
- }
-
- fn updater_loop_iter(&self) -> Result<bool, Error> {
+ fn updater_loop_iter(&self) -> Result<WorkerStatus, Error> {
if let Some((key, valhash)) = self.data.merkle_todo.first()? {
self.update_item(&key, &valhash)?;
- Ok(true)
+ Ok(WorkerStatus::Busy)
} else {
- Ok(false)
+ Ok(WorkerStatus::Idle)
}
}
@@ -325,6 +298,34 @@ where
}
}
+struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static;
+
+#[async_trait]
+impl<F, R> Worker for MerkleWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("Merkle tree updater: {}", F::TABLE_NAME)
+ }
+
+ async fn work(
+ &mut self,
+ _must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<WorkerStatus, Error> {
+ self.0.updater_loop_iter()
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ WorkerStatus::Busy
+ }
+}
+
impl MerkleNodeKey {
fn encode(&self) -> Vec<u8> {
let mut ret = Vec::with_capacity(2 + self.prefix.len());
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 4c83e991..be081d96 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -1,17 +1,17 @@
use std::collections::VecDeque;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
-use futures::select;
-use futures_util::future::*;
use futures_util::stream::*;
use opentelemetry::KeyValue;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
+use tokio::select;
use tokio::sync::{mpsc, watch};
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@@ -34,7 +34,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- todo: Mutex<SyncTodo>,
+ add_full_sync_tx: mpsc::UnboundedSender<()>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@@ -52,10 +52,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
-struct SyncTodo {
- todo: Vec<TodoPartition>,
-}
-
#[derive(Debug, Clone)]
struct TodoPartition {
partition: Partition,
@@ -80,118 +76,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
- let todo = SyncTodo { todo: vec![] };
+ let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
let syncer = Arc::new(Self {
system: system.clone(),
data,
merkle,
- todo: Mutex::new(todo),
+ add_full_sync_tx,
endpoint,
});
syncer.endpoint.set_handler(syncer.clone());
- let (busy_tx, busy_rx) = mpsc::unbounded_channel();
-
- let s1 = syncer.clone();
- system.background.spawn_worker(
- format!("table sync watcher for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
- );
-
- let s2 = syncer.clone();
- system.background.spawn_worker(
- format!("table syncer for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
- );
-
- let s3 = syncer.clone();
- tokio::spawn(async move {
- tokio::time::sleep(Duration::from_secs(20)).await;
- s3.add_full_sync();
+ system.background.spawn_worker(SyncWorker {
+ syncer: syncer.clone(),
+ ring_recv: system.ring.clone(),
+ ring: system.ring.borrow().clone(),
+ add_full_sync_rx,
+ todo: vec![],
+ next_full_sync: Instant::now() + Duration::from_secs(20),
});
syncer
}
- async fn watcher_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- mut busy_rx: mpsc::UnboundedReceiver<bool>,
- ) {
- let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
- let mut nothing_to_do_since = Some(Instant::now());
-
- while !*must_exit.borrow() {
- select! {
- _ = ring_recv.changed().fuse() => {
- let new_ring = ring_recv.borrow();
- if !Arc::ptr_eq(&new_ring, &prev_ring) {
- debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- prev_ring = new_ring.clone();
- }
- }
- busy_opt = busy_rx.recv().fuse() => {
- if let Some(busy) = busy_opt {
- if busy {
- nothing_to_do_since = None;
- } else if nothing_to_do_since.is_none() {
- nothing_to_do_since = Some(Instant::now());
- }
- }
- }
- _ = must_exit.changed().fuse() => {},
- _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
- if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
- nothing_to_do_since = None;
- debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- }
- }
- }
- }
- }
-
pub fn add_full_sync(&self) {
- self.todo
- .lock()
- .unwrap()
- .add_full_sync(&self.data, &self.system);
- }
-
- async fn syncer_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- busy_tx: mpsc::UnboundedSender<bool>,
- ) {
- while !*must_exit.borrow() {
- let task = self.todo.lock().unwrap().pop_task();
- if let Some(partition) = task {
- busy_tx.send(true).unwrap();
- let res = self
- .clone()
- .sync_partition(&partition, &mut must_exit)
- .await;
- if let Err(e) = res {
- warn!(
- "({}) Error while syncing {:?}: {}",
- F::TABLE_NAME,
- partition,
- e
- );
- }
- } else {
- busy_tx.send(false).unwrap();
- tokio::time::sleep(Duration::from_secs(1)).await;
- }
+ if self.add_full_sync_tx.send(()).is_err() {
+ error!("({}) Could not add full sync", F::TABLE_NAME);
}
}
+ // ----
+
async fn sync_partition(
- self: Arc<Self>,
+ self: &Arc<Self>,
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
@@ -577,12 +495,22 @@ where
}
}
-impl SyncTodo {
- fn add_full_sync<F: TableSchema, R: TableReplication>(
- &mut self,
- data: &TableData<F, R>,
- system: &System,
- ) {
+// -------- Sync Worker ---------
+
+struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
+ syncer: Arc<TableSyncer<F, R>>,
+ ring_recv: watch::Receiver<Arc<Ring>>,
+ ring: Arc<Ring>,
+ add_full_sync_rx: mpsc::UnboundedReceiver<()>,
+ todo: Vec<TodoPartition>,
+ next_full_sync: Instant,
+}
+
+impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
+ fn add_full_sync(&mut self) {
+ let system = &self.syncer.system;
+ let data = &self.syncer.data;
+
let my_id = system.id;
self.todo.clear();
@@ -623,6 +551,8 @@ impl SyncTodo {
retain,
});
}
+
+ self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
fn pop_task(&mut self) -> Option<TodoPartition> {
@@ -641,6 +571,51 @@ impl SyncTodo {
}
}
+#[async_trait]
+impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
+ fn name(&self) -> String {
+ format!("Table sync worker for {}", F::TABLE_NAME)
+ }
+
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> {
+ if let Some(partition) = self.pop_task() {
+ self.syncer.sync_partition(&partition, must_exit).await?;
+ Ok(WorkerStatus::Busy)
+ } else {
+ Ok(WorkerStatus::Idle)
+ }
+ }
+
+ async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ select! {
+ s = self.add_full_sync_rx.recv() => match s {
+ Some(()) => {
+ self.add_full_sync();
+ }
+ None => (),
+ },
+ _ = self.ring_recv.changed() => {
+ let new_ring = self.ring_recv.borrow();
+ if !Arc::ptr_eq(&new_ring, &self.ring) {
+ self.ring = new_ring.clone();
+ drop(new_ring);
+ debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
+ self.add_full_sync();
+ }
+ },
+ _ = tokio::time::sleep(self.next_full_sync - Instant::now()) => {
+ self.add_full_sync();
+ }
+ }
+ match self.todo.is_empty() {
+ false => WorkerStatus::Busy,
+ true => WorkerStatus::Idle,
+ }
+ }
+}
+
+// ---- UTIL ----
+
fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
}
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
index 8cc660f8..fcdac582 100644
--- a/src/util/background/job_worker.rs
+++ b/src/util/background/job_worker.rs
@@ -34,16 +34,15 @@ impl Worker for JobWorker {
}
}
- async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus {
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
loop {
match self.job_chan.lock().await.recv().await {
Some((job, cancellable)) => {
if cancellable && *must_exit.borrow() {
- // skip job
continue;
}
self.next_job = Some(job);
- return WorkerStatus::Busy
+ return WorkerStatus::Busy;
}
None => return WorkerStatus::Done,
}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 97d25784..c06e2225 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -10,7 +10,8 @@ use std::sync::Arc;
use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error;
-use worker::{Worker, WorkerProcessor};
+use worker::WorkerProcessor;
+pub use worker::{Worker, WorkerStatus};
pub(crate) type JobOutput = Result<(), Error>;
pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
@@ -30,9 +31,7 @@ impl BackgroundRunner {
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
let await_all_done =
- tokio::spawn(
- async move { WorkerProcessor::new(worker_out, stop_signal).run().await },
- );
+ tokio::spawn(async move { WorkerProcessor::new(worker_out, stop_signal).run().await });
let (send_job, queue_out) = mpsc::unbounded_channel();
let queue_out = Arc::new(Mutex::new(queue_out));
@@ -40,11 +39,14 @@ impl BackgroundRunner {
for i in 0..n_runners {
let queue_out = queue_out.clone();
- send_worker.send(Box::new(job_worker::JobWorker {
- index: i,
- job_chan: queue_out.clone(),
- next_job: None,
- })).ok().unwrap();
+ send_worker
+ .send(Box::new(job_worker::JobWorker {
+ index: i,
+ job_chan: queue_out.clone(),
+ next_job: None,
+ }))
+ .ok()
+ .unwrap();
}
let bgrunner = Arc::new(Self {
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index a173902c..92f7990c 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -1,16 +1,16 @@
use std::time::{Duration, Instant};
-use tracing::*;
use async_trait::async_trait;
use futures::future::*;
-use tokio::select;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
+use tokio::select;
use tokio::sync::{mpsc, watch};
+use tracing::*;
use crate::error::Error;
-#[derive(PartialEq, Copy, Clone)]
+#[derive(PartialEq, Copy, Clone, Debug)]
pub enum WorkerStatus {
Busy,
Idle,
@@ -20,8 +20,20 @@ pub enum WorkerStatus {
#[async_trait]
pub trait Worker: Send {
fn name(&self) -> String;
+
+ /// Work: do a basic unit of work, if one is available (otherwise, should return
+ /// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the
+ /// middle of processing, it will only be interrupted at the last minute when Garage is trying
+ /// to exit and this hasn't returned yet. This function may return an error to indicate that
+ /// its unit of work could not be processed due to an error: the error will be logged and
+ /// .work() will be called again immediately.
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>;
- async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus;
+
+ /// Wait for work: await for some task to become available. This future can be interrupted in
+ /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
+ /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
+ /// it to check if we are exiting.
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus;
}
pub(crate) struct WorkerProcessor {
@@ -58,10 +70,12 @@ impl WorkerProcessor {
let task_id = next_task_id;
next_task_id += 1;
let stop_signal = self.stop_signal.clone();
+ let stop_signal_worker = self.stop_signal.clone();
workers.push(async move {
let mut worker = WorkerHandler {
task_id,
stop_signal,
+ stop_signal_worker,
worker: new_worker,
status: WorkerStatus::Busy,
};
@@ -91,15 +105,22 @@ impl WorkerProcessor {
let drain_half_time = Instant::now() + Duration::from_secs(5);
let drain_everything = async move {
while let Some(mut worker) = workers.next().await {
- if worker.status == WorkerStatus::Busy
- || (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time)
- {
- workers.push(async move {
- worker.step().await;
- worker
- }.boxed());
+ if worker.status == WorkerStatus::Done {
+ info!(
+ "Worker {} (TID {}) exited",
+ worker.worker.name(),
+ worker.task_id
+ );
+ } else if Instant::now() > drain_half_time {
+ warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.status);
} else {
- info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
+ workers.push(
+ async move {
+ worker.step().await;
+ worker
+ }
+ .boxed(),
+ );
}
}
};
@@ -109,7 +130,7 @@ impl WorkerProcessor {
info!("All workers exited in time \\o/");
}
_ = tokio::time::sleep(Duration::from_secs(9)) => {
- warn!("Some workers could not exit in time, we are cancelling some things in the middle.");
+ error!("Some workers could not exit in time, we are cancelling some things in the middle");
}
}
}
@@ -119,27 +140,49 @@ impl WorkerProcessor {
struct WorkerHandler {
task_id: usize,
stop_signal: watch::Receiver<bool>,
+ stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>,
status: WorkerStatus,
}
impl WorkerHandler {
- async fn step(&mut self) {
+ async fn step(&mut self) {
match self.status {
- WorkerStatus::Busy => {
- match self.worker.work(&mut self.stop_signal).await {
- Ok(s) => {
- self.status = s;
+ WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await {
+ Ok(s) => {
+ self.status = s;
+ }
+ Err(e) => {
+ error!(
+ "Error in worker {} (TID {}): {}",
+ self.worker.name(),
+ self.task_id,
+ e
+ );
+ }
+ },
+ WorkerStatus::Idle => {
+ if *self.stop_signal.borrow() {
+ select! {
+ new_st = self.worker.wait_for_work(&mut self.stop_signal_worker) => {
+ self.status = new_st;
+ }
+ _ = tokio::time::sleep(Duration::from_secs(1)) => {
+ // stay in Idle state
+ }
}
- Err(e) => {
- error!("Error in worker {}: {}", self.worker.name(), e);
+ } else {
+ select! {
+ new_st = self.worker.wait_for_work(&mut self.stop_signal_worker) => {
+ self.status = new_st;
+ }
+ _ = self.stop_signal.changed() => {
+ // stay in Idle state
+ }
}
}
}
- WorkerStatus::Idle => {
- self.status = self.worker.wait_for_work(&mut self.stop_signal).await;
- }
- WorkerStatus::Done => unreachable!()
+ WorkerStatus::Done => unreachable!(),
}
}
}