aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/admin_rpc.rs35
-rw-r--r--src/api/api_server.rs77
-rw-r--r--src/store/bucket_table.rs46
-rw-r--r--src/store/key_table.rs121
-rw-r--r--src/store/mod.rs1
-rw-r--r--src/store/object_table.rs47
-rw-r--r--src/store/version_table.rs38
7 files changed, 290 insertions, 75 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs
index fe59f92e..17ef5072 100644
--- a/src/admin_rpc.rs
+++ b/src/admin_rpc.rs
@@ -100,12 +100,7 @@ impl AdminRpcHandler {
};
self.garage
.bucket_table
- .insert(&Bucket {
- name: query.name.clone(),
- timestamp: new_time,
- deleted: false,
- authorized_keys: vec![],
- })
+ .insert(&Bucket::new(query.name.clone(), new_time, false, vec![]))
.await?;
Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name)))
}
@@ -143,12 +138,12 @@ impl AdminRpcHandler {
}
self.garage
.bucket_table
- .insert(&Bucket {
- name: query.name.clone(),
- timestamp: std::cmp::max(bucket.timestamp + 1, now_msec()),
- deleted: true,
- authorized_keys: vec![],
- })
+ .insert(&Bucket::new(
+ query.name.clone(),
+ std::cmp::max(bucket.timestamp + 1, now_msec()),
+ true,
+ vec![],
+ ))
.await?;
Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name)))
}
@@ -292,7 +287,7 @@ impl AdminRpcHandler {
.get(&version.bucket, &version.key)
.await?;
let version_exists = match object {
- Some(o) => o.versions.iter().any(|x| x.uuid == version.uuid),
+ Some(o) => o.versions().iter().any(|x| x.uuid == version.uuid),
None => {
warn!(
"Repair versions: object for version {:?} not found",
@@ -305,13 +300,13 @@ impl AdminRpcHandler {
info!("Repair versions: marking version as deleted: {:?}", version);
self.garage
.version_table
- .insert(&Version {
- uuid: version.uuid,
- deleted: true,
- blocks: vec![],
- bucket: version.bucket,
- key: version.key,
- })
+ .insert(&Version::new(
+ version.uuid,
+ version.bucket,
+ version.key,
+ true,
+ vec![],
+ ))
.await?;
}
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index a80b2ea2..905ba0dd 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -119,37 +119,29 @@ async fn handle_put(
None => return Err(Error::BadRequest(format!("Empty body"))),
};
- let mut object = Object {
- bucket: bucket.into(),
- key: key.into(),
- versions: Vec::new(),
- };
- object.versions.push(Box::new(ObjectVersion {
+ let mut object_version = ObjectVersion {
uuid: version_uuid,
timestamp: now_msec(),
mime_type: mime_type.to_string(),
size: first_block.len() as u64,
is_complete: false,
data: ObjectVersionData::DeleteMarker,
- }));
+ };
if first_block.len() < INLINE_THRESHOLD {
- object.versions[0].data = ObjectVersionData::Inline(first_block);
- object.versions[0].is_complete = true;
+ object_version.data = ObjectVersionData::Inline(first_block);
+ object_version.is_complete = true;
+
+ let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
return Ok(version_uuid);
}
- let version = Version {
- uuid: version_uuid,
- deleted: false,
- blocks: Vec::new(),
- bucket: bucket.into(),
- key: key.into(),
- };
+ let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
let first_block_hash = hash(&first_block[..]);
- object.versions[0].data = ObjectVersionData::FirstBlock(first_block_hash);
+ object_version.data = ObjectVersionData::FirstBlock(first_block_hash);
+ let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
let mut next_offset = first_block.len();
@@ -175,9 +167,12 @@ async fn handle_put(
// TODO: if at any step we have an error, we should undo everything we did
- object.versions[0].is_complete = true;
- object.versions[0].size = next_offset as u64;
+ object_version.is_complete = true;
+ object_version.size = next_offset as u64;
+
+ let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
+
Ok(version_uuid)
}
@@ -187,8 +182,9 @@ async fn put_block_meta(
offset: u64,
hash: Hash,
) -> Result<(), Error> {
+ // TODO: don't clone, restart from empty block list ??
let mut version = version.clone();
- version.blocks.push(VersionBlock { offset, hash: hash });
+ version.add_block(VersionBlock { offset, hash }).unwrap();
let block_ref = BlockRef {
block: hash,
@@ -250,7 +246,7 @@ async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<U
None => false,
Some(o) => {
let mut has_active_version = false;
- for v in o.versions.iter() {
+ for v in o.versions().iter() {
if v.data != ObjectVersionData::DeleteMarker {
has_active_version = true;
break;
@@ -267,19 +263,18 @@ async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<U
let version_uuid = gen_uuid();
- let mut object = Object {
- bucket: bucket.into(),
- key: key.into(),
- versions: Vec::new(),
- };
- object.versions.push(Box::new(ObjectVersion {
- uuid: version_uuid,
- timestamp: now_msec(),
- mime_type: "application/x-delete-marker".into(),
- size: 0,
- is_complete: true,
- data: ObjectVersionData::DeleteMarker,
- }));
+ let object = Object::new(
+ bucket.into(),
+ key.into(),
+ vec![ObjectVersion {
+ uuid: version_uuid,
+ timestamp: now_msec(),
+ mime_type: "application/x-delete-marker".into(),
+ size: 0,
+ is_complete: true,
+ data: ObjectVersionData::DeleteMarker,
+ }],
+ );
garage.object_table.insert(&object).await?;
return Ok(version_uuid);
@@ -290,7 +285,7 @@ async fn handle_get(
bucket: &str,
key: &str,
) -> Result<Response<BodyType>, Error> {
- let mut object = match garage
+ let object = match garage
.object_table
.get(&bucket.to_string(), &key.to_string())
.await?
@@ -300,8 +295,8 @@ async fn handle_get(
};
let last_v = match object
- .versions
- .drain(..)
+ .versions()
+ .iter()
.rev()
.filter(|v| v.is_complete)
.next()
@@ -311,13 +306,13 @@ async fn handle_get(
};
let resp_builder = Response::builder()
- .header("Content-Type", last_v.mime_type)
+ .header("Content-Type", last_v.mime_type.to_string())
.status(StatusCode::OK);
- match last_v.data {
+ match &last_v.data {
ObjectVersionData::DeleteMarker => Err(Error::NotFound),
ObjectVersionData::Inline(bytes) => {
- let body: BodyType = Box::new(BytesBody::from(bytes));
+ let body: BodyType = Box::new(BytesBody::from(bytes.to_vec()));
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(first_block_hash) => {
@@ -331,7 +326,7 @@ async fn handle_get(
};
let mut blocks = version
- .blocks
+ .blocks()
.iter()
.map(|vb| (vb.hash, None))
.collect::<Vec<_>>();
diff --git a/src/store/bucket_table.rs b/src/store/bucket_table.rs
index 5604049c..7778b8f9 100644
--- a/src/store/bucket_table.rs
+++ b/src/store/bucket_table.rs
@@ -15,7 +15,44 @@ pub struct Bucket {
pub deleted: bool,
// Authorized keys
- pub authorized_keys: Vec<AllowedKey>,
+ authorized_keys: Vec<AllowedKey>,
+}
+
+impl Bucket {
+ pub fn new(
+ name: String,
+ timestamp: u64,
+ deleted: bool,
+ authorized_keys: Vec<AllowedKey>,
+ ) -> Self {
+ let mut ret = Bucket {
+ name,
+ timestamp,
+ deleted,
+ authorized_keys: vec![],
+ };
+ for key in authorized_keys {
+ ret.add_key(key)
+ .expect("Duplicate AllowedKey in Bucket constructor");
+ }
+ ret
+ }
+ /// Add a key only if it is not already present
+ pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> {
+ match self
+ .authorized_keys
+ .binary_search_by(|k| k.access_key_id.cmp(&key.access_key_id))
+ {
+ Err(i) => {
+ self.authorized_keys.insert(i, key);
+ Ok(())
+ }
+ Ok(_) => Err(()),
+ }
+ }
+ pub fn authorized_keys(&self) -> &[AllowedKey] {
+ &self.authorized_keys[..]
+ }
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
@@ -39,9 +76,10 @@ impl Entry<EmptyKey, String> for Bucket {
*self = other.clone();
return;
}
- if self.timestamp > other.timestamp {
+ if self.timestamp > other.timestamp || self.deleted {
return;
}
+
for ak in other.authorized_keys.iter() {
match self
.authorized_keys
@@ -50,9 +88,7 @@ impl Entry<EmptyKey, String> for Bucket {
Ok(i) => {
let our_ak = &mut self.authorized_keys[i];
if ak.timestamp > our_ak.timestamp {
- our_ak.timestamp = ak.timestamp;
- our_ak.allowed_read = ak.allowed_read;
- our_ak.allowed_write = ak.allowed_write;
+ *our_ak = ak.clone();
}
}
Err(i) => {
diff --git a/src/store/key_table.rs b/src/store/key_table.rs
new file mode 100644
index 00000000..7476622f
--- /dev/null
+++ b/src/store/key_table.rs
@@ -0,0 +1,121 @@
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+
+use crate::error::Error;
+use crate::table::*;
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct Key {
+ // Primary key
+ pub access_key_id: String,
+
+ // Associated secret key (immutable)
+ pub secret_access_key: String,
+
+ // Deletion
+ pub deleted: bool,
+
+ // Authorized keys
+ authorized_buckets: Vec<AllowedBucket>,
+}
+
+impl Key {
+ pub fn new(buckets: Vec<AllowedBucket>) -> Self {
+ let access_key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
+ let secret_access_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
+ let mut ret = Self {
+ access_key_id,
+ secret_access_key,
+ deleted: false,
+ authorized_buckets: vec![],
+ };
+ for b in buckets {
+ ret.add_bucket(b);
+ }
+ ret
+ }
+ pub fn delete(access_key_id: String, secret_access_key: String) -> Self {
+ Self {
+ access_key_id,
+ secret_access_key,
+ deleted: true,
+ authorized_buckets: vec![],
+ }
+ }
+ /// Add an authorized bucket, only if it wasn't there before
+ pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> {
+ match self
+ .authorized_buckets
+ .binary_search_by(|b| b.bucket.cmp(&new.bucket))
+ {
+ Err(i) => {
+ self.authorized_buckets.insert(i, new);
+ Ok(())
+ }
+ Ok(_) => Err(()),
+ }
+ }
+ pub fn authorized_buckets(&self) -> &[AllowedBucket] {
+ &self.authorized_buckets[..]
+ }
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct AllowedBucket {
+ pub bucket: String,
+ pub timestamp: u64,
+ pub allowed_read: bool,
+ pub allowed_write: bool,
+}
+
+impl Entry<EmptyKey, String> for Key {
+ fn partition_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+ fn sort_key(&self) -> &String {
+ &self.access_key_id
+ }
+
+ fn merge(&mut self, other: &Self) {
+ if other.deleted {
+ self.deleted = true;
+ self.authorized_buckets.clear();
+ return;
+ }
+
+ for ab in other.authorized_buckets.iter() {
+ match self
+ .authorized_buckets
+ .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket))
+ {
+ Ok(i) => {
+ let our_ab = &mut self.authorized_buckets[i];
+ if ab.timestamp > our_ab.timestamp {
+ *our_ab = ab.clone();
+ }
+ }
+ Err(i) => {
+ self.authorized_buckets.insert(i, ab.clone());
+ }
+ }
+ }
+ }
+}
+
+pub struct KeyTable;
+
+#[async_trait]
+impl TableSchema for KeyTable {
+ type P = EmptyKey;
+ type S = String;
+ type E = Key;
+ type Filter = ();
+
+ async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> {
+ Ok(())
+ }
+
+ fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
+ !entry.deleted
+ }
+}
diff --git a/src/store/mod.rs b/src/store/mod.rs
index afadc9bb..b6a8dc46 100644
--- a/src/store/mod.rs
+++ b/src/store/mod.rs
@@ -1,5 +1,6 @@
pub mod block;
pub mod block_ref_table;
pub mod bucket_table;
+pub mod key_table;
pub mod object_table;
pub mod version_table;
diff --git a/src/store/object_table.rs b/src/store/object_table.rs
index 97de0cdb..f329a7f4 100644
--- a/src/store/object_table.rs
+++ b/src/store/object_table.rs
@@ -20,7 +20,38 @@ pub struct Object {
pub key: String,
// Data
- pub versions: Vec<Box<ObjectVersion>>,
+ versions: Vec<ObjectVersion>,
+}
+
+impl Object {
+ pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
+ let mut ret = Self {
+ bucket,
+ key,
+ versions: vec![],
+ };
+ for v in versions {
+ ret.add_version(v)
+ .expect("Twice the same ObjectVersion in Object constructor");
+ }
+ ret
+ }
+ /// Adds a version if it wasn't already present
+ pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> {
+ match self
+ .versions
+ .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key()))
+ {
+ Err(i) => {
+ self.versions.insert(i, new);
+ Ok(())
+ }
+ Ok(_) => Err(()),
+ }
+ }
+ pub fn versions(&self) -> &[ObjectVersion] {
+ &self.versions[..]
+ }
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
@@ -113,13 +144,13 @@ impl TableSchema for ObjectTable {
.binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
.is_err()
{
- let deleted_version = Version {
- uuid: v.uuid,
- deleted: true,
- blocks: vec![],
- bucket: old_v.bucket.clone(),
- key: old_v.key.clone(),
- };
+ let deleted_version = Version::new(
+ v.uuid,
+ old_v.bucket.clone(),
+ old_v.key.clone(),
+ true,
+ vec![],
+ );
version_table.insert(&deleted_version).await?;
}
}
diff --git a/src/store/version_table.rs b/src/store/version_table.rs
index d25a56ca..6d304cda 100644
--- a/src/store/version_table.rs
+++ b/src/store/version_table.rs
@@ -18,7 +18,7 @@ pub struct Version {
// Actual data: the blocks for this version
pub deleted: bool,
- pub blocks: Vec<VersionBlock>,
+ blocks: Vec<VersionBlock>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
@@ -26,6 +26,42 @@ pub struct Version {
pub key: String,
}
+impl Version {
+ pub fn new(
+ uuid: UUID,
+ bucket: String,
+ key: String,
+ deleted: bool,
+ blocks: Vec<VersionBlock>,
+ ) -> Self {
+ let mut ret = Self {
+ uuid,
+ deleted,
+ blocks: vec![],
+ bucket,
+ key,
+ };
+ for b in blocks {
+ ret.add_block(b)
+ .expect("Twice the same VersionBlock in Version constructor");
+ }
+ ret
+ }
+ /// Adds a block if it wasn't already present
+ pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> {
+ match self.blocks.binary_search_by(|b| b.offset.cmp(&new.offset)) {
+ Err(i) => {
+ self.blocks.insert(i, new);
+ Ok(())
+ }
+ Ok(_) => Err(()),
+ }
+ }
+ pub fn blocks(&self) -> &[VersionBlock] {
+ &self.blocks[..]
+ }
+}
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
pub offset: u64,