From cc580da0aef95bcb94bd2ce602258d0d18388969 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Apr 2020 23:01:49 +0200 Subject: Some work --- src/table.rs | 122 +++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 107 insertions(+), 15 deletions(-) (limited to 'src/table.rs') diff --git a/src/table.rs b/src/table.rs index 5c8e93a5..f45f48c2 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,8 +1,8 @@ -use std::marker::PhantomData; use std::time::Duration; use std::sync::Arc; use serde::{Serialize, Deserialize}; use async_trait::async_trait; +use reduce::Reduce; use crate::error::Error; use crate::proto::*; @@ -12,7 +12,7 @@ use crate::rpc_client::*; pub struct Table { - phantom: PhantomData, + pub instance: F, pub name: String, @@ -49,9 +49,11 @@ impl TableRpcHandler for TableRpcHandlerAdapter { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub enum TableRPC { - Update(F::K, F::V), + Ok, + Read(Vec), + Update(Vec<(F::K, F::V)>), } pub struct Partition { @@ -70,18 +72,18 @@ pub trait ValueMerge { #[async_trait] pub trait TableFormat: Send + Sync { - type K: Clone + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync; + type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync; type V: Clone + Serialize + for<'de> Deserialize<'de> + ValueMerge + Send + Sync; async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V); } impl Table { - pub fn new(system: Arc, db: &sled::Db, name: String, param: TableReplicationParams) -> Self { + pub fn new(instance: F, system: Arc, db: &sled::Db, name: String, param: TableReplicationParams) -> Self { let store = db.open_tree(&name) .expect("Unable to open DB tree"); Self{ - phantom: PhantomData::default(), + instance, name, system, store, @@ -95,22 +97,112 @@ impl Table { } pub async fn insert(&self, k: &F::K, v: &F::V) -> Result<(), Error> { - unimplemented!(); + let hash = k.hash(); + let who = self.system.members.read().await + .walk_ring(&hash, self.param.replication_factor); + + let rpc = &TableRPC::::Update(vec![(k.clone(), v.clone())]); + + self.rpc_try_call_many(&who[..], + &rpc, + self.param.write_quorum).await?; + Ok(()) + } + pub async fn get(&self, k: &F::K) -> Result { let hash = k.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); - let msg = rmp_serde::encode::to_vec_named(&TableRPC::::Update(k.clone(), v.clone()))?; - rpc_try_call_many(self.system.clone(), - &who[..], - &Message::TableRPC(self.name.to_string(), msg), - self.param.write_quorum, + let rpc = &TableRPC::::Read(vec![k.clone()]); + let resps = self.rpc_try_call_many(&who[..], + &rpc, + self.param.read_quorum) + .await?; + + let mut values = vec![]; + for resp in resps { + if let TableRPC::Update(mut pairs) = resp { + if pairs.len() == 0 { + continue; + } else if pairs.len() == 1 && pairs[0].0 == *k { + values.push(pairs.drain(..).next().unwrap().1); + continue; + } + } + return Err(Error::Message(format!("Invalid return value to read"))); + } + values.drain(..) + .reduce(|mut x, y| { x.merge(&y); x }) + .map(Ok) + .unwrap_or(Err(Error::NotFound)) + } + + async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC, quorum: usize) -> Result>, Error> { + let rpc_bytes = rmp_serde::encode::to_vec_named(rpc)?; + let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); + + let resps = rpc_try_call_many(self.system.clone(), + who, + &rpc_msg, + quorum, self.param.timeout).await?; - Ok(()) + + let mut resps_vals = vec![]; + for resp in resps { + if let Message::TableRPC(tbl, rep_by) = &resp { + if *tbl == self.name { + resps_vals.push(rmp_serde::decode::from_read_ref(&rep_by)?); + continue; + } + } + return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) + } + Ok(resps_vals) } async fn handle(&self, msg: TableRPC) -> Result, Error> { - unimplemented!() + match msg { + TableRPC::Read(keys) => { + Ok(TableRPC::Update(self.handle_read(&keys)?)) + } + TableRPC::Update(pairs) => { + self.handle_write(pairs).await?; + Ok(TableRPC::Ok) + } + _ => Err(Error::RPCError(format!("Unexpected table RPC"))) + } + } + + fn handle_read(&self, keys: &[F::K]) -> Result, Error> { + let mut results = vec![]; + for key in keys.iter() { + if let Some(bytes) = self.store.get(&key.hash())? { + let pair = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(bytes.as_ref())?; + results.push(pair); + } + } + Ok(results) + } + + async fn handle_write(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> { + for mut pair in pairs.drain(..) { + let hash = pair.0.hash(); + + let old_val = match self.store.get(&hash)? { + Some(prev_bytes) => { + let (_, old_val) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&prev_bytes)?; + pair.1.merge(&old_val); + Some(old_val) + } + None => None + }; + + let new_bytes = rmp_serde::encode::to_vec_named(&pair)?; + self.store.insert(&hash, new_bytes)?; + + self.instance.updated(&pair.0, old_val.as_ref(), &pair.1).await; + } + Ok(()) } } -- cgit v1.2.3