diff options
author | Alex Auvolat <alex@adnab.me> | 2023-01-04 11:34:43 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-01-04 11:34:43 +0100 |
commit | 570e5e5bbb7a3eac41350db9433e28ed289b97f4 (patch) | |
tree | a7fc299ba180098be5a3bef28a39256870ce697b /src/table/queue.rs | |
parent | 6e44369cbc810b8912ca0f7f5fd293e87f10c851 (diff) | |
parent | 4eb8ca3a528dae2848141f5cc3eb607eb7d40114 (diff) | |
download | garage-570e5e5bbb7a3eac41350db9433e28ed289b97f4.tar.gz garage-570e5e5bbb7a3eac41350db9433e28ed289b97f4.zip |
Merge branch 'main' into next
Diffstat (limited to 'src/table/queue.rs')
-rw-r--r-- | src/table/queue.rs | 77 |
1 files changed, 77 insertions, 0 deletions
diff --git a/src/table/queue.rs b/src/table/queue.rs new file mode 100644 index 00000000..0857209b --- /dev/null +++ b/src/table/queue.rs @@ -0,0 +1,77 @@ +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use tokio::select; +use tokio::sync::watch; + +use garage_util::background::*; +use garage_util::error::Error; + +use crate::replication::*; +use crate::schema::*; +use crate::table::*; + +const BATCH_SIZE: usize = 100; + +pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>) +where + F: TableSchema, + R: TableReplication; + +#[async_trait] +impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> { + fn name(&self) -> String { + format!("{} queue", F::TABLE_NAME) + } + + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64), + ..Default::default() + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + let mut kv_pairs = vec![]; + let mut values = vec![]; + + for entry_kv in self.0.data.insert_queue.iter()? { + let (k, v) = entry_kv?; + + values.push(self.0.data.decode_entry(&v)?); + kv_pairs.push((k, v)); + + if kv_pairs.len() > BATCH_SIZE { + break; + } + } + + if kv_pairs.is_empty() { + return Ok(WorkerState::Idle); + } + + self.0.insert_many(values).await?; + + self.0.data.insert_queue.db().transaction(|mut tx| { + for (k, v) in kv_pairs.iter() { + if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? { + if &v2 == v { + tx.remove(&self.0.data.insert_queue, k)?; + } + } + } + Ok(()) + })?; + + Ok(WorkerState::Busy) + } + + async fn wait_for_work(&mut self) -> WorkerState { + select! { + _ = tokio::time::sleep(Duration::from_secs(600)) => (), + _ = self.0.data.insert_queue_notify.notified() => (), + } + WorkerState::Busy + } +} |