aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs92
1 files changed, 19 insertions, 73 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 94bacc60..7a5caf4f 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -3,7 +3,6 @@ use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
-use async_trait::async_trait;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -16,6 +15,7 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::table_sync::*;
+use crate::schema::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
@@ -48,70 +48,6 @@ pub enum TableRPC<F: TableSchema> {
impl<F: TableSchema> RpcMessage for TableRPC<F> {}
-pub trait PartitionKey {
- fn hash(&self) -> Hash;
-}
-
-pub trait SortKey {
- fn sort_key(&self) -> &[u8];
-}
-
-pub trait Entry<P: PartitionKey, S: SortKey>:
- PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
-{
- fn partition_key(&self) -> &P;
- fn sort_key(&self) -> &S;
-
- fn merge(&mut self, other: &Self);
-}
-
-#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
-pub struct EmptyKey;
-impl SortKey for EmptyKey {
- fn sort_key(&self) -> &[u8] {
- &[]
- }
-}
-impl PartitionKey for EmptyKey {
- fn hash(&self) -> Hash {
- [0u8; 32].into()
- }
-}
-
-impl PartitionKey for String {
- fn hash(&self) -> Hash {
- hash(self.as_bytes())
- }
-}
-impl SortKey for String {
- fn sort_key(&self) -> &[u8] {
- self.as_bytes()
- }
-}
-
-impl PartitionKey for Hash {
- fn hash(&self) -> Hash {
- self.clone()
- }
-}
-impl SortKey for Hash {
- fn sort_key(&self) -> &[u8] {
- self.as_slice()
- }
-}
-
-#[async_trait]
-pub trait TableSchema: Send + Sync {
- type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
- type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
- type E: Entry<Self::P, Self::S>;
- type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
-
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>;
- fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
- true
- }
-}
pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs
@@ -250,7 +186,7 @@ where
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value {
- let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?;
+ let v = Self::decode_entry(v_bytes.as_slice())?;
ret = match ret {
None => Some(v),
Some(mut x) => {
@@ -306,8 +242,7 @@ where
for resp in resps {
if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() {
- let entry =
- rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?;
+ let entry = Self::decode_entry(entry_bytes.as_slice())?;
let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) {
None => {
@@ -429,7 +364,7 @@ where
let keep = match filter {
None => true,
Some(f) => {
- let entry = rmp_serde::decode::from_read_ref::<_, F::E>(value.as_ref())?;
+ let entry = Self::decode_entry(value.as_ref())?;
F::matches_filter(&entry, f)
}
};
@@ -448,15 +383,14 @@ where
let mut epidemic_propagate = vec![];
for update_bytes in entries.iter() {
- let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;
+ let update = Self::decode_entry(update_bytes.as_slice())?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let (old_entry, new_entry) = self.store.transaction(|db| {
let (old_entry, new_entry) = match db.get(&tree_key)? {
Some(prev_bytes) => {
- let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)
- .map_err(Error::RMPDecode)
+ let old_entry = Self::decode_entry(&prev_bytes)
.map_err(sled::ConflictableTransactionError::Abort)?;
let mut new_entry = old_entry.clone();
new_entry.merge(&update);
@@ -504,7 +438,7 @@ where
break;
}
if let Some(old_val) = self.store.remove(&key)? {
- let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?;
+ let old_entry = Self::decode_entry(&old_val)?;
self.instance.updated(Some(old_entry), None).await?;
self.system
.background
@@ -521,4 +455,16 @@ where
ret.extend(s.sort_key());
ret
}
+
+ fn decode_entry(bytes: &[u8]) -> Result<F::E, Error> {
+ match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
+ Ok(x) => Ok(x),
+ Err(e) => {
+ match F::try_migrate(bytes) {
+ Some(x) => Ok(x),
+ None => Err(e.into()),
+ }
+ }
+ }
+ }
}