diff options
author | Mendes <mendes.oulamara@pm.me> | 2022-10-04 18:14:49 +0200 |
---|---|---|
committer | Mendes <mendes.oulamara@pm.me> | 2022-10-04 18:14:49 +0200 |
commit | 829f815a897b04986559910bbcbf53625adcdf20 (patch) | |
tree | 6db3c27cff2aded754a641d1f2b05c83be701267 /src/table/sync.rs | |
parent | 99f96b9564c9c841dc6c56f1255a6e70ff884d46 (diff) | |
parent | a096ced35562bd0a8877a1ee2f755be1edafe343 (diff) | |
download | garage-829f815a897b04986559910bbcbf53625adcdf20.tar.gz garage-829f815a897b04986559910bbcbf53625adcdf20.zip |
Merge remote-tracking branch 'origin/main' into optimal-layout
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r-- | src/table/sync.rs | 233 |
1 files changed, 110 insertions, 123 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs index 08069ad0..9d79d856 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -1,17 +1,17 @@ use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; -use futures::select; -use futures_util::future::*; use futures_util::stream::*; use opentelemetry::KeyValue; use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; +use tokio::select; use tokio::sync::{mpsc, watch}; +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -24,8 +24,6 @@ use crate::merkle::*; use crate::replication::*; use crate::*; -const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); - // Do anti-entropy every 10 minutes const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60); @@ -34,7 +32,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> data: Arc<TableData<F, R>>, merkle: Arc<MerkleUpdater<F, R>>, - todo: Mutex<SyncTodo>, + add_full_sync_tx: mpsc::UnboundedSender<()>, endpoint: Arc<Endpoint<SyncRpc, Self>>, } @@ -52,10 +50,6 @@ impl Rpc for SyncRpc { type Response = Result<SyncRpc, Error>; } -struct SyncTodo { - todo: Vec<TodoPartition>, -} - #[derive(Debug, Clone)] struct TodoPartition { partition: Partition, @@ -80,118 +74,40 @@ where .netapp .endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME)); - let todo = SyncTodo { todo: vec![] }; + let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel(); let syncer = Arc::new(Self { system: system.clone(), data, merkle, - todo: Mutex::new(todo), + add_full_sync_tx, endpoint, }); syncer.endpoint.set_handler(syncer.clone()); - let (busy_tx, busy_rx) = mpsc::unbounded_channel(); - - let s1 = syncer.clone(); - system.background.spawn_worker( - format!("table sync watcher for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), - ); - - let s2 = syncer.clone(); - system.background.spawn_worker( - format!("table syncer for {}", F::TABLE_NAME), - move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), - ); - - let s3 = syncer.clone(); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(20)).await; - s3.add_full_sync(); + system.background.spawn_worker(SyncWorker { + syncer: syncer.clone(), + ring_recv: system.ring.clone(), + ring: system.ring.borrow().clone(), + add_full_sync_rx, + todo: vec![], + next_full_sync: Instant::now() + Duration::from_secs(20), }); syncer } - async fn watcher_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - mut busy_rx: mpsc::UnboundedReceiver<bool>, - ) { - let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone(); - let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone(); - let mut nothing_to_do_since = Some(Instant::now()); - - while !*must_exit.borrow() { - select! { - _ = ring_recv.changed().fuse() => { - let new_ring = ring_recv.borrow(); - if !Arc::ptr_eq(&new_ring, &prev_ring) { - debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); - self.add_full_sync(); - prev_ring = new_ring.clone(); - } - } - busy_opt = busy_rx.recv().fuse() => { - if let Some(busy) = busy_opt { - if busy { - nothing_to_do_since = None; - } else if nothing_to_do_since.is_none() { - nothing_to_do_since = Some(Instant::now()); - } - } - } - _ = must_exit.changed().fuse() => {}, - _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { - if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { - nothing_to_do_since = None; - debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME); - self.add_full_sync(); - } - } - } - } - } - pub fn add_full_sync(&self) { - self.todo - .lock() - .unwrap() - .add_full_sync(&self.data, &self.system); - } - - async fn syncer_task( - self: Arc<Self>, - mut must_exit: watch::Receiver<bool>, - busy_tx: mpsc::UnboundedSender<bool>, - ) { - while !*must_exit.borrow() { - let task = self.todo.lock().unwrap().pop_task(); - if let Some(partition) = task { - busy_tx.send(true).unwrap(); - let res = self - .clone() - .sync_partition(&partition, &mut must_exit) - .await; - if let Err(e) = res { - warn!( - "({}) Error while syncing {:?}: {}", - F::TABLE_NAME, - partition, - e - ); - } - } else { - busy_tx.send(false).unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - } + if self.add_full_sync_tx.send(()).is_err() { + error!("({}) Could not add full sync", F::TABLE_NAME); } } + // ---- + async fn sync_partition( - self: Arc<Self>, + self: &Arc<Self>, partition: &TodoPartition, must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { @@ -258,9 +174,9 @@ where while !*must_exit.borrow() { let mut items = Vec::new(); - for item in self.data.store.range(begin.to_vec()..end.to_vec()) { + for item in self.data.store.range(begin.to_vec()..end.to_vec())? { let (key, value) = item?; - items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref())))); + items.push((key.to_vec(), Arc::new(ByteBuf::from(value)))); if items.len() >= 1024 { break; @@ -329,9 +245,7 @@ where &self.endpoint, nodes, SyncRpc::Items(values), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_quorum(nodes.len()) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()), ) .await?; @@ -392,8 +306,7 @@ where &self.endpoint, who, SyncRpc::RootCkHash(partition.partition, root_ck_hash), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await?; @@ -432,11 +345,11 @@ where // Just send that item directly if let Some(val) = self.data.store.get(&ik[..])? { if blake2sum(&val[..]) != ivhash { - warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik); + debug!("({}) Hashes differ between stored value and Merkle tree, key: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik)); } todo_items.push(val.to_vec()); } else { - warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik); + debug!("({}) Item from Merkle tree not found in store: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik)); } } MerkleNode::Intermediate(l) => { @@ -449,8 +362,7 @@ where &self.endpoint, who, SyncRpc::GetNode(key.clone()), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await? { @@ -526,8 +438,7 @@ where &self.endpoint, who, SyncRpc::Items(values), - RequestStrategy::with_priority(PRIO_BACKGROUND) - .with_timeout(TABLE_SYNC_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_BACKGROUND), ) .await?; if let SyncRpc::Ok = rpc_resp { @@ -577,12 +488,22 @@ where } } -impl SyncTodo { - fn add_full_sync<F: TableSchema, R: TableReplication>( - &mut self, - data: &TableData<F, R>, - system: &System, - ) { +// -------- Sync Worker --------- + +struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> { + syncer: Arc<TableSyncer<F, R>>, + ring_recv: watch::Receiver<Arc<Ring>>, + ring: Arc<Ring>, + add_full_sync_rx: mpsc::UnboundedReceiver<()>, + todo: Vec<TodoPartition>, + next_full_sync: Instant, +} + +impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> { + fn add_full_sync(&mut self) { + let system = &self.syncer.system; + let data = &self.syncer.data; + let my_id = system.id; self.todo.clear(); @@ -603,8 +524,16 @@ impl SyncTodo { let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if data.store.range(begin..end).next().is_none() { - continue; + match data.store.range(begin..end) { + Ok(mut iter) => { + if iter.next().is_none() { + continue; + } + } + Err(e) => { + warn!("DB error in add_full_sync: {}", e); + continue; + } } } @@ -615,6 +544,8 @@ impl SyncTodo { retain, }); } + + self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL; } fn pop_task(&mut self) -> Option<TodoPartition> { @@ -633,6 +564,62 @@ impl SyncTodo { } } +#[async_trait] +impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> { + fn name(&self) -> String { + format!("{} sync", F::TABLE_NAME) + } + + fn info(&self) -> Option<String> { + let l = self.todo.len(); + if l > 0 { + Some(format!("{} partitions remaining", l)) + } else { + None + } + } + + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + if let Some(partition) = self.pop_task() { + self.syncer.sync_partition(&partition, must_exit).await?; + Ok(WorkerState::Busy) + } else { + Ok(WorkerState::Idle) + } + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + select! { + s = self.add_full_sync_rx.recv() => { + if let Some(()) = s { + self.add_full_sync(); + } + }, + _ = self.ring_recv.changed() => { + let new_ring = self.ring_recv.borrow(); + if !Arc::ptr_eq(&new_ring, &self.ring) { + self.ring = new_ring.clone(); + drop(new_ring); + debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); + self.add_full_sync(); + } + }, + _ = tokio::time::sleep_until(self.next_full_sync.into()) => { + self.add_full_sync(); + } + } + match self.todo.is_empty() { + false => WorkerState::Busy, + true => WorkerState::Idle, + } + } +} + +// ---- UTIL ---- + fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> { Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) } |