aboutsummaryrefslogtreecommitdiff
path: root/src/table_sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r--src/table_sync.rs51
1 files changed, 36 insertions, 15 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 0f3e90d2..6f45969b 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -9,8 +9,8 @@ use futures_util::future::*;
use futures_util::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
-use tokio::sync::watch;
use tokio::sync::Mutex;
+use tokio::sync::{mpsc, watch};
use crate::data::*;
use crate::error::Error;
@@ -18,9 +18,8 @@ use crate::membership::Ring;
use crate::table::*;
const MAX_DEPTH: usize = 16;
-const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
+const SCAN_INTERVAL: Duration = Duration::from_secs(60);
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
-
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10);
pub struct TableSyncer<F: TableSchema, R: TableReplication> {
@@ -91,18 +90,24 @@ where
.collect::<Vec<_>>(),
});
+ let (busy_tx, busy_rx) = mpsc::unbounded_channel();
+
let s1 = syncer.clone();
table
.system
.background
- .spawn_worker(move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit))
+ .spawn_worker(move |must_exit: watch::Receiver<bool>| {
+ s1.watcher_task(must_exit, busy_rx)
+ })
.await;
let s2 = syncer.clone();
table
.system
.background
- .spawn_worker(move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit))
+ .spawn_worker(move |must_exit: watch::Receiver<bool>| {
+ s2.syncer_task(must_exit, busy_tx)
+ })
.await;
syncer
@@ -111,25 +116,20 @@ where
async fn watcher_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
+ mut busy_rx: mpsc::UnboundedReceiver<bool>,
) -> Result<(), Error> {
- tokio::time::delay_for(Duration::from_secs(10)).await;
-
- self.todo.lock().await.add_full_scan(&self.table);
- let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
+ let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
let s_ring_recv = ring_recv.recv().fuse();
+ let s_busy = busy_rx.recv().fuse();
let s_must_exit = must_exit.recv().fuse();
- pin_mut!(s_ring_recv, s_must_exit);
+ let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
+ pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
select! {
- _ = next_full_scan => {
- next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
- eprintln!("({}) Adding full scan to syncer todo list", self.table.name);
- self.todo.lock().await.add_full_scan(&self.table);
- }
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
eprintln!("({}) Adding ring difference to syncer todo list", self.table.name);
@@ -137,11 +137,29 @@ where
prev_ring = new_ring;
}
}
+ busy_opt = s_busy => {
+ if let Some(busy) = busy_opt {
+ if busy {
+ nothing_to_do_since = None;
+ } else {
+ if nothing_to_do_since.is_none() {
+ nothing_to_do_since = Some(Instant::now());
+ }
+ }
+ }
+ }
must_exit_v = s_must_exit => {
if must_exit_v.unwrap_or(false) {
break;
}
}
+ _ = s_timeout => {
+ if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
+ nothing_to_do_since = None;
+ eprintln!("({}) Adding full scan to syncer todo list", self.table.name);
+ self.todo.lock().await.add_full_scan(&self.table);
+ }
+ }
}
}
Ok(())
@@ -150,9 +168,11 @@ where
async fn syncer_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
+ busy_tx: mpsc::UnboundedSender<bool>,
) -> Result<(), Error> {
while !*must_exit.borrow() {
if let Some(partition) = self.todo.lock().await.pop_task() {
+ busy_tx.send(true)?;
let res = self
.clone()
.sync_partition(&partition, &mut must_exit)
@@ -164,6 +184,7 @@ where
);
}
} else {
+ busy_tx.send(false)?;
tokio::time::delay_for(Duration::from_secs(1)).await;
}
}