diff options
-rw-r--r-- | src/admin_rpc.rs | 35 | ||||
-rw-r--r-- | src/api/api_server.rs | 77 | ||||
-rw-r--r-- | src/store/bucket_table.rs | 46 | ||||
-rw-r--r-- | src/store/key_table.rs | 121 | ||||
-rw-r--r-- | src/store/mod.rs | 1 | ||||
-rw-r--r-- | src/store/object_table.rs | 47 | ||||
-rw-r--r-- | src/store/version_table.rs | 38 |
7 files changed, 290 insertions, 75 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs index fe59f92e..17ef5072 100644 --- a/src/admin_rpc.rs +++ b/src/admin_rpc.rs @@ -100,12 +100,7 @@ impl AdminRpcHandler { }; self.garage .bucket_table - .insert(&Bucket { - name: query.name.clone(), - timestamp: new_time, - deleted: false, - authorized_keys: vec![], - }) + .insert(&Bucket::new(query.name.clone(), new_time, false, vec![])) .await?; Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) } @@ -143,12 +138,12 @@ impl AdminRpcHandler { } self.garage .bucket_table - .insert(&Bucket { - name: query.name.clone(), - timestamp: std::cmp::max(bucket.timestamp + 1, now_msec()), - deleted: true, - authorized_keys: vec![], - }) + .insert(&Bucket::new( + query.name.clone(), + std::cmp::max(bucket.timestamp + 1, now_msec()), + true, + vec![], + )) .await?; Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) } @@ -292,7 +287,7 @@ impl AdminRpcHandler { .get(&version.bucket, &version.key) .await?; let version_exists = match object { - Some(o) => o.versions.iter().any(|x| x.uuid == version.uuid), + Some(o) => o.versions().iter().any(|x| x.uuid == version.uuid), None => { warn!( "Repair versions: object for version {:?} not found", @@ -305,13 +300,13 @@ impl AdminRpcHandler { info!("Repair versions: marking version as deleted: {:?}", version); self.garage .version_table - .insert(&Version { - uuid: version.uuid, - deleted: true, - blocks: vec![], - bucket: version.bucket, - key: version.key, - }) + .insert(&Version::new( + version.uuid, + version.bucket, + version.key, + true, + vec![], + )) .await?; } diff --git a/src/api/api_server.rs b/src/api/api_server.rs index a80b2ea2..905ba0dd 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -119,37 +119,29 @@ async fn handle_put( None => return Err(Error::BadRequest(format!("Empty body"))), }; - let mut object = Object { - bucket: bucket.into(), - key: key.into(), - versions: Vec::new(), - }; - object.versions.push(Box::new(ObjectVersion { + let mut object_version = ObjectVersion { uuid: version_uuid, timestamp: now_msec(), mime_type: mime_type.to_string(), size: first_block.len() as u64, is_complete: false, data: ObjectVersionData::DeleteMarker, - })); + }; if first_block.len() < INLINE_THRESHOLD { - object.versions[0].data = ObjectVersionData::Inline(first_block); - object.versions[0].is_complete = true; + object_version.data = ObjectVersionData::Inline(first_block); + object_version.is_complete = true; + + let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; return Ok(version_uuid); } - let version = Version { - uuid: version_uuid, - deleted: false, - blocks: Vec::new(), - bucket: bucket.into(), - key: key.into(), - }; + let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); let first_block_hash = hash(&first_block[..]); - object.versions[0].data = ObjectVersionData::FirstBlock(first_block_hash); + object_version.data = ObjectVersionData::FirstBlock(first_block_hash); + let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; let mut next_offset = first_block.len(); @@ -175,9 +167,12 @@ async fn handle_put( // TODO: if at any step we have an error, we should undo everything we did - object.versions[0].is_complete = true; - object.versions[0].size = next_offset as u64; + object_version.is_complete = true; + object_version.size = next_offset as u64; + + let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; + Ok(version_uuid) } @@ -187,8 +182,9 @@ async fn put_block_meta( offset: u64, hash: Hash, ) -> Result<(), Error> { + // TODO: don't clone, restart from empty block list ?? let mut version = version.clone(); - version.blocks.push(VersionBlock { offset, hash: hash }); + version.add_block(VersionBlock { offset, hash }).unwrap(); let block_ref = BlockRef { block: hash, @@ -250,7 +246,7 @@ async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<U None => false, Some(o) => { let mut has_active_version = false; - for v in o.versions.iter() { + for v in o.versions().iter() { if v.data != ObjectVersionData::DeleteMarker { has_active_version = true; break; @@ -267,19 +263,18 @@ async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<U let version_uuid = gen_uuid(); - let mut object = Object { - bucket: bucket.into(), - key: key.into(), - versions: Vec::new(), - }; - object.versions.push(Box::new(ObjectVersion { - uuid: version_uuid, - timestamp: now_msec(), - mime_type: "application/x-delete-marker".into(), - size: 0, - is_complete: true, - data: ObjectVersionData::DeleteMarker, - })); + let object = Object::new( + bucket.into(), + key.into(), + vec![ObjectVersion { + uuid: version_uuid, + timestamp: now_msec(), + mime_type: "application/x-delete-marker".into(), + size: 0, + is_complete: true, + data: ObjectVersionData::DeleteMarker, + }], + ); garage.object_table.insert(&object).await?; return Ok(version_uuid); @@ -290,7 +285,7 @@ async fn handle_get( bucket: &str, key: &str, ) -> Result<Response<BodyType>, Error> { - let mut object = match garage + let object = match garage .object_table .get(&bucket.to_string(), &key.to_string()) .await? @@ -300,8 +295,8 @@ async fn handle_get( }; let last_v = match object - .versions - .drain(..) + .versions() + .iter() .rev() .filter(|v| v.is_complete) .next() @@ -311,13 +306,13 @@ async fn handle_get( }; let resp_builder = Response::builder() - .header("Content-Type", last_v.mime_type) + .header("Content-Type", last_v.mime_type.to_string()) .status(StatusCode::OK); - match last_v.data { + match &last_v.data { ObjectVersionData::DeleteMarker => Err(Error::NotFound), ObjectVersionData::Inline(bytes) => { - let body: BodyType = Box::new(BytesBody::from(bytes)); + let body: BodyType = Box::new(BytesBody::from(bytes.to_vec())); Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(first_block_hash) => { @@ -331,7 +326,7 @@ async fn handle_get( }; let mut blocks = version - .blocks + .blocks() .iter() .map(|vb| (vb.hash, None)) .collect::<Vec<_>>(); diff --git a/src/store/bucket_table.rs b/src/store/bucket_table.rs index 5604049c..7778b8f9 100644 --- a/src/store/bucket_table.rs +++ b/src/store/bucket_table.rs @@ -15,7 +15,44 @@ pub struct Bucket { pub deleted: bool, // Authorized keys - pub authorized_keys: Vec<AllowedKey>, + authorized_keys: Vec<AllowedKey>, +} + +impl Bucket { + pub fn new( + name: String, + timestamp: u64, + deleted: bool, + authorized_keys: Vec<AllowedKey>, + ) -> Self { + let mut ret = Bucket { + name, + timestamp, + deleted, + authorized_keys: vec![], + }; + for key in authorized_keys { + ret.add_key(key) + .expect("Duplicate AllowedKey in Bucket constructor"); + } + ret + } + /// Add a key only if it is not already present + pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> { + match self + .authorized_keys + .binary_search_by(|k| k.access_key_id.cmp(&key.access_key_id)) + { + Err(i) => { + self.authorized_keys.insert(i, key); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn authorized_keys(&self) -> &[AllowedKey] { + &self.authorized_keys[..] + } } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] @@ -39,9 +76,10 @@ impl Entry<EmptyKey, String> for Bucket { *self = other.clone(); return; } - if self.timestamp > other.timestamp { + if self.timestamp > other.timestamp || self.deleted { return; } + for ak in other.authorized_keys.iter() { match self .authorized_keys @@ -50,9 +88,7 @@ impl Entry<EmptyKey, String> for Bucket { Ok(i) => { let our_ak = &mut self.authorized_keys[i]; if ak.timestamp > our_ak.timestamp { - our_ak.timestamp = ak.timestamp; - our_ak.allowed_read = ak.allowed_read; - our_ak.allowed_write = ak.allowed_write; + *our_ak = ak.clone(); } } Err(i) => { diff --git a/src/store/key_table.rs b/src/store/key_table.rs new file mode 100644 index 00000000..7476622f --- /dev/null +++ b/src/store/key_table.rs @@ -0,0 +1,121 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; + +use crate::error::Error; +use crate::table::*; + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct Key { + // Primary key + pub access_key_id: String, + + // Associated secret key (immutable) + pub secret_access_key: String, + + // Deletion + pub deleted: bool, + + // Authorized keys + authorized_buckets: Vec<AllowedBucket>, +} + +impl Key { + pub fn new(buckets: Vec<AllowedBucket>) -> Self { + let access_key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); + let secret_access_key = hex::encode(&rand::random::<[u8; 32]>()[..]); + let mut ret = Self { + access_key_id, + secret_access_key, + deleted: false, + authorized_buckets: vec![], + }; + for b in buckets { + ret.add_bucket(b); + } + ret + } + pub fn delete(access_key_id: String, secret_access_key: String) -> Self { + Self { + access_key_id, + secret_access_key, + deleted: true, + authorized_buckets: vec![], + } + } + /// Add an authorized bucket, only if it wasn't there before + pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> { + match self + .authorized_buckets + .binary_search_by(|b| b.bucket.cmp(&new.bucket)) + { + Err(i) => { + self.authorized_buckets.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn authorized_buckets(&self) -> &[AllowedBucket] { + &self.authorized_buckets[..] + } +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct AllowedBucket { + pub bucket: String, + pub timestamp: u64, + pub allowed_read: bool, + pub allowed_write: bool, +} + +impl Entry<EmptyKey, String> for Key { + fn partition_key(&self) -> &EmptyKey { + &EmptyKey + } + fn sort_key(&self) -> &String { + &self.access_key_id + } + + fn merge(&mut self, other: &Self) { + if other.deleted { + self.deleted = true; + self.authorized_buckets.clear(); + return; + } + + for ab in other.authorized_buckets.iter() { + match self + .authorized_buckets + .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket)) + { + Ok(i) => { + let our_ab = &mut self.authorized_buckets[i]; + if ab.timestamp > our_ab.timestamp { + *our_ab = ab.clone(); + } + } + Err(i) => { + self.authorized_buckets.insert(i, ab.clone()); + } + } + } + } +} + +pub struct KeyTable; + +#[async_trait] +impl TableSchema for KeyTable { + type P = EmptyKey; + type S = String; + type E = Key; + type Filter = (); + + async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> { + Ok(()) + } + + fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { + !entry.deleted + } +} diff --git a/src/store/mod.rs b/src/store/mod.rs index afadc9bb..b6a8dc46 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -1,5 +1,6 @@ pub mod block; pub mod block_ref_table; pub mod bucket_table; +pub mod key_table; pub mod object_table; pub mod version_table; diff --git a/src/store/object_table.rs b/src/store/object_table.rs index 97de0cdb..f329a7f4 100644 --- a/src/store/object_table.rs +++ b/src/store/object_table.rs @@ -20,7 +20,38 @@ pub struct Object { pub key: String, // Data - pub versions: Vec<Box<ObjectVersion>>, + versions: Vec<ObjectVersion>, +} + +impl Object { + pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self { + let mut ret = Self { + bucket, + key, + versions: vec![], + }; + for v in versions { + ret.add_version(v) + .expect("Twice the same ObjectVersion in Object constructor"); + } + ret + } + /// Adds a version if it wasn't already present + pub fn add_version(&mut self, new: ObjectVersion) -> Result<(), ()> { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&new.cmp_key())) + { + Err(i) => { + self.versions.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn versions(&self) -> &[ObjectVersion] { + &self.versions[..] + } } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] @@ -113,13 +144,13 @@ impl TableSchema for ObjectTable { .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) .is_err() { - let deleted_version = Version { - uuid: v.uuid, - deleted: true, - blocks: vec![], - bucket: old_v.bucket.clone(), - key: old_v.key.clone(), - }; + let deleted_version = Version::new( + v.uuid, + old_v.bucket.clone(), + old_v.key.clone(), + true, + vec![], + ); version_table.insert(&deleted_version).await?; } } diff --git a/src/store/version_table.rs b/src/store/version_table.rs index d25a56ca..6d304cda 100644 --- a/src/store/version_table.rs +++ b/src/store/version_table.rs @@ -18,7 +18,7 @@ pub struct Version { // Actual data: the blocks for this version pub deleted: bool, - pub blocks: Vec<VersionBlock>, + blocks: Vec<VersionBlock>, // Back link to bucket+key so that we can figure if // this was deleted later on @@ -26,6 +26,42 @@ pub struct Version { pub key: String, } +impl Version { + pub fn new( + uuid: UUID, + bucket: String, + key: String, + deleted: bool, + blocks: Vec<VersionBlock>, + ) -> Self { + let mut ret = Self { + uuid, + deleted, + blocks: vec![], + bucket, + key, + }; + for b in blocks { + ret.add_block(b) + .expect("Twice the same VersionBlock in Version constructor"); + } + ret + } + /// Adds a block if it wasn't already present + pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> { + match self.blocks.binary_search_by(|b| b.offset.cmp(&new.offset)) { + Err(i) => { + self.blocks.insert(i, new); + Ok(()) + } + Ok(_) => Err(()), + } + } + pub fn blocks(&self) -> &[VersionBlock] { + &self.blocks[..] + } +} + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct VersionBlock { pub offset: u64, |