aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs632
1 files changed, 632 insertions, 0 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
new file mode 100644
index 00000000..9c37c286
--- /dev/null
+++ b/src/table/sync.rs
@@ -0,0 +1,632 @@
+use std::collections::VecDeque;
+use std::convert::TryInto;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::future::join_all;
+use futures::{pin_mut, select};
+use futures_util::future::*;
+use futures_util::stream::*;
+use rand::Rng;
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
+use tokio::sync::{mpsc, watch};
+
+use garage_rpc::ring::Ring;
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use crate::data::*;
+use crate::merkle::*;
+use crate::replication::*;
+use crate::*;
+
+const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
+
+// Do anti-entropy every 10 minutes
+const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
+
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
+ data: Arc<TableData<F>>,
+ aux: Arc<TableAux<F, R>>,
+
+ todo: Mutex<SyncTodo>,
+}
+
+type RootCk = Vec<(MerklePartition, Hash)>;
+
+#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
+pub struct PartitionRange {
+ begin: MerklePartition,
+ // if end is None, go all the way to partition 0xFFFF included
+ end: Option<MerklePartition>,
+}
+
+#[derive(Serialize, Deserialize)]
+pub(crate) enum SyncRPC {
+ RootCkHash(PartitionRange, Hash),
+ RootCkList(PartitionRange, RootCk),
+ CkNoDifference,
+ GetNode(MerkleNodeKey),
+ Node(MerkleNodeKey, MerkleNode),
+ Items(Vec<Arc<ByteBuf>>),
+}
+
+struct SyncTodo {
+ todo: Vec<TodoPartition>,
+}
+
+#[derive(Debug, Clone)]
+struct TodoPartition {
+ range: PartitionRange,
+
+ // Are we a node that stores this partition or not?
+ retain: bool,
+}
+
+impl<F, R> TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub(crate) fn launch(data: Arc<TableData<F>>, aux: Arc<TableAux<F, R>>) -> Arc<Self> {
+ let todo = SyncTodo { todo: vec![] };
+
+ let syncer = Arc::new(Self {
+ data: data.clone(),
+ aux: aux.clone(),
+ todo: Mutex::new(todo),
+ });
+
+ let (busy_tx, busy_rx) = mpsc::unbounded_channel();
+
+ let s1 = syncer.clone();
+ aux.system.background.spawn_worker(
+ format!("table sync watcher for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
+ );
+
+ let s2 = syncer.clone();
+ aux.system.background.spawn_worker(
+ format!("table syncer for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
+ );
+
+ let s3 = syncer.clone();
+ tokio::spawn(async move {
+ tokio::time::delay_for(Duration::from_secs(20)).await;
+ s3.add_full_sync();
+ });
+
+ syncer
+ }
+
+ async fn watcher_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ mut busy_rx: mpsc::UnboundedReceiver<bool>,
+ ) -> Result<(), Error> {
+ let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
+ let mut nothing_to_do_since = Some(Instant::now());
+
+ while !*must_exit.borrow() {
+ let s_ring_recv = ring_recv.recv().fuse();
+ let s_busy = busy_rx.recv().fuse();
+ let s_must_exit = must_exit.recv().fuse();
+ let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
+ pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
+
+ select! {
+ new_ring_r = s_ring_recv => {
+ if new_ring_r.is_some() {
+ debug!("({}) Adding ring difference to syncer todo list", self.data.name);
+ self.add_full_sync();
+ }
+ }
+ busy_opt = s_busy => {
+ if let Some(busy) = busy_opt {
+ if busy {
+ nothing_to_do_since = None;
+ } else {
+ if nothing_to_do_since.is_none() {
+ nothing_to_do_since = Some(Instant::now());
+ }
+ }
+ }
+ }
+ must_exit_v = s_must_exit => {
+ if must_exit_v.unwrap_or(false) {
+ break;
+ }
+ }
+ _ = s_timeout => {
+ if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
+ nothing_to_do_since = None;
+ debug!("({}) Adding full sync to syncer todo list", self.data.name);
+ self.add_full_sync();
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+
+ pub fn add_full_sync(&self) {
+ self.todo
+ .lock()
+ .unwrap()
+ .add_full_sync(&self.data, &self.aux);
+ }
+
+ async fn syncer_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ busy_tx: mpsc::UnboundedSender<bool>,
+ ) -> Result<(), Error> {
+ while !*must_exit.borrow() {
+ let task = self.todo.lock().unwrap().pop_task();
+ if let Some(partition) = task {
+ busy_tx.send(true)?;
+ let res = self
+ .clone()
+ .sync_partition(&partition, &mut must_exit)
+ .await;
+ if let Err(e) = res {
+ warn!(
+ "({}) Error while syncing {:?}: {}",
+ self.data.name, partition, e
+ );
+ }
+ } else {
+ busy_tx.send(false)?;
+ tokio::time::delay_for(Duration::from_secs(1)).await;
+ }
+ }
+ Ok(())
+ }
+
+ async fn sync_partition(
+ self: Arc<Self>,
+ partition: &TodoPartition,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ if partition.retain {
+ let my_id = self.aux.system.id;
+
+ let nodes = self
+ .aux
+ .replication
+ .write_nodes(
+ &hash_of_merkle_partition(partition.range.begin),
+ &self.aux.system,
+ )
+ .into_iter()
+ .filter(|node| *node != my_id)
+ .collect::<Vec<_>>();
+
+ debug!(
+ "({}) Syncing {:?} with {:?}...",
+ self.data.name, partition, nodes
+ );
+ let mut sync_futures = nodes
+ .iter()
+ .map(|node| {
+ self.clone()
+ .do_sync_with(partition.clone(), *node, must_exit.clone())
+ })
+ .collect::<FuturesUnordered<_>>();
+
+ let mut n_errors = 0;
+ while let Some(r) = sync_futures.next().await {
+ if let Err(e) = r {
+ n_errors += 1;
+ warn!("({}) Sync error: {}", self.data.name, e);
+ }
+ }
+ if n_errors > self.aux.replication.max_write_errors() {
+ return Err(Error::Message(format!(
+ "Sync failed with too many nodes (should have been: {:?}).",
+ nodes
+ )));
+ }
+ } else {
+ self.offload_partition(
+ &hash_of_merkle_partition(partition.range.begin),
+ &hash_of_merkle_partition_opt(partition.range.end),
+ must_exit,
+ )
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ // Offload partition: this partition is not something we are storing,
+ // so send it out to all other nodes that store it and delete items locally.
+ // We don't bother checking if the remote nodes already have the items,
+ // we just batch-send everything. Offloading isn't supposed to happen very often.
+ // If any of the nodes that are supposed to store the items is unable to
+ // save them, we interrupt the process.
+ async fn offload_partition(
+ self: &Arc<Self>,
+ begin: &Hash,
+ end: &Hash,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let mut counter: usize = 0;
+
+ while !*must_exit.borrow() {
+ let mut items = Vec::new();
+
+ for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
+ let (key, value) = item?;
+ items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
+
+ if items.len() >= 1024 {
+ break;
+ }
+ }
+
+ if items.len() > 0 {
+ let nodes = self
+ .aux
+ .replication
+ .write_nodes(&begin, &self.aux.system)
+ .into_iter()
+ .collect::<Vec<_>>();
+ if nodes.contains(&self.aux.system.id) {
+ warn!("Interrupting offload as partitions seem to have changed");
+ break;
+ }
+
+ counter += 1;
+ debug!(
+ "Offloading {} items from {:?}..{:?} ({})",
+ items.len(),
+ begin,
+ end,
+ counter
+ );
+ self.offload_items(&items, &nodes[..]).await?;
+ } else {
+ break;
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn offload_items(
+ self: &Arc<Self>,
+ items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
+ nodes: &[UUID],
+ ) -> Result<(), Error> {
+ let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
+ let update_msg = Arc::new(TableRPC::<F>::Update(values));
+
+ for res in join_all(nodes.iter().map(|to| {
+ self.aux
+ .rpc_client
+ .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
+ }))
+ .await
+ {
+ res?;
+ }
+
+ // All remote nodes have written those items, now we can delete them locally
+ let mut not_removed = 0;
+ for (k, v) in items.iter() {
+ if !self.data.delete_if_equal(&k[..], &v[..])? {
+ not_removed += 1;
+ }
+ }
+
+ if not_removed > 0 {
+ debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed);
+ }
+
+ Ok(())
+ }
+
+ // ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ======
+
+ fn get_root_ck(&self, range: PartitionRange) -> Result<RootCk, Error> {
+ let begin = u16::from_be_bytes(range.begin);
+ let range_iter = match range.end {
+ Some(end) => {
+ let end = u16::from_be_bytes(end);
+ begin..=(end - 1)
+ }
+ None => begin..=0xFFFF,
+ };
+
+ let mut ret = vec![];
+ for i in range_iter {
+ let key = MerkleNodeKey {
+ partition: u16::to_be_bytes(i),
+ prefix: vec![],
+ };
+ match self.data.merkle_updater.read_node(&key)? {
+ MerkleNode::Empty => (),
+ x => {
+ ret.push((key.partition, hash_of(&x)?));
+ }
+ }
+ }
+ Ok(ret)
+ }
+
+ async fn do_sync_with(
+ self: Arc<Self>,
+ partition: TodoPartition,
+ who: UUID,
+ must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let root_ck = self.get_root_ck(partition.range)?;
+ let root_ck_hash = hash_of(&root_ck)?;
+
+ // If their root checksum has level > than us, use that as a reference
+ let root_resp = self
+ .aux
+ .rpc_client
+ .call(
+ who,
+ TableRPC::<F>::SyncRPC(SyncRPC::RootCkHash(partition.range, root_ck_hash)),
+ TABLE_SYNC_RPC_TIMEOUT,
+ )
+ .await?;
+
+ let mut todo = match root_resp {
+ TableRPC::<F>::SyncRPC(SyncRPC::CkNoDifference) => {
+ debug!(
+ "({}) Sync {:?} with {:?}: no difference",
+ self.data.name, partition, who
+ );
+ return Ok(());
+ }
+ TableRPC::<F>::SyncRPC(SyncRPC::RootCkList(_, their_root_ck)) => {
+ let join = join_ordered(&root_ck[..], &their_root_ck[..]);
+ let mut todo = VecDeque::new();
+ for (p, v1, v2) in join.iter() {
+ let diff = match (v1, v2) {
+ (Some(_), None) | (None, Some(_)) => true,
+ (Some(a), Some(b)) => a != b,
+ _ => false,
+ };
+ if diff {
+ todo.push_back(MerkleNodeKey {
+ partition: **p,
+ prefix: vec![],
+ });
+ }
+ }
+ debug!(
+ "({}) Sync {:?} with {:?}: todo.len() = {}",
+ self.data.name,
+ partition,
+ who,
+ todo.len()
+ );
+ todo
+ }
+ x => {
+ return Err(Error::Message(format!(
+ "Invalid respone to RootCkHash RPC: {}",
+ debug_serialize(x)
+ )));
+ }
+ };
+
+ let mut todo_items = vec![];
+
+ while !todo.is_empty() && !*must_exit.borrow() {
+ let key = todo.pop_front().unwrap();
+ let node = self.data.merkle_updater.read_node(&key)?;
+
+ match node {
+ MerkleNode::Empty => {
+ // They have items we don't have.
+ // We don't request those items from them, they will send them.
+ // We only bother with pushing items that differ
+ }
+ MerkleNode::Leaf(ik, _) => {
+ // Just send that item directly
+ if let Some(val) = self.data.store.get(ik)? {
+ todo_items.push(val.to_vec());
+ }
+ }
+ MerkleNode::Intermediate(l) => {
+ let remote_node = match self
+ .aux
+ .rpc_client
+ .call(
+ who,
+ TableRPC::<F>::SyncRPC(SyncRPC::GetNode(key.clone())),
+ TABLE_SYNC_RPC_TIMEOUT,
+ )
+ .await?
+ {
+ TableRPC::<F>::SyncRPC(SyncRPC::Node(_, node)) => node,
+ x => {
+ return Err(Error::Message(format!(
+ "Invalid respone to GetNode RPC: {}",
+ debug_serialize(x)
+ )));
+ }
+ };
+ let int_l2 = match remote_node {
+ MerkleNode::Intermediate(l2) => l2,
+ _ => vec![],
+ };
+
+ let join = join_ordered(&l[..], &int_l2[..]);
+ for (p, v1, v2) in join.into_iter() {
+ let diff = match (v1, v2) {
+ (Some(_), None) | (None, Some(_)) => true,
+ (Some(a), Some(b)) => a != b,
+ _ => false,
+ };
+ if diff {
+ todo.push_back(key.add_byte(*p));
+ }
+ }
+ }
+ }
+
+ if todo_items.len() >= 256 {
+ self.send_items(who, std::mem::replace(&mut todo_items, vec![]))
+ .await?;
+ }
+ }
+
+ if !todo_items.is_empty() {
+ self.send_items(who, todo_items).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ info!(
+ "({}) Sending {} items to {:?}",
+ self.data.name,
+ item_list.len(),
+ who
+ );
+
+ let mut values = vec![];
+ for item in item_list.iter() {
+ if let Some(v) = self.data.store.get(&item[..])? {
+ values.push(Arc::new(ByteBuf::from(v.as_ref())));
+ }
+ }
+ let rpc_resp = self
+ .aux
+ .rpc_client
+ .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
+ .await?;
+ if let TableRPC::<F>::Ok = rpc_resp {
+ Ok(())
+ } else {
+ Err(Error::Message(format!(
+ "Unexpected response to RPC Update: {}",
+ debug_serialize(&rpc_resp)
+ )))
+ }
+ }
+
+ // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
+
+ pub(crate) async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
+ match message {
+ SyncRPC::RootCkHash(range, h) => {
+ let root_ck = self.get_root_ck(*range)?;
+ let hash = hash_of(&root_ck)?;
+ if hash == *h {
+ Ok(SyncRPC::CkNoDifference)
+ } else {
+ Ok(SyncRPC::RootCkList(*range, root_ck))
+ }
+ }
+ SyncRPC::GetNode(k) => {
+ let node = self.data.merkle_updater.read_node(&k)?;
+ Ok(SyncRPC::Node(k.clone(), node))
+ }
+ _ => Err(Error::Message(format!("Unexpected sync RPC"))),
+ }
+ }
+}
+
+impl SyncTodo {
+ fn add_full_sync<F: TableSchema, R: TableReplication>(
+ &mut self,
+ data: &TableData<F>,
+ aux: &TableAux<F, R>,
+ ) {
+ let my_id = aux.system.id;
+
+ self.todo.clear();
+
+ let ring = aux.system.ring.borrow().clone();
+ let split_points = aux.replication.split_points(&ring);
+
+ for i in 0..split_points.len() {
+ let begin: MerklePartition = {
+ let b = split_points[i];
+ assert_eq!(b.as_slice()[2..], [0u8; 30][..]);
+ b.as_slice()[..2].try_into().unwrap()
+ };
+
+ let end: Option<MerklePartition> = if i + 1 < split_points.len() {
+ let e = split_points[i + 1];
+ assert_eq!(e.as_slice()[2..], [0u8; 30][..]);
+ Some(e.as_slice()[..2].try_into().unwrap())
+ } else {
+ None
+ };
+
+ let begin_hash = hash_of_merkle_partition(begin);
+ let end_hash = hash_of_merkle_partition_opt(end);
+
+ let nodes = aux.replication.replication_nodes(&begin_hash, &ring);
+
+ let retain = nodes.contains(&my_id);
+ if !retain {
+ // Check if we have some data to send, otherwise skip
+ if data.store.range(begin_hash..end_hash).next().is_none() {
+ continue;
+ }
+ }
+
+ self.todo.push(TodoPartition {
+ range: PartitionRange { begin, end },
+ retain,
+ });
+ }
+ }
+
+ fn pop_task(&mut self) -> Option<TodoPartition> {
+ if self.todo.is_empty() {
+ return None;
+ }
+
+ let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
+ if i == self.todo.len() - 1 {
+ self.todo.pop()
+ } else {
+ let replacement = self.todo.pop().unwrap();
+ let ret = std::mem::replace(&mut self.todo[i], replacement);
+ Some(ret)
+ }
+ }
+}
+
+fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
+ Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
+}
+
+fn join_ordered<'a, K: Ord + Eq, V1, V2>(
+ x: &'a [(K, V1)],
+ y: &'a [(K, V2)],
+) -> Vec<(&'a K, Option<&'a V1>, Option<&'a V2>)> {
+ let mut ret = vec![];
+ let mut i = 0;
+ let mut j = 0;
+ while i < x.len() || j < y.len() {
+ if i < x.len() && j < y.len() && x[i].0 == y[j].0 {
+ ret.push((&x[i].0, Some(&x[i].1), Some(&y[j].1)));
+ i += 1;
+ j += 1;
+ } else if i < x.len() && (j == y.len() || x[i].0 < y[j].0) {
+ ret.push((&x[i].0, Some(&x[i].1), None));
+ i += 1;
+ } else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) {
+ ret.push((&x[i].0, None, Some(&y[j].1)));
+ j += 1;
+ } else {
+ unreachable!();
+ }
+ }
+ ret
+}