From 4c1aee42d5032066272a051687ac200e874cc13f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Apr 2020 16:16:27 +0200 Subject: Reorganize table API --- src/api_server.rs | 22 ++++++------ src/table.rs | 100 ++++++++++++++++++++++++++++++++++++--------------- src/version_table.rs | 32 ++++++++--------- 3 files changed, 95 insertions(+), 59 deletions(-) (limited to 'src') diff --git a/src/api_server.rs b/src/api_server.rs index 8acd15d8..ff7c536c 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -97,11 +97,9 @@ async fn handle_put(garage: Arc, None => return Err(Error::BadRequest(format!("Empty body"))), }; - let version_key = VersionMetaKey{ - bucket: bucket.to_string(), - key: key.to_string(), - }; - let mut version_value = VersionMetaValue { + let mut version = VersionMeta{ + bucket: bucket.into(), + key: key.into(), timestamp: now_msec(), uuid: version_uuid.clone(), mime_type: mime_type.to_string(), @@ -111,15 +109,15 @@ async fn handle_put(garage: Arc, }; if first_block.len() < INLINE_THRESHOLD { - version_value.data = VersionData::Inline(first_block); - version_value.is_complete = true; - garage.version_table.insert(&version_key, &version_value).await?; + version.data = VersionData::Inline(first_block); + version.is_complete = true; + garage.version_table.insert(&version).await?; return Ok(version_uuid) } let first_block_hash = hash(&first_block[..]); - version_value.data = VersionData::FirstBlock(first_block_hash); - garage.version_table.insert(&version_key, &version_value).await?; + version.data = VersionData::FirstBlock(first_block_hash); + garage.version_table.insert(&version).await?; let block_meta = BlockMeta{ version_uuid: version_uuid.clone(), @@ -145,8 +143,8 @@ async fn handle_put(garage: Arc, // TODO: if at any step we have an error, we should undo everything we did - version_value.is_complete = true; - garage.version_table.insert(&version_key, &version_value).await?; + version.is_complete = true; + garage.version_table.insert(&version).await?; Ok(version_uuid) } diff --git a/src/table.rs b/src/table.rs index e524f821..df82e9c7 100644 --- a/src/table.rs +++ b/src/table.rs @@ -52,10 +52,10 @@ impl TableRpcHandler for TableRpcHandlerAdapter { pub enum TableRPC { Ok, - ReadEntry(F::K, Vec), - ReadEntryResponse(Option), + ReadEntry(F::P, F::S), + ReadEntryResponse(Option), - Update(Vec<(F::K, F::V)>), + Update(Vec), } pub struct Partition { @@ -64,21 +64,59 @@ pub struct Partition { pub other_nodes: Vec, } -pub trait TableKey { +pub trait PartitionKey: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync { fn hash(&self) -> Hash; } -pub trait TableValue { - fn sort_key(&self) -> Vec; +pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { + fn sort_key(&self) -> &[u8]; +} + +pub trait Entry: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { + fn partition_key(&self) -> &P; + fn sort_key(&self) -> &S; + fn merge(&mut self, other: &Self); } +#[derive(Clone, Serialize, Deserialize)] +pub struct EmptySortKey; +impl SortKey for EmptySortKey { + fn sort_key(&self) -> &[u8] { + &[] + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct StringKey(String); +impl PartitionKey for StringKey { + fn hash(&self) -> Hash { + hash(self.0.as_bytes()) + } +} +impl SortKey for StringKey { + fn sort_key(&self) -> &[u8] { + self.0.as_bytes() + } +} +impl AsRef for StringKey { + fn as_ref(&self) -> &str { + &self.0 + } +} +impl From<&str> for StringKey { + fn from(s: &str) -> StringKey { + StringKey(s.to_string()) + } +} + #[async_trait] pub trait TableFormat: Send + Sync { - type K: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + TableKey + Send + Sync; - type V: Clone + Serialize + for<'de> Deserialize<'de> + TableValue + Send + Sync; + type P: PartitionKey; + type S: SortKey; + type E: Entry; - async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V); + async fn updated(&self, old: Option<&Self::E>, new: &Self::E); } impl Table { @@ -99,12 +137,12 @@ impl Table { Box::new(TableRpcHandlerAdapter::{ table: self }) } - pub async fn insert(&self, k: &F::K, v: &F::V) -> Result<(), Error> { - let hash = k.hash(); + pub async fn insert(&self, e: &F::E) -> Result<(), Error> { + let hash = e.partition_key().hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); - let rpc = &TableRPC::::Update(vec![(k.clone(), v.clone())]); + let rpc = &TableRPC::::Update(vec![e.clone()]); self.rpc_try_call_many(&who[..], &rpc, @@ -112,12 +150,12 @@ impl Table { Ok(()) } - pub async fn get(&self, k: &F::K, sort_key: &[u8]) -> Result, Error> { - let hash = k.hash(); + pub async fn get(&self, partition_key: &F::P, sort_key: &F::S) -> Result, Error> { + let hash = partition_key.hash(); let who = self.system.members.read().await .walk_ring(&hash, self.param.replication_factor); - let rpc = &TableRPC::::ReadEntry(k.clone(), sort_key.to_vec()); + let rpc = &TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self.rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) @@ -179,36 +217,40 @@ impl Table { } } - fn handle_read_entry(&self, key: &F::K, sort_key: &[u8]) -> Result, Error> { - let mut tree_key = key.hash().to_vec(); - tree_key.extend(sort_key); + fn handle_read_entry(&self, p: &F::P, s: &F::S) -> Result, Error> { + let tree_key = self.tree_key(p, s); if let Some(bytes) = self.store.get(&tree_key)? { - let (_, v) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&bytes)?; - Ok(Some(v)) + let e = rmp_serde::decode::from_read_ref::<_, F::E>(&bytes)?; + Ok(Some(e)) } else { Ok(None) } } - async fn handle_update(&self, mut pairs: Vec<(F::K, F::V)>) -> Result<(), Error> { - for mut pair in pairs.drain(..) { - let mut tree_key = pair.0.hash().to_vec(); - tree_key.extend(pair.1.sort_key()); + async fn handle_update(&self, mut entries: Vec) -> Result<(), Error> { + for mut entry in entries.drain(..) { + let tree_key = self.tree_key(entry.partition_key(), entry.sort_key()); let old_val = match self.store.get(&tree_key)? { Some(prev_bytes) => { - let (_, old_val) = rmp_serde::decode::from_read_ref::<_, (F::K, F::V)>(&prev_bytes)?; - pair.1.merge(&old_val); - Some(old_val) + let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)?; + entry.merge(&old_entry); + Some(old_entry) } None => None }; - let new_bytes = rmp_serde::encode::to_vec_named(&pair)?; + let new_bytes = rmp_serde::encode::to_vec_named(&entry)?; self.store.insert(&tree_key, new_bytes)?; - self.instance.updated(&pair.0, old_val.as_ref(), &pair.1).await; + self.instance.updated(old_val.as_ref(), &entry).await; } Ok(()) } + + fn tree_key(&self, p: &F::P, s: &F::S) -> Vec { + let mut ret = p.hash().to_vec(); + ret.extend(s.sort_key()); + ret + } } diff --git a/src/version_table.rs b/src/version_table.rs index e8360cd1..1542dc42 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -8,14 +8,11 @@ use crate::table::*; use crate::server::Garage; -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct VersionMetaKey { - pub bucket: String, - pub key: String, -} - #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct VersionMetaValue { +pub struct VersionMeta { + pub bucket: StringKey, + pub key: StringKey, + pub timestamp: u64, pub uuid: UUID, @@ -37,16 +34,14 @@ pub struct VersionTable { pub garage: RwLock>>, } -impl TableKey for VersionMetaKey { - fn hash(&self) -> Hash { - hash(self.bucket.as_bytes()) +impl Entry for VersionMeta { + fn partition_key(&self) -> &StringKey { + &self.bucket } -} - -impl TableValue for VersionMetaValue { - fn sort_key(&self) -> Vec { - vec![] + fn sort_key(&self) -> &StringKey { + &self.key } + fn merge(&mut self, other: &Self) { unimplemented!() } @@ -54,10 +49,11 @@ impl TableValue for VersionMetaValue { #[async_trait] impl TableFormat for VersionTable { - type K = VersionMetaKey; - type V = VersionMetaValue; + type P = StringKey; + type S = StringKey; + type E = VersionMeta; - async fn updated(&self, key: &Self::K, old: Option<&Self::V>, new: &Self::V) { + async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { unimplemented!() } } -- cgit v1.2.3