aboutsummaryrefslogblamecommitdiff
path: root/src/table.rs
blob: 5c8e93a54452bd88059725e7f0f9f023b7bc5eea (plain) (tree)



















































































































                                                                                                             
use std::marker::PhantomData;
use std::time::Duration;
use std::sync::Arc;
use serde::{Serialize, Deserialize};
use async_trait::async_trait;

use crate::error::Error;
use crate::proto::*;
use crate::data::*;
use crate::membership::System;
use crate::rpc_client::*;


pub struct Table<F: TableFormat> {
	phantom: PhantomData<F>,

	pub name: String,

	pub system: Arc<System>,
	pub store: sled::Tree,
	pub partitions: Vec<Partition>,

	pub param: TableReplicationParams,
}

#[derive(Clone)]
pub struct TableReplicationParams {
	pub replication_factor: usize,
	pub read_quorum: usize,
	pub write_quorum: usize,
	pub timeout: Duration,
}

#[async_trait]
pub trait TableRpcHandler {
	async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error>;
}

struct TableRpcHandlerAdapter<F: TableFormat> {
	table: Arc<Table<F>>,
}

#[async_trait]
impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> {
	async fn handle(&self, rpc: &[u8]) -> Result<Vec<u8>, Error> {
		let msg = rmp_serde::decode::from_read_ref::<_, TableRPC<F>>(rpc)?;
		let rep = self.table.handle(msg).await?;
		Ok(rmp_serde::encode::to_vec_named(&rep)?)
	}
}

#[derive(Debug, Serialize, Deserialize)]
pub enum TableRPC<F: TableFormat> {
	Update(F::K, F::V),
}

pub struct Partition {
	pub begin: Hash,
	pub end: Hash,
	pub other_nodes: Vec<UUID>,
}

pub trait KeyHash {
	fn hash(&self) -> Hash;
}

pub trait ValueMerge {
	fn merge(&mut self, other: &Self);
}

#[async_trait]
pub trait TableFormat: Send + Sync {
	type K: Clone + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync;
	type V: Clone + Serialize + for<'de> Deserialize<'de> + ValueMerge + Send + Sync;

	async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V);
}

impl<F: TableFormat + 'static> Table<F> {
	pub fn new(system: Arc<System>, db: &sled::Db, name: String, param: TableReplicationParams) -> Self {
		let store = db.open_tree(&name)
				.expect("Unable to open DB tree");
		Self{
			phantom: PhantomData::default(),
			name,
			system,
			store,
			partitions: Vec::new(),
			param,
		}
	}

	pub fn rpc_handler(self: Arc<Self>) -> Box<dyn TableRpcHandler + Send + Sync> {
		Box::new(TableRpcHandlerAdapter::<F>{ table: self })
	}

	pub async fn insert(&self, k: &F::K, v: &F::V) -> Result<(), Error> {
		unimplemented!();

		let hash = k.hash();
		let who = self.system.members.read().await
			.walk_ring(&hash, self.param.replication_factor);

		let msg = rmp_serde::encode::to_vec_named(&TableRPC::<F>::Update(k.clone(), v.clone()))?;
		rpc_try_call_many(self.system.clone(),
						  &who[..],
						  &Message::TableRPC(self.name.to_string(), msg),
						  self.param.write_quorum,
						  self.param.timeout).await?;
		Ok(())
	}

	async fn handle(&self, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> {
		unimplemented!()
	}
}