diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 100 |
1 files changed, 71 insertions, 29 deletions
diff --git a/src/table.rs b/src/table.rs index e524f821..df82e9c7 100644 --- a/src/table.rs +++ b/src/table.rs @@ -52,10 +52,10 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> { pub enum TableRPC<F: TableFormat> { Ok, - ReadEntry(F::K, Vec<u8>), - ReadEntryResponse(Option<F::V>), + ReadEntry(F::P, F::S), + ReadEntryResponse(Option<F::E>), - Update(Vec<(F::K, F::V)>), + Update(Vec<F::E>), } pub struct Partition { @@ -64,21 +64,59 @@ pub struct Partition { pub other_nodes: Vec<UUID>, } -pub trait TableKey { +pub trait PartitionKey: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync { fn hash(&self) -> Hash; } -pub trait TableValue { - fn sort_key(&self) -> Vec<u8>; +pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { + fn sort_key(&self) -> &[u8]; +} + +pub trait Entry<P: PartitionKey, S: SortKey>: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { + fn partition_key(&self) -> &P; + fn sort_key(&self) -> &S; + fn merge(&mut self, other: &Self); } +#[derive(Clone, Serialize, Deserialize)] +pub struct EmptySortKey; +impl SortKey for EmptySortKey { + fn sort_key(&self) -> &[u8] { + &[] + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct StringKey(String); +impl PartitionKey for StringKey { + fn hash(&self) -> Hash { + hash(self.0.as_bytes()) + } +} +impl SortKey for StringKey { + fn sort_key(&self) -> &[u8] { + self.0.as_bytes() + } +} +impl AsRef<str> for StringKey { + fn as_ref(&self) -> &str { + &self.0 + } +} +impl From<&str> for StringKey { + fn from(s: &str) -> StringKey { + StringKey(s.to_string()) + } +} + #[async_trait] pub trait TableFormat: Send + Sync { - type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + TableKey + Send + Sync; - type V: Clone + Serialize + for<'de> Deserialize<'de> + TableValue + Send + Sync; + type P: PartitionKey; + type S: SortKey; + type E: Entry<Self::P, Self::S>; - async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V); + async fn updated(&self, old: Option<&Self::E>, new: &Self::E); } impl<F: TableFormat + 'static> Table<F> { @@ -99,12 +137,12 @@ impl<F: TableFormat + 'static> Table<F> { Box::new(TableRpcHandlerAdapter::<F>{ table: self }) } - pub async fn insert(&self, k: &F::K, v: &F::V) -> Result<(), Error> { - let hash = k.hash(); + pub async fn insert(&self, e: &F::E) -> Result<(), Error> { + let hash = e.partition_key().hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); - let rpc = &TableRPC::<F>::Update(vec![(k.clone(), v.clone())]); + let rpc = &TableRPC::<F>::Update(vec![e.clone()]); self.rpc_try_call_many(&who[..], &rpc, @@ -112,12 +150,12 @@ impl<F: TableFormat + 'static> Table<F> { Ok(()) } - pub async fn get(&self, k: &F::K, sort_key: &[u8]) -> Result<Option<F::V>, Error> { - let hash = k.hash(); + pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> { + let hash = partition_key.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); - let rpc = &TableRPC::<F>::ReadEntry(k.clone(), sort_key.to_vec()); + let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self.rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) @@ -179,36 +217,40 @@ impl<F: TableFormat + 'static> Table<F> { } } - fn handle_read_entry(&self, key: &F::K, sort_key: &[u8]) -> Result<Option<F::V>, Error> { - let mut tree_key = key.hash().to_vec(); - tree_key.extend(sort_key); + fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<F::E>, Error> { + let tree_key = self.tree_key(p, s); if let Some(bytes) = self.store.get(&tree_key)? { - let (_, v) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&bytes)?; - Ok(Some(v)) + let e = rmp_serde::decode::from_read_ref::<_, F::E>(&bytes)?; + Ok(Some(e)) } else { Ok(None) } } - async fn handle_update(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> { - for mut pair in pairs.drain(..) { - let mut tree_key = pair.0.hash().to_vec(); - tree_key.extend(pair.1.sort_key()); + async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> { + for mut entry in entries.drain(..) { + let tree_key = self.tree_key(entry.partition_key(), entry.sort_key()); let old_val = match self.store.get(&tree_key)? { Some(prev_bytes) => { - let (_, old_val) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&prev_bytes)?; - pair.1.merge(&old_val); - Some(old_val) + let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)?; + entry.merge(&old_entry); + Some(old_entry) } None => None }; - let new_bytes = rmp_serde::encode::to_vec_named(&pair)?; + let new_bytes = rmp_serde::encode::to_vec_named(&entry)?; self.store.insert(&tree_key, new_bytes)?; - self.instance.updated(&pair.0, old_val.as_ref(), &pair.1).await; + self.instance.updated(old_val.as_ref(), &entry).await; } Ok(()) } + + fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { + let mut ret = p.hash().to_vec(); + ret.extend(s.sort_key()); + ret + } } |