diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-08 23:01:49 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-08 23:01:49 +0200 |
commit | cc580da0aef95bcb94bd2ce602258d0d18388969 (patch) | |
tree | c61148b75627dc27989d4ce39ded3b6124c86cfb /src/table.rs | |
parent | bacc76a057bcd90d61bfe3584bd3cdbadc748364 (diff) | |
download | garage-cc580da0aef95bcb94bd2ce602258d0d18388969.tar.gz garage-cc580da0aef95bcb94bd2ce602258d0d18388969.zip |
Some work
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 122 |
1 files changed, 107 insertions, 15 deletions
diff --git a/src/table.rs b/src/table.rs index 5c8e93a5..f45f48c2 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,8 +1,8 @@ -use std::marker::PhantomData; use std::time::Duration; use std::sync::Arc; use serde::{Serialize, Deserialize}; use async_trait::async_trait; +use reduce::Reduce; use crate::error::Error; use crate::proto::*; @@ -12,7 +12,7 @@ use crate::rpc_client::*; pub struct Table<F: TableFormat> { - phantom: PhantomData<F>, + pub instance: F, pub name: String, @@ -49,9 +49,11 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Serialize, Deserialize)] pub enum TableRPC<F: TableFormat> { - Update(F::K, F::V), + Ok, + Read(Vec<F::K>), + Update(Vec<(F::K, F::V)>), } pub struct Partition { @@ -70,18 +72,18 @@ pub trait ValueMerge { #[async_trait] pub trait TableFormat: Send + Sync { - type K: Clone + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync; + type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync; type V: Clone + Serialize + for<'de> Deserialize<'de> + ValueMerge + Send + Sync; async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V); } impl<F: TableFormat + 'static> Table<F> { - pub fn new(system: Arc<System>, db: &sled::Db, name: String, param: TableReplicationParams) -> Self { + pub fn new(instance: F, system: Arc<System>, db: &sled::Db, name: String, param: TableReplicationParams) -> Self { let store = db.open_tree(&name) .expect("Unable to open DB tree"); Self{ - phantom: PhantomData::default(), + instance, name, system, store, @@ -95,22 +97,112 @@ impl<F: TableFormat + 'static> Table<F> { } pub async fn insert(&self, k: &F::K, v: &F::V) -> Result<(), Error> { - unimplemented!(); + let hash = k.hash(); + let who = self.system.members.read().await + .walk_ring(&hash, self.param.replication_factor); + + let rpc = &TableRPC::<F>::Update(vec![(k.clone(), v.clone())]); + + self.rpc_try_call_many(&who[..], + &rpc, + self.param.write_quorum).await?; + Ok(()) + } + pub async fn get(&self, k: &F::K) -> Result<F::V, Error> { let hash = k.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); - let msg = rmp_serde::encode::to_vec_named(&TableRPC::<F>::Update(k.clone(), v.clone()))?; - rpc_try_call_many(self.system.clone(), - &who[..], - &Message::TableRPC(self.name.to_string(), msg), - self.param.write_quorum, + let rpc = &TableRPC::<F>::Read(vec![k.clone()]); + let resps = self.rpc_try_call_many(&who[..], + &rpc, + self.param.read_quorum) + .await?; + + let mut values = vec![]; + for resp in resps { + if let TableRPC::Update(mut pairs) = resp { + if pairs.len() == 0 { + continue; + } else if pairs.len() == 1 && pairs[0].0 == *k { + values.push(pairs.drain(..).next().unwrap().1); + continue; + } + } + return Err(Error::Message(format!("Invalid return value to read"))); + } + values.drain(..) + .reduce(|mut x, y| { x.merge(&y); x }) + .map(Ok) + .unwrap_or(Err(Error::NotFound)) + } + + async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> { + let rpc_bytes = rmp_serde::encode::to_vec_named(rpc)?; + let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); + + let resps = rpc_try_call_many(self.system.clone(), + who, + &rpc_msg, + quorum, self.param.timeout).await?; - Ok(()) + + let mut resps_vals = vec![]; + for resp in resps { + if let Message::TableRPC(tbl, rep_by) = &resp { + if *tbl == self.name { + resps_vals.push(rmp_serde::decode::from_read_ref(&rep_by)?); + continue; + } + } + return Err(Error::Message(format!("Invalid reply to TableRPC: {:?}", resp))) + } + Ok(resps_vals) } async fn handle(&self, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { - unimplemented!() + match msg { + TableRPC::Read(keys) => { + Ok(TableRPC::Update(self.handle_read(&keys)?)) + } + TableRPC::Update(pairs) => { + self.handle_write(pairs).await?; + Ok(TableRPC::Ok) + } + _ => Err(Error::RPCError(format!("Unexpected table RPC"))) + } + } + + fn handle_read(&self, keys: &[F::K]) -> Result<Vec<(F::K, F::V)>, Error> { + let mut results = vec![]; + for key in keys.iter() { + if let Some(bytes) = self.store.get(&key.hash())? { + let pair = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(bytes.as_ref())?; + results.push(pair); + } + } + Ok(results) + } + + async fn handle_write(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> { + for mut pair in pairs.drain(..) { + let hash = pair.0.hash(); + + let old_val = match self.store.get(&hash)? { + Some(prev_bytes) => { + let (_, old_val) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&prev_bytes)?; + pair.1.merge(&old_val); + Some(old_val) + } + None => None + }; + + let new_bytes = rmp_serde::encode::to_vec_named(&pair)?; + self.store.insert(&hash, new_bytes)?; + + self.instance.updated(&pair.0, old_val.as_ref(), &pair.1).await; + } + Ok(()) } } |