diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/mod.rs | 6 | ||||
-rw-r--r-- | src/table/table.rs | 524 | ||||
-rw-r--r-- | src/table/table_fullcopy.rs | 100 | ||||
-rw-r--r-- | src/table/table_sharded.rs | 55 | ||||
-rw-r--r-- | src/table/table_sync.rs | 791 |
5 files changed, 1476 insertions, 0 deletions
diff --git a/src/table/mod.rs b/src/table/mod.rs new file mode 100644 index 00000000..e03b8d0b --- /dev/null +++ b/src/table/mod.rs @@ -0,0 +1,6 @@ +pub mod table; +pub mod table_fullcopy; +pub mod table_sharded; +pub mod table_sync; + +pub use table::*; diff --git a/src/table/table.rs b/src/table/table.rs new file mode 100644 index 00000000..50e8739a --- /dev/null +++ b/src/table/table.rs @@ -0,0 +1,524 @@ +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; +use std::time::Duration; + +use arc_swap::ArcSwapOption; +use async_trait::async_trait; +use futures::stream::*; +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; + +use crate::data::*; +use crate::error::Error; + +use crate::rpc::membership::{Ring, System}; +use crate::rpc::rpc_client::*; +use crate::rpc::rpc_server::*; + +use crate::table::table_sync::*; + +const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); + +pub struct Table<F: TableSchema, R: TableReplication> { + pub instance: F, + pub replication: R, + + pub name: String, + pub rpc_client: Arc<RpcClient<TableRPC<F>>>, + + pub system: Arc<System>, + pub store: sled::Tree, + pub syncer: ArcSwapOption<TableSyncer<F, R>>, +} + +#[derive(Serialize, Deserialize)] +pub enum TableRPC<F: TableSchema> { + Ok, + + ReadEntry(F::P, F::S), + ReadEntryResponse(Option<ByteBuf>), + + // Read range: read all keys in partition P, possibly starting at a certain sort key offset + ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize), + + Update(Vec<Arc<ByteBuf>>), + + SyncRPC(SyncRPC), +} + +impl<F: TableSchema> RpcMessage for TableRPC<F> {} + +pub trait PartitionKey { + fn hash(&self) -> Hash; +} + +pub trait SortKey { + fn sort_key(&self) -> &[u8]; +} + +pub trait Entry<P: PartitionKey, S: SortKey>: + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync +{ + fn partition_key(&self) -> &P; + fn sort_key(&self) -> &S; + + fn merge(&mut self, other: &Self); +} + +#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct EmptyKey; +impl SortKey for EmptyKey { + fn sort_key(&self) -> &[u8] { + &[] + } +} +impl PartitionKey for EmptyKey { + fn hash(&self) -> Hash { + [0u8; 32].into() + } +} + +impl<T: AsRef<str>> PartitionKey for T { + fn hash(&self) -> Hash { + hash(self.as_ref().as_bytes()) + } +} +impl<T: AsRef<str>> SortKey for T { + fn sort_key(&self) -> &[u8] { + self.as_ref().as_bytes() + } +} + +impl PartitionKey for Hash { + fn hash(&self) -> Hash { + self.clone() + } +} +impl SortKey for Hash { + fn sort_key(&self) -> &[u8] { + self.as_slice() + } +} + +#[async_trait] +pub trait TableSchema: Send + Sync { + type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; + type E: Entry<Self::P, Self::S>; + type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; + + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>; + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { + true + } +} + +pub trait TableReplication: Send + Sync { + // See examples in table_sharded.rs and table_fullcopy.rs + // To understand various replication methods + + // Which nodes to send reads from + fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; + fn read_quorum(&self) -> usize; + + // Which nodes to send writes to + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>; + fn write_quorum(&self) -> usize; + fn max_write_errors(&self) -> usize; + fn epidemic_writes(&self) -> bool; + + // Which are the nodes that do actually replicate the data + fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>; + fn split_points(&self, ring: &Ring) -> Vec<Hash>; +} + +impl<F, R> Table<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== + + pub async fn new( + instance: F, + replication: R, + system: Arc<System>, + db: &sled::Db, + name: String, + rpc_server: &mut RpcServer, + ) -> Arc<Self> { + let store = db.open_tree(&name).expect("Unable to open DB tree"); + + let rpc_path = format!("table_{}", name); + let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); + + let table = Arc::new(Self { + instance, + replication, + name, + rpc_client, + system, + store, + syncer: ArcSwapOption::from(None), + }); + table.clone().register_handler(rpc_server, rpc_path); + + let syncer = TableSyncer::launch(table.clone()).await; + table.syncer.swap(Some(syncer)); + + table + } + + pub async fn insert(&self, e: &F::E) -> Result<(), Error> { + let hash = e.partition_key().hash(); + let who = self.replication.write_nodes(&hash, &self.system); + //eprintln!("insert who: {:?}", who); + + let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); + let rpc = TableRPC::<F>::Update(vec![e_enc]); + + self.rpc_client + .try_call_many( + &who[..], + rpc, + RequestStrategy::with_quorum(self.replication.write_quorum()) + .with_timeout(TABLE_RPC_TIMEOUT), + ) + .await?; + Ok(()) + } + + pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { + let mut call_list = HashMap::new(); + + for entry in entries.iter() { + let hash = entry.partition_key().hash(); + let who = self.replication.write_nodes(&hash, &self.system); + let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); + for node in who { + if !call_list.contains_key(&node) { + call_list.insert(node, vec![]); + } + call_list.get_mut(&node).unwrap().push(e_enc.clone()); + } + } + + let call_futures = call_list.drain().map(|(node, entries)| async move { + let rpc = TableRPC::<F>::Update(entries); + + let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?; + Ok::<_, Error>((node, resp)) + }); + let mut resps = call_futures.collect::<FuturesUnordered<_>>(); + let mut errors = vec![]; + + while let Some(resp) = resps.next().await { + if let Err(e) = resp { + errors.push(e); + } + } + if errors.len() > self.replication.max_write_errors() { + Err(Error::Message("Too many errors".into())) + } else { + Ok(()) + } + } + + pub async fn get( + self: &Arc<Self>, + partition_key: &F::P, + sort_key: &F::S, + ) -> Result<Option<F::E>, Error> { + let hash = partition_key.hash(); + let who = self.replication.read_nodes(&hash, &self.system); + //eprintln!("get who: {:?}", who); + + let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); + let resps = self + .rpc_client + .try_call_many( + &who[..], + rpc, + RequestStrategy::with_quorum(self.replication.read_quorum()) + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + + let mut ret = None; + let mut not_all_same = false; + for resp in resps { + if let TableRPC::ReadEntryResponse(value) = resp { + if let Some(v_bytes) = value { + let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?; + ret = match ret { + None => Some(v), + Some(mut x) => { + if x != v { + not_all_same = true; + x.merge(&v); + } + Some(x) + } + } + } + } else { + return Err(Error::Message(format!("Invalid return value to read"))); + } + } + if let Some(ret_entry) = &ret { + if not_all_same { + let self2 = self.clone(); + let ent2 = ret_entry.clone(); + self.system + .background + .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); + } + } + Ok(ret) + } + + pub async fn get_range( + self: &Arc<Self>, + partition_key: &F::P, + begin_sort_key: Option<F::S>, + filter: Option<F::Filter>, + limit: usize, + ) -> Result<Vec<F::E>, Error> { + let hash = partition_key.hash(); + let who = self.replication.read_nodes(&hash, &self.system); + + let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit); + + let resps = self + .rpc_client + .try_call_many( + &who[..], + rpc, + RequestStrategy::with_quorum(self.replication.read_quorum()) + .with_timeout(TABLE_RPC_TIMEOUT) + .interrupt_after_quorum(true), + ) + .await?; + + let mut ret = BTreeMap::new(); + let mut to_repair = BTreeMap::new(); + for resp in resps { + if let TableRPC::Update(entries) = resp { + for entry_bytes in entries.iter() { + let entry = + rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; + let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); + match ret.remove(&entry_key) { + None => { + ret.insert(entry_key, Some(entry)); + } + Some(Some(mut prev)) => { + let must_repair = prev != entry; + prev.merge(&entry); + if must_repair { + to_repair.insert(entry_key.clone(), Some(prev.clone())); + } + ret.insert(entry_key, Some(prev)); + } + Some(None) => unreachable!(), + } + } + } + } + if !to_repair.is_empty() { + let self2 = self.clone(); + self.system.background.spawn_cancellable(async move { + for (_, v) in to_repair.iter_mut() { + self2.repair_on_read(&who[..], v.take().unwrap()).await?; + } + Ok(()) + }); + } + let ret_vec = ret + .iter_mut() + .take(limit) + .map(|(_k, v)| v.take().unwrap()) + .collect::<Vec<_>>(); + Ok(ret_vec) + } + + // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== + + async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> { + let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); + self.rpc_client + .try_call_many( + &who[..], + TableRPC::<F>::Update(vec![what_enc]), + RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT), + ) + .await?; + Ok(()) + } + + // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ============== + + fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) { + let self2 = self.clone(); + rpc_server.add_handler::<TableRPC<F>, _, _>(path, move |msg, _addr| { + let self2 = self2.clone(); + async move { self2.handle(&msg).await } + }); + + let self2 = self.clone(); + self.rpc_client + .set_local_handler(self.system.id, move |msg| { + let self2 = self2.clone(); + async move { self2.handle(&msg).await } + }); + } + + async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> { + match msg { + TableRPC::ReadEntry(key, sort_key) => { + let value = self.handle_read_entry(key, sort_key)?; + Ok(TableRPC::ReadEntryResponse(value)) + } + TableRPC::ReadRange(key, begin_sort_key, filter, limit) => { + let values = self.handle_read_range(key, begin_sort_key, filter, *limit)?; + Ok(TableRPC::Update(values)) + } + TableRPC::Update(pairs) => { + self.handle_update(pairs).await?; + Ok(TableRPC::Ok) + } + TableRPC::SyncRPC(rpc) => { + let syncer = self.syncer.load_full().unwrap(); + let response = syncer + .handle_rpc(rpc, self.system.background.stop_signal.clone()) + .await?; + Ok(TableRPC::SyncRPC(response)) + } + _ => Err(Error::BadRequest(format!("Unexpected table RPC"))), + } + } + + fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<ByteBuf>, Error> { + let tree_key = self.tree_key(p, s); + if let Some(bytes) = self.store.get(&tree_key)? { + Ok(Some(ByteBuf::from(bytes.to_vec()))) + } else { + Ok(None) + } + } + + fn handle_read_range( + &self, + p: &F::P, + s: &Option<F::S>, + filter: &Option<F::Filter>, + limit: usize, + ) -> Result<Vec<Arc<ByteBuf>>, Error> { + let partition_hash = p.hash(); + let first_key = match s { + None => partition_hash.to_vec(), + Some(sk) => self.tree_key(p, sk), + }; + let mut ret = vec![]; + for item in self.store.range(first_key..) { + let (key, value) = item?; + if &key[..32] != partition_hash.as_slice() { + break; + } + let keep = match filter { + None => true, + Some(f) => { + let entry = rmp_serde::decode::from_read_ref::<_, F::E>(value.as_ref())?; + F::matches_filter(&entry, f) + } + }; + if keep { + ret.push(Arc::new(ByteBuf::from(value.as_ref()))); + } + if ret.len() >= limit { + break; + } + } + Ok(ret) + } + + pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { + let syncer = self.syncer.load_full().unwrap(); + let mut epidemic_propagate = vec![]; + + for update_bytes in entries.iter() { + let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; + + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let (old_entry, new_entry) = self.store.transaction(|db| { + let (old_entry, new_entry) = match db.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes) + .map_err(Error::RMPDecode) + .map_err(sled::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) + .map_err(sled::ConflictableTransactionError::Abort)?; + db.insert(tree_key.clone(), new_bytes)?; + Ok((old_entry, new_entry)) + })?; + + if old_entry.as_ref() != Some(&new_entry) { + if self.replication.epidemic_writes() { + epidemic_propagate.push(new_entry.clone()); + } + + self.instance.updated(old_entry, Some(new_entry)).await?; + self.system + .background + .spawn_cancellable(syncer.clone().invalidate(tree_key)); + } + } + + if epidemic_propagate.len() > 0 { + let self2 = self.clone(); + self.system + .background + .spawn_cancellable(async move { self2.insert_many(&epidemic_propagate[..]).await }); + } + + Ok(()) + } + + pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { + let syncer = self.syncer.load_full().unwrap(); + + debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end); + let mut count = 0; + while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { + if key.as_ref() < begin.as_slice() { + break; + } + if let Some(old_val) = self.store.remove(&key)? { + let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?; + self.instance.updated(Some(old_entry), None).await?; + self.system + .background + .spawn_cancellable(syncer.clone().invalidate(key.to_vec())); + count += 1; + } + } + debug!("({}) {} entries deleted", self.name, count); + Ok(()) + } + + fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { + let mut ret = p.hash().to_vec(); + ret.extend(s.sort_key()); + ret + } +} diff --git a/src/table/table_fullcopy.rs b/src/table/table_fullcopy.rs new file mode 100644 index 00000000..2cd2e464 --- /dev/null +++ b/src/table/table_fullcopy.rs @@ -0,0 +1,100 @@ +use arc_swap::ArcSwapOption; +use std::sync::Arc; + +use crate::data::*; +use crate::rpc::membership::{Ring, System}; +use crate::table::*; + +#[derive(Clone)] +pub struct TableFullReplication { + pub write_factor: usize, + pub write_quorum: usize, + + neighbors: ArcSwapOption<Neighbors>, +} + +#[derive(Clone)] +struct Neighbors { + ring: Arc<Ring>, + neighbors: Vec<UUID>, +} + +impl TableFullReplication { + pub fn new(write_factor: usize, write_quorum: usize) -> Self { + TableFullReplication { + write_factor, + write_quorum, + neighbors: ArcSwapOption::from(None), + } + } + + fn get_neighbors(&self, system: &System) -> Vec<UUID> { + let neighbors = self.neighbors.load_full(); + if let Some(n) = neighbors { + if Arc::ptr_eq(&n.ring, &system.ring.borrow()) { + return n.neighbors.clone(); + } + } + + // Recalculate neighbors + let ring = system.ring.borrow().clone(); + let my_id = system.id; + + let mut nodes = vec![]; + for (node, _) in ring.config.members.iter() { + let node_ranking = hash(&[node.as_slice(), my_id.as_slice()].concat()); + nodes.push((*node, node_ranking)); + } + nodes.sort_by(|(_, rank1), (_, rank2)| rank1.cmp(rank2)); + let mut neighbors = nodes + .drain(..) + .map(|(node, _)| node) + .filter(|node| *node != my_id) + .take(self.write_factor) + .collect::<Vec<_>>(); + neighbors.push(my_id); + self.neighbors.swap(Some(Arc::new(Neighbors { + ring, + neighbors: neighbors.clone(), + }))); + neighbors + } +} + +impl TableReplication for TableFullReplication { + // Full replication schema: all nodes store everything + // Writes are disseminated in an epidemic manner in the network + + // Advantage: do all reads locally, extremely fast + // Inconvenient: only suitable to reasonably small tables + + fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> { + vec![system.id] + } + fn read_quorum(&self) -> usize { + 1 + } + + fn write_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> { + self.get_neighbors(system) + } + fn write_quorum(&self) -> usize { + self.write_quorum + } + fn max_write_errors(&self) -> usize { + self.write_factor - self.write_quorum + } + fn epidemic_writes(&self) -> bool { + true + } + + fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec<UUID> { + ring.config.members.keys().cloned().collect::<Vec<_>>() + } + fn split_points(&self, _ring: &Ring) -> Vec<Hash> { + let mut ret = vec![]; + ret.push([0u8; 32].into()); + ret.push([0xFFu8; 32].into()); + ret + } +} diff --git a/src/table/table_sharded.rs b/src/table/table_sharded.rs new file mode 100644 index 00000000..5190f5d4 --- /dev/null +++ b/src/table/table_sharded.rs @@ -0,0 +1,55 @@ +use crate::data::*; +use crate::rpc::membership::{Ring, System}; +use crate::table::*; + +#[derive(Clone)] +pub struct TableShardedReplication { + pub replication_factor: usize, + pub read_quorum: usize, + pub write_quorum: usize, +} + +impl TableReplication for TableShardedReplication { + // Sharded replication schema: + // - based on the ring of nodes, a certain set of neighbors + // store entries, given as a function of the position of the + // entry's hash in the ring + // - reads are done on all of the nodes that replicate the data + // - writes as well + + fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> { + let ring = system.ring.borrow().clone(); + ring.walk_ring(&hash, self.replication_factor) + } + fn read_quorum(&self) -> usize { + self.read_quorum + } + + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> { + let ring = system.ring.borrow().clone(); + ring.walk_ring(&hash, self.replication_factor) + } + fn write_quorum(&self) -> usize { + self.write_quorum + } + fn max_write_errors(&self) -> usize { + self.replication_factor - self.write_quorum + } + fn epidemic_writes(&self) -> bool { + false + } + + fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID> { + ring.walk_ring(&hash, self.replication_factor) + } + fn split_points(&self, ring: &Ring) -> Vec<Hash> { + let mut ret = vec![]; + + ret.push([0u8; 32].into()); + for entry in ring.ring.iter() { + ret.push(entry.location); + } + ret.push([0xFFu8; 32].into()); + ret + } +} diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs new file mode 100644 index 00000000..8f6582a7 --- /dev/null +++ b/src/table/table_sync.rs @@ -0,0 +1,791 @@ +use rand::Rng; +use std::collections::{BTreeMap, VecDeque}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use futures::future::BoxFuture; +use futures::{pin_mut, select}; +use futures_util::future::*; +use futures_util::stream::*; +use serde::{Deserialize, Serialize}; +use serde_bytes::ByteBuf; +use tokio::sync::Mutex; +use tokio::sync::{mpsc, watch}; + +use crate::data::*; +use crate::error::Error; +use crate::rpc::membership::Ring; +use crate::table::*; + +const MAX_DEPTH: usize = 16; +const SCAN_INTERVAL: Duration = Duration::from_secs(3600); +const CHECKSUM_CACHE_TIMEOUT: Duration = Duration::from_secs(1800); +const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30); + +pub struct TableSyncer<F: TableSchema, R: TableReplication> { + table: Arc<Table<F, R>>, + todo: Mutex<SyncTodo>, + cache: Vec<Mutex<BTreeMap<SyncRange, RangeChecksumCache>>>, +} + +#[derive(Serialize, Deserialize)] +pub enum SyncRPC { + GetRootChecksumRange(Hash, Hash), + RootChecksumRange(SyncRange), + Checksums(Vec<RangeChecksum>, bool), + Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>), +} + +pub struct SyncTodo { + todo: Vec<TodoPartition>, +} + +#[derive(Debug, Clone)] +struct TodoPartition { + begin: Hash, + end: Hash, + retain: bool, +} + +// A SyncRange defines a query on the dataset stored by a node, in the following way: +// - all items whose key are >= `begin` +// - stopping at the first item whose key hash has at least `level` leading zero bytes (excluded) +// - except if the first item of the range has such many leading zero bytes +// - and stopping at `end` (excluded) if such an item is not found +// The checksum itself does not store all of the items in the database, only the hashes of the "sub-ranges" +// i.e. of ranges of level `level-1` that cover the same range +// (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin) +// See RangeChecksum for the struct that stores this information. +#[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] +pub struct SyncRange { + begin: Vec<u8>, + end: Vec<u8>, + level: usize, +} + +impl std::cmp::PartialOrd for SyncRange { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} +impl std::cmp::Ord for SyncRange { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.begin + .cmp(&other.begin) + .then(self.level.cmp(&other.level)) + .then(self.end.cmp(&other.end)) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RangeChecksum { + bounds: SyncRange, + children: Vec<(SyncRange, Hash)>, + found_limit: Option<Vec<u8>>, + + #[serde(skip, default = "std::time::Instant::now")] + time: Instant, +} + +#[derive(Debug, Clone)] +pub struct RangeChecksumCache { + hash: Option<Hash>, // None if no children + found_limit: Option<Vec<u8>>, + time: Instant, +} + +impl<F, R> TableSyncer<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + pub async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { + let todo = SyncTodo { todo: Vec::new() }; + let syncer = Arc::new(TableSyncer { + table: table.clone(), + todo: Mutex::new(todo), + cache: (0..MAX_DEPTH) + .map(|_| Mutex::new(BTreeMap::new())) + .collect::<Vec<_>>(), + }); + + let (busy_tx, busy_rx) = mpsc::unbounded_channel(); + + let s1 = syncer.clone(); + table + .system + .background + .spawn_worker( + format!("table sync watcher for {}", table.name), + move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), + ) + .await; + + let s2 = syncer.clone(); + table + .system + .background + .spawn_worker( + format!("table syncer for {}", table.name), + move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), + ) + .await; + + let s3 = syncer.clone(); + tokio::spawn(async move { + tokio::time::delay_for(Duration::from_secs(20)).await; + s3.add_full_scan().await; + }); + + syncer + } + + async fn watcher_task( + self: Arc<Self>, + mut must_exit: watch::Receiver<bool>, + mut busy_rx: mpsc::UnboundedReceiver<bool>, + ) -> Result<(), Error> { + let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone(); + let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone(); + let mut nothing_to_do_since = Some(Instant::now()); + + while !*must_exit.borrow() { + let s_ring_recv = ring_recv.recv().fuse(); + let s_busy = busy_rx.recv().fuse(); + let s_must_exit = must_exit.recv().fuse(); + let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse(); + pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout); + + select! { + new_ring_r = s_ring_recv => { + if let Some(new_ring) = new_ring_r { + debug!("({}) Adding ring difference to syncer todo list", self.table.name); + self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring); + prev_ring = new_ring; + } + } + busy_opt = s_busy => { + if let Some(busy) = busy_opt { + if busy { + nothing_to_do_since = None; + } else { + if nothing_to_do_since.is_none() { + nothing_to_do_since = Some(Instant::now()); + } + } + } + } + must_exit_v = s_must_exit => { + if must_exit_v.unwrap_or(false) { + break; + } + } + _ = s_timeout => { + if nothing_to_do_since.map(|t| Instant::now() - t >= SCAN_INTERVAL).unwrap_or(false) { + nothing_to_do_since = None; + debug!("({}) Adding full scan to syncer todo list", self.table.name); + self.add_full_scan().await; + } + } + } + } + Ok(()) + } + + pub async fn add_full_scan(&self) { + self.todo.lock().await.add_full_scan(&self.table); + } + + async fn syncer_task( + self: Arc<Self>, + mut must_exit: watch::Receiver<bool>, + busy_tx: mpsc::UnboundedSender<bool>, + ) -> Result<(), Error> { + while !*must_exit.borrow() { + if let Some(partition) = self.todo.lock().await.pop_task() { + busy_tx.send(true)?; + let res = self + .clone() + .sync_partition(&partition, &mut must_exit) + .await; + if let Err(e) = res { + warn!( + "({}) Error while syncing {:?}: {}", + self.table.name, partition, e + ); + } + } else { + busy_tx.send(false)?; + tokio::time::delay_for(Duration::from_secs(1)).await; + } + } + Ok(()) + } + + async fn sync_partition( + self: Arc<Self>, + partition: &TodoPartition, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<(), Error> { + let my_id = self.table.system.id; + let nodes = self + .table + .replication + .write_nodes(&partition.begin, &self.table.system) + .into_iter() + .filter(|node| *node != my_id) + .collect::<Vec<_>>(); + + debug!( + "({}) Preparing to sync {:?} with {:?}...", + self.table.name, partition, nodes + ); + let root_cks = self + .root_checksum(&partition.begin, &partition.end, must_exit) + .await?; + + let mut sync_futures = nodes + .iter() + .map(|node| { + self.clone().do_sync_with( + partition.clone(), + root_cks.clone(), + *node, + partition.retain, + must_exit.clone(), + ) + }) + .collect::<FuturesUnordered<_>>(); + + let mut n_errors = 0; + while let Some(r) = sync_futures.next().await { + if let Err(e) = r { + n_errors += 1; + warn!("({}) Sync error: {}", self.table.name, e); + } + } + if n_errors > self.table.replication.max_write_errors() { + return Err(Error::Message(format!( + "Sync failed with too many nodes (should have been: {:?}).", + nodes + ))); + } + + if !partition.retain { + self.table + .delete_range(&partition.begin, &partition.end) + .await?; + } + + Ok(()) + } + + async fn root_checksum( + self: &Arc<Self>, + begin: &Hash, + end: &Hash, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<RangeChecksum, Error> { + for i in 1..MAX_DEPTH { + let rc = self + .range_checksum( + &SyncRange { + begin: begin.to_vec(), + end: end.to_vec(), + level: i, + }, + must_exit, + ) + .await?; + if rc.found_limit.is_none() { + return Ok(rc); + } + } + Err(Error::Message(format!( + "Unable to compute root checksum (this should never happen)" + ))) + } + + async fn range_checksum( + self: &Arc<Self>, + range: &SyncRange, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<RangeChecksum, Error> { + assert!(range.level != 0); + + if range.level == 1 { + let mut children = vec![]; + for item in self + .table + .store + .range(range.begin.clone()..range.end.clone()) + { + let (key, value) = item?; + let key_hash = hash(&key[..]); + if children.len() > 0 + && key_hash.as_slice()[0..range.level] + .iter() + .all(|x| *x == 0u8) + { + return Ok(RangeChecksum { + bounds: range.clone(), + children, + found_limit: Some(key.to_vec()), + time: Instant::now(), + }); + } + let item_range = SyncRange { + begin: key.to_vec(), + end: vec![], + level: 0, + }; + children.push((item_range, hash(&value[..]))); + } + Ok(RangeChecksum { + bounds: range.clone(), + children, + found_limit: None, + time: Instant::now(), + }) + } else { + let mut children = vec![]; + let mut sub_range = SyncRange { + begin: range.begin.clone(), + end: range.end.clone(), + level: range.level - 1, + }; + let mut time = Instant::now(); + while !*must_exit.borrow() { + let sub_ck = self + .range_checksum_cached_hash(&sub_range, must_exit) + .await?; + + if let Some(hash) = sub_ck.hash { + children.push((sub_range.clone(), hash)); + if sub_ck.time < time { + time = sub_ck.time; + } + } + + if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() { + return Ok(RangeChecksum { + bounds: range.clone(), + children, + found_limit: None, + time, + }); + } + let found_limit = sub_ck.found_limit.unwrap(); + + let actual_limit_hash = hash(&found_limit[..]); + if actual_limit_hash.as_slice()[0..range.level] + .iter() + .all(|x| *x == 0u8) + { + return Ok(RangeChecksum { + bounds: range.clone(), + children, + found_limit: Some(found_limit.clone()), + time, + }); + } + + sub_range.begin = found_limit; + } + Err(Error::Message(format!("Exiting."))) + } + } + + fn range_checksum_cached_hash<'a>( + self: &'a Arc<Self>, + range: &'a SyncRange, + must_exit: &'a mut watch::Receiver<bool>, + ) -> BoxFuture<'a, Result<RangeChecksumCache, Error>> { + async move { + let mut cache = self.cache[range.level].lock().await; + if let Some(v) = cache.get(&range) { + if Instant::now() - v.time < CHECKSUM_CACHE_TIMEOUT { + return Ok(v.clone()); + } + } + cache.remove(&range); + drop(cache); + + let v = self.range_checksum(&range, must_exit).await?; + trace!( + "({}) New checksum calculated for {}-{}/{}, {} children", + self.table.name, + hex::encode(&range.begin) + .chars() + .take(16) + .collect::<String>(), + hex::encode(&range.end).chars().take(16).collect::<String>(), + range.level, + v.children.len() + ); + + let hash = if v.children.len() > 0 { + Some(hash(&rmp_to_vec_all_named(&v)?[..])) + } else { + None + }; + let cache_entry = RangeChecksumCache { + hash, + found_limit: v.found_limit, + time: v.time, + }; + + let mut cache = self.cache[range.level].lock().await; + cache.insert(range.clone(), cache_entry.clone()); + Ok(cache_entry) + } + .boxed() + } + + async fn do_sync_with( + self: Arc<Self>, + partition: TodoPartition, + root_ck: RangeChecksum, + who: UUID, + retain: bool, + mut must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { + let mut todo = VecDeque::new(); + + // If their root checksum has level > than us, use that as a reference + let root_cks_resp = self + .table + .rpc_client + .call( + who, + TableRPC::<F>::SyncRPC(SyncRPC::GetRootChecksumRange( + partition.begin.clone(), + partition.end.clone(), + )), + TABLE_SYNC_RPC_TIMEOUT, + ) + .await?; + if let TableRPC::<F>::SyncRPC(SyncRPC::RootChecksumRange(range)) = root_cks_resp { + if range.level > root_ck.bounds.level { + let their_root_range_ck = self.range_checksum(&range, &mut must_exit).await?; + todo.push_back(their_root_range_ck); + } else { + todo.push_back(root_ck); + } + } else { + return Err(Error::BadRequest(format!( + "Invalid respone to GetRootChecksumRange RPC: {}", + debug_serialize(root_cks_resp) + ))); + } + + while !todo.is_empty() && !*must_exit.borrow() { + let total_children = todo.iter().map(|x| x.children.len()).fold(0, |x, y| x + y); + trace!( + "({}) Sync with {:?}: {} ({}) remaining", + self.table.name, + who, + todo.len(), + total_children + ); + + let step_size = std::cmp::min(16, todo.len()); + let step = todo.drain(..step_size).collect::<Vec<_>>(); + + let rpc_resp = self + .table + .rpc_client + .call( + who, + TableRPC::<F>::SyncRPC(SyncRPC::Checksums(step, retain)), + TABLE_SYNC_RPC_TIMEOUT, + ) + .await?; + if let TableRPC::<F>::SyncRPC(SyncRPC::Difference(mut diff_ranges, diff_items)) = + rpc_resp + { + if diff_ranges.len() > 0 || diff_items.len() > 0 { + info!( + "({}) Sync with {:?}: difference {} ranges, {} items", + self.table.name, + who, + diff_ranges.len(), + diff_items.len() + ); + } + let mut items_to_send = vec![]; + for differing in diff_ranges.drain(..) { + if differing.level == 0 { + items_to_send.push(differing.begin); + } else { + let checksum = self.range_checksum(&differing, &mut must_exit).await?; + todo.push_back(checksum); + } + } + if retain && diff_items.len() > 0 { + self.table.handle_update(&diff_items[..]).await?; + } + if items_to_send.len() > 0 { + self.send_items(who, items_to_send).await?; + } + } else { + return Err(Error::BadRequest(format!( + "Unexpected response to sync RPC checksums: {}", + debug_serialize(&rpc_resp) + ))); + } + } + Ok(()) + } + + async fn send_items(&self, who: UUID, item_list: Vec<Vec<u8>>) -> Result<(), Error> { + info!( + "({}) Sending {} items to {:?}", + self.table.name, + item_list.len(), + who + ); + + let mut values = vec![]; + for item in item_list.iter() { + if let Some(v) = self.table.store.get(&item[..])? { + values.push(Arc::new(ByteBuf::from(v.as_ref()))); + } + } + let rpc_resp = self + .table + .rpc_client + .call(who, TableRPC::<F>::Update(values), TABLE_SYNC_RPC_TIMEOUT) + .await?; + if let TableRPC::<F>::Ok = rpc_resp { + Ok(()) + } else { + Err(Error::Message(format!( + "Unexpected response to RPC Update: {}", + debug_serialize(&rpc_resp) + ))) + } + } + + pub async fn handle_rpc( + self: &Arc<Self>, + message: &SyncRPC, + mut must_exit: watch::Receiver<bool>, + ) -> Result<SyncRPC, Error> { + match message { + SyncRPC::GetRootChecksumRange(begin, end) => { + let root_cks = self.root_checksum(&begin, &end, &mut must_exit).await?; + Ok(SyncRPC::RootChecksumRange(root_cks.bounds)) + } + SyncRPC::Checksums(checksums, retain) => { + self.handle_checksums_rpc(&checksums[..], *retain, &mut must_exit) + .await + } + _ => Err(Error::Message(format!("Unexpected sync RPC"))), + } + } + + async fn handle_checksums_rpc( + self: &Arc<Self>, + checksums: &[RangeChecksum], + retain: bool, + must_exit: &mut watch::Receiver<bool>, + ) -> Result<SyncRPC, Error> { + let mut ret_ranges = vec![]; + let mut ret_items = vec![]; + + for their_ckr in checksums.iter() { + let our_ckr = self.range_checksum(&their_ckr.bounds, must_exit).await?; + for (their_range, their_hash) in their_ckr.children.iter() { + let differs = match our_ckr + .children + .binary_search_by(|(our_range, _)| our_range.cmp(&their_range)) + { + Err(_) => { + if their_range.level >= 1 { + let cached_hash = self + .range_checksum_cached_hash(&their_range, must_exit) + .await?; + cached_hash.hash.map(|h| h != *their_hash).unwrap_or(true) + } else { + true + } + } + Ok(i) => our_ckr.children[i].1 != *their_hash, + }; + if differs { + ret_ranges.push(their_range.clone()); + if retain && their_range.level == 0 { + if let Some(item_bytes) = + self.table.store.get(their_range.begin.as_slice())? + { + ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); + } + } + } + } + for (our_range, _hash) in our_ckr.children.iter() { + if let Some(their_found_limit) = &their_ckr.found_limit { + if our_range.begin.as_slice() > their_found_limit.as_slice() { + break; + } + } + + let not_present = our_ckr + .children + .binary_search_by(|(their_range, _)| their_range.cmp(&our_range)) + .is_err(); + if not_present { + if our_range.level > 0 { + ret_ranges.push(our_range.clone()); + } + if retain && our_range.level == 0 { + if let Some(item_bytes) = + self.table.store.get(our_range.begin.as_slice())? + { + ret_items.push(Arc::new(ByteBuf::from(item_bytes.to_vec()))); + } + } + } + } + } + let n_checksums = checksums + .iter() + .map(|x| x.children.len()) + .fold(0, |x, y| x + y); + if ret_ranges.len() > 0 || ret_items.len() > 0 { + trace!( + "({}) Checksum comparison RPC: {} different + {} items for {} received", + self.table.name, + ret_ranges.len(), + ret_items.len(), + n_checksums + ); + } + Ok(SyncRPC::Difference(ret_ranges, ret_items)) + } + + pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> { + for i in 1..MAX_DEPTH { + let needle = SyncRange { + begin: item_key.to_vec(), + end: vec![], + level: i, + }; + let mut cache = self.cache[i].lock().await; + if let Some(cache_entry) = cache.range(..=needle).rev().next() { + if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key { + let index = cache_entry.0.clone(); + drop(cache_entry); + cache.remove(&index); + } + } + } + Ok(()) + } +} + +impl SyncTodo { + fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) { + let my_id = table.system.id; + + self.todo.clear(); + + let ring = table.system.ring.borrow().clone(); + let split_points = table.replication.split_points(&ring); + + for i in 0..split_points.len() - 1 { + let begin = split_points[i]; + let end = split_points[i + 1]; + let nodes = table.replication.replication_nodes(&begin, &ring); + + let retain = nodes.contains(&my_id); + if !retain { + // Check if we have some data to send, otherwise skip + if table.store.range(begin..end).next().is_none() { + continue; + } + } + + self.todo.push(TodoPartition { begin, end, retain }); + } + } + + fn add_ring_difference<F: TableSchema, R: TableReplication>( + &mut self, + table: &Table<F, R>, + old_ring: &Ring, + new_ring: &Ring, + ) { + let my_id = table.system.id; + + // If it is us who are entering or leaving the system, + // initiate a full sync instead of incremental sync + if old_ring.config.members.contains_key(&my_id) + != new_ring.config.members.contains_key(&my_id) + { + self.add_full_scan(table); + return; + } + + let mut all_points = None + .into_iter() + .chain(table.replication.split_points(old_ring).drain(..)) + .chain(table.replication.split_points(new_ring).drain(..)) + .chain(self.todo.iter().map(|x| x.begin)) + .chain(self.todo.iter().map(|x| x.end)) + .collect::<Vec<_>>(); + all_points.sort(); + all_points.dedup(); + + let mut old_todo = std::mem::replace(&mut self.todo, vec![]); + old_todo.sort_by(|x, y| x.begin.cmp(&y.begin)); + let mut new_todo = vec![]; + + for i in 0..all_points.len() - 1 { + let begin = all_points[i]; + let end = all_points[i + 1]; + let was_ours = table + .replication + .replication_nodes(&begin, &old_ring) + .contains(&my_id); + let is_ours = table + .replication + .replication_nodes(&begin, &new_ring) + .contains(&my_id); + + let was_todo = match old_todo.binary_search_by(|x| x.begin.cmp(&begin)) { + Ok(_) => true, + Err(j) => { + (j > 0 && old_todo[j - 1].begin < end && begin < old_todo[j - 1].end) + || (j < old_todo.len() + && old_todo[j].begin < end && begin < old_todo[j].end) + } + }; + if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { + new_todo.push(TodoPartition { + begin, + end, + retain: is_ours, + }); + } + } + + self.todo = new_todo; + } + + fn pop_task(&mut self) -> Option<TodoPartition> { + if self.todo.is_empty() { + return None; + } + + let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len()); + if i == self.todo.len() - 1 { + self.todo.pop() + } else { + let replacement = self.todo.pop().unwrap(); + let ret = std::mem::replace(&mut self.todo[i], replacement); + Some(ret) + } + } +} |