aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs15
-rw-r--r--src/table/lib.rs4
-rw-r--r--src/table/merkle.rs107
-rw-r--r--src/table/replication/fullcopy.rs1
-rw-r--r--src/table/replication/sharded.rs6
-rw-r--r--src/table/sync.rs632
-rw-r--r--src/table/table.rs50
-rw-r--r--src/table/table_sync.rs898
8 files changed, 752 insertions, 961 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index fa89fc27..6217bf6d 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -1,16 +1,16 @@
use std::sync::Arc;
use log::warn;
-use sled::Transactional;
use serde_bytes::ByteBuf;
+use sled::Transactional;
+use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::*;
-use garage_util::background::BackgroundRunner;
-use crate::schema::*;
-use crate::merkle::*;
use crate::crdt::CRDT;
+use crate::merkle::*;
+use crate::schema::*;
pub struct TableData<F: TableSchema> {
pub name: String,
@@ -20,7 +20,10 @@ pub struct TableData<F: TableSchema> {
pub(crate) merkle_updater: Arc<MerkleUpdater>,
}
-impl<F> TableData<F> where F: TableSchema {
+impl<F> TableData<F>
+where
+ F: TableSchema,
+{
pub fn new(
name: String,
instance: F,
@@ -45,7 +48,7 @@ impl<F> TableData<F> where F: TableSchema {
merkle_tree_store,
);
- Arc::new(Self{
+ Arc::new(Self {
name,
instance,
store,
diff --git a/src/table/lib.rs b/src/table/lib.rs
index bb249a56..18c29c35 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -7,11 +7,11 @@ pub mod crdt;
pub mod schema;
pub mod util;
+pub mod data;
pub mod merkle;
pub mod replication;
-pub mod data;
+pub mod sync;
pub mod table;
-pub mod table_sync;
pub use schema::*;
pub use table::*;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index ef197dc8..92c18e09 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -15,6 +15,19 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+pub type MerklePartition = [u8; 2];
+
+pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash {
+ let mut partition_pos = [0u8; 32];
+ partition_pos[0..2].copy_from_slice(&p[..]);
+ partition_pos.into()
+}
+
+pub fn hash_of_merkle_partition_opt(p: Option<MerklePartition>) -> Hash {
+ p.map(hash_of_merkle_partition)
+ .unwrap_or([0xFFu8; 32].into())
+}
+
// This modules partitions the data in 2**16 partitions, based on the top
// 16 bits (two bytes) of item's partition keys' hashes.
// It builds one Merkle tree for each of these 2**16 partitions.
@@ -37,10 +50,10 @@ pub(crate) struct MerkleUpdater {
empty_node_hash: Hash,
}
-#[derive(Clone)]
+#[derive(Clone, Serialize, Deserialize)]
pub struct MerkleNodeKey {
// partition: first 16 bits (two bytes) of the partition_key's hash
- pub partition: [u8; 2],
+ pub partition: MerklePartition,
// prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
pub prefix: Vec<u8>,
@@ -214,8 +227,11 @@ impl MerkleUpdater {
// insertion and replace current node by an intermediary node
let (pos1, h1) = {
let key2 = key.next_key(blake2sum(&exlf_key[..]));
- let subhash =
- self.put_node_txn(tx, &key2, &MerkleNode::Leaf(exlf_key, exlf_hash))?;
+ let subhash = self.put_node_txn(
+ tx,
+ &key2,
+ &MerkleNode::Leaf(exlf_key, exlf_hash),
+ )?;
(key2.prefix[i], subhash)
};
let (pos2, h2) = {
@@ -280,14 +296,11 @@ impl MerkleUpdater {
}
// Access a node in the Merkle tree, used by the sync protocol
- pub(crate) fn read_node(
- &self,
- k: &MerkleNodeKey,
- ) -> Result<MerkleNode, Error> {
+ pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> {
let ent = self.merkle_tree.get(k.encode())?;
match ent {
None => Ok(MerkleNode::Empty),
- Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?)
+ Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
}
}
}
@@ -339,31 +352,77 @@ fn intermediate_rm_child(ch: &mut Vec<(u8, Hash)>, pos: u8) {
#[test]
fn test_intermediate_aux() {
let mut v = vec![];
-
+
intermediate_set_child(&mut v, 12u8, [12u8; 32].into());
- assert!(v == vec![(12u8, [12u8; 32].into())]);
-
+ assert_eq!(v, vec![(12u8, [12u8; 32].into())]);
+
intermediate_set_child(&mut v, 42u8, [42u8; 32].into());
- assert!(v == vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]);
-
+ assert_eq!(
+ v,
+ vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]
+ );
+
intermediate_set_child(&mut v, 4u8, [4u8; 32].into());
- assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]);
-
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (12u8, [12u8; 32].into()),
+ (42u8, [42u8; 32].into())
+ ]
+ );
+
intermediate_set_child(&mut v, 12u8, [8u8; 32].into());
- assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]);
-
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (12u8, [8u8; 32].into()),
+ (42u8, [42u8; 32].into())
+ ]
+ );
+
intermediate_set_child(&mut v, 6u8, [6u8; 32].into());
- assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]);
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [6u8; 32].into()),
+ (12u8, [8u8; 32].into()),
+ (42u8, [42u8; 32].into())
+ ]
+ );
intermediate_rm_child(&mut v, 42u8);
- assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]);
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [6u8; 32].into()),
+ (12u8, [8u8; 32].into())
+ ]
+ );
intermediate_rm_child(&mut v, 11u8);
- assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]);
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [6u8; 32].into()),
+ (12u8, [8u8; 32].into())
+ ]
+ );
intermediate_rm_child(&mut v, 6u8);
- assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]);
-
+ assert_eq!(v, vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]);
+
intermediate_set_child(&mut v, 6u8, [7u8; 32].into());
- assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [7u8; 32].into()), (12u8, [8u8; 32].into())]);
+ assert_eq!(
+ v,
+ vec![
+ (4u8, [4u8; 32].into()),
+ (6u8, [7u8; 32].into()),
+ (12u8, [8u8; 32].into())
+ ]
+ );
}
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index a62a6c3c..a20f20b7 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -53,7 +53,6 @@ impl TableReplication for TableFullReplication {
fn split_points(&self, _ring: &Ring) -> Vec<Hash> {
let mut ret = vec![];
ret.push([0u8; 32].into());
- ret.push([0xFFu8; 32].into());
ret
}
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 42a742cd..886c7c08 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -44,11 +44,13 @@ impl TableReplication for TableShardedReplication {
fn split_points(&self, ring: &Ring) -> Vec<Hash> {
let mut ret = vec![];
- ret.push([0u8; 32].into());
for entry in ring.ring.iter() {
ret.push(entry.location);
}
- ret.push([0xFFu8; 32].into());
+ if ret.len() > 0 {
+ assert_eq!(ret[0], [0u8; 32].into());
+ }
+
ret
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
new file mode 100644
index 00000000..9c37c286
--- /dev/null
+++ b/src/table/sync.rs
@@ -0,0 +1,632 @@
+use std::collections::VecDeque;
+use std::convert::TryInto;
+use std::sync::{Arc, Mutex};
+use std::time::{Duration, Instant};
+
+use futures::future::join_all;
+use futures::{pin_mut, select};
+use futures_util::future::*;
+use futures_util::stream::*;
+use rand::Rng;
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
+use tokio::sync::{mpsc, watch};
+
+use garage_rpc::ring::Ring;
+use garage_util::data::*;
+use garage_util::error::Error;
+
+use crate::data::*;
+use crate::merkle::*;
+use crate::replication::*;
+use crate::*;
+
+const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
+
+// Do anti-entropy every 10 minutes
+const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
+
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
+ data: Arc<TableData<F>>,
+ aux: Arc<TableAux<F, R>>,
+
+ todo: Mutex<SyncTodo>,
+}
+
+type RootCk = Vec<(MerklePartition, Hash)>;
+
+#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
+pub struct PartitionRange {
+ begin: MerklePartition,
+ // if end is None, go all the way to partition 0xFFFF included
+ end: Option<MerklePartition>,
+}
+
+#[derive(Serialize, Deserialize)]
+pub(crate) enum SyncRPC {
+ RootCkHash(PartitionRange, Hash),
+ RootCkList(PartitionRange, RootCk),
+ CkNoDifference,
+ GetNode(MerkleNodeKey),
+ Node(MerkleNodeKey, MerkleNode),
+ Items(Vec<Arc<ByteBuf>>),
+}
+
+struct SyncTodo {
+ todo: Vec<TodoPartition>,
+}
+
+#[derive(Debug, Clone)]
+struct TodoPartition {
+ range: PartitionRange,
+
+ // Are we a node that stores this partition or not?
+ retain: bool,
+}
+
+impl<F, R> TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub(crate) fn launch(data: Arc<TableData<F>>, aux: Arc<TableAux<F, R>>) -> Arc<Self> {
+ let todo = SyncTodo { todo: vec![] };
+
+ let syncer = Arc::new(Self {
+ data: data.clone(),
+ aux: aux.clone(),
+ todo: Mutex::new(todo),
+ });
+
+ let (busy_tx, busy_rx) = mpsc::unbounded_channel();
+
+ let s1 = syncer.clone();
+ aux.system.background.spawn_worker(
+ format!("table sync watcher for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
+ );
+
+ let s2 = syncer.clone();
+ aux.system.background.spawn_worker(
+ format!("table syncer for {}", data.name),
+ move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
+ );
+
+ let s3 = syncer.clone();
+ tokio::spawn(async move {
+ tokio::time::delay_for(Duration::from_secs(20)).await;
+ s3.add_full_sync();
+ });
+
+ syncer
+ }
+
+ async fn watcher_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ mut busy_rx: mpsc::UnboundedReceiver<bool>,
+ ) -> Result<(), Error> {
+ let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
+ let mut nothing_to_do_since = Some(Instant::now());
+
+ while !*must_exit.borrow() {
+ let s_ring_recv = ring_recv.recv().fuse();
+ let s_busy = busy_rx.recv().fuse();
+ let s_must_exit = must_exit.recv().fuse();
+ let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
+ pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
+
+ select! {
+ new_ring_r = s_ring_recv => {
+ if new_ring_r.is_some() {
+ debug!("({}) Adding ring difference to syncer todo list", self.data.name);
+ self.add_full_sync();
+ }
+ }
+ busy_opt = s_busy => {
+ if let Some(busy) = busy_opt {
+ if busy {
+ nothing_to_do_since = None;
+ } else {
+ if nothing_to_do_since.is_none() {
+ nothing_to_do_since = Some(Instant::now());
+ }
+ }
+ }
+ }
+ must_exit_v = s_must_exit => {
+ if must_exit_v.unwrap_or(false) {
+ break;
+ }
+ }
+ _ = s_timeout => {
+ if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
+ nothing_to_do_since = None;
+ debug!("({}) Adding full sync to syncer todo list", self.data.name);
+ self.add_full_sync();
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+
+ pub fn add_full_sync(&self) {
+ self.todo
+ .lock()
+ .unwrap()
+ .add_full_sync(&self.data, &self.aux);
+ }
+
+ async fn syncer_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ busy_tx: mpsc::UnboundedSender<bool>,
+ ) -> Result<(), Error> {
+ while !*must_exit.borrow() {
+ let task = self.todo.lock().unwrap().pop_task();
+ if let Some(partition) = task {
+ busy_tx.send(true)?;
+ let res = self
+ .clone()
+ .sync_partition(&partition, &mut must_exit)
+ .await;
+ if let Err(e) = res {
+ warn!(
+ "({}) Error while syncing {:?}: {}",
+ self.data.name, partition, e
+ );
+ }
+ } else {
+ busy_tx.send(false)?;
+ tokio::time::delay_for(Duration::from_secs(1)).await;
+ }
+ }
+ Ok(())
+ }
+
+ async fn sync_partition(
+ self: Arc<Self>,
+ partition: &TodoPartition,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ if partition.retain {
+ let my_id = self.aux.system.id;
+
+ let nodes = self
+ .aux
+ .replication
+ .write_nodes(
+ &hash_of_merkle_partition(partition.range.begin),
+ &self.aux.system,
+ )
+ .into_iter()
+ .filter(|node| *node != my_id)
+ .collect::<Vec<_>>();
+
+ debug!(
+ "({}) Syncing {:?} with {:?}...",
+ self.data.name, partition, nodes
+ );
+ let mut sync_futures = nodes
+ .iter()
+ .map(|node| {
+ self.clone()
+ .do_sync_with(partition.clone(), *node, must_exit.clone())
+ })
+ .collect::<FuturesUnordered<_>>();
+
+ let mut n_errors = 0;
+ while let Some(r) = sync_futures.next().await {
+ if let Err(e) = r {
+ n_errors += 1;
+ warn!("({}) Sync error: {}", self.data.name, e);
+ }
+ }
+ if n_errors > self.aux.replication.max_write_errors() {
+ return Err(Error::Message(format!(
+ "Sync failed with too many nodes (should have been: {:?}).",
+ nodes
+ )));
+ }
+ } else {
+ self.offload_partition(
+ &hash_of_merkle_partition(partition.range.begin),
+ &hash_of_merkle_partition_opt(partition.range.end),
+ must_exit,
+ )
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ // Offload partition: this partition is not something we are storing,
+ // so send it out to all other nodes that store it and delete items locally.
+ // We don't bother checking if the remote nodes already have the items,
+ // we just batch-send everything. Offloading isn't supposed to happen very often.
+ // If any of the nodes that are supposed to store the items is unable to
+ // save them, we interrupt the process.
+ async fn offload_partition(
+ self: &Arc<Self>,
+ begin: &Hash,
+ end: &Hash,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let mut counter: usize = 0;
+
+ while !*must_exit.borrow() {
+ let mut items = Vec::new();
+
+ for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
+ let (key, value) = item?;
+ items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
+
+ if items.len() >= 1024 {
+ break;
+ }
+ }
+
+ if items.len() > 0 {
+ let nodes = self
+ .aux
+ .replication
+ .write_nodes(&begin, &self.aux.system)
+ .into_iter()
+ .collect::<Vec<_>>();
+ if nodes.contains(&self.aux.system.id) {
+ warn!("Interrupting offload as partitions seem to have changed");
+ break;
+ }
+
+ counter += 1;
+ debug!(
+ "Offloading {} items from {:?}..{:?} ({})",
+ items.len(),
+ begin,
+ end,
+ counter
+ );
+ self.offload_items(&items, &nodes[..]).await?;
+ } else {
+ break;
+ }
+ }
+
+ Ok(())
+ }
+
+ async fn offload_items(
+ self: &Arc<Self>,
+ items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
+ nodes: &[UUID],
+ ) -> Result<(), Error> {
+ let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
+ let update_msg = Arc::new(TableRPC::<F>::Update(values));
+
+ for res in join_all(nodes.iter().map(|to| {
+ self.aux
+ .rpc_client
+ .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
+ }))
+ .await
+ {
+ res?;
+ }
+
+ // All remote nodes have written those items, now we can delete them locally
+ let mut not_removed = 0;
+ for (k, v) in items.iter() {
+ if !self.data.delete_if_equal(&k[..], &v[..])? {
+ not_removed += 1;
+ }
+ }
+
+ if not_removed > 0 {
+ debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed);
+ }
+
+ Ok(())
+ }
+
+ // ======= SYNCHRONIZATION PROCEDURE -- DRIVER SIDE ======
+
+ fn get_root_ck(&self, range: PartitionRange) -> Result<RootCk, Error> {
+ let begin = u16::from_be_bytes(range.begin);
+ let range_iter = match range.end {
+ Some(end) => {
+ let end = u16::from_be_bytes(end);
+ begin..=(end - 1)
+ }
+ None => begin..=0xFFFF,
+ };
+
+ let mut ret = vec![];
+ for i in range_iter {
+ let key = MerkleNodeKey {
+ partition: u16::to_be_bytes(i),
+ prefix: vec![],
+ };
+ match self.data.merkle_updater.read_node(&key)? {
+ MerkleNode::Empty => (),
+ x => {
+ ret.push((key.partition, hash_of(&x)?));
+ }
+ }
+ }
+ Ok(ret)
+ }
+
+ async fn do_sync_with(
+ self: Arc<Self>,
+ partition: TodoPartition,
+ who: UUID,
+ must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let root_ck = self.get_root_ck(partition.range)?;
+ let root_ck_hash = hash_of(&root_ck)?;
+
+ // If their root checksum has level > than us, use that as a reference
+ let root_resp = self
+ .aux
+ .rpc_client
+ .call(
+ who,
+ TableRPC::<F>::SyncRPC(SyncRPC::RootCkHash(partition.range, root_ck_hash)),
+ TABLE_SYNC_RPC_TIMEOUT,
+ )
+ .await?;
+
+ let mut todo = match root_resp {
+ TableRPC::<F>::SyncRPC(SyncRPC::CkNoDifference) => {
+ debug!(
+ "({}) Sync {:?} with {:?}: no difference",
+ self.data.name, partition, who
+ );
+ return Ok(());
+ }
+ TableRPC::<F>::SyncRPC(SyncRPC::RootCkList(_, their_root_ck)) => {
+ let join = join_ordered(&root_ck[..], &their_root_ck[..]);
+ let mut todo = VecDeque::new();
+ for (p, v1, v2) in join.iter() {
+ let diff = match (v1, v2) {
+ (Some(_), None) | (None, Some(_)) => true,
+ (Some(a), Some(b)) => a != b,
+ _ => false,
+ };
+ if diff {
+ todo.push_back(MerkleNodeKey {
+ partition: **p,
+ prefix: vec![],
+ });
+ }
+ }
+ debug!(
+ "({}) Sync {:?} with {:?}: todo.len() = {}",
+ self.data.name,
+ partition,
+ who,
+ todo.len()
+ );
+ todo
+ }
+ x => {
+ return Err(Error::Message(format!(
+ "Invalid respone to RootCkHash RPC: {}",
+ debug_serialize(x)
+ )));
+ }
+ };
+
+ let mut todo_items = vec![];
+
+ while !todo.is_empty() && !*must_exit.borrow() {
+ let key = todo.pop_front().unwrap();
+ let node = self.data.merkle_updater.read_node(&key)?;
+
+ match node {
+ MerkleNode::Empty => {
+ // They have items we don't have.
+ // We don't request those items from them, they will send them.
+ // We only bother with pushing items that differ
+ }
+ MerkleNode::Leaf(ik, _) => {
+ // Just send that item directly
+ if let Some(val) = self.data.store.get(ik)? {
+ todo_items.push(val.to_vec());
+ }
+ }
+ MerkleNode::Intermediate(l) => {
+ let remote_node = match self
+ .aux
+ .rpc_client
+ .call(
+ who,
+ TableRPC::<F>::SyncRPC(SyncRPC::GetNode(key.clone())),
+ TABLE_SYNC_RPC_TIMEOUT,
+ )
+ .await?
+ {
+ TableRPC::<F>::SyncRPC(SyncRPC::Node(_, node)) => node,
+ x => {
+ return Err(Error::Message(format!(
+ "Invalid respone to GetNode RPC: {}",
+ debug_serialize(x)
+ )));
+ }
+ };
+ let int_l2 = match remote_node {
+ MerkleNode::Intermediate(l2) => l2,
+ _ => vec![],
+ };
+
+ let join = join_ordered(&l[..], &int_l2[..]);
+ for (p, v1, v2) in join.into_iter() {
+ let diff = match (v1, v2) {
+ (Some(_), None) | (None, Some(_)) => true,
+ (Some(a), Some(b)) => a != b,
+ _ => false,
+ };
+ if diff {
+ todo.push_back(key.add_byte(*p));
+ }
+ }
+ }
+ }
+
+ if todo_items.len() >= 256 {
+ self.send_items(who, std::mem::replace(&mut todo_items, vec![]))
+ .await?;
+ }
+ }
+
+ if !todo_items.is_empty() {
+ self.send_items(who, todo_items).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ info!(
+ "({}) Sending {} items to {:?}",
+ self.data.name,
+ item_list.len(),
+ who
+ );
+
+ let mut values = vec![];
+ for item in item_list.iter() {
+ if let Some(v) = self.data.store.get(&item[..])? {
+ values.push(Arc::new(ByteBuf::from(v.as_ref())));
+ }
+ }
+ let rpc_resp = self
+ .aux
+ .rpc_client
+ .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
+ .await?;
+ if let TableRPC::<F>::Ok = rpc_resp {
+ Ok(())
+ } else {
+ Err(Error::Message(format!(
+ "Unexpected response to RPC Update: {}",
+ debug_serialize(&rpc_resp)
+ )))
+ }
+ }
+
+ // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
+
+ pub(crate) async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
+ match message {
+ SyncRPC::RootCkHash(range, h) => {
+ let root_ck = self.get_root_ck(*range)?;
+ let hash = hash_of(&root_ck)?;
+ if hash == *h {
+ Ok(SyncRPC::CkNoDifference)
+ } else {
+ Ok(SyncRPC::RootCkList(*range, root_ck))
+ }
+ }
+ SyncRPC::GetNode(k) => {
+ let node = self.data.merkle_updater.read_node(&k)?;
+ Ok(SyncRPC::Node(k.clone(), node))
+ }
+ _ => Err(Error::Message(format!("Unexpected sync RPC"))),
+ }
+ }
+}
+
+impl SyncTodo {
+ fn add_full_sync<F: TableSchema, R: TableReplication>(
+ &mut self,
+ data: &TableData<F>,
+ aux: &TableAux<F, R>,
+ ) {
+ let my_id = aux.system.id;
+
+ self.todo.clear();
+
+ let ring = aux.system.ring.borrow().clone();
+ let split_points = aux.replication.split_points(&ring);
+
+ for i in 0..split_points.len() {
+ let begin: MerklePartition = {
+ let b = split_points[i];
+ assert_eq!(b.as_slice()[2..], [0u8; 30][..]);
+ b.as_slice()[..2].try_into().unwrap()
+ };
+
+ let end: Option<MerklePartition> = if i + 1 < split_points.len() {
+ let e = split_points[i + 1];
+ assert_eq!(e.as_slice()[2..], [0u8; 30][..]);
+ Some(e.as_slice()[..2].try_into().unwrap())
+ } else {
+ None
+ };
+
+ let begin_hash = hash_of_merkle_partition(begin);
+ let end_hash = hash_of_merkle_partition_opt(end);
+
+ let nodes = aux.replication.replication_nodes(&begin_hash, &ring);
+
+ let retain = nodes.contains(&my_id);
+ if !retain {
+ // Check if we have some data to send, otherwise skip
+ if data.store.range(begin_hash..end_hash).next().is_none() {
+ continue;
+ }
+ }
+
+ self.todo.push(TodoPartition {
+ range: PartitionRange { begin, end },
+ retain,
+ });
+ }
+ }
+
+ fn pop_task(&mut self) -> Option<TodoPartition> {
+ if self.todo.is_empty() {
+ return None;
+ }
+
+ let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
+ if i == self.todo.len() - 1 {
+ self.todo.pop()
+ } else {
+ let replacement = self.todo.pop().unwrap();
+ let ret = std::mem::replace(&mut self.todo[i], replacement);
+ Some(ret)
+ }
+ }
+}
+
+fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
+ Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
+}
+
+fn join_ordered<'a, K: Ord + Eq, V1, V2>(
+ x: &'a [(K, V1)],
+ y: &'a [(K, V2)],
+) -> Vec<(&'a K, Option<&'a V1>, Option<&'a V2>)> {
+ let mut ret = vec![];
+ let mut i = 0;
+ let mut j = 0;
+ while i < x.len() || j < y.len() {
+ if i < x.len() && j < y.len() && x[i].0 == y[j].0 {
+ ret.push((&x[i].0, Some(&x[i].1), Some(&y[j].1)));
+ i += 1;
+ j += 1;
+ } else if i < x.len() && (j == y.len() || x[i].0 < y[j].0) {
+ ret.push((&x[i].0, Some(&x[i].1), None));
+ i += 1;
+ } else if j < y.len() && (i == x.len() || x[i].0 > y[j].0) {
+ ret.push((&x[i].0, None, Some(&y[j].1)));
+ j += 1;
+ } else {
+ unreachable!();
+ }
+ }
+ ret
+}
diff --git a/src/table/table.rs b/src/table/table.rs
index a4cb4b24..516c9358 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -15,9 +15,9 @@ use garage_rpc::rpc_server::*;
use crate::crdt::CRDT;
use crate::data::*;
-use crate::schema::*;
-use crate::table_sync::*;
use crate::replication::*;
+use crate::schema::*;
+use crate::sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
@@ -50,7 +50,6 @@ pub(crate) enum TableRPC<F: TableSchema> {
impl<F: TableSchema> RpcMessage for TableRPC<F> {}
-
impl<F, R> Table<F, R>
where
F: TableSchema + 'static,
@@ -69,29 +68,17 @@ where
let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
- let data = TableData::new(
- name,
- instance,
- db,
- system.background.clone(),
- );
+ let data = TableData::new(name, instance, db, system.background.clone());
- let aux = Arc::new(TableAux{
+ let aux = Arc::new(TableAux {
system,
replication,
rpc_client,
});
- let syncer = TableSyncer::launch(
- data.clone(),
- aux.clone(),
- );
+ let syncer = TableSyncer::launch(data.clone(), aux.clone());
- let table = Arc::new(Self {
- data,
- aux,
- syncer,
- });
+ let table = Arc::new(Self { data, aux, syncer });
table.clone().register_handler(rpc_server, rpc_path);
@@ -106,7 +93,8 @@ where
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRPC::<F>::Update(vec![e_enc]);
- self.aux.rpc_client
+ self.aux
+ .rpc_client
.try_call_many(
&who[..],
rpc,
@@ -135,7 +123,11 @@ where
let call_futures = call_list.drain().map(|(node, entries)| async move {
let rpc = TableRPC::<F>::Update(entries);
- let resp = self.aux.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
+ let resp = self
+ .aux
+ .rpc_client
+ .call(node, rpc, TABLE_RPC_TIMEOUT)
+ .await?;
Ok::<_, Error>((node, resp))
});
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
@@ -200,7 +192,8 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- self.aux.system
+ self.aux
+ .system
.background
.spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
}
@@ -221,7 +214,8 @@ where
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
let resps = self
- .aux.rpc_client
+ .aux
+ .rpc_client
.try_call_many(
&who[..],
rpc,
@@ -276,7 +270,8 @@ where
async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
- self.aux.rpc_client
+ self.aux
+ .rpc_client
.try_call_many(
&who[..],
TableRPC::<F>::Update(vec![what_enc]),
@@ -296,7 +291,8 @@ where
});
let self2 = self.clone();
- self.aux.rpc_client
+ self.aux
+ .rpc_client
.set_local_handler(self.aux.system.id, move |msg| {
let self2 = self2.clone();
async move { self2.handle(&msg).await }
@@ -318,9 +314,7 @@ where
Ok(TableRPC::Ok)
}
TableRPC::SyncRPC(rpc) => {
- let response = self.syncer
- .handle_rpc(rpc, self.aux.system.background.stop_signal.clone())
- .await?;
+ let response = self.syncer.handle_rpc(rpc).await?;
Ok(TableRPC::SyncRPC(response))
}
_ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
deleted file mode 100644
index 7394be1b..00000000
--- a/src/table/table_sync.rs
+++ /dev/null
@@ -1,898 +0,0 @@
-use rand::Rng;
-use std::collections::{BTreeMap, VecDeque};
-use std::sync::{Arc, Mutex};
-use std::time::{Duration, Instant};
-
-use futures::future::join_all;
-use futures::{pin_mut, select};
-use futures_util::future::*;
-use futures_util::stream::*;
-use serde::{Deserialize, Serialize};
-use serde_bytes::ByteBuf;
-use tokio::sync::{mpsc, watch};
-
-use garage_rpc::ring::Ring;
-use garage_util::data::*;
-use garage_util::error::Error;
-
-use crate::*;
-use crate::data::*;
-use crate::replication::*;
-
-const MAX_DEPTH: usize = 16;
-
-const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-
-// Do anti-entropy every 10 minutes
-const SCAN_INTERVAL: Duration = Duration::from_secs(10 * 60);
-
-const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(10 * 60);
-
-pub struct TableSyncer<F: TableSchema, R: TableReplication> {
- data: Arc<TableData<F>>,
- aux: Arc<TableAux<F, R>>,
-
- todo: Mutex<SyncTodo>,
- cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>,
-}
-
-#[derive(Serialize, Deserialize)]
-pub(crate) enum SyncRPC {
- GetRootChecksumRange(Hash, Hash),
- RootChecksumRange(SyncRange),
- Checksums(Vec<RangeChecksum>),
- Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
-}
-
-struct SyncTodo {
- todo: Vec<TodoPartition>,
-}
-
-#[derive(Debug, Clone)]
-struct TodoPartition {
- // Partition consists in hashes between begin included and end excluded
- begin: Hash,
- end: Hash,
-
- // Are we a node that stores this partition or not?
- retain: bool,
-}
-
-// A SyncRange defines a query on the dataset stored by a node, in the following way:
-// - all items whose key are >= `begin`
-// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded)
-// - except if the first item of the range has such many leading zero bytes
-// - and stopping at `end` (excluded) if such an item is not found
-// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges"
-// i.e. of ranges of level `level-1` that cover the same range
-// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin)
-// See RangeChecksum for the struct that stores this information.
-#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
-pub(crate) struct SyncRange {
- begin: Vec<u8>,
- end: Vec<u8>,
- level: usize,
-}
-
-impl std::cmp::PartialOrd for SyncRange {
- fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- Some(self.cmp(other))
- }
-}
-impl std::cmp::Ord for SyncRange {
- fn cmp(&self, other: &Self) -> std::cmp::Ordering {
- self.begin
- .cmp(&other.begin)
- .then(self.level.cmp(&other.level))
- .then(self.end.cmp(&other.end))
- }
-}
-
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub(crate) struct RangeChecksum {
- bounds: SyncRange,
- children: Vec<(SyncRange, Hash)>,
- found_limit: Option<Vec<u8>>,
-
- #[serde(skip, default = "std::time::Instant::now")]
- time: Instant,
-}
-
-#[derive(Debug, Clone)]
-struct RangeChecksumCache {
- hash: Option<Hash>, // None if no children
- found_limit: Option<Vec<u8>>,
- time: Instant,
-}
-
-impl<F, R> TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) fn launch(data: Arc<TableData<F>>,
- aux: Arc<TableAux<F, R>>) -> Arc<Self> {
- let todo = SyncTodo{ todo: vec![] };
-
- let syncer = Arc::new(Self {
- data: data.clone(),
- aux: aux.clone(),
- todo: Mutex::new(todo),
- cache: (0..MAX_DEPTH)
- .map(|_| Mutex::new(BTreeMap::new()))
- .collect::<Vec<_>>(),
- });
-
- let (busy_tx, busy_rx) = mpsc::unbounded_channel();
-
- let s1 = syncer.clone();
- aux.system.background.spawn_worker(
- format!("table sync watcher for {}", data.name),
- move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
- );
-
- let s2 = syncer.clone();
- aux.system.background.spawn_worker(
- format!("table syncer for {}", data.name),
- move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
- );
-
- let s3 = syncer.clone();
- tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(20)).await;
- s3.add_full_scan();
- });
-
- syncer
- }
-
- async fn watcher_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- mut busy_rx: mpsc::UnboundedReceiver<bool>,
- ) -> Result<(), Error> {
- let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
- let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
- let mut nothing_to_do_since = Some(Instant::now());
-
- while !*must_exit.borrow() {
- let s_ring_recv = ring_recv.recv().fuse();
- let s_busy = busy_rx.recv().fuse();
- let s_must_exit = must_exit.recv().fuse();
- let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
- pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
-
- select! {
- new_ring_r = s_ring_recv => {
- if let Some(new_ring) = new_ring_r {
- debug!("({}) Adding ring difference to syncer todo list", self.data.name);
- self.todo.lock().unwrap().add_ring_difference(&prev_ring, &new_ring, &self.data, &self.aux);
- prev_ring = new_ring;
- }
- }
- busy_opt = s_busy => {
- if let Some(busy) = busy_opt {
- if busy {
- nothing_to_do_since = None;
- } else {
- if nothing_to_do_since.is_none() {
- nothing_to_do_since = Some(Instant::now());
- }
- }
- }
- }
- must_exit_v = s_must_exit => {
- if must_exit_v.unwrap_or(false) {
- break;
- }
- }
- _ = s_timeout => {
- if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
- nothing_to_do_since = None;
- debug!("({}) Adding full scan to syncer todo list", self.data.name);
- self.add_full_scan();
- }
- }
- }
- }
- Ok(())
- }
-
- pub fn add_full_scan(&self) {
- self.todo.lock().unwrap().add_full_scan(&self.data, &self.aux);
- }
-
- async fn syncer_task(
- self: Arc<Self>,
- mut must_exit: watch::Receiver<bool>,
- busy_tx: mpsc::UnboundedSender<bool>,
- ) -> Result<(), Error> {
- while !*must_exit.borrow() {
- let task = self.todo.lock().unwrap().pop_task();
- if let Some(partition) = task {
- busy_tx.send(true)?;
- let res = self
- .clone()
- .sync_partition(&partition, &mut must_exit)
- .await;
- if let Err(e) = res {
- warn!(
- "({}) Error while syncing {:?}: {}",
- self.data.name, partition, e
- );
- }
- } else {
- busy_tx.send(false)?;
- tokio::time::delay_for(Duration::from_secs(1)).await;
- }
- }
- Ok(())
- }
-
- async fn sync_partition(
- self: Arc<Self>,
- partition: &TodoPartition,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<(), Error> {
- if partition.retain {
- let my_id = self.aux.system.id;
- let nodes = self
- .aux
- .replication
- .write_nodes(&partition.begin, &self.aux.system)
- .into_iter()
- .filter(|node| *node != my_id)
- .collect::<Vec<_>>();
-
- debug!(
- "({}) Preparing to sync {:?} with {:?}...",
- self.data.name, partition, nodes
- );
- let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit)?;
-
- let mut sync_futures = nodes
- .iter()
- .map(|node| {
- self.clone().do_sync_with(
- partition.clone(),
- root_cks.clone(),
- *node,
- must_exit.clone(),
- )
- })
- .collect::<FuturesUnordered<_>>();
-
- let mut n_errors = 0;
- while let Some(r) = sync_futures.next().await {
- if let Err(e) = r {
- n_errors += 1;
- warn!("({}) Sync error: {}", self.data.name, e);
- }
- }
- if n_errors > self.aux.replication.max_write_errors() {
- return Err(Error::Message(format!(
- "Sync failed with too many nodes (should have been: {:?}).",
- nodes
- )));
- }
- } else {
- self.offload_partition(&partition.begin, &partition.end, must_exit)
- .await?;
- }
-
- Ok(())
- }
-
- // Offload partition: this partition is not something we are storing,
- // so send it out to all other nodes that store it and delete items locally.
- // We don't bother checking if the remote nodes already have the items,
- // we just batch-send everything. Offloading isn't supposed to happen very often.
- // If any of the nodes that are supposed to store the items is unable to
- // save them, we interrupt the process.
- async fn offload_partition(
- self: &Arc<Self>,
- begin: &Hash,
- end: &Hash,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<(), Error> {
- let mut counter: usize = 0;
-
- while !*must_exit.borrow() {
- let mut items = Vec::new();
-
- for item in self.data.store.range(begin.to_vec()..end.to_vec()) {
- let (key, value) = item?;
- items.push((key.to_vec(), Arc::new(ByteBuf::from(value.as_ref()))));
-
- if items.len() >= 1024 {
- break;
- }
- }
-
- if items.len() > 0 {
- let nodes = self
- .aux
- .replication
- .write_nodes(&begin, &self.aux.system)
- .into_iter()
- .collect::<Vec<_>>();
- if nodes.contains(&self.aux.system.id) {
- warn!("Interrupting offload as partitions seem to have changed");
- break;
- }
-
- counter += 1;
- debug!(
- "Offloading {} items from {:?}..{:?} ({})",
- items.len(),
- begin,
- end,
- counter
- );
- self.offload_items(&items, &nodes[..]).await?;
- } else {
- break;
- }
- }
-
- Ok(())
- }
-
- async fn offload_items(
- self: &Arc<Self>,
- items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
- nodes: &[UUID],
- ) -> Result<(), Error> {
- let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
- let update_msg = Arc::new(TableRPC::<F>::Update(values));
-
- for res in join_all(nodes.iter().map(|to| {
- self.aux
- .rpc_client
- .call_arc(*to, update_msg.clone(), TABLE_SYNC_RPC_TIMEOUT)
- }))
- .await
- {
- res?;
- }
-
- // All remote nodes have written those items, now we can delete them locally
- let mut not_removed = 0;
- for (k, v) in items.iter() {
- if !self.data.delete_if_equal(&k[..], &v[..])? {
- not_removed += 1;
- }
- }
-
- if not_removed > 0 {
- debug!("{} items not removed during offload because they changed in between (trying again...)", not_removed);
- }
-
- Ok(())
- }
-
- fn root_checksum(
- self: &Arc<Self>,
- begin: &Hash,
- end: &Hash,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksum, Error> {
- for i in 1..MAX_DEPTH {
- let rc = self.range_checksum(
- &SyncRange {
- begin: begin.to_vec(),
- end: end.to_vec(),
- level: i,
- },
- must_exit,
- )?;
- if rc.found_limit.is_none() {
- return Ok(rc);
- }
- }
- Err(Error::Message(format!(
- "Unable to compute root checksum (this should never happen)"
- )))
- }
-
- fn range_checksum(
- self: &Arc<Self>,
- range: &SyncRange,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksum, Error> {
- assert!(range.level != 0);
- trace!("Call range_checksum {:?}", range);
-
- if range.level == 1 {
- let mut children = vec![];
- for item in self
- .data
- .store
- .range(range.begin.clone()..range.end.clone())
- {
- let (key, value) = item?;
- let key_hash = blake2sum(&key[..]);
- if children.len() > 0
- && key_hash.as_slice()[0..range.level]
- .iter()
- .all(|x| *x == 0u8)
- {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: Some(key.to_vec()),
- time: Instant::now(),
- });
- }
- let item_range = SyncRange {
- begin: key.to_vec(),
- end: vec![],
- level: 0,
- };
- children.push((item_range, blake2sum(&value[..])));
- }
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: None,
- time: Instant::now(),
- })
- } else {
- let mut children = vec![];
- let mut sub_range = SyncRange {
- begin: range.begin.clone(),
- end: range.end.clone(),
- level: range.level - 1,
- };
- let mut time = Instant::now();
- while !*must_exit.borrow() {
- let sub_ck = self.range_checksum_cached_hash(&sub_range, must_exit)?;
-
- if let Some(hash) = sub_ck.hash {
- children.push((sub_range.clone(), hash));
- if sub_ck.time < time {
- time = sub_ck.time;
- }
- }
-
- if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: None,
- time,
- });
- }
- let found_limit = sub_ck.found_limit.unwrap();
-
- let actual_limit_hash = blake2sum(&found_limit[..]);
- if actual_limit_hash.as_slice()[0..range.level]
- .iter()
- .all(|x| *x == 0u8)
- {
- trace!(
- "range_checksum {:?} returning {} items",
- range,
- children.len()
- );
- return Ok(RangeChecksum {
- bounds: range.clone(),
- children,
- found_limit: Some(found_limit.clone()),
- time,
- });
- }
-
- sub_range.begin = found_limit;
- }
- trace!("range_checksum {:?} exiting due to must_exit", range);
- Err(Error::Message(format!("Exiting.")))
- }
- }
-
- fn range_checksum_cached_hash(
- self: &Arc<Self>,
- range: &SyncRange,
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<RangeChecksumCache, Error> {
- {
- let mut cache = self.cache[range.level].lock().unwrap();
- if let Some(v) = cache.get(&range) {
- if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
- return Ok(v.clone());
- }
- }
- cache.remove(&range);
- }
-
- let v = self.range_checksum(&range, must_exit)?;
- trace!(
- "({}) New checksum calculated for {}-{}/{}, {} children",
- self.data.name,
- hex::encode(&range.begin)
- .chars()
- .take(16)
- .collect::<String>(),
- hex::encode(&range.end).chars().take(16).collect::<String>(),
- range.level,
- v.children.len()
- );
-
- let hash = if v.children.len() > 0 {
- Some(blake2sum(&rmp_to_vec_all_named(&v)?[..]))
- } else {
- None
- };
- let cache_entry = RangeChecksumCache {
- hash,
- found_limit: v.found_limit,
- time: v.time,
- };
-
- let mut cache = self.cache[range.level].lock().unwrap();
- cache.insert(range.clone(), cache_entry.clone());
- Ok(cache_entry)
- }
-
- async fn do_sync_with(
- self: Arc<Self>,
- partition: TodoPartition,
- root_ck: RangeChecksum,
- who: UUID,
- mut must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
- let mut todo = VecDeque::new();
-
- // If their root checksum has level > than us, use that as a reference
- let root_cks_resp = self
- .aux
- .rpc_client
- .call(
- who,
- TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange(
- partition.begin.clone(),
- partition.end.clone(),
- )),
- TABLE_SYNC_RPC_TIMEOUT,
- )
- .await?;
- if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
- if range.level > root_ck.bounds.level {
- let their_root_range_ck = self.range_checksum(&range, &mut must_exit)?;
- todo.push_back(their_root_range_ck);
- } else {
- todo.push_back(root_ck);
- }
- } else {
- return Err(Error::Message(format!(
- "Invalid respone to GetRootChecksumRange RPC: {}",
- debug_serialize(root_cks_resp)
- )));
- }
-
- while !todo.is_empty() && !*must_exit.borrow() {
- let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
- trace!(
- "({}) Sync with {:?}: {} ({}) remaining",
- self.data.name,
- who,
- todo.len(),
- total_children
- );
-
- let step_size = std::cmp::min(16, todo.len());
- let step = todo.drain(..step_size).collect::<Vec<_>>();
-
- let rpc_resp = self
- .aux
- .rpc_client
- .call(
- who,
- TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)),
- TABLE_SYNC_RPC_TIMEOUT,
- )
- .await?;
- if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) =
- rpc_resp
- {
- if diff_ranges.len() > 0 || diff_items.len() > 0 {
- info!(
- "({}) Sync with {:?}: difference {} ranges, {} items",
- self.data.name,
- who,
- diff_ranges.len(),
- diff_items.len()
- );
- }
- let mut items_to_send = vec![];
- for differing in diff_ranges.drain(..) {
- if differing.level == 0 {
- items_to_send.push(differing.begin);
- } else {
- let checksum = self.range_checksum(&differing, &mut must_exit)?;
- todo.push_back(checksum);
- }
- }
- if diff_items.len() > 0 {
- self.data.update_many(&diff_items[..])?;
- }
- if items_to_send.len() > 0 {
- self.send_items(who, items_to_send).await?;
- }
- } else {
- return Err(Error::Message(format!(
- "Unexpected response to sync RPC checksums: {}",
- debug_serialize(&rpc_resp)
- )));
- }
- }
- Ok(())
- }
-
- async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
- info!(
- "({}) Sending {} items to {:?}",
- self.data.name,
- item_list.len(),
- who
- );
-
- let mut values = vec![];
- for item in item_list.iter() {
- if let Some(v) = self.data.store.get(&item[..])? {
- values.push(Arc::new(ByteBuf::from(v.as_ref())));
- }
- }
- let rpc_resp = self
- .aux
- .rpc_client
- .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
- .await?;
- if let TableRPC::<F>::Ok = rpc_resp {
- Ok(())
- } else {
- Err(Error::Message(format!(
- "Unexpected response to RPC Update: {}",
- debug_serialize(&rpc_resp)
- )))
- }
- }
-
- pub(crate) async fn handle_rpc(
- self: &Arc<Self>,
- message: &SyncRPC,
- mut must_exit: watch::Receiver<bool>,
- ) -> Result<SyncRPC, Error> {
- match message {
- SyncRPC::GetRootChecksumRange(begin, end) => {
- let root_cks = self.root_checksum(&begin, &end, &mut must_exit)?;
- Ok(SyncRPC::RootChecksumRange(root_cks.bounds))
- }
- SyncRPC::Checksums(checksums) => {
- self.handle_checksums_rpc(&checksums[..], &mut must_exit)
- .await
- }
- _ => Err(Error::Message(format!("Unexpected sync RPC"))),
- }
- }
-
- async fn handle_checksums_rpc(
- self: &Arc<Self>,
- checksums: &[RangeChecksum],
- must_exit: &mut watch::Receiver<bool>,
- ) -> Result<SyncRPC, Error> {
- let mut ret_ranges = vec![];
- let mut ret_items = vec![];
-
- for their_ckr in checksums.iter() {
- let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit)?;
- for (their_range, their_hash) in their_ckr.children.iter() {
- let differs = match our_ckr
- .children
- .binary_search_by(|(our_range, _)| our_range.cmp(&their_range))
- {
- Err(_) => {
- if their_range.level >= 1 {
- let cached_hash =
- self.range_checksum_cached_hash(&their_range, must_exit)?;
- cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true)
- } else {
- true
- }
- }
- Ok(i) => our_ckr.children[i].1 != *their_hash,
- };
- if differs {
- ret_ranges.push(their_range.clone());
- if their_range.level == 0 {
- if let Some(item_bytes) =
- self.data.store.get(their_range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
- }
- }
- }
- for (our_range, _hash) in our_ckr.children.iter() {
- if let Some(their_found_limit) = &their_ckr.found_limit {
- if our_range.begin.as_slice() > their_found_limit.as_slice() {
- break;
- }
- }
-
- let not_present = our_ckr
- .children
- .binary_search_by(|(their_range, _)| their_range.cmp(&our_range))
- .is_err();
- if not_present {
- if our_range.level > 0 {
- ret_ranges.push(our_range.clone());
- }
- if our_range.level == 0 {
- if let Some(item_bytes) =
- self.data.store.get(our_range.begin.as_slice())?
- {
- ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
- }
- }
- }
- }
- }
- let n_checksums = checksums
- .iter()
- .map(|x| x.children.len())
- .fold(0, |x, y| x + y);
- if ret_ranges.len() > 0 || ret_items.len() > 0 {
- trace!(
- "({}) Checksum comparison RPC: {} different + {} items for {} received",
- self.data.name,
- ret_ranges.len(),
- ret_items.len(),
- n_checksums
- );
- }
- Ok(SyncRPC::Difference(ret_ranges, ret_items))
- }
-
- pub(crate) fn invalidate(self: &Arc<Self>, item_key: &[u8]) {
- for i in 1..MAX_DEPTH {
- let needle = SyncRange {
- begin: item_key.to_vec(),
- end: vec![],
- level: i,
- };
- let mut cache = self.cache[i].lock().unwrap();
- if let Some(cache_entry) = cache.range(..=needle).rev().next() {
- if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key {
- let index = cache_entry.0.clone();
- drop(cache_entry);
- cache.remove(&index);
- }
- }
- }
- }
-}
-
-impl SyncTodo {
- fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, data: &TableData<F>, aux: &TableAux<F, R>) {
- let my_id = aux.system.id;
-
- self.todo.clear();
-
- let ring = aux.system.ring.borrow().clone();
- let split_points = aux.replication.split_points(&ring);
-
- for i in 0..split_points.len() - 1 {
- let begin = split_points[i];
- let end = split_points[i + 1];
- if begin == end {
- continue;
- }
-
- let nodes = aux.replication.replication_nodes(&begin, &ring);
-
- let retain = nodes.contains(&my_id);
- if !retain {
- // Check if we have some data to send, otherwise skip
- if data.store.range(begin..end).next().is_none() {
- continue;
- }
- }
-
- self.todo.push(TodoPartition { begin, end, retain });
- }
- }
-
- fn add_ring_difference<F: TableSchema, R: TableReplication>(
- &mut self,
- old_ring: &Ring,
- new_ring: &Ring,
- data: &TableData<F>, aux: &TableAux<F, R>,
- ) {
- let my_id = aux.system.id;
-
- // If it is us who are entering or leaving the system,
- // initiate a full sync instead of incremental sync
- if old_ring.config.members.contains_key(&my_id)
- != new_ring.config.members.contains_key(&my_id)
- {
- self.add_full_scan(data, aux);
- return;
- }
-
- let mut all_points = None
- .into_iter()
- .chain(aux.replication.split_points(old_ring).drain(..))
- .chain(aux.replication.split_points(new_ring).drain(..))
- .chain(self.todo.iter().map(|x| x.begin))
- .chain(self.todo.iter().map(|x| x.end))
- .collect::<Vec<_>>();
- all_points.sort();
- all_points.dedup();
-
- let mut old_todo = std::mem::replace(&mut self.todo, vec![]);
- old_todo.sort_by(|x, y| x.begin.cmp(&y.begin));
- let mut new_todo = vec![];
-
- for i in 0..all_points.len() - 1 {
- let begin = all_points[i];
- let end = all_points[i + 1];
- let was_ours = aux
- .replication
- .replication_nodes(&begin, &old_ring)
- .contains(&my_id);
- let is_ours = aux
- .replication
- .replication_nodes(&begin, &new_ring)
- .contains(&my_id);
-
- let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) {
- Ok(_) => true,
- Err(j) => {
- (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end)
- || (j < old_todo.len()
- && old_todo[j].begin < end && begin < old_todo[j].end)
- }
- };
- if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) {
- new_todo.push(TodoPartition {
- begin,
- end,
- retain: is_ours,
- });
- }
- }
-
- self.todo = new_todo;
- }
-
- fn pop_task(&mut self) -> Option<TodoPartition> {
- if self.todo.is_empty() {
- return None;
- }
-
- let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
- if i == self.todo.len() - 1 {
- self.todo.pop()
- } else {
- let replacement = self.todo.pop().unwrap();
- let ret = std::mem::replace(&mut self.todo[i], replacement);
- Some(ret)
- }
- }
-}