aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs100
1 files changed, 71 insertions, 29 deletions
diff --git a/src/table.rs b/src/table.rs
index e524f821..df82e9c7 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -52,10 +52,10 @@ impl<F: TableFormat + 'static> TableRpcHandler for TableRpcHandlerAdapter<F> {
pub enum TableRPC<F: TableFormat> {
Ok,
- ReadEntry(F::K, Vec<u8>),
- ReadEntryResponse(Option<F::V>),
+ ReadEntry(F::P, F::S),
+ ReadEntryResponse(Option<F::E>),
- Update(Vec<(F::K, F::V)>),
+ Update(Vec<F::E>),
}
pub struct Partition {
@@ -64,21 +64,59 @@ pub struct Partition {
pub other_nodes: Vec<UUID>,
}
-pub trait TableKey {
+pub trait PartitionKey: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync {
fn hash(&self) -> Hash;
}
-pub trait TableValue {
- fn sort_key(&self) -> Vec<u8>;
+pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync {
+ fn sort_key(&self) -> &[u8];
+}
+
+pub trait Entry<P: PartitionKey, S: SortKey>: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync {
+ fn partition_key(&self) -> &P;
+ fn sort_key(&self) -> &S;
+
fn merge(&mut self, other: &Self);
}
+#[derive(Clone, Serialize, Deserialize)]
+pub struct EmptySortKey;
+impl SortKey for EmptySortKey {
+ fn sort_key(&self) -> &[u8] {
+ &[]
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+pub struct StringKey(String);
+impl PartitionKey for StringKey {
+ fn hash(&self) -> Hash {
+ hash(self.0.as_bytes())
+ }
+}
+impl SortKey for StringKey {
+ fn sort_key(&self) -> &[u8] {
+ self.0.as_bytes()
+ }
+}
+impl AsRef<str> for StringKey {
+ fn as_ref(&self) -> &str {
+ &self.0
+ }
+}
+impl From<&str> for StringKey {
+ fn from(s: &str) -> StringKey {
+ StringKey(s.to_string())
+ }
+}
+
#[async_trait]
pub trait TableFormat: Send + Sync {
- type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + TableKey + Send + Sync;
- type V: Clone + Serialize + for<'de> Deserialize<'de> + TableValue + Send + Sync;
+ type P: PartitionKey;
+ type S: SortKey;
+ type E: Entry<Self::P, Self::S>;
- async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V);
+ async fn updated(&self, old: Option<&Self::E>, new: &Self::E);
}
impl<F: TableFormat + 'static> Table<F> {
@@ -99,12 +137,12 @@ impl<F: TableFormat + 'static> Table<F> {
Box::new(TableRpcHandlerAdapter::<F>{ table: self })
}
- pub async fn insert(&self, k: &F::K, v: &F::V) -> Result<(), Error> {
- let hash = k.hash();
+ pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
+ let hash = e.partition_key().hash();
let who = self.system.members.read().await
.walk_ring(&hash, self.param.replication_factor);
- let rpc = &TableRPC::<F>::Update(vec![(k.clone(), v.clone())]);
+ let rpc = &TableRPC::<F>::Update(vec![e.clone()]);
self.rpc_try_call_many(&who[..],
&rpc,
@@ -112,12 +150,12 @@ impl<F: TableFormat + 'static> Table<F> {
Ok(())
}
- pub async fn get(&self, k: &F::K, sort_key: &[u8]) -> Result<Option<F::V>, Error> {
- let hash = k.hash();
+ pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result<Option<F::E>, Error> {
+ let hash = partition_key.hash();
let who = self.system.members.read().await
.walk_ring(&hash, self.param.replication_factor);
- let rpc = &TableRPC::<F>::ReadEntry(k.clone(), sort_key.to_vec());
+ let rpc = &TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self.rpc_try_call_many(&who[..],
&rpc,
self.param.read_quorum)
@@ -179,36 +217,40 @@ impl<F: TableFormat + 'static> Table<F> {
}
}
- fn handle_read_entry(&self, key: &F::K, sort_key: &[u8]) -> Result<Option<F::V>, Error> {
- let mut tree_key = key.hash().to_vec();
- tree_key.extend(sort_key);
+ fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result<Option<F::E>, Error> {
+ let tree_key = self.tree_key(p, s);
if let Some(bytes) = self.store.get(&tree_key)? {
- let (_, v) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&bytes)?;
- Ok(Some(v))
+ let e = rmp_serde::decode::from_read_ref::<_, F::E>(&bytes)?;
+ Ok(Some(e))
} else {
Ok(None)
}
}
- async fn handle_update(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> {
- for mut pair in pairs.drain(..) {
- let mut tree_key = pair.0.hash().to_vec();
- tree_key.extend(pair.1.sort_key());
+ async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> {
+ for mut entry in entries.drain(..) {
+ let tree_key = self.tree_key(entry.partition_key(), entry.sort_key());
let old_val = match self.store.get(&tree_key)? {
Some(prev_bytes) => {
- let (_, old_val) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&prev_bytes)?;
- pair.1.merge(&old_val);
- Some(old_val)
+ let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)?;
+ entry.merge(&old_entry);
+ Some(old_entry)
}
None => None
};
- let new_bytes = rmp_serde::encode::to_vec_named(&pair)?;
+ let new_bytes = rmp_serde::encode::to_vec_named(&entry)?;
self.store.insert(&tree_key, new_bytes)?;
- self.instance.updated(&pair.0, old_val.as_ref(), &pair.1).await;
+ self.instance.updated(old_val.as_ref(), &entry).await;
}
Ok(())
}
+
+ fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
+ let mut ret = p.hash().to_vec();
+ ret.extend(s.sort_key());
+ ret
+ }
}