diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 76 |
1 files changed, 41 insertions, 35 deletions
diff --git a/src/table.rs b/src/table.rs index f45f48c2..e524f821 100644 --- a/src/table.rs +++ b/src/table.rs @@ -2,7 +2,6 @@ use std::time::Duration; use std::sync::Arc; use serde::{Serialize, Deserialize}; use async_trait::async_trait; -use reduce::Reduce; use crate::error::Error; use crate::proto::*; @@ -52,7 +51,10 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> { #[derive(Serialize, Deserialize)] pub enum TableRPC<F: TableFormat> { Ok, - Read(Vec<F::K>), + + ReadEntry(F::K, Vec<u8>), + ReadEntryResponse(Option<F::V>), + Update(Vec<(F::K, F::V)>), } @@ -62,18 +64,19 @@ pub struct Partition { pub other_nodes: Vec<UUID>, } -pub trait KeyHash { +pub trait TableKey { fn hash(&self) -> Hash; } -pub trait ValueMerge { +pub trait TableValue { + fn sort_key(&self) -> Vec<u8>; fn merge(&mut self, other: &Self); } #[async_trait] pub trait TableFormat: Send + Sync { - type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync; - type V: Clone + Serialize + for<'de> Deserialize<'de> + ValueMerge + Send + Sync; + type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + TableKey + Send + Sync; + type V: Clone + Serialize + for<'de> Deserialize<'de> + TableValue + Send + Sync; async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V); } @@ -109,33 +112,34 @@ impl<F: TableFormat + 'static> Table<F> { Ok(()) } - pub async fn get(&self, k: &F::K) -> Result<F::V, Error> { + pub async fn get(&self, k: &F::K, sort_key: &[u8]) -> Result<Option<F::V>, Error> { let hash = k.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); - let rpc = &TableRPC::<F>::Read(vec![k.clone()]); + let rpc = &TableRPC::<F>::ReadEntry(k.clone(), sort_key.to_vec()); let resps = self.rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) .await?; - let mut values = vec![]; + let mut ret = None; for resp in resps { - if let TableRPC::Update(mut pairs) = resp { - if pairs.len() == 0 { - continue; - } else if pairs.len() == 1 && pairs[0].0 == *k { - values.push(pairs.drain(..).next().unwrap().1); - continue; + if let TableRPC::ReadEntryResponse(value) = resp { + if let Some(v) = value { + ret = match ret { + None => Some(v), + Some(mut x) => { + x.merge(&v); + Some(x) + } + } } + } else { + return Err(Error::Message(format!("Invalid return value to read"))); } - return Err(Error::Message(format!("Invalid return value to read"))); } - values.drain(..) - .reduce(|mut x, y| { x.merge(&y); x }) - .map(Ok) - .unwrap_or(Err(Error::NotFound)) + Ok(ret) } async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> { @@ -163,33 +167,35 @@ impl<F: TableFormat + 'static> Table<F> { async fn handle(&self, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { match msg { - TableRPC::Read(keys) => { - Ok(TableRPC::Update(self.handle_read(&keys)?)) + TableRPC::ReadEntry(key, sort_key) => { + let value = self.handle_read_entry(&key, &sort_key)?; + Ok(TableRPC::ReadEntryResponse(value)) } TableRPC::Update(pairs) => { - self.handle_write(pairs).await?; + self.handle_update(pairs).await?; Ok(TableRPC::Ok) } _ => Err(Error::RPCError(format!("Unexpected table RPC"))) } } - fn handle_read(&self, keys: &[F::K]) -> Result<Vec<(F::K, F::V)>, Error> { - let mut results = vec![]; - for key in keys.iter() { - if let Some(bytes) = self.store.get(&key.hash())? { - let pair = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(bytes.as_ref())?; - results.push(pair); - } + fn handle_read_entry(&self, key: &F::K, sort_key: &[u8]) -> Result<Option<F::V>, Error> { + let mut tree_key = key.hash().to_vec(); + tree_key.extend(sort_key); + if let Some(bytes) = self.store.get(&tree_key)? { + let (_, v) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&bytes)?; + Ok(Some(v)) + } else { + Ok(None) } - Ok(results) } - async fn handle_write(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> { + async fn handle_update(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> { for mut pair in pairs.drain(..) { - let hash = pair.0.hash(); + let mut tree_key = pair.0.hash().to_vec(); + tree_key.extend(pair.1.sort_key()); - let old_val = match self.store.get(&hash)? { + let old_val = match self.store.get(&tree_key)? { Some(prev_bytes) => { let (_, old_val) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&prev_bytes)?; pair.1.merge(&old_val); @@ -199,7 +205,7 @@ impl<F: TableFormat + 'static> Table<F> { }; let new_bytes = rmp_serde::encode::to_vec_named(&pair)?; - self.store.insert(&hash, new_bytes)?; + self.store.insert(&tree_key, new_bytes)?; self.instance.updated(&pair.0, old_val.as_ref(), &pair.1).await; } |