aboutsummaryrefslogtreecommitdiff
path: root/src/table_sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r--src/table_sync.rs88
1 files changed, 81 insertions, 7 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 3dd9df33..92aa8c2a 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -1,5 +1,5 @@
use rand::Rng;
-use std::collections::{BTreeSet, HashMap, VecDeque};
+use std::collections::{BTreeSet, BTreeMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -10,19 +10,21 @@ use futures_util::future::*;
use tokio::sync::watch;
use tokio::sync::Mutex;
use serde::{Serialize, Deserialize};
+use serde_bytes::ByteBuf;
use crate::data::*;
use crate::error::Error;
use crate::membership::Ring;
use crate::table::*;
+const MAX_DEPTH: usize = 16;
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
pub struct TableSyncer<F: TableSchema> {
pub table: Arc<Table<F>>,
pub todo: Mutex<SyncTodo>,
- pub cache: Vec<Mutex<HashMap<SyncRange, RangeChecksum>>>,
+ pub cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksum>>>,
}
pub struct SyncTodo {
@@ -43,6 +45,17 @@ pub struct SyncRange {
pub level: usize,
}
+impl std::cmp::PartialOrd for SyncRange {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+impl std::cmp::Ord for SyncRange {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.begin.cmp(&other.begin)
+ }
+}
+
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RangeChecksum {
pub bounds: SyncRange,
@@ -59,7 +72,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
let syncer = Arc::new(TableSyncer {
table: table.clone(),
todo: Mutex::new(todo),
- cache: (0..32).map(|_| Mutex::new(HashMap::new())).collect::<Vec<_>>(),
+ cache: (0..MAX_DEPTH).map(|_| Mutex::new(BTreeMap::new())).collect::<Vec<_>>(),
});
let s1 = syncer.clone();
@@ -83,12 +96,14 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
+ tokio::time::delay_for(Duration::from_secs(10));
+
self.todo.lock().await.add_full_scan(&self.table);
let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
- loop {
+ while !*must_exit.borrow() {
let s_ring_recv = ring_recv.recv().fuse();
let s_must_exit = must_exit.recv().fuse();
pin_mut!(s_ring_recv, s_must_exit);
@@ -96,21 +111,24 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
select! {
_ = next_full_scan => {
next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
+ eprintln!("Adding full scan to syncer todo list");
self.todo.lock().await.add_full_scan(&self.table);
}
new_ring_r = s_ring_recv => {
if let Some(new_ring) = new_ring_r {
+ eprintln!("Adding ring difference to syncer todo list");
self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
prev_ring = new_ring;
}
}
must_exit_v = s_must_exit => {
if must_exit_v.unwrap_or(false) {
- return Ok(())
+ break;
}
}
}
}
+ Ok(())
}
async fn syncer_task(
@@ -131,6 +149,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
+ eprintln!("Calculating root checksum for {:?}...", partition);
let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?;
eprintln!("Root checksum for {:?}: {:?}", partition, root_cks);
@@ -152,7 +171,7 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> {
- for i in 1..32 {
+ for i in 1..MAX_DEPTH {
let rc = self.range_checksum(&SyncRange{
begin: begin.to_vec(),
end: end.to_vec(),
@@ -262,13 +281,49 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
todo.push_back(root_ck);
while !todo.is_empty() && !*must_exit.borrow() {
+ eprintln!("Sync with {:?}: {} remaining", who, todo.len());
+
let end = std::cmp::min(16, todo.len());
let step = todo.drain(..end).collect::<Vec<_>>();
- unimplemented!()
+
+ let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::SyncChecksums(step)).await?;
+ if let TableRPC::<F>::SyncDifferentSet(mut s) = rpc_resp {
+ let mut items = vec![];
+ for differing in s.drain(..) {
+ if differing.level == 0 {
+ items.push(differing.begin);
+ } else {
+ let checksum = self.range_checksum(&differing, &mut must_exit).await?;
+ todo.push_back(checksum);
+ }
+ }
+ if items.len() > 0 {
+ self.table.system.background.spawn(self.clone().send_items(who.clone(), items));
+ }
+ } else {
+ return Err(Error::Message(format!("Unexpected response to RPC SyncChecksums: {}", debug_serialize(&rpc_resp))));
+ }
}
Ok(())
}
+ async fn send_items(self: Arc<Self>, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ eprintln!("Sending {} items to {:?}", item_list.len(), who);
+
+ let mut values = vec![];
+ for item in item_list.iter() {
+ if let Some(v) = self.table.store.get(&item[..])? {
+ values.push(Arc::new(ByteBuf::from(v.as_ref())));
+ }
+ }
+ let rpc_resp = self.table.rpc_call(&who, &TableRPC::<F>::Update(values)).await?;
+ if let TableRPC::<F>::Ok = rpc_resp {
+ Ok(())
+ } else {
+ Err(Error::Message(format!("Unexpected response to RPC Update: {}", debug_serialize(&rpc_resp))))
+ }
+ }
+
pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> {
let mut ret = vec![];
for ckr in checksums.iter() {
@@ -288,6 +343,25 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
}
Ok(ret)
}
+
+ pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> {
+ for i in 1..MAX_DEPTH {
+ let needle = SyncRange{
+ begin: item_key.to_vec(),
+ end: vec![],
+ level: i,
+ };
+ let mut cache = self.cache[i].lock().await;
+ if let Some(cache_entry) = cache.range(..=needle).rev().next() {
+ if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key {
+ let index = cache_entry.0.clone();
+ drop(cache_entry);
+ cache.remove(&index);
+ }
+ }
+ }
+ Ok(())
+ }
}
impl SyncTodo {