diff options
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r-- | src/table_sync.rs | 51 |
1 files changed, 36 insertions, 15 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs index 0f3e90d2..6f45969b 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -9,8 +9,8 @@ use futures_util::future::*; use futures_util::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use tokio::sync::watch; use tokio::sync::Mutex; +use tokio::sync::{mpsc, watch}; use crate::data::*; use crate::error::Error; @@ -18,9 +18,8 @@ use crate::membership::Ring; use crate::table::*; const MAX_DEPTH: usize = 16; -const SCAN_INTERVAL: Duration = Duration::from_secs(3600); +const SCAN_INTERVAL: Duration = Duration::from_secs(60); const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); - const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(10); pub struct TableSyncer<F: TableSchema, R: TableReplication> { @@ -91,18 +90,24 @@ where .collect::<Vec<_>>(), }); + let (busy_tx, busy_rx) = mpsc::unbounded_channel(); + let s1 = syncer.clone(); table .system .background - .spawn_worker(move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit)) + .spawn_worker(move |must_exit: watch::Receiver<bool>| { + s1.watcher_task(must_exit, busy_rx) + }) .await; let s2 = syncer.clone(); table .system .background - .spawn_worker(move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit)) + .spawn_worker(move |must_exit: watch::Receiver<bool>| { + s2.syncer_task(must_exit, busy_tx) + }) .await; syncer @@ -111,25 +116,20 @@ where async fn watcher_task( self: Arc<Self>, mut must_exit: watch::Receiver<bool>, + mut busy_rx: mpsc::UnboundedReceiver<bool>, ) -> Result<(), Error> { - tokio::time::delay_for(Duration::from_secs(10)).await; - - self.todo.lock().await.add_full_scan(&self.table); - let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone(); let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone(); + let mut nothing_to_do_since = Some(Instant::now()); while !*must_exit.borrow() { let s_ring_recv = ring_recv.recv().fuse(); + let s_busy = busy_rx.recv().fuse(); let s_must_exit = must_exit.recv().fuse(); - pin_mut!(s_ring_recv, s_must_exit); + let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse(); + pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout); select! { - _ = next_full_scan => { - next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); - eprintln!("({}) Adding full scan to syncer todo list", self.table.name); - self.todo.lock().await.add_full_scan(&self.table); - } new_ring_r = s_ring_recv => { if let Some(new_ring) = new_ring_r { eprintln!("({}) Adding ring difference to syncer todo list", self.table.name); @@ -137,11 +137,29 @@ where prev_ring = new_ring; } } + busy_opt = s_busy => { + if let Some(busy) = busy_opt { + if busy { + nothing_to_do_since = None; + } else { + if nothing_to_do_since.is_none() { + nothing_to_do_since = Some(Instant::now()); + } + } + } + } must_exit_v = s_must_exit => { if must_exit_v.unwrap_or(false) { break; } } + _ = s_timeout => { + if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { + nothing_to_do_since = None; + eprintln!("({}) Adding full scan to syncer todo list", self.table.name); + self.todo.lock().await.add_full_scan(&self.table); + } + } } } Ok(()) @@ -150,9 +168,11 @@ where async fn syncer_task( self: Arc<Self>, mut must_exit: watch::Receiver<bool>, + busy_tx: mpsc::UnboundedSender<bool>, ) -> Result<(), Error> { while !*must_exit.borrow() { if let Some(partition) = self.todo.lock().await.pop_task() { + busy_tx.send(true)?; let res = self .clone() .sync_partition(&partition, &mut must_exit) @@ -164,6 +184,7 @@ where ); } } else { + busy_tx.send(false)?; tokio::time::delay_for(Duration::from_secs(1)).await; } } |