aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/background.rs2
-rw-r--r--src/table.rs21
-rw-r--r--src/table_sync.rs210
3 files changed, 205 insertions, 28 deletions
diff --git a/src/background.rs b/src/background.rs
index 772745a6..f4b889ea 100644
--- a/src/background.rs
+++ b/src/background.rs
@@ -14,7 +14,7 @@ type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
pub struct BackgroundRunner {
n_runners: usize,
- stop_signal: watch::Receiver<bool>,
+ pub stop_signal: watch::Receiver<bool>,
queue_in: mpsc::UnboundedSender<(Job, bool)>,
queue_out: Mutex<mpsc::UnboundedReceiver<(Job, bool)>>,
diff --git a/src/table.rs b/src/table.rs
index 533b4291..99ac77bb 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -4,6 +4,7 @@ use std::time::Duration;
use async_trait::async_trait;
use futures::stream::*;
+use tokio::sync::RwLock;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -12,7 +13,7 @@ use crate::error::Error;
use crate::membership::System;
use crate::proto::*;
use crate::rpc_client::*;
-use crate::table_sync::TableSyncer;
+use crate::table_sync::*;
pub struct Table<F: TableSchema> {
pub instance: F,
@@ -21,6 +22,7 @@ pub struct Table<F: TableSchema> {
pub system: Arc<System>,
pub store: sled::Tree,
+ pub syncer: RwLock<Option<Arc<TableSyncer<F>>>>,
pub param: TableReplicationParams,
}
@@ -59,6 +61,9 @@ pub enum TableRPC<F: TableSchema> {
ReadEntryResponse(Option<ByteBuf>),
Update(Vec<Arc<ByteBuf>>),
+
+ SyncChecksums(Vec<RangeChecksum>),
+ SyncDifferentSet(Vec<SyncRange>),
}
pub trait PartitionKey {
@@ -132,8 +137,10 @@ impl<F: TableSchema + 'static> Table<F> {
system,
store,
param,
+ syncer: RwLock::new(None),
});
- TableSyncer::launch(table.clone()).await;
+ let syncer = TableSyncer::launch(table.clone()).await;
+ *table.syncer.write().await = Some(syncer);
table
}
@@ -309,6 +316,11 @@ impl<F: TableSchema + 'static> Table<F> {
self.handle_update(pairs).await?;
Ok(TableRPC::Ok)
}
+ TableRPC::SyncChecksums(checksums) => {
+ let syncer = self.syncer.read().await.as_ref().unwrap().clone();
+ let differing = syncer.handle_checksum_rpc(&checksums[..], self.system.background.stop_signal.clone()).await?;
+ Ok(TableRPC::SyncDifferentSet(differing))
+ }
_ => Err(Error::RPCError(format!("Unexpected table RPC"))),
}
}
@@ -353,6 +365,11 @@ impl<F: TableSchema + 'static> Table<F> {
Ok(())
}
+ pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
+ // TODO
+ Ok(())
+ }
+
fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec();
ret.extend(s.sort_key());
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 039dab6d..3dd9df33 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -1,12 +1,15 @@
use rand::Rng;
-use std::collections::BTreeSet;
+use std::collections::{BTreeSet, HashMap, VecDeque};
use std::sync::Arc;
-use std::time::Duration;
+use std::time::{Duration, Instant};
use futures::{pin_mut, select};
+use futures::future::BoxFuture;
+use futures_util::stream::*;
use futures_util::future::*;
use tokio::sync::watch;
use tokio::sync::Mutex;
+use serde::{Serialize, Deserialize};
use crate::data::*;
use crate::error::Error;
@@ -14,11 +17,12 @@ use crate::membership::Ring;
use crate::table::*;
const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
+const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
pub struct TableSyncer<F: TableSchema> {
pub table: Arc<Table<F>>,
-
pub todo: Mutex<SyncTodo>,
+ pub cache: Vec<Mutex<HashMap<SyncRange, RangeChecksum>>>,
}
pub struct SyncTodo {
@@ -32,12 +36,30 @@ pub struct Partition {
pub retain: bool,
}
+#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
+pub struct SyncRange {
+ pub begin: Vec<u8>,
+ pub end: Vec<u8>,
+ pub level: usize,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct RangeChecksum {
+ pub bounds: SyncRange,
+ pub children: Vec<(SyncRange, Hash)>,
+ pub found_limit: Option<Vec<u8>>,
+
+ #[serde(skip, default="std::time::Instant::now")]
+ pub time: Instant,
+}
+
impl<F: TableSchema + 'static> TableSyncer<F> {
pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> {
let todo = SyncTodo { todo: Vec::new() };
let syncer = Arc::new(TableSyncer {
table: table.clone(),
todo: Mutex::new(todo),
+ cache: (0..32).map(|_| Mutex::new(HashMap::new())).collect::<Vec<_>>(),
});
let s1 = syncer.clone();
@@ -95,39 +117,177 @@ impl<F: TableSchema + 'static> TableSyncer<F> {
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
- loop {
- let s_pop_task = self.pop_task().fuse();
- let s_must_exit = must_exit.recv().fuse();
- pin_mut!(s_must_exit, s_pop_task);
+ while !*must_exit.borrow() {
+ if let Some(partition) = self.todo.lock().await.pop_task() {
+ let res = self.clone().sync_partition(&partition, &mut must_exit).await;
+ if let Err(e) = res {
+ eprintln!("Error while syncing {:?}: {}", partition, e);
+ }
+ } else {
+ tokio::time::delay_for(Duration::from_secs(1)).await;
+ }
+ }
+ Ok(())
+ }
- select! {
- task = s_pop_task => {
- if let Some(partition) = task {
- let res = self.sync_partition(&partition).await;
- if let Err(e) = res {
- eprintln!("Error while syncing {:?}: {}", partition, e);
- }
- } else {
- tokio::time::delay_for(Duration::from_secs(1)).await;
- }
+ async fn sync_partition(self: Arc<Self>, partition: &Partition, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
+ let root_cks = self.root_checksum(&partition.begin, &partition.end, must_exit).await?;
+ eprintln!("Root checksum for {:?}: {:?}", partition, root_cks);
+
+ let nodes = self.table.system.ring.borrow().clone().walk_ring(&partition.begin, self.table.param.replication_factor);
+ let mut sync_futures = nodes.iter()
+ .map(|node| self.clone().do_sync_with(root_cks.clone(), node.clone(), must_exit.clone()))
+ .collect::<FuturesUnordered<_>>();
+
+ while let Some(r) = sync_futures.next().await {
+ if let Err(e) = r {
+ eprintln!("Sync error: {}", e);
+ }
+ }
+ if !partition.retain {
+ self.table.delete_range(&partition.begin, &partition.end).await?;
+ }
+
+ Ok(())
+ }
+
+ async fn root_checksum(self: &Arc<Self>, begin: &Hash, end: &Hash, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> {
+ for i in 1..32 {
+ let rc = self.range_checksum(&SyncRange{
+ begin: begin.to_vec(),
+ end: end.to_vec(),
+ level: i,
+ }, must_exit).await?;
+ if rc.found_limit.is_none() {
+ return Ok(rc);
+ }
+ }
+ Err(Error::Message(format!("Unable to compute root checksum (this should never happen")))
+ }
+
+ fn range_checksum<'a>(self: &'a Arc<Self>, range: &'a SyncRange, must_exit: &'a mut watch::Receiver<bool>) -> BoxFuture<'a, Result<RangeChecksum, Error>> {
+ async move {
+ let mut cache = self.cache[range.level].lock().await;
+ if let Some(v) = cache.get(&range) {
+ if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
+ return Ok(v.clone());
}
- must_exit_v = s_must_exit => {
- if must_exit_v.unwrap_or(false) {
- return Ok(())
+ }
+ cache.remove(&range);
+ drop(cache);
+
+ let v = self.range_checksum_inner(&range, must_exit).await?;
+
+ let mut cache = self.cache[range.level].lock().await;
+ eprintln!("Checksum for {:?}: {:?}", range, v);
+ cache.insert(range.clone(), v.clone());
+ Ok(v)
+ }.boxed()
+ }
+
+ async fn range_checksum_inner(self: &Arc<Self>, range: &SyncRange, must_exit: &mut watch::Receiver<bool>) -> Result<RangeChecksum, Error> {
+ if range.level == 1 {
+ let mut children = vec![];
+ for item in self.table.store.range(range.begin.clone()..range.end.clone()) {
+ let (key, value) = item?;
+ let key_hash = hash(&key[..]);
+ if key != range.begin && key_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) {
+ return Ok(RangeChecksum{
+ bounds: range.clone(),
+ children,
+ found_limit: Some(key.to_vec()),
+ time: Instant::now(),
+ })
+ }
+ let item_range = SyncRange{
+ begin: key.to_vec(),
+ end: vec![],
+ level: 0,
+ };
+ children.push((item_range, hash(&value[..])));
+ }
+ Ok(RangeChecksum{
+ bounds: range.clone(),
+ children,
+ found_limit: None,
+ time: Instant::now(),
+ })
+ } else {
+ let mut children = vec![];
+ let mut sub_range = SyncRange{
+ begin: range.begin.clone(),
+ end: range.end.clone(),
+ level: range.level - 1,
+ };
+ let mut time = Instant::now();
+ while !*must_exit.borrow() {
+ let sub_ck = self.range_checksum(&sub_range, must_exit).await?;
+
+ if sub_ck.children.len() > 0 {
+ let sub_ck_hash = hash(&rmp_to_vec_all_named(&sub_ck)?[..]);
+ children.push((sub_range.clone(), sub_ck_hash));
+ if sub_ck.time < time {
+ time = sub_ck.time;
}
}
+
+ if sub_ck.found_limit.is_none() || sub_ck.children.len() == 0 {
+ return Ok(RangeChecksum{
+ bounds: range.clone(),
+ children,
+ found_limit: None,
+ time,
+ });
+ }
+ let found_limit = sub_ck.found_limit.unwrap();
+
+ let actual_limit_hash = hash(&found_limit[..]);
+ if actual_limit_hash.as_slice()[0..range.level].iter().all(|x| *x == 0) {
+ return Ok(RangeChecksum{
+ bounds: range.clone(),
+ children,
+ found_limit: Some(found_limit.clone()),
+ time,
+ });
+ }
+
+ sub_range.begin = found_limit;
}
+ Err(Error::Message(format!("Exiting.")))
}
}
- async fn pop_task(&self) -> Option<Partition> {
- self.todo.lock().await.pop_task()
- }
+ async fn do_sync_with(self: Arc<Self>, root_ck: RangeChecksum, who: UUID, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> {
+ let mut todo = VecDeque::new();
+ todo.push_back(root_ck);
- async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> {
- eprintln!("NOT IMPLEMENTED: SYNC PARTITION {:?}", partition);
+ while !todo.is_empty() && !*must_exit.borrow() {
+ let end = std::cmp::min(16, todo.len());
+ let step = todo.drain(..end).collect::<Vec<_>>();
+ unimplemented!()
+ }
Ok(())
}
+
+ pub async fn handle_checksum_rpc(self: &Arc<Self>, checksums: &[RangeChecksum], mut must_exit: watch::Receiver<bool>) -> Result<Vec<SyncRange>, Error> {
+ let mut ret = vec![];
+ for ckr in checksums.iter() {
+ let our_ckr = self.range_checksum(&ckr.bounds, &mut must_exit).await?;
+ for (range, hash) in ckr.children.iter() {
+ match our_ckr.children.binary_search_by(|(our_range, _)| our_range.begin.cmp(&range.begin)) {
+ Err(_) => {
+ ret.push(range.clone());
+ }
+ Ok(i) => {
+ if our_ckr.children[i].1 != *hash {
+ ret.push(range.clone());
+ }
+ }
+ }
+ }
+ }
+ Ok(ret)
+ }
}
impl SyncTodo {