From 83c8467e23c1f531ae233766d5dc7244afe57f08 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 11:58:06 +0100 Subject: Proper queueing for delayed inserts, now backed to disk --- src/table/queue.rs | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 src/table/queue.rs (limited to 'src/table/queue.rs') diff --git a/src/table/queue.rs b/src/table/queue.rs new file mode 100644 index 00000000..3671ea7d --- /dev/null +++ b/src/table/queue.rs @@ -0,0 +1,84 @@ +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::select; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::error::Error; + +use crate::replication::*; +use crate::schema::*; +use crate::table::*; + +const BATCH_SIZE: usize = 100; + +pub(crate) struct InsertQueueWorker(pub(crate) Arc>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; + +#[async_trait] +impl Worker for InsertQueueWorker +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} queue", F::TABLE_NAME) + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64), + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + let mut kv_pairs = vec![]; + let mut values = vec![]; + + for entry_kv in self.0.data.insert_queue.iter()? { + let (k, v) = entry_kv?; + + values.push(self.0.data.decode_entry(&v)?); + kv_pairs.push((k, v)); + + if kv_pairs.len() > BATCH_SIZE { + break; + } + } + + if kv_pairs.is_empty() { + return Ok(WorkerState::Idle); + } + + self.0.insert_many(values).await?; + + self.0.data.insert_queue.db().transaction(|mut tx| { + for (k, v) in kv_pairs.iter() { + if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? { + if &v2 == v { + tx.remove(&self.0.data.insert_queue, k)?; + } + } + } + Ok(()) + })?; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + select! { + _ = tokio::time::sleep(Duration::from_secs(600)) => (), + _ = self.0.data.insert_queue_notify.notified() => (), + } + WorkerState::Busy + } +} -- cgit v1.2.3 From dfc131850a09e7ceacfa98315adbef156e07e9ca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 15:25:29 +0100 Subject: Simplified and more aggressive worker exit logic --- src/table/queue.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'src/table/queue.rs') diff --git a/src/table/queue.rs b/src/table/queue.rs index 3671ea7d..860f20d3 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -71,10 +71,7 @@ where Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { _ = tokio::time::sleep(Duration::from_secs(600)) => (), _ = self.0.data.insert_queue_notify.notified() => (), -- cgit v1.2.3 From 426d8784dac0e39879af52d980887d3692fc907c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 3 Jan 2023 15:08:37 +0100 Subject: cleanup --- src/table/queue.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'src/table/queue.rs') diff --git a/src/table/queue.rs b/src/table/queue.rs index 860f20d3..0857209b 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -16,15 +16,11 @@ const BATCH_SIZE: usize = 100; pub(crate) struct InsertQueueWorker(pub(crate) Arc>) where - F: TableSchema + 'static, - R: TableReplication + 'static; + F: TableSchema, + R: TableReplication; #[async_trait] -impl Worker for InsertQueueWorker -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl Worker for InsertQueueWorker { fn name(&self) -> String { format!("{} queue", F::TABLE_NAME) } -- cgit v1.2.3