From cf8fd948fc4bb6a9f48100ebf89df3752371805d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 10 Apr 2020 23:11:52 +0200 Subject: Add block ref table --- src/api_server.rs | 29 +++++++++++++++++++--------- src/block_ref_table.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/data.rs | 11 +---------- src/http_util.rs | 4 ++-- src/main.rs | 1 + src/server.rs | 16 ++++++++++++++++ src/version_table.rs | 4 ++-- 7 files changed, 93 insertions(+), 23 deletions(-) create mode 100644 src/block_ref_table.rs diff --git a/src/api_server.rs b/src/api_server.rs index 024da7d5..153991b2 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -133,7 +133,7 @@ async fn handle_put( } let version = Version { - version: version_uuid.clone(), + uuid: version_uuid.clone(), deleted: false, blocks: Vec::new(), bucket: bucket.into(), @@ -146,7 +146,7 @@ async fn handle_put( let mut next_offset = first_block.len(); let mut put_curr_version_block = - put_version_block(garage.clone(), &version, 0, first_block_hash.clone()); + put_block_meta(garage.clone(), &version, 0, first_block_hash.clone()); let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block); loop { @@ -155,7 +155,7 @@ async fn handle_put( if let Some(block) = next_block { let block_hash = hash(&block[..]); let block_len = block.len(); - put_curr_version_block = put_version_block( + put_curr_version_block = put_block_meta( garage.clone(), &version, next_offset as u64, @@ -176,15 +176,28 @@ async fn handle_put( Ok(version_uuid) } -async fn put_version_block( +async fn put_block_meta( garage: Arc, version: &Version, offset: u64, hash: Hash, ) -> Result<(), Error> { let mut version = version.clone(); - version.blocks.push(VersionBlock { offset, hash }); - garage.version_table.insert(&version).await?; + version.blocks.push(VersionBlock { + offset, + hash: hash.clone(), + }); + + let block_ref = BlockRef { + block: hash, + version: version.uuid.clone(), + deleted: false, + }; + + futures::try_join!( + garage.version_table.insert(&version), + garage.block_ref_table.insert(&block_ref), + )?; Ok(()) } @@ -308,9 +321,7 @@ async fn handle_get( } }) .buffered(2); - let body: BodyType = Box::new(StreamBody::new( - Box::pin(body_stream), - )); + let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream))); Ok(resp_builder.body(body)?) } } diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs new file mode 100644 index 00000000..9ba87f0c --- /dev/null +++ b/src/block_ref_table.rs @@ -0,0 +1,51 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::RwLock; + +use crate::data::*; +use crate::server::Garage; +use crate::table::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct BlockRef { + // Primary key + pub block: Hash, + + // Sort key + pub version: UUID, + + // Keep track of deleted status + pub deleted: bool, +} + +impl Entry for BlockRef { + fn partition_key(&self) -> &Hash { + &self.block + } + fn sort_key(&self) -> &UUID { + &self.version + } + + fn merge(&mut self, other: &Self) { + if other.deleted { + self.deleted = true; + } + } +} + +pub struct BlockRefTable { + pub garage: RwLock>>, +} + +#[async_trait] +impl TableFormat for BlockRefTable { + type P = Hash; + type S = UUID; + type E = BlockRef; + + async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { + //unimplemented!() + // TODO + } +} diff --git a/src/data.rs b/src/data.rs index 62aba0a1..fd7f9a8b 100644 --- a/src/data.rs +++ b/src/data.rs @@ -129,15 +129,6 @@ pub struct NetworkConfigEntry { pub const INLINE_THRESHOLD: usize = 3072; -#[derive(Debug, Serialize, Deserialize)] -pub struct SplitpointMeta { - pub bucket: String, - pub key: String, - - pub timestamp: u64, - pub uuid: UUID, - pub deleted: bool, -} - +pub use crate::block_ref_table::*; pub use crate::object_table::*; pub use crate::version_table::*; diff --git a/src/http_util.rs b/src/http_util.rs index 24e64c36..228448f0 100644 --- a/src/http_util.rs +++ b/src/http_util.rs @@ -15,7 +15,7 @@ pub struct StreamBody { impl StreamBody { pub fn new(stream: StreamType) -> Self { - Self{stream} + Self { stream } } } @@ -47,7 +47,7 @@ pub struct BytesBody { impl BytesBody { pub fn new(bytes: Bytes) -> Self { - Self{bytes: Some(bytes)} + Self { bytes: Some(bytes) } } } diff --git a/src/main.rs b/src/main.rs index 7c2af45b..15848f2e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ mod membership; mod table; mod block; +mod block_ref_table; mod object_table; mod version_table; diff --git a/src/server.rs b/src/server.rs index 3df6ca59..b62c18cc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -24,6 +24,7 @@ pub struct Garage { pub object_table: Arc>, pub version_table: Arc>, + pub block_ref_table: Arc>, } impl Garage { @@ -55,6 +56,15 @@ impl Garage { "version".to_string(), meta_rep_param.clone(), )); + let block_ref_table = Arc::new(Table::new( + BlockRefTable { + garage: RwLock::new(None), + }, + system.clone(), + &db, + "block_ref".to_string(), + meta_rep_param.clone(), + )); let mut garage = Self { db, @@ -63,6 +73,7 @@ impl Garage { table_rpc_handlers: HashMap::new(), object_table, version_table, + block_ref_table, }; garage.table_rpc_handlers.insert( @@ -73,11 +84,16 @@ impl Garage { garage.version_table.name.clone(), garage.version_table.clone().rpc_handler(), ); + garage.table_rpc_handlers.insert( + garage.block_ref_table.name.clone(), + garage.block_ref_table.clone().rpc_handler(), + ); let garage = Arc::new(garage); *garage.object_table.instance.garage.write().await = Some(garage.clone()); *garage.version_table.instance.garage.write().await = Some(garage.clone()); + *garage.block_ref_table.instance.garage.write().await = Some(garage.clone()); garage } diff --git a/src/version_table.rs b/src/version_table.rs index 28ee2e01..d037d344 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -10,7 +10,7 @@ use crate::table::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Version { // Primary key - pub version: UUID, + pub uuid: UUID, // Actual data: the blocks for this version pub deleted: bool, @@ -30,7 +30,7 @@ pub struct VersionBlock { impl Entry for Version { fn partition_key(&self) -> &Hash { - &self.version + &self.uuid } fn sort_key(&self) -> &EmptySortKey { &EmptySortKey -- cgit v1.2.3