diff options
Diffstat (limited to 'src/table_sync.rs')
-rw-r--r-- | src/table_sync.rs | 204 |
1 files changed, 204 insertions, 0 deletions
diff --git a/src/table_sync.rs b/src/table_sync.rs new file mode 100644 index 00000000..5097c1b0 --- /dev/null +++ b/src/table_sync.rs @@ -0,0 +1,204 @@ +use rand::Rng; +use std::sync::Arc; +use std::time::Duration; +use std::collections::BTreeSet; + +use futures::{pin_mut, select}; +use futures_util::future::*; +use tokio::sync::watch; +use tokio::sync::Mutex; + +use crate::data::*; +use crate::error::Error; +use crate::membership::{Ring, System}; +use crate::table::*; + +const SCAN_INTERVAL: Duration = Duration::from_secs(3600); + +pub struct TableSyncer<F: TableSchema> { + pub table: Arc<Table<F>>, + + pub todo: Mutex<SyncTodo>, +} + +pub struct SyncTodo { + pub todo: Vec<Partition>, +} + +#[derive(Debug, Clone)] +pub struct Partition { + pub begin: Hash, + pub end: Hash, +} + +impl<F: TableSchema + 'static> TableSyncer<F> { + pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> { + let todo = SyncTodo { todo: Vec::new() }; + let syncer = Arc::new(TableSyncer { + table: table.clone(), + todo: Mutex::new(todo), + }); + + let s1 = syncer.clone(); + table + .system + .background + .spawn_worker(move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit)) + .await; + + let s2 = syncer.clone(); + table + .system + .background + .spawn_worker(move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit)) + .await; + + syncer + } + + async fn watcher_task( + self: Arc<Self>, + mut must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { + self.todo.lock().await.add_full_scan(&self.table); + let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); + let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone(); + let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone(); + + loop { + let s_ring_recv = ring_recv.recv().fuse(); + let s_must_exit = must_exit.recv().fuse(); + pin_mut!(s_ring_recv, s_must_exit); + + select! { + _ = next_full_scan => { + next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse(); + self.todo.lock().await.add_full_scan(&self.table); + } + new_ring_r = s_ring_recv => { + if let Some(new_ring) = new_ring_r { + self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring); + prev_ring = new_ring; + } + } + must_exit_v = s_must_exit => { + if must_exit_v.unwrap_or(false) { + return Ok(()) + } + } + } + } + } + + async fn syncer_task( + self: Arc<Self>, + mut must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { + loop { + let s_pop_task = self.pop_task().fuse(); + let s_must_exit = must_exit.recv().fuse(); + pin_mut!(s_must_exit, s_pop_task); + + select! { + task = s_pop_task => { + if let Some(partition) = task { + let res = self.sync_partition(&partition).await; + if let Err(e) = res { + eprintln!("Error while syncing {:?}: {}", partition, e); + } + } else { + tokio::time::delay_for(Duration::from_secs(1)).await; + } + } + must_exit_v = s_must_exit => { + if must_exit_v.unwrap_or(false) { + return Ok(()) + } + } + } + } + } + + async fn pop_task(&self) -> Option<Partition> { + self.todo.lock().await.pop_task() + } + + async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> { + unimplemented!() + } +} + +impl SyncTodo { + fn add_full_scan<F: TableSchema>(&mut self, table: &Table<F>) { + let my_id = table.system.id.clone(); + + self.todo.clear(); + + let ring: Arc<Ring> = table.system.ring.borrow().clone(); + for i in 0..ring.ring.len() { + let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor); + let begin = ring.ring[i].location.clone(); + + if i == ring.ring.len() - 1 { + let end = ring.ring[0].location.clone(); + self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id); + self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id); + } else { + let end = ring.ring[i + 1].location.clone(); + self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id); + } + } + } + + fn add_full_scan_aux<F: TableSchema>( + &mut self, + table: &Table<F>, + begin: Hash, + end: Hash, + nodes: &[UUID], + my_id: &UUID, + ) { + if !nodes.contains(my_id) { + // Check if we have some data to send, otherwise skip + if table + .store + .range(begin.clone()..end.clone()) + .next() + .is_none() + {} + } + + self.todo.push(Partition { begin, end }); + } + + fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) { + let old_ring = ring_points(old); + let new_ring = ring_points(new); + unimplemented!() + } + + fn pop_task(&mut self) -> Option<Partition> { + if self.todo.is_empty() { + return None; + } + + let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len()); + if i == self.todo.len() - 1 { + self.todo.pop() + } else { + let replacement = self.todo.pop().unwrap(); + let ret = std::mem::replace(&mut self.todo[i], replacement); + Some(ret) + } + } +} + +fn ring_points(ring: &Ring) -> BTreeSet<Hash> { + let mut ret = BTreeSet::new(); + ret.insert([0u8; 32].into()); + ret.insert([0xFFu8; 32].into()); + for i in 0..ring.ring.len() { + ret.insert(ring.ring[i].location.clone()); + } + ret +} |