aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/data.rs2
-rw-r--r--src/table.rs14
-rw-r--r--src/table_sync.rs68
3 files changed, 59 insertions, 25 deletions
diff --git a/src/data.rs b/src/data.rs
index c1665d2a..a3b7b23b 100644
--- a/src/data.rs
+++ b/src/data.rs
@@ -25,7 +25,7 @@ impl Eq for FixedBytes32 {}
impl fmt::Debug for FixedBytes32 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "{}", hex::encode(self.0))
+ write!(f, "{}…", hex::encode(&self.0[..8]))
}
}
diff --git a/src/table.rs b/src/table.rs
index 40114aec..bd26a79d 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -6,7 +6,7 @@ use async_trait::async_trait;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
-use tokio::sync::RwLock;
+use arc_swap::ArcSwapOption;
use crate::data::*;
use crate::error::Error;
@@ -22,7 +22,7 @@ pub struct Table<F: TableSchema> {
pub system: Arc<System>,
pub store: sled::Tree,
- pub syncer: RwLock<Option<Arc<TableSyncer<F>>>>,
+ pub syncer: ArcSwapOption<TableSyncer<F>>,
pub param: TableReplicationParams,
}
@@ -142,10 +142,10 @@ impl<F: TableSchema + 'static> Table<F> {
system,
store,
param,
- syncer: RwLock::new(None),
+ syncer: ArcSwapOption::from(None),
});
let syncer = TableSyncer::launch(table.clone()).await;
- *table.syncer.write().await = Some(syncer);
+ table.syncer.swap(Some(syncer));
table
}
@@ -389,7 +389,7 @@ impl<F: TableSchema + 'static> Table<F> {
Ok(TableRPC::Ok)
}
TableRPC::SyncRPC(rpc) => {
- let syncer = self.syncer.read().await.as_ref().unwrap().clone();
+ let syncer = self.syncer.load_full().unwrap();
let response = syncer
.handle_rpc(&rpc, self.system.background.stop_signal.clone())
.await?;
@@ -408,7 +408,7 @@ impl<F: TableSchema + 'static> Table<F> {
}
}
- async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
+ pub async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
for update_bytes in entries.drain(..) {
let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;
@@ -437,7 +437,7 @@ impl<F: TableSchema + 'static> Table<F> {
if old_entry != new_entry {
self.instance.updated(old_entry, new_entry).await;
- let syncer = self.syncer.read().await.as_ref().unwrap().clone();
+ let syncer = self.syncer.load_full().unwrap();
self.system.background.spawn(syncer.invalidate(tree_key));
}
}
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 5ef13d6d..f96e45ff 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -30,7 +30,7 @@ pub struct TableSyncer<F: TableSchema> {
#[derive(Serialize, Deserialize)]
pub enum SyncRPC {
Checksums(Vec<RangeChecksum>),
- DifferentSet(Vec<SyncRange>),
+ Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
}
pub struct SyncTodo {
@@ -172,10 +172,12 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.root_checksum(&partition.begin, &partition.end, must_exit)
.await?;
+ let my_id = self.table.system.id.clone();
let ring = self.table.system.ring.borrow().clone();
let nodes = ring.walk_ring(&partition.begin, self.table.param.replication_factor);
let mut sync_futures = nodes
.iter()
+ .filter(|node| **node != my_id)
.map(|node| {
self.clone()
.do_sync_with(root_cks.clone(), node.clone(), must_exit.clone())
@@ -364,21 +366,25 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.table
.rpc_call(&who, &TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step)))
.await?;
- if let TableRPC::<F>::SyncRPC(SyncRPC::DifferentSet(mut s)) = rpc_resp {
- let mut items = vec![];
- for differing in s.drain(..) {
+ if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = rpc_resp {
+ eprintln!("({}) Sync with {:?}: difference {} ranges, {} items", self.table.name, who, diff_ranges.len(), diff_items.len());
+ let mut items_to_send = vec![];
+ for differing in diff_ranges.drain(..) {
if differing.level == 0 {
- items.push(differing.begin);
+ items_to_send.push(differing.begin);
} else {
let checksum = self.range_checksum(&differing, &mut must_exit).await?;
todo.push_back(checksum);
}
}
- if items.len() > 0 {
+ if diff_items.len() > 0 {
+ self.table.handle_update(diff_items).await?;
+ }
+ if items_to_send.len() > 0 {
self.table
.system
.background
- .spawn(self.clone().send_items(who.clone(), items));
+ .spawn(self.clone().send_items(who.clone(), items_to_send));
}
} else {
return Err(Error::Message(format!(
@@ -424,20 +430,47 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
mut must_exit: watch::Receiver<bool>,
) -> Result<SyncRPC, Error> {
if let SyncRPC::Checksums(checksums) = message {
- let mut ret = vec![];
+ let mut ret_ranges = vec![];
+ let mut ret_items = vec![];
for ckr in checksums.iter() {
let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
for (range, hash) in ckr.children.iter() {
- match our_ckr
+ // Only consider items that are in the intersection of the two ranges
+ // (other ranges will be exchanged at some point)
+ if our_ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) {
+ break;
+ }
+
+ let differs = match our_ckr
.children
.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin))
{
- Err(_) => {
- ret.push(range.clone());
+ Err(_) => true,
+ Ok(i) => our_ckr.children[i].1 != *hash,
+ };
+ if differs {
+ ret_ranges.push(range.clone());
+ if range.level == 0 {
+ if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
+ ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
+ }
}
- Ok(i) => {
- if our_ckr.children[i].1 != *hash {
- ret.push(range.clone());
+ }
+ }
+ for (range, _hash) in our_ckr.children.iter() {
+ if ckr.found_limit.as_ref().map(|x| range.begin.as_slice() >= x.as_slice()).unwrap_or(false) {
+ break;
+ }
+
+ let not_present = ckr
+ .children
+ .binary_search_by(|(their_range, _)| their_range.begin.cmp(&range.begin))
+ .is_err();
+ if not_present {
+ ret_ranges.push(range.clone());
+ if range.level == 0 {
+ if let Some(item_bytes) = self.table.store.get(range.begin.as_slice())? {
+ ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
}
}
}
@@ -448,12 +481,13 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
.map(|x| x.children.len())
.fold(0, |x, y| x + y);
eprintln!(
- "({}) Checksum comparison RPC: {} different out of {}",
+ "({}) Checksum comparison RPC: {} different + {} items for {} received",
self.table.name,
- ret.len(),
+ ret_ranges.len(),
+ ret_items.len(),
n_checksums
);
- return Ok(SyncRPC::DifferentSet(ret));
+ return Ok(SyncRPC::Difference(ret_ranges, ret_items));
}
Err(Error::Message(format!("Unexpected sync RPC")))
}