diff options
-rw-r--r-- | TODO | 29 | ||||
-rw-r--r-- | src/data.rs | 3 | ||||
-rw-r--r-- | src/error.rs | 3 | ||||
-rw-r--r-- | src/table.rs | 76 | ||||
-rw-r--r-- | src/version_table.rs | 7 |
5 files changed, 78 insertions, 40 deletions
@@ -0,0 +1,29 @@ +Object table +------------ + + +Rename version table to object table +In value handle the different versions + +So that the table becomes (bucket, key) -> CRDT(list of versions) + +CRDT merge rule: +- keep one complete version (the one with the highest timestamp) +- keep all incomplete versions with timestamps higher than the complete version + +Cleanup rule: remove incomplete versions after a given delay (say 24h) + + +Block table +----------- + +Table is version_UUID -> BTreeMap<(offset, block hash)> OR Deleted (= CRDT top) + + +Block reference table +--------------------- + +Table is block_Hash + Sort key: version_UUID -> boolean (true when deleted) + +Since the hash key is the same as for the blocks themselves, +we can simply consider the updates to this table as events that increase/decrease a reference counter. diff --git a/src/data.rs b/src/data.rs index bbe9aa1d..f01d5394 100644 --- a/src/data.rs +++ b/src/data.rs @@ -67,6 +67,9 @@ impl FixedBytes32 { pub fn as_slice_mut(&mut self) -> &mut [u8] { &mut self.0[..] } + pub fn to_vec(&self) -> Vec<u8> { + self.0.to_vec() + } } pub type UUID = FixedBytes32; diff --git a/src/error.rs b/src/error.rs index 0cfafca3..1481234f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,9 +35,6 @@ pub enum Error { #[error(display = "{}", _0)] BadRequest(String), - #[error(display = "Entry not found")] - NotFound, - #[error(display = "{}", _0)] Message(String), } diff --git a/src/table.rs b/src/table.rs index f45f48c2..e524f821 100644 --- a/src/table.rs +++ b/src/table.rs @@ -2,7 +2,6 @@ use std::time::Duration; use std::sync::Arc; use serde::{Serialize, Deserialize}; use async_trait::async_trait; -use reduce::Reduce; use crate::error::Error; use crate::proto::*; @@ -52,7 +51,10 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> { #[derive(Serialize, Deserialize)] pub enum TableRPC<F: TableFormat> { Ok, - Read(Vec<F::K>), + + ReadEntry(F::K, Vec<u8>), + ReadEntryResponse(Option<F::V>), + Update(Vec<(F::K, F::V)>), } @@ -62,18 +64,19 @@ pub struct Partition { pub other_nodes: Vec<UUID>, } -pub trait KeyHash { +pub trait TableKey { fn hash(&self) -> Hash; } -pub trait ValueMerge { +pub trait TableValue { + fn sort_key(&self) -> Vec<u8>; fn merge(&mut self, other: &Self); } #[async_trait] pub trait TableFormat: Send + Sync { - type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + KeyHash + Send + Sync; - type V: Clone + Serialize + for<'de> Deserialize<'de> + ValueMerge + Send + Sync; + type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + TableKey + Send + Sync; + type V: Clone + Serialize + for<'de> Deserialize<'de> + TableValue + Send + Sync; async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V); } @@ -109,33 +112,34 @@ impl<F: TableFormat + 'static> Table<F> { Ok(()) } - pub async fn get(&self, k: &F::K) -> Result<F::V, Error> { + pub async fn get(&self, k: &F::K, sort_key: &[u8]) -> Result<Option<F::V>, Error> { let hash = k.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); - let rpc = &TableRPC::<F>::Read(vec![k.clone()]); + let rpc = &TableRPC::<F>::ReadEntry(k.clone(), sort_key.to_vec()); let resps = self.rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) .await?; - let mut values = vec![]; + let mut ret = None; for resp in resps { - if let TableRPC::Update(mut pairs) = resp { - if pairs.len() == 0 { - continue; - } else if pairs.len() == 1 && pairs[0].0 == *k { - values.push(pairs.drain(..).next().unwrap().1); - continue; + if let TableRPC::ReadEntryResponse(value) = resp { + if let Some(v) = value { + ret = match ret { + None => Some(v), + Some(mut x) => { + x.merge(&v); + Some(x) + } + } } + } else { + return Err(Error::Message(format!("Invalid return value to read"))); } - return Err(Error::Message(format!("Invalid return value to read"))); } - values.drain(..) - .reduce(|mut x, y| { x.merge(&y); x }) - .map(Ok) - .unwrap_or(Err(Error::NotFound)) + Ok(ret) } async fn rpc_try_call_many(&self, who: &[UUID], rpc: &TableRPC<F>, quorum: usize) -> Result<Vec<TableRPC<F>>, Error> { @@ -163,33 +167,35 @@ impl<F: TableFormat + 'static> Table<F> { async fn handle(&self, msg: TableRPC<F>) -> Result<TableRPC<F>, Error> { match msg { - TableRPC::Read(keys) => { - Ok(TableRPC::Update(self.handle_read(&keys)?)) + TableRPC::ReadEntry(key, sort_key) => { + let value = self.handle_read_entry(&key, &sort_key)?; + Ok(TableRPC::ReadEntryResponse(value)) } TableRPC::Update(pairs) => { - self.handle_write(pairs).await?; + self.handle_update(pairs).await?; Ok(TableRPC::Ok) } _ => Err(Error::RPCError(format!("Unexpected table RPC"))) } } - fn handle_read(&self, keys: &[F::K]) -> Result<Vec<(F::K, F::V)>, Error> { - let mut results = vec![]; - for key in keys.iter() { - if let Some(bytes) = self.store.get(&key.hash())? { - let pair = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(bytes.as_ref())?; - results.push(pair); - } + fn handle_read_entry(&self, key: &F::K, sort_key: &[u8]) -> Result<Option<F::V>, Error> { + let mut tree_key = key.hash().to_vec(); + tree_key.extend(sort_key); + if let Some(bytes) = self.store.get(&tree_key)? { + let (_, v) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&bytes)?; + Ok(Some(v)) + } else { + Ok(None) } - Ok(results) } - async fn handle_write(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> { + async fn handle_update(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> { for mut pair in pairs.drain(..) { - let hash = pair.0.hash(); + let mut tree_key = pair.0.hash().to_vec(); + tree_key.extend(pair.1.sort_key()); - let old_val = match self.store.get(&hash)? { + let old_val = match self.store.get(&tree_key)? { Some(prev_bytes) => { let (_, old_val) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&prev_bytes)?; pair.1.merge(&old_val); @@ -199,7 +205,7 @@ impl<F: TableFormat + 'static> Table<F> { }; let new_bytes = rmp_serde::encode::to_vec_named(&pair)?; - self.store.insert(&hash, new_bytes)?; + self.store.insert(&tree_key, new_bytes)?; self.instance.updated(&pair.0, old_val.as_ref(), &pair.1).await; } diff --git a/src/version_table.rs b/src/version_table.rs index 86086421..e8360cd1 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -37,13 +37,16 @@ pub struct VersionTable { pub garage: RwLock<Option<Arc<Garage>>>, } -impl KeyHash for VersionMetaKey { +impl TableKey for VersionMetaKey { fn hash(&self) -> Hash { hash(self.bucket.as_bytes()) } } -impl ValueMerge for VersionMetaValue { +impl TableValue for VersionMetaValue { + fn sort_key(&self) -> Vec<u8> { + vec![] + } fn merge(&mut self, other: &Self) { unimplemented!() } |