aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-10 23:11:52 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-10 23:11:52 +0200
commitcf8fd948fc4bb6a9f48100ebf89df3752371805d (patch)
tree77fcb45f80ab75da2e5cdf520a50ab7192f9e25c /src
parentff4fb9756810abeff17c360f3055c3865160b240 (diff)
downloadgarage-cf8fd948fc4bb6a9f48100ebf89df3752371805d.tar.gz
garage-cf8fd948fc4bb6a9f48100ebf89df3752371805d.zip
Add block ref table
Diffstat (limited to 'src')
-rw-r--r--src/api_server.rs29
-rw-r--r--src/block_ref_table.rs51
-rw-r--r--src/data.rs11
-rw-r--r--src/http_util.rs4
-rw-r--r--src/main.rs1
-rw-r--r--src/server.rs16
-rw-r--r--src/version_table.rs4
7 files changed, 93 insertions, 23 deletions
diff --git a/src/api_server.rs b/src/api_server.rs
index 024da7d5..153991b2 100644
--- a/src/api_server.rs
+++ b/src/api_server.rs
@@ -133,7 +133,7 @@ async fn handle_put(
}
let version = Version {
- version: version_uuid.clone(),
+ uuid: version_uuid.clone(),
deleted: false,
blocks: Vec::new(),
bucket: bucket.into(),
@@ -146,7 +146,7 @@ async fn handle_put(
let mut next_offset = first_block.len();
let mut put_curr_version_block =
- put_version_block(garage.clone(), &version, 0, first_block_hash.clone());
+ put_block_meta(garage.clone(), &version, 0, first_block_hash.clone());
let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block);
loop {
@@ -155,7 +155,7 @@ async fn handle_put(
if let Some(block) = next_block {
let block_hash = hash(&block[..]);
let block_len = block.len();
- put_curr_version_block = put_version_block(
+ put_curr_version_block = put_block_meta(
garage.clone(),
&version,
next_offset as u64,
@@ -176,15 +176,28 @@ async fn handle_put(
Ok(version_uuid)
}
-async fn put_version_block(
+async fn put_block_meta(
garage: Arc<Garage>,
version: &Version,
offset: u64,
hash: Hash,
) -> Result<(), Error> {
let mut version = version.clone();
- version.blocks.push(VersionBlock { offset, hash });
- garage.version_table.insert(&version).await?;
+ version.blocks.push(VersionBlock {
+ offset,
+ hash: hash.clone(),
+ });
+
+ let block_ref = BlockRef {
+ block: hash,
+ version: version.uuid.clone(),
+ deleted: false,
+ };
+
+ futures::try_join!(
+ garage.version_table.insert(&version),
+ garage.block_ref_table.insert(&block_ref),
+ )?;
Ok(())
}
@@ -308,9 +321,7 @@ async fn handle_get(
}
})
.buffered(2);
- let body: BodyType = Box::new(StreamBody::new(
- Box::pin(body_stream),
- ));
+ let body: BodyType = Box::new(StreamBody::new(Box::pin(body_stream)));
Ok(resp_builder.body(body)?)
}
}
diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs
new file mode 100644
index 00000000..9ba87f0c
--- /dev/null
+++ b/src/block_ref_table.rs
@@ -0,0 +1,51 @@
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+use std::sync::Arc;
+use tokio::sync::RwLock;
+
+use crate::data::*;
+use crate::server::Garage;
+use crate::table::*;
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct BlockRef {
+ // Primary key
+ pub block: Hash,
+
+ // Sort key
+ pub version: UUID,
+
+ // Keep track of deleted status
+ pub deleted: bool,
+}
+
+impl Entry<Hash, UUID> for BlockRef {
+ fn partition_key(&self) -> &Hash {
+ &self.block
+ }
+ fn sort_key(&self) -> &UUID {
+ &self.version
+ }
+
+ fn merge(&mut self, other: &Self) {
+ if other.deleted {
+ self.deleted = true;
+ }
+ }
+}
+
+pub struct BlockRefTable {
+ pub garage: RwLock<Option<Arc<Garage>>>,
+}
+
+#[async_trait]
+impl TableFormat for BlockRefTable {
+ type P = Hash;
+ type S = UUID;
+ type E = BlockRef;
+
+ async fn updated(&self, old: Option<&Self::E>, new: &Self::E) {
+ //unimplemented!()
+ // TODO
+ }
+}
diff --git a/src/data.rs b/src/data.rs
index 62aba0a1..fd7f9a8b 100644
--- a/src/data.rs
+++ b/src/data.rs
@@ -129,15 +129,6 @@ pub struct NetworkConfigEntry {
pub const INLINE_THRESHOLD: usize = 3072;
-#[derive(Debug, Serialize, Deserialize)]
-pub struct SplitpointMeta {
- pub bucket: String,
- pub key: String,
-
- pub timestamp: u64,
- pub uuid: UUID,
- pub deleted: bool,
-}
-
+pub use crate::block_ref_table::*;
pub use crate::object_table::*;
pub use crate::version_table::*;
diff --git a/src/http_util.rs b/src/http_util.rs
index 24e64c36..228448f0 100644
--- a/src/http_util.rs
+++ b/src/http_util.rs
@@ -15,7 +15,7 @@ pub struct StreamBody {
impl StreamBody {
pub fn new(stream: StreamType) -> Self {
- Self{stream}
+ Self { stream }
}
}
@@ -47,7 +47,7 @@ pub struct BytesBody {
impl BytesBody {
pub fn new(bytes: Bytes) -> Self {
- Self{bytes: Some(bytes)}
+ Self { bytes: Some(bytes) }
}
}
diff --git a/src/main.rs b/src/main.rs
index 7c2af45b..15848f2e 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -6,6 +6,7 @@ mod membership;
mod table;
mod block;
+mod block_ref_table;
mod object_table;
mod version_table;
diff --git a/src/server.rs b/src/server.rs
index 3df6ca59..b62c18cc 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -24,6 +24,7 @@ pub struct Garage {
pub object_table: Arc<Table<ObjectTable>>,
pub version_table: Arc<Table<VersionTable>>,
+ pub block_ref_table: Arc<Table<BlockRefTable>>,
}
impl Garage {
@@ -55,6 +56,15 @@ impl Garage {
"version".to_string(),
meta_rep_param.clone(),
));
+ let block_ref_table = Arc::new(Table::new(
+ BlockRefTable {
+ garage: RwLock::new(None),
+ },
+ system.clone(),
+ &db,
+ "block_ref".to_string(),
+ meta_rep_param.clone(),
+ ));
let mut garage = Self {
db,
@@ -63,6 +73,7 @@ impl Garage {
table_rpc_handlers: HashMap::new(),
object_table,
version_table,
+ block_ref_table,
};
garage.table_rpc_handlers.insert(
@@ -73,11 +84,16 @@ impl Garage {
garage.version_table.name.clone(),
garage.version_table.clone().rpc_handler(),
);
+ garage.table_rpc_handlers.insert(
+ garage.block_ref_table.name.clone(),
+ garage.block_ref_table.clone().rpc_handler(),
+ );
let garage = Arc::new(garage);
*garage.object_table.instance.garage.write().await = Some(garage.clone());
*garage.version_table.instance.garage.write().await = Some(garage.clone());
+ *garage.block_ref_table.instance.garage.write().await = Some(garage.clone());
garage
}
diff --git a/src/version_table.rs b/src/version_table.rs
index 28ee2e01..d037d344 100644
--- a/src/version_table.rs
+++ b/src/version_table.rs
@@ -10,7 +10,7 @@ use crate::table::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
// Primary key
- pub version: UUID,
+ pub uuid: UUID,
// Actual data: the blocks for this version
pub deleted: bool,
@@ -30,7 +30,7 @@ pub struct VersionBlock {
impl Entry<Hash, EmptySortKey> for Version {
fn partition_key(&self) -> &Hash {
- &self.version
+ &self.uuid
}
fn sort_key(&self) -> &EmptySortKey {
&EmptySortKey