aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
authorMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
committerMendes <mendes.oulamara@pm.me>2022-10-04 18:14:49 +0200
commit829f815a897b04986559910bbcbf53625adcdf20 (patch)
tree6db3c27cff2aded754a641d1f2b05c83be701267 /src/table/sync.rs
parent99f96b9564c9c841dc6c56f1255a6e70ff884d46 (diff)
parenta096ced35562bd0a8877a1ee2f755be1edafe343 (diff)
downloadgarage-829f815a897b04986559910bbcbf53625adcdf20.tar.gz
garage-829f815a897b04986559910bbcbf53625adcdf20.zip
Merge remote-tracking branch 'origin/main' into optimal-layout
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs233
1 files changed, 110 insertions, 123 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 08069ad0..9d79d856 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -1,17 +1,17 @@
use std::collections::VecDeque;
-use std::sync::{Arc, Mutex};
+use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
-use futures::select;
-use futures_util::future::*;
use futures_util::stream::*;
use opentelemetry::KeyValue;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
+use tokio::select;
use tokio::sync::{mpsc, watch};
+use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@@ -24,8 +24,6 @@ use crate::merkle::*;
use crate::replication::*;
use crate::*;
-const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
@@ -34,7 +32,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- todo: Mutex<SyncTodo>,
+ add_full_sync_tx: mpsc::UnboundedSender<()>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@@ -52,10 +50,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
-struct SyncTodo {
- todo: Vec<TodoPartition>,
-}
-
#[derive(Debug, Clone)]
struct TodoPartition {
partition: Partition,
@@ -80,118 +74,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
- let todo = SyncTodo { todo: vec![] };
+ let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
let syncer = Arc::new(Self {
system: system.clone(),
data,
merkle,
- todo: Mutex::new(todo),
+ add_full_sync_tx,
endpoint,
});
syncer.endpoint.set_handler(syncer.clone());
- let (busy_tx, busy_rx) = mpsc::unbounded_channel();
-
- let s1 = syncer.clone();
- system.background.spawn_worker(
- format!("table sync watcher for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
- );
-
- let s2 = syncer.clone();
- system.background.spawn_worker(
- format!("table syncer for {}", F::TABLE_NAME),
- move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
- );
-
- let s3 = syncer.clone();
- tokio::spawn(async move {
- tokio::time::sleep(Duration::from_secs(20)).await;
- s3.add_full_sync();
+ system.background.spawn_worker(SyncWorker {
+ syncer: syncer.clone(),
+ ring_recv: system.ring.clone(),
+ ring: system.ring.borrow().clone(),
+ add_full_sync_rx,
+ todo: vec![],
+ next_full_sync: Instant::now() + Duration::from_secs(20),
});
syncer
}
- async fn watcher_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- mut busy_rx: mpsc::UnboundedReceiver<bool>,
- ) {
- let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
- let mut nothing_to_do_since = Some(Instant::now());
-
- while !*must_exit.borrow() {
- select! {
- _ = ring_recv.changed().fuse() => {
- let new_ring = ring_recv.borrow();
- if !Arc::ptr_eq(&new_ring, &prev_ring) {
- debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- prev_ring = new_ring.clone();
- }
- }
- busy_opt = busy_rx.recv().fuse() => {
- if let Some(busy) = busy_opt {
- if busy {
- nothing_to_do_since = None;
- } else if nothing_to_do_since.is_none() {
- nothing_to_do_since = Some(Instant::now());
- }
- }
- }
- _ = must_exit.changed().fuse() => {},
- _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
- if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
- nothing_to_do_since = None;
- debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME);
- self.add_full_sync();
- }
- }
- }
- }
- }
-
pub fn add_full_sync(&self) {
- self.todo
- .lock()
- .unwrap()
- .add_full_sync(&self.data, &self.system);
- }
-
- async fn syncer_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- busy_tx: mpsc::UnboundedSender<bool>,
- ) {
- while !*must_exit.borrow() {
- let task = self.todo.lock().unwrap().pop_task();
- if let Some(partition) = task {
- busy_tx.send(true).unwrap();
- let res = self
- .clone()
- .sync_partition(&partition, &mut must_exit)
- .await;
- if let Err(e) = res {
- warn!(
- "({}) Error while syncing {:?}: {}",
- F::TABLE_NAME,
- partition,
- e
- );
- }
- } else {
- busy_tx.send(false).unwrap();
- tokio::time::sleep(Duration::from_secs(1)).await;
- }
+ if self.add_full_sync_tx.send(()).is_err() {
+ error!("({}) Could not add full sync", F::TABLE_NAME);
}
}
+ // ----
+
async fn sync_partition(
- self: Arc<Self>,
+ self: &Arc<Self>,
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
@@ -258,9 +174,9 @@ where
while !*must_exit.borrow() {
let mut items = Vec::new();
- for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
+ for item in self.data.store.range(begin.to_vec()..end.to_vec())? {
let (key, value) = item?;
- items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
+ items.push((key.to_vec(), Arc::new(ByteBuf::from(value))));
if items.len() >= 1024 {
break;
@@ -329,9 +245,7 @@ where
&self.endpoint,
nodes,
SyncRpc::Items(values),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_quorum(nodes.len())
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await?;
@@ -392,8 +306,7 @@ where
&self.endpoint,
who,
SyncRpc::RootCkHash(partition.partition, root_ck_hash),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
@@ -432,11 +345,11 @@ where
// Just send that item directly
if let Some(val) = self.data.store.get(&ik[..])? {
if blake2sum(&val[..]) != ivhash {
- warn!("({}) Hashes differ between stored value and Merkle tree, key: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik);
+ debug!("({}) Hashes differ between stored value and Merkle tree, key: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik));
}
todo_items.push(val.to_vec());
} else {
- warn!("({}) Item from Merkle tree not found in store: {:?} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, ik);
+ debug!("({}) Item from Merkle tree not found in store: {} (if your server is very busy, don't worry, this happens when the Merkle tree can't be updated fast enough)", F::TABLE_NAME, hex::encode(ik));
}
}
MerkleNode::Intermediate(l) => {
@@ -449,8 +362,7 @@ where
&self.endpoint,
who,
SyncRpc::GetNode(key.clone()),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?
{
@@ -526,8 +438,7 @@ where
&self.endpoint,
who,
SyncRpc::Items(values),
- RequestStrategy::with_priority(PRIO_BACKGROUND)
- .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
if let SyncRpc::Ok = rpc_resp {
@@ -577,12 +488,22 @@ where
}
}
-impl SyncTodo {
- fn add_full_sync<F: TableSchema, R: TableReplication>(
- &mut self,
- data: &TableData<F, R>,
- system: &System,
- ) {
+// -------- Sync Worker ---------
+
+struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
+ syncer: Arc<TableSyncer<F, R>>,
+ ring_recv: watch::Receiver<Arc<Ring>>,
+ ring: Arc<Ring>,
+ add_full_sync_rx: mpsc::UnboundedReceiver<()>,
+ todo: Vec<TodoPartition>,
+ next_full_sync: Instant,
+}
+
+impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
+ fn add_full_sync(&mut self) {
+ let system = &self.syncer.system;
+ let data = &self.syncer.data;
+
let my_id = system.id;
self.todo.clear();
@@ -603,8 +524,16 @@ impl SyncTodo {
let retain = nodes.contains(&my_id);
if !retain {
// Check if we have some data to send, otherwise skip
- if data.store.range(begin..end).next().is_none() {
- continue;
+ match data.store.range(begin..end) {
+ Ok(mut iter) => {
+ if iter.next().is_none() {
+ continue;
+ }
+ }
+ Err(e) => {
+ warn!("DB error in add_full_sync: {}", e);
+ continue;
+ }
}
}
@@ -615,6 +544,8 @@ impl SyncTodo {
retain,
});
}
+
+ self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
fn pop_task(&mut self) -> Option<TodoPartition> {
@@ -633,6 +564,62 @@ impl SyncTodo {
}
}
+#[async_trait]
+impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
+ fn name(&self) -> String {
+ format!("{} sync", F::TABLE_NAME)
+ }
+
+ fn info(&self) -> Option<String> {
+ let l = self.todo.len();
+ if l > 0 {
+ Some(format!("{} partitions remaining", l))
+ } else {
+ None
+ }
+ }
+
+ async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ if let Some(partition) = self.pop_task() {
+ self.syncer.sync_partition(&partition, must_exit).await?;
+ Ok(WorkerState::Busy)
+ } else {
+ Ok(WorkerState::Idle)
+ }
+ }
+
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
+ if *must_exit.borrow() {
+ return WorkerState::Done;
+ }
+ select! {
+ s = self.add_full_sync_rx.recv() => {
+ if let Some(()) = s {
+ self.add_full_sync();
+ }
+ },
+ _ = self.ring_recv.changed() => {
+ let new_ring = self.ring_recv.borrow();
+ if !Arc::ptr_eq(&new_ring, &self.ring) {
+ self.ring = new_ring.clone();
+ drop(new_ring);
+ debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
+ self.add_full_sync();
+ }
+ },
+ _ = tokio::time::sleep_until(self.next_full_sync.into()) => {
+ self.add_full_sync();
+ }
+ }
+ match self.todo.is_empty() {
+ false => WorkerState::Busy,
+ true => WorkerState::Idle,
+ }
+ }
+}
+
+// ---- UTIL ----
+
fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
}