aboutsummaryrefslogblamecommitdiff
path: root/src/table_sync.rs
blob: 5097c1b0a69f411e67533fe862256525096d5eda (plain) (tree)











































































































































































































                                                                                                                               
use rand::Rng;
use std::sync::Arc;
use std::time::Duration;
use std::collections::BTreeSet;

use futures::{pin_mut, select};
use futures_util::future::*;
use tokio::sync::watch;
use tokio::sync::Mutex;

use crate::data::*;
use crate::error::Error;
use crate::membership::{Ring, System};
use crate::table::*;

const SCAN_INTERVAL: Duration = Duration::from_secs(3600);

pub struct TableSyncer<F: TableSchema> {
	pub table: Arc<Table<F>>,

	pub todo: Mutex<SyncTodo>,
}

pub struct SyncTodo {
	pub todo: Vec<Partition>,
}

#[derive(Debug, Clone)]
pub struct Partition {
	pub begin: Hash,
	pub end: Hash,
}

impl<F: TableSchema + 'static> TableSyncer<F> {
	pub async fn launch(table: Arc<Table<F>>) -> Arc<Self> {
		let todo = SyncTodo { todo: Vec::new() };
		let syncer = Arc::new(TableSyncer {
			table: table.clone(),
			todo: Mutex::new(todo),
		});

		let s1 = syncer.clone();
		table
			.system
			.background
			.spawn_worker(move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit))
			.await;

		let s2 = syncer.clone();
		table
			.system
			.background
			.spawn_worker(move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit))
			.await;

		syncer
	}

	async fn watcher_task(
		self: Arc<Self>,
		mut must_exit: watch::Receiver<bool>,
	) -> Result<(), Error> {
		self.todo.lock().await.add_full_scan(&self.table);
		let mut next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
		let mut prev_ring: Arc<Ring> = self.table.system.ring.borrow().clone();
		let mut ring_recv: watch::Receiver<Arc<Ring>> = self.table.system.ring.clone();

		loop {
			let s_ring_recv = ring_recv.recv().fuse();
			let s_must_exit = must_exit.recv().fuse();
			pin_mut!(s_ring_recv, s_must_exit);

			select! {
				_ = next_full_scan => {
					next_full_scan = tokio::time::delay_for(SCAN_INTERVAL).fuse();
					self.todo.lock().await.add_full_scan(&self.table);
				}
				new_ring_r = s_ring_recv => {
					if let Some(new_ring) = new_ring_r {
						self.todo.lock().await.add_ring_difference(&self.table, &prev_ring, &new_ring);
						prev_ring = new_ring;
					}
				}
				must_exit_v = s_must_exit => {
					if must_exit_v.unwrap_or(false) {
						return Ok(())
					}
				}
			}
		}
	}

	async fn syncer_task(
		self: Arc<Self>,
		mut must_exit: watch::Receiver<bool>,
	) -> Result<(), Error> {
		loop {
			let s_pop_task = self.pop_task().fuse();
			let s_must_exit = must_exit.recv().fuse();
			pin_mut!(s_must_exit, s_pop_task);

			select! {
				task = s_pop_task => {
					if let Some(partition) = task {
						let res = self.sync_partition(&partition).await;
						if let Err(e) = res {
							eprintln!("Error while syncing {:?}: {}", partition, e);
						}
					} else {
						tokio::time::delay_for(Duration::from_secs(1)).await;
					}
				}
				must_exit_v = s_must_exit => {
					if must_exit_v.unwrap_or(false) {
						return Ok(())
					}
				}
			}
		}
	}

	async fn pop_task(&self) -> Option<Partition> {
		self.todo.lock().await.pop_task()
	}

	async fn sync_partition(self: &Arc<Self>, partition: &Partition) -> Result<(), Error> {
		unimplemented!()
	}
}

impl SyncTodo {
	fn add_full_scan<F: TableSchema>(&mut self, table: &Table<F>) {
		let my_id = table.system.id.clone();

		self.todo.clear();

		let ring: Arc<Ring> = table.system.ring.borrow().clone();
		for i in 0..ring.ring.len() {
			let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor);
			let begin = ring.ring[i].location.clone();

			if i == ring.ring.len() - 1 {
				let end = ring.ring[0].location.clone();
				self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id);
				self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id);
			} else {
				let end = ring.ring[i + 1].location.clone();
				self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id);
			}
		}
	}

	fn add_full_scan_aux<F: TableSchema>(
		&mut self,
		table: &Table<F>,
		begin: Hash,
		end: Hash,
		nodes: &[UUID],
		my_id: &UUID,
	) {
		if !nodes.contains(my_id) {
			// Check if we have some data to send, otherwise skip
			if table
				.store
				.range(begin.clone()..end.clone())
				.next()
				.is_none()
			{}
		}

		self.todo.push(Partition { begin, end });
	}

	fn add_ring_difference<F: TableSchema>(&mut self, table: &Table<F>, old: &Ring, new: &Ring) {
		let old_ring = ring_points(old);
		let new_ring = ring_points(new);
		unimplemented!()
	}

	fn pop_task(&mut self) -> Option<Partition> {
		if self.todo.is_empty() {
			return None;
		}

		let i = rand::thread_rng().gen_range::<usize, _, _>(0, self.todo.len());
		if i == self.todo.len() - 1 {
			self.todo.pop()
		} else {
			let replacement = self.todo.pop().unwrap();
			let ret = std::mem::replace(&mut self.todo[i], replacement);
			Some(ret)
		}
	}
}

fn ring_points(ring: &Ring) -> BTreeSet<Hash> {
	let mut ret = BTreeSet::new();
	ret.insert([0u8; 32].into());
	ret.insert([0xFFu8; 32].into());
	for i in 0..ring.ring.len() {
		ret.insert(ring.ring[i].location.clone());
	}
	ret
}