aboutsummaryrefslogtreecommitdiff
path: root/src/table_sync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r--src/table_sync.rs204
1 files changed, 204 insertions, 0 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs
new file mode 100644
index 00000000..5097c1b0
--- /dev/null
+++ b/src/table_sync.rs
@@ -0,0 +1,204 @@
+use rand::Rng;
+use std::sync::Arc;
+use std::time::Duration;
+use std::collections::BTreeSet;
+
+use futures::{pin_mut, select};
+use futures_util::future::*;
+use tokio::sync::watch;
+use tokio::sync::Mutex;
+
+use crate::data::*;
+use crate::error::Error;
+use crate::membership::{Ring, System};
+use crate::table::*;
+
+const SCAN_INTERVAL: Duration = Duration::from_secs(3600);
+
+pub struct TableSyncer<F: TableSchema> {
+ pub table: Arc<Table<F>>,
+
+ pub todo: Mutex<SyncTodo>,
+}
+
+pub struct SyncTodo {
+ pub todo: Vec<Partition>,
+}
+
+#[derive(Debug, Clone)]
+pub struct Partition {
+ pub begin: Hash,
+ pub end: Hash,
+}
+
+impl<F: TableSchema + 'static> TableSyncer<F> {
+ pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> {
+ let todo = SyncTodo { todo: Vec::new() };
+ let syncer = Arc::new(TableSyncer {
+ table: table.clone(),
+ todo: Mutex::new(todo),
+ });
+
+ let s1 = syncer.clone();
+ table
+ .system
+ .background
+ .spawn_worker(move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit))
+ .await;
+
+ let s2 = syncer.clone();
+ table
+ .system
+ .background
+ .spawn_worker(move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit))
+ .await;
+
+ syncer
+ }
+
+ async fn watcher_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ self.todo.lock().await.add_full_scan(&self.table);
+ let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
+ let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
+ let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();
+
+ loop {
+ let s_ring_recv = ring_recv.recv().fuse();
+ let s_must_exit = must_exit.recv().fuse();
+ pin_mut!(s_ring_recv, s_must_exit);
+
+ select! {
+ _ = next_full_scan => {
+ next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
+ self.todo.lock().await.add_full_scan(&self.table);
+ }
+ new_ring_r = s_ring_recv => {
+ if let Some(new_ring) = new_ring_r {
+ self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
+ prev_ring = new_ring;
+ }
+ }
+ must_exit_v = s_must_exit => {
+ if must_exit_v.unwrap_or(false) {
+ return Ok(())
+ }
+ }
+ }
+ }
+ }
+
+ async fn syncer_task(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ loop {
+ let s_pop_task = self.pop_task().fuse();
+ let s_must_exit = must_exit.recv().fuse();
+ pin_mut!(s_must_exit, s_pop_task);
+
+ select! {
+ task = s_pop_task => {
+ if let Some(partition) = task {
+ let res = self.sync_partition(&partition).await;
+ if let Err(e) = res {
+ eprintln!("Error while syncing {:?}: {}", partition, e);
+ }
+ } else {
+ tokio::time::delay_for(Duration::from_secs(1)).await;
+ }
+ }
+ must_exit_v = s_must_exit => {
+ if must_exit_v.unwrap_or(false) {
+ return Ok(())
+ }
+ }
+ }
+ }
+ }
+
+ async fn pop_task(&self) -> Option<Partition> {
+ self.todo.lock().await.pop_task()
+ }
+
+ async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> {
+ unimplemented!()
+ }
+}
+
+impl SyncTodo {
+ fn add_full_scan<F: TableSchema>(&mut self, table: &Table<F>) {
+ let my_id = table.system.id.clone();
+
+ self.todo.clear();
+
+ let ring: Arc<Ring> = table.system.ring.borrow().clone();
+ for i in 0..ring.ring.len() {
+ let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor);
+ let begin = ring.ring[i].location.clone();
+
+ if i == ring.ring.len() - 1 {
+ let end = ring.ring[0].location.clone();
+ self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id);
+ self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id);
+ } else {
+ let end = ring.ring[i + 1].location.clone();
+ self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id);
+ }
+ }
+ }
+
+ fn add_full_scan_aux<F: TableSchema>(
+ &mut self,
+ table: &Table<F>,
+ begin: Hash,
+ end: Hash,
+ nodes: &[UUID],
+ my_id: &UUID,
+ ) {
+ if !nodes.contains(my_id) {
+ // Check if we have some data to send, otherwise skip
+ if table
+ .store
+ .range(begin.clone()..end.clone())
+ .next()
+ .is_none()
+ {}
+ }
+
+ self.todo.push(Partition { begin, end });
+ }
+
+ fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) {
+ let old_ring = ring_points(old);
+ let new_ring = ring_points(new);
+ unimplemented!()
+ }
+
+ fn pop_task(&mut self) -> Option<Partition> {
+ if self.todo.is_empty() {
+ return None;
+ }
+
+ let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
+ if i == self.todo.len() - 1 {
+ self.todo.pop()
+ } else {
+ let replacement = self.todo.pop().unwrap();
+ let ret = std::mem::replace(&mut self.todo[i], replacement);
+ Some(ret)
+ }
+ }
+}
+
+fn ring_points(ring: &Ring) -> BTreeSet<Hash> {
+ let mut ret = BTreeSet::new();
+ ret.insert([0u8; 32].into());
+ ret.insert([0xFFu8; 32].into());
+ for i in 0..ring.ring.len() {
+ ret.insert(ring.ring[i].location.clone());
+ }
+ ret
+}