aboutsummaryrefslogtreecommitdiff
path: root/src/table/queue.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/queue.rs')
-rw-r--r--src/table/queue.rs84
1 files changed, 84 insertions, 0 deletions
diff --git a/src/table/queue.rs b/src/table/queue.rs
new file mode 100644
index 00000000..3671ea7d
--- /dev/null
+++ b/src/table/queue.rs
@@ -0,0 +1,84 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use tokio::select;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::error::Error;
+
+use crate::replication::*;
+use crate::schema::*;
+use crate::table::*;
+
+const BATCH_SIZE: usize = 100;
+
+pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>)
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static;
+
+#[async_trait]
+impl<F, R> Worker for InsertQueueWorker<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ fn name(&self) -> String {
+ format!("{} queue", F::TABLE_NAME)
+ }
+
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64),
+ ..Default::default()
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ let mut kv_pairs = vec![];
+ let mut values = vec![];
+
+ for entry_kv in self.0.data.insert_queue.iter()? {
+ let (k, v) = entry_kv?;
+
+ values.push(self.0.data.decode_entry(&v)?);
+ kv_pairs.push((k, v));
+
+ if kv_pairs.len() > BATCH_SIZE {
+ break;
+ }
+ }
+
+ if kv_pairs.is_empty() {
+ return Ok(WorkerState::Idle);
+ }
+
+ self.0.insert_many(values).await?;
+
+ self.0.data.insert_queue.db().transaction(|mut tx| {
+ for (k, v) in kv_pairs.iter() {
+ if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
+ if &v2 == v {
+ tx.remove(&self.0.data.insert_queue, k)?;
+ }
+ }
+ }
+ Ok(())
+ })?;
+
+ Ok(WorkerState::Busy)
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ if *must_exit.borrow() {
+ return WorkerState::Done;
+ }
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(600)) => (),
+ _ = self.0.data.insert_queue_notify.notified() => (),
+ }
+ WorkerState::Busy
+ }
+}