aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/mod.rs6
-rw-r--r--src/table/table.rs524
-rw-r--r--src/table/table_fullcopy.rs100
-rw-r--r--src/table/table_sharded.rs55
-rw-r--r--src/table/table_sync.rs791
5 files changed, 1476 insertions, 0 deletions
diff --git a/src/table/mod.rs b/src/table/mod.rs
new file mode 100644
index 00000000..e03b8d0b
--- /dev/null
+++ b/src/table/mod.rs
@@ -0,0 +1,6 @@
+pub mod table;
+pub mod table_fullcopy;
+pub mod table_sharded;
+pub mod table_sync;
+
+pub use table::*;
diff --git a/src/table/table.rs b/src/table/table.rs
new file mode 100644
index 00000000..50e8739a
--- /dev/null
+++ b/src/table/table.rs
@@ -0,0 +1,524 @@
+use std::collections::{BTreeMap, HashMap};
+use std::sync::Arc;
+use std::time::Duration;
+
+use arc_swap::ArcSwapOption;
+use async_trait::async_trait;
+use futures::stream::*;
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
+
+use crate::data::*;
+use crate::error::Error;
+
+use crate::rpc::membership::{Ring, System};
+use crate::rpc::rpc_client::*;
+use crate::rpc::rpc_server::*;
+
+use crate::table::table_sync::*;
+
+const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
+
+pub struct Table<F: TableSchema, R: TableReplication> {
+ pub instance: F,
+ pub replication: R,
+
+ pub name: String,
+ pub rpc_client: Arc<RpcClient<TableRPC<F>>>,
+
+ pub system: Arc<System>,
+ pub store: sled::Tree,
+ pub syncer: ArcSwapOption<TableSyncer<F, R>>,
+}
+
+#[derive(Serialize, Deserialize)]
+pub enum TableRPC<F: TableSchema> {
+ Ok,
+
+ ReadEntry(F::P, F::S),
+ ReadEntryResponse(Option<ByteBuf>),
+
+ // Read range: read all keys in partition P, possibly starting at a certain sort key offset
+ ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize),
+
+ Update(Vec<Arc<ByteBuf>>),
+
+ SyncRPC(SyncRPC),
+}
+
+impl<F: TableSchema> RpcMessage for TableRPC<F> {}
+
+pub trait PartitionKey {
+ fn hash(&self) -> Hash;
+}
+
+pub trait SortKey {
+ fn sort_key(&self) -> &[u8];
+}
+
+pub trait Entry<P: PartitionKey, S: SortKey>:
+ PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
+{
+ fn partition_key(&self) -> &P;
+ fn sort_key(&self) -> &S;
+
+ fn merge(&mut self, other: &Self);
+}
+
+#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct EmptyKey;
+impl SortKey for EmptyKey {
+ fn sort_key(&self) -> &[u8] {
+ &[]
+ }
+}
+impl PartitionKey for EmptyKey {
+ fn hash(&self) -> Hash {
+ [0u8; 32].into()
+ }
+}
+
+impl<T: AsRef<str>> PartitionKey for T {
+ fn hash(&self) -> Hash {
+ hash(self.as_ref().as_bytes())
+ }
+}
+impl<T: AsRef<str>> SortKey for T {
+ fn sort_key(&self) -> &[u8] {
+ self.as_ref().as_bytes()
+ }
+}
+
+impl PartitionKey for Hash {
+ fn hash(&self) -> Hash {
+ self.clone()
+ }
+}
+impl SortKey for Hash {
+ fn sort_key(&self) -> &[u8] {
+ self.as_slice()
+ }
+}
+
+#[async_trait]
+pub trait TableSchema: Send + Sync {
+ type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type E: Entry<Self::P, Self::S>;
+ type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+
+ async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>;
+ fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
+ true
+ }
+}
+
+pub trait TableReplication: Send + Sync {
+ // See examples in table_sharded.rs and table_fullcopy.rs
+ // To understand various replication methods
+
+ // Which nodes to send reads from
+ fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
+ fn read_quorum(&self) -> usize;
+
+ // Which nodes to send writes to
+ fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
+ fn write_quorum(&self) -> usize;
+ fn max_write_errors(&self) -> usize;
+ fn epidemic_writes(&self) -> bool;
+
+ // Which are the nodes that do actually replicate the data
+ fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
+ fn split_points(&self, ring: &Ring) -> Vec<Hash>;
+}
+
+impl<F, R> Table<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
+
+ pub async fn new(
+ instance: F,
+ replication: R,
+ system: Arc<System>,
+ db: &sled::Db,
+ name: String,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
+ let store = db.open_tree(&name).expect("Unable to open DB tree");
+
+ let rpc_path = format!("table_{}", name);
+ let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
+
+ let table = Arc::new(Self {
+ instance,
+ replication,
+ name,
+ rpc_client,
+ system,
+ store,
+ syncer: ArcSwapOption::from(None),
+ });
+ table.clone().register_handler(rpc_server, rpc_path);
+
+ let syncer = TableSyncer::launch(table.clone()).await;
+ table.syncer.swap(Some(syncer));
+
+ table
+ }
+
+ pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
+ let hash = e.partition_key().hash();
+ let who = self.replication.write_nodes(&hash, &self.system);
+ //eprintln!("insert who: {:?}", who);
+
+ let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
+ let rpc = TableRPC::<F>::Update(vec![e_enc]);
+
+ self.rpc_client
+ .try_call_many(
+ &who[..],
+ rpc,
+ RequestStrategy::with_quorum(self.replication.write_quorum())
+ .with_timeout(TABLE_RPC_TIMEOUT),
+ )
+ .await?;
+ Ok(())
+ }
+
+ pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
+ let mut call_list = HashMap::new();
+
+ for entry in entries.iter() {
+ let hash = entry.partition_key().hash();
+ let who = self.replication.write_nodes(&hash, &self.system);
+ let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
+ for node in who {
+ if !call_list.contains_key(&node) {
+ call_list.insert(node, vec![]);
+ }
+ call_list.get_mut(&node).unwrap().push(e_enc.clone());
+ }
+ }
+
+ let call_futures = call_list.drain().map(|(node, entries)| async move {
+ let rpc = TableRPC::<F>::Update(entries);
+
+ let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
+ Ok::<_, Error>((node, resp))
+ });
+ let mut resps = call_futures.collect::<FuturesUnordered<_>>();
+ let mut errors = vec![];
+
+ while let Some(resp) = resps.next().await {
+ if let Err(e) = resp {
+ errors.push(e);
+ }
+ }
+ if errors.len() > self.replication.max_write_errors() {
+ Err(Error::Message("Too many errors".into()))
+ } else {
+ Ok(())
+ }
+ }
+
+ pub async fn get(
+ self: &Arc<Self>,
+ partition_key: &F::P,
+ sort_key: &F::S,
+ ) -> Result<Option<F::E>, Error> {
+ let hash = partition_key.hash();
+ let who = self.replication.read_nodes(&hash, &self.system);
+ //eprintln!("get who: {:?}", who);
+
+ let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
+ let resps = self
+ .rpc_client
+ .try_call_many(
+ &who[..],
+ rpc,
+ RequestStrategy::with_quorum(self.replication.read_quorum())
+ .with_timeout(TABLE_RPC_TIMEOUT)
+ .interrupt_after_quorum(true),
+ )
+ .await?;
+
+ let mut ret = None;
+ let mut not_all_same = false;
+ for resp in resps {
+ if let TableRPC::ReadEntryResponse(value) = resp {
+ if let Some(v_bytes) = value {
+ let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?;
+ ret = match ret {
+ None => Some(v),
+ Some(mut x) => {
+ if x != v {
+ not_all_same = true;
+ x.merge(&v);
+ }
+ Some(x)
+ }
+ }
+ }
+ } else {
+ return Err(Error::Message(format!("Invalid return value to read")));
+ }
+ }
+ if let Some(ret_entry) = &ret {
+ if not_all_same {
+ let self2 = self.clone();
+ let ent2 = ret_entry.clone();
+ self.system
+ .background
+ .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
+ }
+ }
+ Ok(ret)
+ }
+
+ pub async fn get_range(
+ self: &Arc<Self>,
+ partition_key: &F::P,
+ begin_sort_key: Option<F::S>,
+ filter: Option<F::Filter>,
+ limit: usize,
+ ) -> Result<Vec<F::E>, Error> {
+ let hash = partition_key.hash();
+ let who = self.replication.read_nodes(&hash, &self.system);
+
+ let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
+
+ let resps = self
+ .rpc_client
+ .try_call_many(
+ &who[..],
+ rpc,
+ RequestStrategy::with_quorum(self.replication.read_quorum())
+ .with_timeout(TABLE_RPC_TIMEOUT)
+ .interrupt_after_quorum(true),
+ )
+ .await?;
+
+ let mut ret = BTreeMap::new();
+ let mut to_repair = BTreeMap::new();
+ for resp in resps {
+ if let TableRPC::Update(entries) = resp {
+ for entry_bytes in entries.iter() {
+ let entry =
+ rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?;
+ let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
+ match ret.remove(&entry_key) {
+ None => {
+ ret.insert(entry_key, Some(entry));
+ }
+ Some(Some(mut prev)) => {
+ let must_repair = prev != entry;
+ prev.merge(&entry);
+ if must_repair {
+ to_repair.insert(entry_key.clone(), Some(prev.clone()));
+ }
+ ret.insert(entry_key, Some(prev));
+ }
+ Some(None) => unreachable!(),
+ }
+ }
+ }
+ }
+ if !to_repair.is_empty() {
+ let self2 = self.clone();
+ self.system.background.spawn_cancellable(async move {
+ for (_, v) in to_repair.iter_mut() {
+ self2.repair_on_read(&who[..], v.take().unwrap()).await?;
+ }
+ Ok(())
+ });
+ }
+ let ret_vec = ret
+ .iter_mut()
+ .take(limit)
+ .map(|(_k, v)| v.take().unwrap())
+ .collect::<Vec<_>>();
+ Ok(ret_vec)
+ }
+
+ // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
+
+ async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
+ let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
+ self.rpc_client
+ .try_call_many(
+ &who[..],
+ TableRPC::<F>::Update(vec![what_enc]),
+ RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT),
+ )
+ .await?;
+ Ok(())
+ }
+
+ // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ==============
+
+ fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ let self2 = self.clone();
+ rpc_server.add_handler::<TableRPC<F>, _, _>(path, move |msg, _addr| {
+ let self2 = self2.clone();
+ async move { self2.handle(&msg).await }
+ });
+
+ let self2 = self.clone();
+ self.rpc_client
+ .set_local_handler(self.system.id, move |msg| {
+ let self2 = self2.clone();
+ async move { self2.handle(&msg).await }
+ });
+ }
+
+ async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> {
+ match msg {
+ TableRPC::ReadEntry(key, sort_key) => {
+ let value = self.handle_read_entry(key, sort_key)?;
+ Ok(TableRPC::ReadEntryResponse(value))
+ }
+ TableRPC::ReadRange(key, begin_sort_key, filter, limit) => {
+ let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?;
+ Ok(TableRPC::Update(values))
+ }
+ TableRPC::Update(pairs) => {
+ self.handle_update(pairs).await?;
+ Ok(TableRPC::Ok)
+ }
+ TableRPC::SyncRPC(rpc) => {
+ let syncer = self.syncer.load_full().unwrap();
+ let response = syncer
+ .handle_rpc(rpc, self.system.background.stop_signal.clone())
+ .await?;
+ Ok(TableRPC::SyncRPC(response))
+ }
+ _ => Err(Error::BadRequest(format!("Unexpected table RPC"))),
+ }
+ }
+
+ fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> {
+ let tree_key = self.tree_key(p, s);
+ if let Some(bytes) = self.store.get(&tree_key)? {
+ Ok(Some(ByteBuf::from(bytes.to_vec())))
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn handle_read_range(
+ &self,
+ p: &F::P,
+ s: &Option<F::S>,
+ filter: &Option<F::Filter>,
+ limit: usize,
+ ) -> Result<Vec<Arc<ByteBuf>>, Error> {
+ let partition_hash = p.hash();
+ let first_key = match s {
+ None => partition_hash.to_vec(),
+ Some(sk) => self.tree_key(p, sk),
+ };
+ let mut ret = vec![];
+ for item in self.store.range(first_key..) {
+ let (key, value) = item?;
+ if &key[..32] != partition_hash.as_slice() {
+ break;
+ }
+ let keep = match filter {
+ None => true,
+ Some(f) => {
+ let entry = rmp_serde::decode::from_read_ref::<_, F::E>(value.as_ref())?;
+ F::matches_filter(&entry, f)
+ }
+ };
+ if keep {
+ ret.push(Arc::new(ByteBuf::from(value.as_ref())));
+ }
+ if ret.len() >= limit {
+ break;
+ }
+ }
+ Ok(ret)
+ }
+
+ pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
+ let syncer = self.syncer.load_full().unwrap();
+ let mut epidemic_propagate = vec![];
+
+ for update_bytes in entries.iter() {
+ let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;
+
+ let tree_key = self.tree_key(update.partition_key(), update.sort_key());
+
+ let (old_entry, new_entry) = self.store.transaction(|db| {
+ let (old_entry, new_entry) = match db.get(&tree_key)? {
+ Some(prev_bytes) => {
+ let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)
+ .map_err(Error::RMPDecode)
+ .map_err(sled::ConflictableTransactionError::Abort)?;
+ let mut new_entry = old_entry.clone();
+ new_entry.merge(&update);
+ (Some(old_entry), new_entry)
+ }
+ None => (None, update.clone()),
+ };
+
+ let new_bytes = rmp_to_vec_all_named(&new_entry)
+ .map_err(Error::RMPEncode)
+ .map_err(sled::ConflictableTransactionError::Abort)?;
+ db.insert(tree_key.clone(), new_bytes)?;
+ Ok((old_entry, new_entry))
+ })?;
+
+ if old_entry.as_ref() != Some(&new_entry) {
+ if self.replication.epidemic_writes() {
+ epidemic_propagate.push(new_entry.clone());
+ }
+
+ self.instance.updated(old_entry, Some(new_entry)).await?;
+ self.system
+ .background
+ .spawn_cancellable(syncer.clone().invalidate(tree_key));
+ }
+ }
+
+ if epidemic_propagate.len() > 0 {
+ let self2 = self.clone();
+ self.system
+ .background
+ .spawn_cancellable(async move { self2.insert_many(&epidemic_propagate[..]).await });
+ }
+
+ Ok(())
+ }
+
+ pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
+ let syncer = self.syncer.load_full().unwrap();
+
+ debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
+ let mut count = 0;
+ while let Some((key, _value)) = self.store.get_lt(end.as_slice())? {
+ if key.as_ref() < begin.as_slice() {
+ break;
+ }
+ if let Some(old_val) = self.store.remove(&key)? {
+ let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?;
+ self.instance.updated(Some(old_entry), None).await?;
+ self.system
+ .background
+ .spawn_cancellable(syncer.clone().invalidate(key.to_vec()));
+ count += 1;
+ }
+ }
+ debug!("({}) {} entries deleted", self.name, count);
+ Ok(())
+ }
+
+ fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
+ let mut ret = p.hash().to_vec();
+ ret.extend(s.sort_key());
+ ret
+ }
+}
diff --git a/src/table/table_fullcopy.rs b/src/table/table_fullcopy.rs
new file mode 100644
index 00000000..2cd2e464
--- /dev/null
+++ b/src/table/table_fullcopy.rs
@@ -0,0 +1,100 @@
+use arc_swap::ArcSwapOption;
+use std::sync::Arc;
+
+use crate::data::*;
+use crate::rpc::membership::{Ring, System};
+use crate::table::*;
+
+#[derive(Clone)]
+pub struct TableFullReplication {
+ pub write_factor: usize,
+ pub write_quorum: usize,
+
+ neighbors: ArcSwapOption<Neighbors>,
+}
+
+#[derive(Clone)]
+struct Neighbors {
+ ring: Arc<Ring>,
+ neighbors: Vec<UUID>,
+}
+
+impl TableFullReplication {
+ pub fn new(write_factor: usize, write_quorum: usize) -> Self {
+ TableFullReplication {
+ write_factor,
+ write_quorum,
+ neighbors: ArcSwapOption::from(None),
+ }
+ }
+
+ fn get_neighbors(&self, system: &System) -> Vec<UUID> {
+ let neighbors = self.neighbors.load_full();
+ if let Some(n) = neighbors {
+ if Arc::ptr_eq(&n.ring, &system.ring.borrow()) {
+ return n.neighbors.clone();
+ }
+ }
+
+ // Recalculate neighbors
+ let ring = system.ring.borrow().clone();
+ let my_id = system.id;
+
+ let mut nodes = vec![];
+ for (node, _) in ring.config.members.iter() {
+ let node_ranking = hash(&[node.as_slice(), my_id.as_slice()].concat());
+ nodes.push((*node, node_ranking));
+ }
+ nodes.sort_by(|(_, rank1), (_, rank2)| rank1.cmp(rank2));
+ let mut neighbors = nodes
+ .drain(..)
+ .map(|(node, _)| node)
+ .filter(|node| *node != my_id)
+ .take(self.write_factor)
+ .collect::<Vec<_>>();
+ neighbors.push(my_id);
+ self.neighbors.swap(Some(Arc::new(Neighbors {
+ ring,
+ neighbors: neighbors.clone(),
+ })));
+ neighbors
+ }
+}
+
+impl TableReplication for TableFullReplication {
+ // Full replication schema: all nodes store everything
+ // Writes are disseminated in an epidemic manner in the network
+
+ // Advantage: do all reads locally, extremely fast
+ // Inconvenient: only suitable to reasonably small tables
+
+ fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> {
+ vec![system.id]
+ }
+ fn read_quorum(&self) -> usize {
+ 1
+ }
+
+ fn write_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> {
+ self.get_neighbors(system)
+ }
+ fn write_quorum(&self) -> usize {
+ self.write_quorum
+ }
+ fn max_write_errors(&self) -> usize {
+ self.write_factor - self.write_quorum
+ }
+ fn epidemic_writes(&self) -> bool {
+ true
+ }
+
+ fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec<UUID> {
+ ring.config.members.keys().cloned().collect::<Vec<_>>()
+ }
+ fn split_points(&self, _ring: &Ring) -> Vec<Hash> {
+ let mut ret = vec![];
+ ret.push([0u8; 32].into());
+ ret.push([0xFFu8; 32].into());
+ ret
+ }
+}
diff --git a/src/table/table_sharded.rs b/src/table/table_sharded.rs
new file mode 100644
index 00000000..5190f5d4
--- /dev/null
+++ b/src/table/table_sharded.rs
@@ -0,0 +1,55 @@
+use crate::data::*;
+use crate::rpc::membership::{Ring, System};
+use crate::table::*;
+
+#[derive(Clone)]
+pub struct TableShardedReplication {
+ pub replication_factor: usize,
+ pub read_quorum: usize,
+ pub write_quorum: usize,
+}
+
+impl TableReplication for TableShardedReplication {
+ // Sharded replication schema:
+ // - based on the ring of nodes, a certain set of neighbors
+ // store entries, given as a function of the position of the
+ // entry's hash in the ring
+ // - reads are done on all of the nodes that replicate the data
+ // - writes as well
+
+ fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
+ let ring = system.ring.borrow().clone();
+ ring.walk_ring(&hash, self.replication_factor)
+ }
+ fn read_quorum(&self) -> usize {
+ self.read_quorum
+ }
+
+ fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
+ let ring = system.ring.borrow().clone();
+ ring.walk_ring(&hash, self.replication_factor)
+ }
+ fn write_quorum(&self) -> usize {
+ self.write_quorum
+ }
+ fn max_write_errors(&self) -> usize {
+ self.replication_factor - self.write_quorum
+ }
+ fn epidemic_writes(&self) -> bool {
+ false
+ }
+
+ fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID> {
+ ring.walk_ring(&hash, self.replication_factor)
+ }
+ fn split_points(&self, ring: &Ring) -> Vec<Hash> {
+ let mut ret = vec![];
+
+ ret.push([0u8; 32].into());
+ for entry in ring.ring.iter() {
+ ret.push(entry.location);
+ }
+ ret.push([0xFFu8; 32].into());
+ ret
+ }
+}
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
new file mode 100644
index 00000000..8f6582a7
--- /dev/null
+++ b/src/table/table_sync.rs
@@ -0,0 +1,791 @@
+use rand::Rng;
+use std::collections::{BTreeMap, VecDeque};
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use futures::future::BoxFuture;
+use futures::{pin_mut, select};
+use futures_util::future::*;
+use futures_util::stream::*;
+use serde::{Deserialize, Serialize};
+use serde_bytes::ByteBuf;
+use tokio::sync::Mutex;
+use tokio::sync::{mpsc, watch};
+
+use crate::data::*;
+use crate::error::Error;
+use crate::rpc::membership::Ring;
+use crate::table::*;
+
+const MAX_DEPTH: usize = 16;
+const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
+const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800);
+const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
+
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
+ table: Arc<Table<F, R>>,
+ todo: Mutex<SyncTodo>,
+ cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>,
+}
+
+#[derive(Serialize, Deserialize)]
+pub enum SyncRPC {
+ GetRootChecksumRange(Hash, Hash),
+ RootChecksumRange(SyncRange),
+ Checksums(Vec<RangeChecksum>, bool),
+ Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>),
+}
+
+pub struct SyncTodo {
+ todo: Vec<TodoPartition>,
+}
+
+#[derive(Debug, Clone)]
+struct TodoPartition {
+ begin: Hash,
+ end: Hash,
+ retain: bool,
+}
+
+// A SyncRange defines a query on the dataset stored by a node, in the following way:
+// - all items whose key are >= `begin`
+// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded)
+// - except if the first item of the range has such many leading zero bytes
+// - and stopping at `end` (excluded) if such an item is not found
+// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges"
+// i.e. of ranges of level `level-1` that cover the same range
+// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin)
+// See RangeChecksum for the struct that stores this information.
+#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
+pub struct SyncRange {
+ begin: Vec<u8>,
+ end: Vec<u8>,
+ level: usize,
+}
+
+impl std::cmp::PartialOrd for SyncRange {
+ fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
+impl std::cmp::Ord for SyncRange {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.begin
+ .cmp(&other.begin)
+ .then(self.level.cmp(&other.level))
+ .then(self.end.cmp(&other.end))
+ }
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct RangeChecksum {
+ bounds: SyncRange,
+ children: Vec<(SyncRange, Hash)>,
+ found_limit: Option<Vec<u8>>,
+
+ #[serde(skip, default = "std::time::Instant::now")]
+ time: Instant,
+}
+
+#[derive(Debug, Clone)]
+pub struct RangeChecksumCache {
+ hash: Option<Hash>, // None if no children
+ found_limit: Option<Vec<u8>>,
+ time: Instant,
+}
+
+impl<F, R> TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ pub async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
+ let todo = SyncTodo { todo: Vec::new() };
+ let syncer = Arc::new(TableSyncer {
+ table: table.clone(),
+ todo: Mutex::new(todo),
+ cache: (0..MAX_DEPTH)
+ .map(|_| Mutex::new(BTreeMap::new()))
+ .collect::<Vec<_>>(),
+ });
+
+ let (busy_tx, busy_rx) = mpsc::unbounded_channel();
+
+ let s1 = syncer.clone();
+ table
+ .system
+ .background
+ .spawn_worker(
+ format!("table sync watcher for {}", table.name),
+ move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
+ )
+ .await;
+
+ let s2 = syncer.clone();
+ table
+ .system
+ .background
+ .spawn_worker(
+ format!("table syncer for {}", table.name),
+ move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
+ )
+ .await;
+
+ let s3 = syncer.clone();
+ tokio::spawn(async move {
+ tokio::time::delay_for(Duration::from_secs(20)).await;
+ s3.add_full_scan().await;
+ });
+
+ syncer
+ }
+
+ async fn watcher_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ mut busy_rx: mpsc::UnboundedReceiver<bool>,
+ ) -> Result<(), Error> {
+ let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
+ let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
+ let mut nothing_to_do_since = Some(Instant::now());
+
+ while !*must_exit.borrow() {
+ let s_ring_recv = ring_recv.recv().fuse();
+ let s_busy = busy_rx.recv().fuse();
+ let s_must_exit = must_exit.recv().fuse();
+ let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
+ pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
+
+ select! {
+ new_ring_r = s_ring_recv => {
+ if let Some(new_ring) = new_ring_r {
+ debug!("({}) Adding ring difference to syncer todo list", self.table.name);
+ self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
+ prev_ring = new_ring;
+ }
+ }
+ busy_opt = s_busy => {
+ if let Some(busy) = busy_opt {
+ if busy {
+ nothing_to_do_since = None;
+ } else {
+ if nothing_to_do_since.is_none() {
+ nothing_to_do_since = Some(Instant::now());
+ }
+ }
+ }
+ }
+ must_exit_v = s_must_exit => {
+ if must_exit_v.unwrap_or(false) {
+ break;
+ }
+ }
+ _ = s_timeout => {
+ if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) {
+ nothing_to_do_since = None;
+ debug!("({}) Adding full scan to syncer todo list", self.table.name);
+ self.add_full_scan().await;
+ }
+ }
+ }
+ }
+ Ok(())
+ }
+
+ pub async fn add_full_scan(&self) {
+ self.todo.lock().await.add_full_scan(&self.table);
+ }
+
+ async fn syncer_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ busy_tx: mpsc::UnboundedSender<bool>,
+ ) -> Result<(), Error> {
+ while !*must_exit.borrow() {
+ if let Some(partition) = self.todo.lock().await.pop_task() {
+ busy_tx.send(true)?;
+ let res = self
+ .clone()
+ .sync_partition(&partition, &mut must_exit)
+ .await;
+ if let Err(e) = res {
+ warn!(
+ "({}) Error while syncing {:?}: {}",
+ self.table.name, partition, e
+ );
+ }
+ } else {
+ busy_tx.send(false)?;
+ tokio::time::delay_for(Duration::from_secs(1)).await;
+ }
+ }
+ Ok(())
+ }
+
+ async fn sync_partition(
+ self: Arc<Self>,
+ partition: &TodoPartition,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let my_id = self.table.system.id;
+ let nodes = self
+ .table
+ .replication
+ .write_nodes(&partition.begin, &self.table.system)
+ .into_iter()
+ .filter(|node| *node != my_id)
+ .collect::<Vec<_>>();
+
+ debug!(
+ "({}) Preparing to sync {:?} with {:?}...",
+ self.table.name, partition, nodes
+ );
+ let root_cks = self
+ .root_checksum(&partition.begin, &partition.end, must_exit)
+ .await?;
+
+ let mut sync_futures = nodes
+ .iter()
+ .map(|node| {
+ self.clone().do_sync_with(
+ partition.clone(),
+ root_cks.clone(),
+ *node,
+ partition.retain,
+ must_exit.clone(),
+ )
+ })
+ .collect::<FuturesUnordered<_>>();
+
+ let mut n_errors = 0;
+ while let Some(r) = sync_futures.next().await {
+ if let Err(e) = r {
+ n_errors += 1;
+ warn!("({}) Sync error: {}", self.table.name, e);
+ }
+ }
+ if n_errors > self.table.replication.max_write_errors() {
+ return Err(Error::Message(format!(
+ "Sync failed with too many nodes (should have been: {:?}).",
+ nodes
+ )));
+ }
+
+ if !partition.retain {
+ self.table
+ .delete_range(&partition.begin, &partition.end)
+ .await?;
+ }
+
+ Ok(())
+ }
+
+ async fn root_checksum(
+ self: &Arc<Self>,
+ begin: &Hash,
+ end: &Hash,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<RangeChecksum, Error> {
+ for i in 1..MAX_DEPTH {
+ let rc = self
+ .range_checksum(
+ &SyncRange {
+ begin: begin.to_vec(),
+ end: end.to_vec(),
+ level: i,
+ },
+ must_exit,
+ )
+ .await?;
+ if rc.found_limit.is_none() {
+ return Ok(rc);
+ }
+ }
+ Err(Error::Message(format!(
+ "Unable to compute root checksum (this should never happen)"
+ )))
+ }
+
+ async fn range_checksum(
+ self: &Arc<Self>,
+ range: &SyncRange,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<RangeChecksum, Error> {
+ assert!(range.level != 0);
+
+ if range.level == 1 {
+ let mut children = vec![];
+ for item in self
+ .table
+ .store
+ .range(range.begin.clone()..range.end.clone())
+ {
+ let (key, value) = item?;
+ let key_hash = hash(&key[..]);
+ if children.len() > 0
+ && key_hash.as_slice()[0..range.level]
+ .iter()
+ .all(|x| *x == 0u8)
+ {
+ return Ok(RangeChecksum {
+ bounds: range.clone(),
+ children,
+ found_limit: Some(key.to_vec()),
+ time: Instant::now(),
+ });
+ }
+ let item_range = SyncRange {
+ begin: key.to_vec(),
+ end: vec![],
+ level: 0,
+ };
+ children.push((item_range, hash(&value[..])));
+ }
+ Ok(RangeChecksum {
+ bounds: range.clone(),
+ children,
+ found_limit: None,
+ time: Instant::now(),
+ })
+ } else {
+ let mut children = vec![];
+ let mut sub_range = SyncRange {
+ begin: range.begin.clone(),
+ end: range.end.clone(),
+ level: range.level - 1,
+ };
+ let mut time = Instant::now();
+ while !*must_exit.borrow() {
+ let sub_ck = self
+ .range_checksum_cached_hash(&sub_range, must_exit)
+ .await?;
+
+ if let Some(hash) = sub_ck.hash {
+ children.push((sub_range.clone(), hash));
+ if sub_ck.time < time {
+ time = sub_ck.time;
+ }
+ }
+
+ if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
+ return Ok(RangeChecksum {
+ bounds: range.clone(),
+ children,
+ found_limit: None,
+ time,
+ });
+ }
+ let found_limit = sub_ck.found_limit.unwrap();
+
+ let actual_limit_hash = hash(&found_limit[..]);
+ if actual_limit_hash.as_slice()[0..range.level]
+ .iter()
+ .all(|x| *x == 0u8)
+ {
+ return Ok(RangeChecksum {
+ bounds: range.clone(),
+ children,
+ found_limit: Some(found_limit.clone()),
+ time,
+ });
+ }
+
+ sub_range.begin = found_limit;
+ }
+ Err(Error::Message(format!("Exiting.")))
+ }
+ }
+
+ fn range_checksum_cached_hash<'a>(
+ self: &'a Arc<Self>,
+ range: &'a SyncRange,
+ must_exit: &'a mut watch::Receiver<bool>,
+ ) -> BoxFuture<'a, Result<RangeChecksumCache, Error>> {
+ async move {
+ let mut cache = self.cache[range.level].lock().await;
+ if let Some(v) = cache.get(&range) {
+ if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT {
+ return Ok(v.clone());
+ }
+ }
+ cache.remove(&range);
+ drop(cache);
+
+ let v = self.range_checksum(&range, must_exit).await?;
+ trace!(
+ "({}) New checksum calculated for {}-{}/{}, {} children",
+ self.table.name,
+ hex::encode(&range.begin)
+ .chars()
+ .take(16)
+ .collect::<String>(),
+ hex::encode(&range.end).chars().take(16).collect::<String>(),
+ range.level,
+ v.children.len()
+ );
+
+ let hash = if v.children.len() > 0 {
+ Some(hash(&rmp_to_vec_all_named(&v)?[..]))
+ } else {
+ None
+ };
+ let cache_entry = RangeChecksumCache {
+ hash,
+ found_limit: v.found_limit,
+ time: v.time,
+ };
+
+ let mut cache = self.cache[range.level].lock().await;
+ cache.insert(range.clone(), cache_entry.clone());
+ Ok(cache_entry)
+ }
+ .boxed()
+ }
+
+ async fn do_sync_with(
+ self: Arc<Self>,
+ partition: TodoPartition,
+ root_ck: RangeChecksum,
+ who: UUID,
+ retain: bool,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let mut todo = VecDeque::new();
+
+ // If their root checksum has level > than us, use that as a reference
+ let root_cks_resp = self
+ .table
+ .rpc_client
+ .call(
+ who,
+ TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange(
+ partition.begin.clone(),
+ partition.end.clone(),
+ )),
+ TABLE_SYNC_RPC_TIMEOUT,
+ )
+ .await?;
+ if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp {
+ if range.level > root_ck.bounds.level {
+ let their_root_range_ck = self.range_checksum(&range, &mut must_exit).await?;
+ todo.push_back(their_root_range_ck);
+ } else {
+ todo.push_back(root_ck);
+ }
+ } else {
+ return Err(Error::BadRequest(format!(
+ "Invalid respone to GetRootChecksumRange RPC: {}",
+ debug_serialize(root_cks_resp)
+ )));
+ }
+
+ while !todo.is_empty() && !*must_exit.borrow() {
+ let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y);
+ trace!(
+ "({}) Sync with {:?}: {} ({}) remaining",
+ self.table.name,
+ who,
+ todo.len(),
+ total_children
+ );
+
+ let step_size = std::cmp::min(16, todo.len());
+ let step = todo.drain(..step_size).collect::<Vec<_>>();
+
+ let rpc_resp = self
+ .table
+ .rpc_client
+ .call(
+ who,
+ TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)),
+ TABLE_SYNC_RPC_TIMEOUT,
+ )
+ .await?;
+ if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) =
+ rpc_resp
+ {
+ if diff_ranges.len() > 0 || diff_items.len() > 0 {
+ info!(
+ "({}) Sync with {:?}: difference {} ranges, {} items",
+ self.table.name,
+ who,
+ diff_ranges.len(),
+ diff_items.len()
+ );
+ }
+ let mut items_to_send = vec![];
+ for differing in diff_ranges.drain(..) {
+ if differing.level == 0 {
+ items_to_send.push(differing.begin);
+ } else {
+ let checksum = self.range_checksum(&differing, &mut must_exit).await?;
+ todo.push_back(checksum);
+ }
+ }
+ if retain && diff_items.len() > 0 {
+ self.table.handle_update(&diff_items[..]).await?;
+ }
+ if items_to_send.len() > 0 {
+ self.send_items(who, items_to_send).await?;
+ }
+ } else {
+ return Err(Error::BadRequest(format!(
+ "Unexpected response to sync RPC checksums: {}",
+ debug_serialize(&rpc_resp)
+ )));
+ }
+ }
+ Ok(())
+ }
+
+ async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ info!(
+ "({}) Sending {} items to {:?}",
+ self.table.name,
+ item_list.len(),
+ who
+ );
+
+ let mut values = vec![];
+ for item in item_list.iter() {
+ if let Some(v) = self.table.store.get(&item[..])? {
+ values.push(Arc::new(ByteBuf::from(v.as_ref())));
+ }
+ }
+ let rpc_resp = self
+ .table
+ .rpc_client
+ .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT)
+ .await?;
+ if let TableRPC::<F>::Ok = rpc_resp {
+ Ok(())
+ } else {
+ Err(Error::Message(format!(
+ "Unexpected response to RPC Update: {}",
+ debug_serialize(&rpc_resp)
+ )))
+ }
+ }
+
+ pub async fn handle_rpc(
+ self: &Arc<Self>,
+ message: &SyncRPC,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<SyncRPC, Error> {
+ match message {
+ SyncRPC::GetRootChecksumRange(begin, end) => {
+ let root_cks = self.root_checksum(&begin, &end, &mut must_exit).await?;
+ Ok(SyncRPC::RootChecksumRange(root_cks.bounds))
+ }
+ SyncRPC::Checksums(checksums, retain) => {
+ self.handle_checksums_rpc(&checksums[..], *retain, &mut must_exit)
+ .await
+ }
+ _ => Err(Error::Message(format!("Unexpected sync RPC"))),
+ }
+ }
+
+ async fn handle_checksums_rpc(
+ self: &Arc<Self>,
+ checksums: &[RangeChecksum],
+ retain: bool,
+ must_exit: &mut watch::Receiver<bool>,
+ ) -> Result<SyncRPC, Error> {
+ let mut ret_ranges = vec![];
+ let mut ret_items = vec![];
+
+ for their_ckr in checksums.iter() {
+ let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit).await?;
+ for (their_range, their_hash) in their_ckr.children.iter() {
+ let differs = match our_ckr
+ .children
+ .binary_search_by(|(our_range, _)| our_range.cmp(&their_range))
+ {
+ Err(_) => {
+ if their_range.level >= 1 {
+ let cached_hash = self
+ .range_checksum_cached_hash(&their_range, must_exit)
+ .await?;
+ cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true)
+ } else {
+ true
+ }
+ }
+ Ok(i) => our_ckr.children[i].1 != *their_hash,
+ };
+ if differs {
+ ret_ranges.push(their_range.clone());
+ if retain && their_range.level == 0 {
+ if let Some(item_bytes) =
+ self.table.store.get(their_range.begin.as_slice())?
+ {
+ ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
+ }
+ }
+ }
+ }
+ for (our_range, _hash) in our_ckr.children.iter() {
+ if let Some(their_found_limit) = &their_ckr.found_limit {
+ if our_range.begin.as_slice() > their_found_limit.as_slice() {
+ break;
+ }
+ }
+
+ let not_present = our_ckr
+ .children
+ .binary_search_by(|(their_range, _)| their_range.cmp(&our_range))
+ .is_err();
+ if not_present {
+ if our_range.level > 0 {
+ ret_ranges.push(our_range.clone());
+ }
+ if retain && our_range.level == 0 {
+ if let Some(item_bytes) =
+ self.table.store.get(our_range.begin.as_slice())?
+ {
+ ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec())));
+ }
+ }
+ }
+ }
+ }
+ let n_checksums = checksums
+ .iter()
+ .map(|x| x.children.len())
+ .fold(0, |x, y| x + y);
+ if ret_ranges.len() > 0 || ret_items.len() > 0 {
+ trace!(
+ "({}) Checksum comparison RPC: {} different + {} items for {} received",
+ self.table.name,
+ ret_ranges.len(),
+ ret_items.len(),
+ n_checksums
+ );
+ }
+ Ok(SyncRPC::Difference(ret_ranges, ret_items))
+ }
+
+ pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> {
+ for i in 1..MAX_DEPTH {
+ let needle = SyncRange {
+ begin: item_key.to_vec(),
+ end: vec![],
+ level: i,
+ };
+ let mut cache = self.cache[i].lock().await;
+ if let Some(cache_entry) = cache.range(..=needle).rev().next() {
+ if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key {
+ let index = cache_entry.0.clone();
+ drop(cache_entry);
+ cache.remove(&index);
+ }
+ }
+ }
+ Ok(())
+ }
+}
+
+impl SyncTodo {
+ fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) {
+ let my_id = table.system.id;
+
+ self.todo.clear();
+
+ let ring = table.system.ring.borrow().clone();
+ let split_points = table.replication.split_points(&ring);
+
+ for i in 0..split_points.len() - 1 {
+ let begin = split_points[i];
+ let end = split_points[i + 1];
+ let nodes = table.replication.replication_nodes(&begin, &ring);
+
+ let retain = nodes.contains(&my_id);
+ if !retain {
+ // Check if we have some data to send, otherwise skip
+ if table.store.range(begin..end).next().is_none() {
+ continue;
+ }
+ }
+
+ self.todo.push(TodoPartition { begin, end, retain });
+ }
+ }
+
+ fn add_ring_difference<F: TableSchema, R: TableReplication>(
+ &mut self,
+ table: &Table<F, R>,
+ old_ring: &Ring,
+ new_ring: &Ring,
+ ) {
+ let my_id = table.system.id;
+
+ // If it is us who are entering or leaving the system,
+ // initiate a full sync instead of incremental sync
+ if old_ring.config.members.contains_key(&my_id)
+ != new_ring.config.members.contains_key(&my_id)
+ {
+ self.add_full_scan(table);
+ return;
+ }
+
+ let mut all_points = None
+ .into_iter()
+ .chain(table.replication.split_points(old_ring).drain(..))
+ .chain(table.replication.split_points(new_ring).drain(..))
+ .chain(self.todo.iter().map(|x| x.begin))
+ .chain(self.todo.iter().map(|x| x.end))
+ .collect::<Vec<_>>();
+ all_points.sort();
+ all_points.dedup();
+
+ let mut old_todo = std::mem::replace(&mut self.todo, vec![]);
+ old_todo.sort_by(|x, y| x.begin.cmp(&y.begin));
+ let mut new_todo = vec![];
+
+ for i in 0..all_points.len() - 1 {
+ let begin = all_points[i];
+ let end = all_points[i + 1];
+ let was_ours = table
+ .replication
+ .replication_nodes(&begin, &old_ring)
+ .contains(&my_id);
+ let is_ours = table
+ .replication
+ .replication_nodes(&begin, &new_ring)
+ .contains(&my_id);
+
+ let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) {
+ Ok(_) => true,
+ Err(j) => {
+ (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end)
+ || (j < old_todo.len()
+ && old_todo[j].begin < end && begin < old_todo[j].end)
+ }
+ };
+ if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) {
+ new_todo.push(TodoPartition {
+ begin,
+ end,
+ retain: is_ours,
+ });
+ }
+ }
+
+ self.todo = new_todo;
+ }
+
+ fn pop_task(&mut self) -> Option<TodoPartition> {
+ if self.todo.is_empty() {
+ return None;
+ }
+
+ let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
+ if i == self.todo.len() - 1 {
+ self.todo.pop()
+ } else {
+ let replacement = self.todo.pop().unwrap();
+ let ret = std::mem::replace(&mut self.todo[i], replacement);
+ Some(ret)
+ }
+ }
+}