diff options
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 92 |
1 files changed, 19 insertions, 73 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 94bacc60..7a5caf4f 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use std::time::Duration; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -16,6 +15,7 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::table_sync::*; +use crate::schema::*; const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); @@ -48,70 +48,6 @@ pub enum TableRPC<F: TableSchema> { impl<F: TableSchema> RpcMessage for TableRPC<F> {} -pub trait PartitionKey { - fn hash(&self) -> Hash; -} - -pub trait SortKey { - fn sort_key(&self) -> &[u8]; -} - -pub trait Entry<P: PartitionKey, S: SortKey>: - PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync -{ - fn partition_key(&self) -> &P; - fn sort_key(&self) -> &S; - - fn merge(&mut self, other: &Self); -} - -#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct EmptyKey; -impl SortKey for EmptyKey { - fn sort_key(&self) -> &[u8] { - &[] - } -} -impl PartitionKey for EmptyKey { - fn hash(&self) -> Hash { - [0u8; 32].into() - } -} - -impl PartitionKey for String { - fn hash(&self) -> Hash { - hash(self.as_bytes()) - } -} -impl SortKey for String { - fn sort_key(&self) -> &[u8] { - self.as_bytes() - } -} - -impl PartitionKey for Hash { - fn hash(&self) -> Hash { - self.clone() - } -} -impl SortKey for Hash { - fn sort_key(&self) -> &[u8] { - self.as_slice() - } -} - -#[async_trait] -pub trait TableSchema: Send + Sync { - type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; - type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; - type E: Entry<Self::P, Self::S>; - type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; - - async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>; - fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { - true - } -} pub trait TableReplication: Send + Sync { // See examples in table_sharded.rs and table_fullcopy.rs @@ -250,7 +186,7 @@ where for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { - let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?; + let v = Self::decode_entry(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -306,8 +242,7 @@ where for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = - rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; + let entry = Self::decode_entry(entry_bytes.as_slice())?; let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { @@ -429,7 +364,7 @@ where let keep = match filter { None => true, Some(f) => { - let entry = rmp_serde::decode::from_read_ref::<_, F::E>(value.as_ref())?; + let entry = Self::decode_entry(value.as_ref())?; F::matches_filter(&entry, f) } }; @@ -448,15 +383,14 @@ where let mut epidemic_propagate = vec![]; for update_bytes in entries.iter() { - let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; + let update = Self::decode_entry(update_bytes.as_slice())?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let (old_entry, new_entry) = self.store.transaction(|db| { let (old_entry, new_entry) = match db.get(&tree_key)? { Some(prev_bytes) => { - let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes) - .map_err(Error::RMPDecode) + let old_entry = Self::decode_entry(&prev_bytes) .map_err(sled::ConflictableTransactionError::Abort)?; let mut new_entry = old_entry.clone(); new_entry.merge(&update); @@ -504,7 +438,7 @@ where break; } if let Some(old_val) = self.store.remove(&key)? { - let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?; + let old_entry = Self::decode_entry(&old_val)?; self.instance.updated(Some(old_entry), None).await?; self.system .background @@ -521,4 +455,16 @@ where ret.extend(s.sort_key()); ret } + + fn decode_entry(bytes: &[u8]) -> Result<F::E, Error> { + match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { + Ok(x) => Ok(x), + Err(e) => { + match F::try_migrate(bytes) { + Some(x) => Ok(x), + None => Err(e.into()), + } + } + } + } } |