diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/gc.rs | 80 | ||||
-rw-r--r-- | src/table/merkle.rs | 67 | ||||
-rw-r--r-- | src/table/sync.rs | 187 |
3 files changed, 168 insertions, 166 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs index e7fbbcb0..36124c2f 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use futures::future::join_all; -use futures::select; -use futures_util::future::*; use tokio::sync::watch; use garage_db::counted_tree_hack::CountedTree; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; @@ -69,35 +68,11 @@ where gc.endpoint.set_handler(gc.clone()); - let gc1 = gc.clone(); - system.background.spawn_worker( - format!("GC loop for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit), - ); + system.background.spawn_worker(GcWorker::new(gc.clone())); gc } - async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { - while !*must_exit.borrow() { - match self.gc_loop_iter().await { - Ok(None) => { - // Stuff was done, loop immediately - } - Ok(Some(wait_delay)) => { - // Nothing was done, wait specified delay. - select! { - _ = tokio::time::sleep(wait_delay).fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!("({}) Error doing GC: {}", F::TABLE_NAME, e); - } - } - } - } - async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { let now = now_msec(); @@ -328,6 +303,57 @@ where } } +struct GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + gc: Arc<TableGc<F, R>>, + wait_delay: Duration, +} + +impl<F, R> GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn new(gc: Arc<TableGc<F, R>>) -> Self { + Self { + gc, + wait_delay: Duration::from_secs(0), + } + } +} + +#[async_trait] +impl<F, R> Worker for GcWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("Table GC: {}", F::TABLE_NAME) + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + match self.gc.gc_loop_iter().await? { + None => Ok(WorkerStatus::Busy), + Some(delay) => { + self.wait_delay = delay; + Ok(WorkerStatus::Idle) + } + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + tokio::time::sleep(self.wait_delay).await; + WorkerStatus::Busy + } +} + /// An entry stored in the gc_todo Sled tree associated with the table /// Contains helper function for parsing, saving, and removing /// such entry in Sled diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 7685b193..d4d2717f 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -1,14 +1,13 @@ use std::sync::Arc; use std::time::Duration; -use futures::select; -use futures_util::future::*; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; use tokio::sync::watch; use garage_db as db; -use garage_util::background::BackgroundRunner; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -78,43 +77,17 @@ where empty_node_hash, }); - let ret2 = ret.clone(); - background.spawn_worker( - format!("Merkle tree updater for {}", F::TABLE_NAME), - |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit), - ); + background.spawn_worker(MerkleWorker(ret.clone())); ret } - async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { - while !*must_exit.borrow() { - match self.updater_loop_iter() { - Ok(true) => (), - Ok(false) => { - select! { - _ = self.data.merkle_todo_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } - Err(e) => { - warn!( - "({}) Error while updating Merkle tree item: {}", - F::TABLE_NAME, - e - ); - tokio::time::sleep(Duration::from_secs(10)).await; - } - } - } - } - - fn updater_loop_iter(&self) -> Result<bool, Error> { + fn updater_loop_iter(&self) -> Result<WorkerStatus, Error> { if let Some((key, valhash)) = self.data.merkle_todo.first()? { self.update_item(&key, &valhash)?; - Ok(true) + Ok(WorkerStatus::Busy) } else { - Ok(false) + Ok(WorkerStatus::Idle) } } @@ -325,6 +298,34 @@ where } } +struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; + +#[async_trait] +impl<F, R> Worker for MerkleWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("Merkle tree updater: {}", F::TABLE_NAME) + } + + async fn work( + &mut self, + _must_exit: &mut watch::Receiver<bool>, + ) -> Result<WorkerStatus, Error> { + self.0.updater_loop_iter() + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + tokio::time::sleep(Duration::from_secs(10)).await; + WorkerStatus::Busy + } +} + impl MerkleNodeKey { fn encode(&self) -> Vec<u8> { let mut ret = Vec::with_capacity(2 + self.prefix.len()); diff --git a/src/table/sync.rs b/src/table/sync.rs index 4c83e991..be081d96 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -1,17 +1,17 @@ use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; -use futures::select; -use futures_util::future::*; use futures_util::stream::*; use opentelemetry::KeyValue; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; +use tokio::select; use tokio::sync::{mpsc, watch}; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -34,7 +34,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, - todo: Mutex<SyncTodo>, + add_full_sync_tx: mpsc::UnboundedSender<()>, endpoint: Arc<Endpoint<SyncRpc, Self>>, } @@ -52,10 +52,6 @@ impl Rpc for SyncRpc { type Response = Result<SyncRpc, Error>; } -struct SyncTodo { - todo: Vec<TodoPartition>, -} - #[derive(Debug, Clone)] struct TodoPartition { partition: Partition, @@ -80,118 +76,40 @@ where .netapp .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); - let todo = SyncTodo { todo: vec![] }; + let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); let syncer = Arc::new(Self { system: system.clone(), data, merkle, - todo: Mutex::new(todo), + add_full_sync_tx, endpoint, }); syncer.endpoint.set_handler(syncer.clone()); - let (busy_tx, busy_rx) = mpsc::unbounded_channel(); - - let s1 = syncer.clone(); - system.background.spawn_worker( - format!("table sync watcher for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), - ); - - let s2 = syncer.clone(); - system.background.spawn_worker( - format!("table syncer for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), - ); - - let s3 = syncer.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(20)).await; - s3.add_full_sync(); + system.background.spawn_worker(SyncWorker { + syncer: syncer.clone(), + ring_recv: system.ring.clone(), + ring: system.ring.borrow().clone(), + add_full_sync_rx, + todo: vec![], + next_full_sync: Instant::now() + Duration::from_secs(20), }); syncer } - async fn watcher_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - mut busy_rx: mpsc::UnboundedReceiver<bool>, - ) { - let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone(); - let mut nothing_to_do_since = Some(Instant::now()); - - while !*must_exit.borrow() { - select! { - _ = ring_recv.changed().fuse() => { - let new_ring = ring_recv.borrow(); - if !Arc::ptr_eq(&new_ring, &prev_ring) { - debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); - self.add_full_sync(); - prev_ring = new_ring.clone(); - } - } - busy_opt = busy_rx.recv().fuse() => { - if let Some(busy) = busy_opt { - if busy { - nothing_to_do_since = None; - } else if nothing_to_do_since.is_none() { - nothing_to_do_since = Some(Instant::now()); - } - } - } - _ = must_exit.changed().fuse() => {}, - _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { - if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { - nothing_to_do_since = None; - debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME); - self.add_full_sync(); - } - } - } - } - } - pub fn add_full_sync(&self) { - self.todo - .lock() - .unwrap() - .add_full_sync(&self.data, &self.system); - } - - async fn syncer_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - busy_tx: mpsc::UnboundedSender<bool>, - ) { - while !*must_exit.borrow() { - let task = self.todo.lock().unwrap().pop_task(); - if let Some(partition) = task { - busy_tx.send(true).unwrap(); - let res = self - .clone() - .sync_partition(&partition, &mut must_exit) - .await; - if let Err(e) = res { - warn!( - "({}) Error while syncing {:?}: {}", - F::TABLE_NAME, - partition, - e - ); - } - } else { - busy_tx.send(false).unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - } + if self.add_full_sync_tx.send(()).is_err() { + error!("({}) Could not add full sync", F::TABLE_NAME); } } + // ---- + async fn sync_partition( - self: Arc<Self>, + self: &Arc<Self>, partition: &TodoPartition, must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { @@ -577,12 +495,22 @@ where } } -impl SyncTodo { - fn add_full_sync<F: TableSchema, R: TableReplication>( - &mut self, - data: &TableData<F, R>, - system: &System, - ) { +// -------- Sync Worker --------- + +struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { + syncer: Arc<TableSyncer<F, R>>, + ring_recv: watch::Receiver<Arc<Ring>>, + ring: Arc<Ring>, + add_full_sync_rx: mpsc::UnboundedReceiver<()>, + todo: Vec<TodoPartition>, + next_full_sync: Instant, +} + +impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { + fn add_full_sync(&mut self) { + let system = &self.syncer.system; + let data = &self.syncer.data; + let my_id = system.id; self.todo.clear(); @@ -623,6 +551,8 @@ impl SyncTodo { retain, }); } + + self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; } fn pop_task(&mut self) -> Option<TodoPartition> { @@ -641,6 +571,51 @@ impl SyncTodo { } } +#[async_trait] +impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { + fn name(&self) -> String { + format!("Table sync worker for {}", F::TABLE_NAME) + } + + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> { + if let Some(partition) = self.pop_task() { + self.syncer.sync_partition(&partition, must_exit).await?; + Ok(WorkerStatus::Busy) + } else { + Ok(WorkerStatus::Idle) + } + } + + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + select! { + s = self.add_full_sync_rx.recv() => match s { + Some(()) => { + self.add_full_sync(); + } + None => (), + }, + _ = self.ring_recv.changed() => { + let new_ring = self.ring_recv.borrow(); + if !Arc::ptr_eq(&new_ring, &self.ring) { + self.ring = new_ring.clone(); + drop(new_ring); + debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); + self.add_full_sync(); + } + }, + _ = tokio::time::sleep(self.next_full_sync - Instant::now()) => { + self.add_full_sync(); + } + } + match self.todo.is_empty() { + false => WorkerStatus::Busy, + true => WorkerStatus::Idle, + } + } +} + +// ---- UTIL ---- + fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> { Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) } |