From 5b1117e582db16cc5aa50840a685875cbd5501f4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Dec 2021 13:55:11 +0100 Subject: New model for buckets --- src/api/Cargo.toml | 8 +- src/api/api_server.rs | 109 ++++++-- src/api/s3_bucket.rs | 62 ++++- src/api/s3_copy.rs | 20 +- src/api/s3_delete.rs | 14 +- src/api/s3_get.rs | 9 +- src/api/s3_list.rs | 8 +- src/api/s3_put.rs | 44 ++-- src/api/s3_website.rs | 27 +- src/api/signature.rs | 2 +- src/garage/Cargo.toml | 14 +- src/garage/admin.rs | 545 +++++++++++++++++++++++++--------------- src/garage/cli/cmd.rs | 7 +- src/garage/cli/util.rs | 53 ++-- src/garage/repair.rs | 4 +- src/model/Cargo.toml | 9 +- src/model/bucket_alias_table.rs | 68 +++++ src/model/bucket_helper.rs | 41 +++ src/model/bucket_table.rs | 94 ++++--- src/model/garage.rs | 17 ++ src/model/key_table.rs | 102 ++++---- src/model/lib.rs | 3 + src/model/object_table.rs | 16 +- src/model/permission.rs | 37 +++ src/model/version_table.rs | 12 +- src/rpc/Cargo.toml | 4 +- src/table/Cargo.toml | 6 +- src/table/schema.rs | 4 +- src/util/Cargo.toml | 2 +- src/util/crdt/deletable.rs | 72 ++++++ src/util/crdt/lww.rs | 5 + src/util/crdt/lww_map.rs | 12 +- src/util/crdt/mod.rs | 2 + src/util/error.rs | 29 +++ src/util/time.rs | 5 + src/web/Cargo.toml | 10 +- src/web/web_server.rs | 33 ++- 37 files changed, 1043 insertions(+), 466 deletions(-) create mode 100644 src/model/bucket_alias_table.rs create mode 100644 src/model/bucket_helper.rs create mode 100644 src/model/permission.rs create mode 100644 src/util/crdt/deletable.rs (limited to 'src') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 3ca46764..de58f78b 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_api" -version = "0.5.0" +version = "0.6.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -14,9 +14,9 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_model = { version = "0.5.0", path = "../model" } -garage_table = { version = "0.5.0", path = "../table" } -garage_util = { version = "0.5.0", path = "../util" } +garage_model = { version = "0.6.0", path = "../model" } +garage_table = { version = "0.6.0", path = "../table" } +garage_util = { version = "0.6.0", path = "../util" } base64 = "0.13" bytes = "1.0" diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 2de86233..cc9b9c38 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -7,9 +7,12 @@ use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; +use garage_util::crdt; +use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_model::garage::Garage; +use garage_model::key_table::Key; use crate::error::*; use crate::signature::check_signature; @@ -105,10 +108,20 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { + return handle_request_without_bucket(garage, req, api_key, endpoint).await + } + Authorization::Read(bucket) | Authorization::Write(bucket) => bucket.to_string(), + }; + + let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?; + let allowed = match endpoint.authorization_type() { - Authorization::None => true, - Authorization::Read(bucket) => api_key.allow_read(bucket), - Authorization::Write(bucket) => api_key.allow_write(bucket), + Authorization::Read(_) => api_key.allow_read(&bucket_id), + Authorization::Write(_) => api_key.allow_write(&bucket_id), + _ => unreachable!(), }; if !allowed { @@ -118,19 +131,18 @@ async fn handler_inner(garage: Arc, req: Request) -> Result handle_list_buckets(&api_key), - Endpoint::HeadObject { bucket, key, .. } => handle_head(garage, &req, &bucket, &key).await, - Endpoint::GetObject { bucket, key, .. } => handle_get(garage, &req, &bucket, &key).await, + Endpoint::HeadObject { key, .. } => handle_head(garage, &req, bucket_id, &key).await, + Endpoint::GetObject { key, .. } => handle_get(garage, &req, bucket_id, &key).await, Endpoint::UploadPart { - bucket, key, part_number, upload_id, + .. } => { handle_put_part( garage, req, - &bucket, + bucket_id, &key, part_number, &upload_id, @@ -138,38 +150,46 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { + Endpoint::CopyObject { key, .. } => { let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; let (source_bucket, source_key) = parse_bucket_key(©_source, None)?; - if !api_key.allow_read(source_bucket) { + let source_bucket_id = + resolve_bucket(&garage, &source_bucket.to_string(), &api_key).await?; + if !api_key.allow_read(&source_bucket_id) { return Err(Error::Forbidden(format!( "Reading from bucket {} not allowed for this key", source_bucket ))); } let source_key = source_key.ok_or_bad_request("No source key specified")?; - handle_copy(garage, &req, &bucket, &key, source_bucket, source_key).await + handle_copy(garage, &req, bucket_id, &key, source_bucket_id, source_key).await } - Endpoint::PutObject { bucket, key } => { - handle_put(garage, req, &bucket, &key, content_sha256).await + Endpoint::PutObject { key, .. } => { + handle_put(garage, req, bucket_id, &key, content_sha256).await } - Endpoint::AbortMultipartUpload { - bucket, - key, - upload_id, - } => handle_abort_multipart_upload(garage, &bucket, &key, &upload_id).await, - Endpoint::DeleteObject { bucket, key, .. } => handle_delete(garage, &bucket, &key).await, + Endpoint::AbortMultipartUpload { key, upload_id, .. } => { + handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await + } + Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await, Endpoint::CreateMultipartUpload { bucket, key } => { - handle_create_multipart_upload(garage, &req, &bucket, &key).await + handle_create_multipart_upload(garage, &req, &bucket, bucket_id, &key).await } Endpoint::CompleteMultipartUpload { bucket, key, upload_id, } => { - handle_complete_multipart_upload(garage, req, &bucket, &key, &upload_id, content_sha256) - .await + handle_complete_multipart_upload( + garage, + req, + &bucket, + bucket_id, + &key, + &upload_id, + content_sha256, + ) + .await } Endpoint::CreateBucket { bucket } => { debug!( @@ -206,7 +226,8 @@ async fn handler_inner(garage: Arc, req: Request) -> Result, req: Request) -> Result, req: Request) -> Result { - handle_delete_objects(garage, &bucket, req, content_sha256).await + Endpoint::DeleteObjects { .. } => { + handle_delete_objects(garage, bucket_id, req, content_sha256).await } Endpoint::PutBucketWebsite { bucket } => { handle_put_website(garage, bucket, req, content_sha256).await @@ -263,6 +285,41 @@ async fn handler_inner(garage: Arc, req: Request) -> Result, + _req: Request, + api_key: Key, + endpoint: Endpoint, +) -> Result, Error> { + match endpoint { + Endpoint::ListBuckets => handle_list_buckets(&garage, &api_key).await, + endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), + } +} + +#[allow(clippy::ptr_arg)] +async fn resolve_bucket( + garage: &Garage, + bucket_name: &String, + api_key: &Key, +) -> Result { + let api_key_params = api_key + .state + .as_option() + .ok_or_else(|| Error::Forbidden("Operation is not allowed for this key.".to_string()))?; + + if let Some(crdt::Deletable::Present(bucket_id)) = api_key_params.local_aliases.get(bucket_name) + { + Ok(*bucket_id) + } else { + Ok(garage + .bucket_helper() + .resolve_global_bucket_name(bucket_name) + .await? + .ok_or(Error::NotFound)?) + } +} + /// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in /// the host header of the request /// diff --git a/src/api/s3_bucket.rs b/src/api/s3_bucket.rs index 2be0a818..dc131a31 100644 --- a/src/api/s3_bucket.rs +++ b/src/api/s3_bucket.rs @@ -1,9 +1,12 @@ +use std::collections::HashMap; use std::sync::Arc; use hyper::{Body, Response}; use garage_model::garage::Garage; use garage_model::key_table::Key; +use garage_table::util::EmptyKey; +use garage_util::crdt::*; use garage_util::time::*; use crate::error::*; @@ -34,20 +37,65 @@ pub fn handle_get_bucket_versioning() -> Result, Error> { .body(Body::from(xml.into_bytes()))?) } -pub fn handle_list_buckets(api_key: &Key) -> Result, Error> { +pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result, Error> { + let key_state = api_key.state.as_option().ok_or_internal_error( + "Key should not be in deleted state at this point (internal error)", + )?; + + // Collect buckets user has access to + let ids = api_key + .state + .as_option() + .unwrap() + .authorized_buckets + .items() + .iter() + .filter(|(_, perms)| perms.allow_read || perms.allow_write) + .map(|(id, _)| *id) + .collect::>(); + + let mut buckets_by_id = HashMap::new(); + let mut aliases = HashMap::new(); + + for bucket_id in ids.iter() { + let bucket = garage.bucket_table.get(bucket_id, &EmptyKey).await?; + if let Some(bucket) = bucket { + if let Deletable::Present(param) = bucket.state { + for (alias, _, active) in param.aliases.items() { + if *active { + let alias_ent = garage.bucket_alias_table.get(&EmptyKey, alias).await?; + if let Some(alias_ent) = alias_ent { + if let Some(alias_p) = alias_ent.state.get().as_option() { + if alias_p.bucket_id == *bucket_id { + aliases.insert(alias_ent.name.clone(), *bucket_id); + } + } + } + } + } + buckets_by_id.insert(bucket_id, param); + } + } + } + + for (alias, _, id) in key_state.local_aliases.items() { + if let Some(id) = id.as_option() { + aliases.insert(alias.clone(), *id); + } + } + + // Generate response let list_buckets = s3_xml::ListAllMyBucketsResult { owner: s3_xml::Owner { display_name: s3_xml::Value(api_key.name.get().to_string()), id: s3_xml::Value(api_key.key_id.to_string()), }, buckets: s3_xml::BucketList { - entries: api_key - .authorized_buckets - .items() + entries: aliases .iter() - .filter(|(_, _, perms)| perms.allow_read || perms.allow_write) - .map(|(name, ts, _)| s3_xml::Bucket { - creation_date: s3_xml::Value(msec_to_rfc3339(*ts)), + .filter_map(|(name, id)| buckets_by_id.get(id).map(|p| (name, id, p))) + .map(|(name, _id, param)| s3_xml::Bucket { + creation_date: s3_xml::Value(msec_to_rfc3339(param.creation_date)), name: s3_xml::Value(name.to_string()), }) .collect(), diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index 9ade6985..4ede8230 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -18,14 +18,14 @@ use crate::s3_xml; pub async fn handle_copy( garage: Arc, req: &Request, - dest_bucket: &str, + dest_bucket_id: Uuid, dest_key: &str, - source_bucket: &str, + source_bucket_id: Uuid, source_key: &str, ) -> Result, Error> { let source_object = garage .object_table - .get(&source_bucket.to_string(), &source_key.to_string()) + .get(&source_bucket_id, &source_key.to_string()) .await? .ok_or(Error::NotFound)?; @@ -76,7 +76,7 @@ pub async fn handle_copy( )), }; let dest_object = Object::new( - dest_bucket.to_string(), + dest_bucket_id, dest_key.to_string(), vec![dest_object_version], ); @@ -99,7 +99,7 @@ pub async fn handle_copy( state: ObjectVersionState::Uploading(new_meta.headers.clone()), }; let tmp_dest_object = Object::new( - dest_bucket.to_string(), + dest_bucket_id, dest_key.to_string(), vec![tmp_dest_object_version], ); @@ -109,12 +109,8 @@ pub async fn handle_copy( // this means that the BlockRef entries linked to this version cannot be // marked as deleted (they are marked as deleted only if the Version // doesn't exist or is marked as deleted). - let mut dest_version = Version::new( - new_uuid, - dest_bucket.to_string(), - dest_key.to_string(), - false, - ); + let mut dest_version = + Version::new(new_uuid, dest_bucket_id, dest_key.to_string(), false); garage.version_table.insert(&dest_version).await?; // Fill in block list for version and insert block refs @@ -151,7 +147,7 @@ pub async fn handle_copy( )), }; let dest_object = Object::new( - dest_bucket.to_string(), + dest_bucket_id, dest_key.to_string(), vec![dest_object_version], ); diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs index 425f86d7..1976139b 100644 --- a/src/api/s3_delete.rs +++ b/src/api/s3_delete.rs @@ -14,12 +14,12 @@ use crate::signature::verify_signed_content; async fn handle_delete_internal( garage: &Garage, - bucket: &str, + bucket_id: Uuid, key: &str, ) -> Result<(Uuid, Uuid), Error> { let object = garage .object_table - .get(&bucket.to_string(), &key.to_string()) + .get(&bucket_id, &key.to_string()) .await? .ok_or(Error::NotFound)?; // No need to delete @@ -45,7 +45,7 @@ async fn handle_delete_internal( let version_uuid = gen_uuid(); let object = Object::new( - bucket.into(), + bucket_id, key.into(), vec![ObjectVersion { uuid: version_uuid, @@ -61,11 +61,11 @@ async fn handle_delete_internal( pub async fn handle_delete( garage: Arc, - bucket: &str, + bucket_id: Uuid, key: &str, ) -> Result, Error> { let (_deleted_version, delete_marker_version) = - handle_delete_internal(&garage, bucket, key).await?; + handle_delete_internal(&garage, bucket_id, key).await?; Ok(Response::builder() .header("x-amz-version-id", hex::encode(delete_marker_version)) @@ -76,7 +76,7 @@ pub async fn handle_delete( pub async fn handle_delete_objects( garage: Arc, - bucket: &str, + bucket_id: Uuid, req: Request, content_sha256: Option, ) -> Result, Error> { @@ -90,7 +90,7 @@ pub async fn handle_delete_objects( let mut ret_errors = Vec::new(); for obj in cmd.objects.iter() { - match handle_delete_internal(&garage, bucket, &obj.key).await { + match handle_delete_internal(&garage, bucket_id, &obj.key).await { Ok((deleted_version, delete_marker_version)) => { if cmd.quiet { continue; diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs index 428bbf34..269a3fa8 100644 --- a/src/api/s3_get.rs +++ b/src/api/s3_get.rs @@ -7,6 +7,7 @@ use hyper::body::Bytes; use hyper::{Body, Request, Response, StatusCode}; use garage_table::EmptyKey; +use garage_util::data::*; use garage_model::garage::Garage; use garage_model::object_table::*; @@ -84,12 +85,12 @@ fn try_answer_cached( pub async fn handle_head( garage: Arc, req: &Request, - bucket: &str, + bucket_id: Uuid, key: &str, ) -> Result, Error> { let object = garage .object_table - .get(&bucket.to_string(), &key.to_string()) + .get(&bucket_id, &key.to_string()) .await? .ok_or(Error::NotFound)?; @@ -123,12 +124,12 @@ pub async fn handle_head( pub async fn handle_get( garage: Arc, req: &Request, - bucket: &str, + bucket_id: Uuid, key: &str, ) -> Result, Error> { let object = garage .object_table - .get(&bucket.to_string(), &key.to_string()) + .get(&bucket_id, &key.to_string()) .await? .ok_or(Error::NotFound)?; diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index df9c3e6b..07efb02d 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use hyper::{Body, Response}; +use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; @@ -18,7 +19,8 @@ use crate::s3_xml; #[derive(Debug)] pub struct ListObjectsQuery { pub is_v2: bool, - pub bucket: String, + pub bucket_name: String, + pub bucket_id: Uuid, pub delimiter: Option, pub max_keys: usize, pub prefix: String, @@ -102,7 +104,7 @@ pub async fn handle_list( let objects = garage .object_table .get_range( - &query.bucket, + &query.bucket_id, Some(next_chunk_start.clone()), Some(DeletedFilter::NotDeleted), query.max_keys + 1, @@ -232,7 +234,7 @@ pub async fn handle_list( let mut result = s3_xml::ListBucketResult { xmlns: (), - name: s3_xml::Value(query.bucket.to_string()), + name: s3_xml::Value(query.bucket_name.to_string()), prefix: uriencode_maybe(&query.prefix, query.urlencode_resp), marker: None, next_marker: None, diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index f63e8307..152e59b4 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -24,7 +24,7 @@ use crate::signature::verify_signed_content; pub async fn handle_put( garage: Arc, req: Request, - bucket: &str, + bucket_id: Uuid, key: &str, content_sha256: Option, ) -> Result, Error> { @@ -77,7 +77,7 @@ pub async fn handle_put( )), }; - let object = Object::new(bucket.into(), key.into(), vec![object_version]); + let object = Object::new(bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; return Ok(put_response(version_uuid, data_md5sum_hex)); @@ -90,14 +90,14 @@ pub async fn handle_put( timestamp: version_timestamp, state: ObjectVersionState::Uploading(headers.clone()), }; - let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); + let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; // Initialize corresponding entry in version table // Write this entry now, even with empty block list, // to prevent block_ref entries from being deleted (they can be deleted // if the reference a version that isn't found in the version table) - let version = Version::new(version_uuid, bucket.into(), key.into(), false); + let version = Version::new(version_uuid, bucket_id, key.into(), false); garage.version_table.insert(&version).await?; // Transfer data and verify checksum @@ -127,7 +127,7 @@ pub async fn handle_put( Err(e) => { // Mark object as aborted, this will free the blocks further down object_version.state = ObjectVersionState::Aborted; - let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); + let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; return Err(e); } @@ -143,7 +143,7 @@ pub async fn handle_put( }, first_block_hash, )); - let object = Object::new(bucket.into(), key.into(), vec![object_version]); + let object = Object::new(bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; Ok(put_response(version_uuid, md5sum_hex)) @@ -315,7 +315,8 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response { pub async fn handle_create_multipart_upload( garage: Arc, req: &Request, - bucket: &str, + bucket_name: &str, + bucket_id: Uuid, key: &str, ) -> Result, Error> { let version_uuid = gen_uuid(); @@ -327,20 +328,20 @@ pub async fn handle_create_multipart_upload( timestamp: now_msec(), state: ObjectVersionState::Uploading(headers), }; - let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]); + let object = Object::new(bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&object).await?; // Insert empty version so that block_ref entries refer to something // (they are inserted concurrently with blocks in the version table, so // there is the possibility that they are inserted before the version table // is created, in which case it is allowed to delete them, e.g. in repair_*) - let version = Version::new(version_uuid, bucket.into(), key.into(), false); + let version = Version::new(version_uuid, bucket_id, key.into(), false); garage.version_table.insert(&version).await?; // Send success response let result = s3_xml::InitiateMultipartUploadResult { xmlns: (), - bucket: s3_xml::Value(bucket.to_string()), + bucket: s3_xml::Value(bucket_name.to_string()), key: s3_xml::Value(key.to_string()), upload_id: s3_xml::Value(hex::encode(version_uuid)), }; @@ -352,7 +353,7 @@ pub async fn handle_create_multipart_upload( pub async fn handle_put_part( garage: Arc, req: Request, - bucket: &str, + bucket_id: Uuid, key: &str, part_number: u64, upload_id: &str, @@ -366,12 +367,11 @@ pub async fn handle_put_part( }; // Read first chuck, and at the same time try to get object to see if it exists - let bucket = bucket.to_string(); let key = key.to_string(); let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); let (object, first_block) = - futures::try_join!(garage.object_table.get(&bucket, &key), chunker.next(),)?; + futures::try_join!(garage.object_table.get(&bucket_id, &key), chunker.next(),)?; // Check object is valid and multipart block can be accepted let first_block = first_block.ok_or_else(|| Error::BadRequest("Empty body".to_string()))?; @@ -386,7 +386,7 @@ pub async fn handle_put_part( } // Copy block to store - let version = Version::new(version_uuid, bucket, key, false); + let version = Version::new(version_uuid, bucket_id, key, false); let first_block_hash = blake2sum(&first_block[..]); let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( &garage, @@ -424,7 +424,8 @@ pub async fn handle_put_part( pub async fn handle_complete_multipart_upload( garage: Arc, req: Request, - bucket: &str, + bucket_name: &str, + bucket_id: Uuid, key: &str, upload_id: &str, content_sha256: Option, @@ -442,10 +443,9 @@ pub async fn handle_complete_multipart_upload( let version_uuid = decode_upload_id(upload_id)?; - let bucket = bucket.to_string(); let key = key.to_string(); let (object, version) = futures::try_join!( - garage.object_table.get(&bucket, &key), + garage.object_table.get(&bucket_id, &key), garage.version_table.get(&version_uuid, &EmptyKey), )?; @@ -510,14 +510,14 @@ pub async fn handle_complete_multipart_upload( version.blocks.items()[0].1.hash, )); - let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]); + let final_object = Object::new(bucket_id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; // Send response saying ok we're done let result = s3_xml::CompleteMultipartUploadResult { xmlns: (), location: None, - bucket: s3_xml::Value(bucket), + bucket: s3_xml::Value(bucket_name.to_string()), key: s3_xml::Value(key), etag: s3_xml::Value(etag), }; @@ -528,7 +528,7 @@ pub async fn handle_complete_multipart_upload( pub async fn handle_abort_multipart_upload( garage: Arc, - bucket: &str, + bucket_id: Uuid, key: &str, upload_id: &str, ) -> Result, Error> { @@ -536,7 +536,7 @@ pub async fn handle_abort_multipart_upload( let object = garage .object_table - .get(&bucket.to_string(), &key.to_string()) + .get(&bucket_id, &key.to_string()) .await?; let object = object.ok_or_else(|| Error::BadRequest("Object not found".to_string()))?; @@ -550,7 +550,7 @@ pub async fn handle_abort_multipart_upload( }; object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]); + let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; Ok(Response::new(Body::from(vec![]))) diff --git a/src/api/s3_website.rs b/src/api/s3_website.rs index 37c8b86c..da67c4cd 100644 --- a/src/api/s3_website.rs +++ b/src/api/s3_website.rs @@ -7,9 +7,10 @@ use serde::{Deserialize, Serialize}; use crate::error::*; use crate::s3_xml::{xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; -use garage_model::bucket_table::BucketState; + use garage_model::garage::Garage; use garage_table::*; +use garage_util::crdt; use garage_util::data::Hash; pub async fn handle_delete_website( @@ -17,14 +18,18 @@ pub async fn handle_delete_website( bucket: String, ) -> Result, Error> { let mut bucket = garage - .bucket_table + .bucket_alias_table .get(&EmptyKey, &bucket) .await? .ok_or(Error::NotFound)?; - if let BucketState::Present(state) = bucket.state.get_mut() { - state.website.update(false); - garage.bucket_table.insert(&bucket).await?; + if let crdt::Deletable::Present(state) = bucket.state.get_mut() { + let mut new_param = state.clone(); + new_param.website_access = false; + bucket.state.update(crdt::Deletable::present(new_param)); + garage.bucket_alias_table.insert(&bucket).await?; + } else { + unreachable!(); } Ok(Response::builder() @@ -43,7 +48,7 @@ pub async fn handle_put_website( verify_signed_content(content_sha256, &body[..])?; let mut bucket = garage - .bucket_table + .bucket_alias_table .get(&EmptyKey, &bucket) .await? .ok_or(Error::NotFound)?; @@ -51,9 +56,13 @@ pub async fn handle_put_website( let conf: WebsiteConfiguration = from_reader(&body as &[u8])?; conf.validate()?; - if let BucketState::Present(state) = bucket.state.get_mut() { - state.website.update(true); - garage.bucket_table.insert(&bucket).await?; + if let crdt::Deletable::Present(state) = bucket.state.get() { + let mut new_param = state.clone(); + new_param.website_access = true; + bucket.state.update(crdt::Deletable::present(new_param)); + garage.bucket_alias_table.insert(&bucket).await?; + } else { + unreachable!(); } Ok(Response::builder() diff --git a/src/api/signature.rs b/src/api/signature.rs index 53ca2ce5..b5da7b62 100644 --- a/src/api/signature.rs +++ b/src/api/signature.rs @@ -64,7 +64,7 @@ pub async fn check_signature( .key_table .get(&EmptyKey, &authorization.key_id) .await? - .filter(|k| !k.deleted.get()) + .filter(|k| !k.state.is_deleted()) .ok_or_else(|| Error::Forbidden(format!("No such key: {}", authorization.key_id)))?; let canonical_request = canonical_request( diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 74a6ab0e..44cacde3 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage" -version = "0.5.0" +version = "0.6.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -15,12 +15,12 @@ path = "main.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_api = { version = "0.5.0", path = "../api" } -garage_model = { version = "0.5.0", path = "../model" } -garage_rpc = { version = "0.5.0", path = "../rpc" } -garage_table = { version = "0.5.0", path = "../table" } -garage_util = { version = "0.5.0", path = "../util" } -garage_web = { version = "0.5.0", path = "../web" } +garage_api = { version = "0.6.0", path = "../api" } +garage_model = { version = "0.6.0", path = "../model" } +garage_rpc = { version = "0.6.0", path = "../rpc" } +garage_table = { version = "0.6.0", path = "../table" } +garage_util = { version = "0.6.0", path = "../util" } +garage_web = { version = "0.6.0", path = "../web" } bytes = "1.0" git-version = "0.3.4" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c7472670..6db8bfbe 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -5,17 +5,21 @@ use std::sync::Arc; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use garage_util::error::Error; +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::error::*; +use garage_util::time::*; -use garage_table::crdt::Crdt; use garage_table::replication::*; use garage_table::*; use garage_rpc::*; +use garage_model::bucket_alias_table::*; use garage_model::bucket_table::*; use garage_model::garage::Garage; use garage_model::key_table::*; +use garage_model::permission::*; use crate::cli::*; use crate::repair::Repair; @@ -31,7 +35,7 @@ pub enum AdminRpc { // Replies Ok(String), - BucketList(Vec), + BucketList(Vec), BucketInfo(Bucket), KeyList(Vec<(String, String)>), KeyInfo(Key), @@ -56,203 +60,331 @@ impl AdminRpcHandler { async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result { match cmd { - BucketOperation::List => { - let bucket_names = self + BucketOperation::List => self.handle_list_buckets().await, + BucketOperation::Info(query) => { + let bucket_id = self .garage - .bucket_table - .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) + .bucket_helper() + .resolve_global_bucket_name(&query.name) .await? - .iter() - .map(|b| b.name.to_string()) - .collect::>(); - Ok(AdminRpc::BucketList(bucket_names)) - } - BucketOperation::Info(query) => { - let bucket = self.get_existing_bucket(&query.name).await?; - Ok(AdminRpc::BucketInfo(bucket)) - } - BucketOperation::Create(query) => { - let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? { - Some(mut bucket) => { - if !bucket.is_deleted() { - return Err(Error::BadRpc(format!( - "Bucket {} already exists", - query.name - ))); - } - bucket - .state - .update(BucketState::Present(BucketParams::new())); - bucket - } - None => Bucket::new(query.name.clone()), - }; - self.garage.bucket_table.insert(&bucket).await?; - Ok(AdminRpc::Ok(format!("Bucket {} was created.", query.name))) - } - BucketOperation::Delete(query) => { - let mut bucket = self.get_existing_bucket(&query.name).await?; - let objects = self + .ok_or_message("Bucket not found")?; + let bucket = self .garage - .object_table - .get_range(&query.name, None, Some(DeletedFilter::NotDeleted), 10) + .bucket_helper() + .get_existing_bucket(bucket_id) .await?; - if !objects.is_empty() { - return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name))); - } - if !query.yes { - return Err(Error::BadRpc( - "Add --yes flag to really perform this operation".to_string(), - )); - } - // --- done checking, now commit --- - for (key_id, _, _) in bucket.authorized_keys() { - if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { - if !key.deleted.get() { - self.update_key_bucket(&key, &bucket.name, false, false) - .await?; - } - } else { - return Err(Error::Message(format!("Key not found: {}", key_id))); - } - } - bucket.state.update(BucketState::Deleted); - self.garage.bucket_table.insert(&bucket).await?; - Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) - } - BucketOperation::Allow(query) => { - let key = self.get_existing_key(&query.key_pattern).await?; - let bucket = self.get_existing_bucket(&query.bucket).await?; - let allow_read = query.read || key.allow_read(&query.bucket); - let allow_write = query.write || key.allow_write(&query.bucket); - self.update_key_bucket(&key, &query.bucket, allow_read, allow_write) - .await?; - self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) - .await?; - Ok(AdminRpc::Ok(format!( - "New permissions for {} on {}: read {}, write {}.", - &key.key_id, &query.bucket, allow_read, allow_write - ))) - } - BucketOperation::Deny(query) => { - let key = self.get_existing_key(&query.key_pattern).await?; - let bucket = self.get_existing_bucket(&query.bucket).await?; - let allow_read = !query.read && key.allow_read(&query.bucket); - let allow_write = !query.write && key.allow_write(&query.bucket); - self.update_key_bucket(&key, &query.bucket, allow_read, allow_write) - .await?; - self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write) - .await?; - Ok(AdminRpc::Ok(format!( - "New permissions for {} on {}: read {}, write {}.", - &key.key_id, &query.bucket, allow_read, allow_write - ))) + Ok(AdminRpc::BucketInfo(bucket)) } - BucketOperation::Website(query) => { - let mut bucket = self.get_existing_bucket(&query.bucket).await?; + BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await, + BucketOperation::Delete(query) => self.handle_delete_bucket(query).await, + BucketOperation::Allow(query) => self.handle_bucket_allow(query).await, + BucketOperation::Deny(query) => self.handle_bucket_deny(query).await, + BucketOperation::Website(query) => self.handle_bucket_website(query).await, + } + } + + async fn handle_list_buckets(&self) -> Result { + let bucket_aliases = self + .garage + .bucket_alias_table + .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) + .await?; + Ok(AdminRpc::BucketList(bucket_aliases)) + } - if !(query.allow ^ query.deny) { - return Err(Error::Message( - "You must specify exactly one flag, either --allow or --deny".to_string(), - )); + #[allow(clippy::ptr_arg)] + async fn handle_create_bucket(&self, name: &String) -> Result { + let mut bucket = Bucket::new(); + let alias = match self.garage.bucket_alias_table.get(&EmptyKey, name).await? { + Some(mut alias) => { + if !alias.state.get().is_deleted() { + return Err(Error::BadRpc(format!("Bucket {} already exists", name))); } + alias.state.update(Deletable::Present(AliasParams { + bucket_id: bucket.id, + website_access: false, + })); + alias + } + None => BucketAlias::new(name.clone(), bucket.id, false), + }; + bucket + .state + .as_option_mut() + .unwrap() + .aliases + .update_in_place(name.clone(), true); + self.garage.bucket_table.insert(&bucket).await?; + self.garage.bucket_alias_table.insert(&alias).await?; + Ok(AdminRpc::Ok(format!("Bucket {} was created.", name))) + } - if let BucketState::Present(state) = bucket.state.get_mut() { - state.website.update(query.allow); - self.garage.bucket_table.insert(&bucket).await?; - let msg = if query.allow { - format!("Website access allowed for {}", &query.bucket) - } else { - format!("Website access denied for {}", &query.bucket) - }; + async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result { + let mut bucket_alias = self + .garage + .bucket_alias_table + .get(&EmptyKey, &query.name) + .await? + .filter(|a| !a.is_deleted()) + .ok_or_message(format!("Bucket {} does not exist", query.name))?; - Ok(AdminRpc::Ok(msg)) - } else { - unreachable!(); + let bucket_id = bucket_alias.state.get().as_option().unwrap().bucket_id; + + // Check bucket doesn't have other aliases + let mut bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let bucket_state = bucket.state.as_option().unwrap(); + if bucket_state + .aliases + .items() + .iter() + .filter(|(_, _, active)| *active) + .any(|(name, _, _)| name != &query.name) + { + return Err(Error::Message(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name))); + } + if bucket_state + .local_aliases + .items() + .iter() + .any(|(_, _, active)| *active) + { + return Err(Error::Message(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name))); + } + + // Check bucket is empty + let objects = self + .garage + .object_table + .get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10) + .await?; + if !objects.is_empty() { + return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name))); + } + + if !query.yes { + return Err(Error::BadRpc( + "Add --yes flag to really perform this operation".to_string(), + )); + } + + // --- done checking, now commit --- + // 1. delete authorization from keys that had access + for (key_id, _) in bucket.authorized_keys() { + if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { + if !key.state.is_deleted() { + self.update_key_bucket(&key, bucket.id, false, false) + .await?; } + } else { + return Err(Error::Message(format!("Key not found: {}", key_id))); } } + // 2. delete bucket alias + bucket_alias.state.update(Deletable::Deleted); + self.garage.bucket_alias_table.insert(&bucket_alias).await?; + // 3. delete bucket alias + bucket.state = Deletable::delete(); + self.garage.bucket_table.insert(&bucket).await?; + + Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name))) + } + + async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result { + let bucket_id = self + .garage + .bucket_helper() + .resolve_global_bucket_name(&query.bucket) + .await? + .ok_or_message("Bucket not found")?; + let bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let key = self.get_existing_key(&query.key_pattern).await?; + + let allow_read = query.read || key.allow_read(&bucket_id); + let allow_write = query.write || key.allow_write(&bucket_id); + + let new_perm = self + .update_key_bucket(&key, bucket_id, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &key.key_id, new_perm) + .await?; + + Ok(AdminRpc::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &key.key_id, &query.bucket, allow_read, allow_write + ))) + } + + async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result { + let bucket_id = self + .garage + .bucket_helper() + .resolve_global_bucket_name(&query.bucket) + .await? + .ok_or_message("Bucket not found")?; + let bucket = self + .garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let key = self.get_existing_key(&query.key_pattern).await?; + + let allow_read = !query.read && key.allow_read(&bucket_id); + let allow_write = !query.write && key.allow_write(&bucket_id); + + let new_perm = self + .update_key_bucket(&key, bucket_id, allow_read, allow_write) + .await?; + self.update_bucket_key(bucket, &key.key_id, new_perm) + .await?; + + Ok(AdminRpc::Ok(format!( + "New permissions for {} on {}: read {}, write {}.", + &key.key_id, &query.bucket, allow_read, allow_write + ))) + } + + async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result { + let mut bucket_alias = self + .garage + .bucket_alias_table + .get(&EmptyKey, &query.bucket) + .await? + .filter(|a| !a.is_deleted()) + .ok_or_message(format!("Bucket {} does not exist", query.bucket))?; + + let mut state = bucket_alias.state.get().as_option().unwrap().clone(); + + if !(query.allow ^ query.deny) { + return Err(Error::Message( + "You must specify exactly one flag, either --allow or --deny".to_string(), + )); + } + + state.website_access = query.allow; + bucket_alias.state.update(Deletable::present(state)); + self.garage.bucket_alias_table.insert(&bucket_alias).await?; + + let msg = if query.allow { + format!("Website access allowed for {}", &query.bucket) + } else { + format!("Website access denied for {}", &query.bucket) + }; + + Ok(AdminRpc::Ok(msg)) } async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result { match cmd { - KeyOperation::List => { - let key_ids = self - .garage - .key_table - .get_range( - &EmptyKey, - None, - Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), - 10000, - ) - .await? - .iter() - .map(|k| (k.key_id.to_string(), k.name.get().clone())) - .collect::>(); - Ok(AdminRpc::KeyList(key_ids)) - } + KeyOperation::List => self.handle_list_keys().await, KeyOperation::Info(query) => { let key = self.get_existing_key(&query.key_pattern).await?; Ok(AdminRpc::KeyInfo(key)) } - KeyOperation::New(query) => { - let key = Key::new(query.name.clone()); - self.garage.key_table.insert(&key).await?; - Ok(AdminRpc::KeyInfo(key)) - } - KeyOperation::Rename(query) => { - let mut key = self.get_existing_key(&query.key_pattern).await?; - key.name.update(query.new_name.clone()); - self.garage.key_table.insert(&key).await?; - Ok(AdminRpc::KeyInfo(key)) - } - KeyOperation::Delete(query) => { - let key = self.get_existing_key(&query.key_pattern).await?; - if !query.yes { - return Err(Error::BadRpc( - "Add --yes flag to really perform this operation".to_string(), - )); - } - // --- done checking, now commit --- - for (ab_name, _, _) in key.authorized_buckets.items().iter() { - if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? { - if !bucket.is_deleted() { - self.update_bucket_key(bucket, &key.key_id, false, false) - .await?; - } - } else { - return Err(Error::Message(format!("Bucket not found: {}", ab_name))); + KeyOperation::New(query) => self.handle_create_key(query).await, + KeyOperation::Rename(query) => self.handle_rename_key(query).await, + KeyOperation::Delete(query) => self.handle_delete_key(query).await, + KeyOperation::Import(query) => self.handle_import_key(query).await, + } + } + + async fn handle_list_keys(&self) -> Result { + let key_ids = self + .garage + .key_table + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), + 10000, + ) + .await? + .iter() + .map(|k| (k.key_id.to_string(), k.name.get().clone())) + .collect::>(); + Ok(AdminRpc::KeyList(key_ids)) + } + + async fn handle_create_key(&self, query: &KeyNewOpt) -> Result { + let key = Key::new(query.name.clone()); + self.garage.key_table.insert(&key).await?; + Ok(AdminRpc::KeyInfo(key)) + } + + async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result { + let mut key = self.get_existing_key(&query.key_pattern).await?; + key.name.update(query.new_name.clone()); + self.garage.key_table.insert(&key).await?; + Ok(AdminRpc::KeyInfo(key)) + } + + async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result { + let mut key = self.get_existing_key(&query.key_pattern).await?; + if !query.yes { + return Err(Error::BadRpc( + "Add --yes flag to really perform this operation".to_string(), + )); + } + let state = key.state.as_option_mut().unwrap(); + + // --- done checking, now commit --- + // 1. Delete local aliases + for (alias, _, to) in state.local_aliases.items().iter() { + if let Deletable::Present(bucket_id) = to { + if let Some(mut bucket) = self.garage.bucket_table.get(bucket_id, &EmptyKey).await? + { + if let Deletable::Present(bucket_state) = &mut bucket.state { + bucket_state.local_aliases = bucket_state + .local_aliases + .update_mutator((key.key_id.to_string(), alias.to_string()), false); + self.garage.bucket_table.insert(&bucket).await?; } + } else { + // ignore } - let del_key = Key::delete(key.key_id.to_string()); - self.garage.key_table.insert(&del_key).await?; - Ok(AdminRpc::Ok(format!( - "Key {} was deleted successfully.", - key.key_id - ))) } - KeyOperation::Import(query) => { - let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; - if prev_key.is_some() { - return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); + } + // 2. Delete authorized buckets + for (ab_id, auth) in state.authorized_buckets.items().iter() { + if let Some(bucket) = self.garage.bucket_table.get(ab_id, &EmptyKey).await? { + let new_perm = BucketKeyPerm { + timestamp: increment_logical_clock(auth.timestamp), + allow_read: false, + allow_write: false, + }; + if !bucket.is_deleted() { + self.update_bucket_key(bucket, &key.key_id, new_perm) + .await?; } - let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); - self.garage.key_table.insert(&imported_key).await?; - Ok(AdminRpc::KeyInfo(imported_key)) + } else { + // ignore } } + // 3. Actually delete key + key.state = Deletable::delete(); + self.garage.key_table.insert(&key).await?; + + Ok(AdminRpc::Ok(format!( + "Key {} was deleted successfully.", + key.key_id + ))) } - #[allow(clippy::ptr_arg)] - async fn get_existing_bucket(&self, bucket: &String) -> Result { - self.garage - .bucket_table - .get(&EmptyKey, bucket) - .await? - .filter(|b| !b.is_deleted()) - .map(Ok) - .unwrap_or_else(|| Err(Error::BadRpc(format!("Bucket {} does not exist", bucket)))) + async fn handle_import_key(&self, query: &KeyImportOpt) -> Result { + let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; + if prev_key.is_some() { + return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); + } + let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); + self.garage.key_table.insert(&imported_key).await?; + Ok(AdminRpc::KeyInfo(imported_key)) } async fn get_existing_key(&self, pattern: &str) -> Result { @@ -267,7 +399,7 @@ impl AdminRpcHandler { ) .await? .into_iter() - .filter(|k| !k.deleted.get()) + .filter(|k| !k.state.is_deleted()) .collect::>(); if candidates.len() != 1 { Err(Error::Message(format!( @@ -279,51 +411,48 @@ impl AdminRpcHandler { } } - /// Update **bucket table** to inform of the new linked key - async fn update_bucket_key( - &self, - mut bucket: Bucket, - key_id: &str, - allow_read: bool, - allow_write: bool, - ) -> Result<(), Error> { - if let BucketState::Present(params) = bucket.state.get_mut() { - let ak = &mut params.authorized_keys; - let old_ak = ak.take_and_clear(); - ak.merge(&old_ak.update_mutator( - key_id.to_string(), - PermissionSet { - allow_read, - allow_write, - }, - )); - } else { - return Err(Error::Message( - "Bucket is deleted in update_bucket_key".to_string(), - )); - } - self.garage.bucket_table.insert(&bucket).await?; - Ok(()) - } - /// Update **key table** to inform of the new linked bucket async fn update_key_bucket( &self, key: &Key, - bucket: &str, + bucket_id: Uuid, allow_read: bool, allow_write: bool, - ) -> Result<(), Error> { + ) -> Result { let mut key = key.clone(); - let old_map = key.authorized_buckets.take_and_clear(); - key.authorized_buckets.merge(&old_map.update_mutator( - bucket.to_string(), - PermissionSet { + let mut key_state = key.state.as_option_mut().unwrap(); + + let perm = key_state + .authorized_buckets + .get(&bucket_id) + .cloned() + .map(|old_perm| BucketKeyPerm { + timestamp: increment_logical_clock(old_perm.timestamp), allow_read, allow_write, - }, - )); + }) + .unwrap_or(BucketKeyPerm { + timestamp: now_msec(), + allow_read, + allow_write, + }); + + key_state.authorized_buckets = Map::put_mutator(bucket_id, perm); + self.garage.key_table.insert(&key).await?; + Ok(perm) + } + + /// Update **bucket table** to inform of the new linked key + async fn update_bucket_key( + &self, + mut bucket: Bucket, + key_id: &str, + new_perm: BucketKeyPerm, + ) -> Result<(), Error> { + bucket.state.as_option_mut().unwrap().authorized_keys = + Map::put_mutator(key_id.to_string(), new_perm); + self.garage.bucket_table.insert(&bucket).await?; Ok(()) } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index a916974e..3cdf4d26 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -161,8 +161,11 @@ pub async fn cmd_admin( } AdminRpc::BucketList(bl) => { println!("List of buckets:"); - for bucket in bl { - println!("{}", bucket); + for alias in bl { + if let Some(p) = alias.state.get().as_option() { + let wflag = if p.website_access { "W" } else { " " }; + println!("- {} {} {:?}", wflag, alias.name, p.bucket_id); + } } } AdminRpc::BucketInfo(bucket) => { diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 647a2449..be34183e 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -1,3 +1,4 @@ +use garage_util::crdt::*; use garage_util::data::Uuid; use garage_util::error::*; @@ -8,26 +9,50 @@ pub fn print_key_info(key: &Key) { println!("Key name: {}", key.name.get()); println!("Key ID: {}", key.key_id); println!("Secret key: {}", key.secret_key); - if key.deleted.get() { - println!("Key is deleted."); - } else { - println!("Authorized buckets:"); - for (b, _, perm) in key.authorized_buckets.items().iter() { - println!("- {} R:{} W:{}", b, perm.allow_read, perm.allow_write); + match &key.state { + Deletable::Present(p) => { + println!("\nKey-specific bucket aliases:"); + for (alias_name, _, alias) in p.local_aliases.items().iter() { + if let Some(bucket_id) = alias.as_option() { + println!("- {} {:?}", alias_name, bucket_id); + } + } + println!("\nAuthorized buckets:"); + for (b, perm) in p.authorized_buckets.items().iter() { + let rflag = if perm.allow_read { "R" } else { " " }; + let wflag = if perm.allow_write { "W" } else { " " }; + println!("- {}{} {:?}", rflag, wflag, b); + } + } + Deletable::Deleted => { + println!("\nKey is deleted."); } } } pub fn print_bucket_info(bucket: &Bucket) { - println!("Bucket name: {}", bucket.name); - match bucket.state.get() { - BucketState::Deleted => println!("Bucket is deleted."), - BucketState::Present(p) => { - println!("Authorized keys:"); - for (k, _, perm) in p.authorized_keys.items().iter() { - println!("- {} R:{} W:{}", k, perm.allow_read, perm.allow_write); + println!("Bucket: {}", hex::encode(bucket.id)); + match &bucket.state { + Deletable::Deleted => println!("Bucket is deleted."), + Deletable::Present(p) => { + println!("\nGlobal aliases:"); + for (alias, _, active) in p.aliases.items().iter() { + if *active { + println!("- {}", alias); + } + } + println!("\nKey-specific aliases:"); + for ((key_id, alias), _, active) in p.local_aliases.items().iter() { + if *active { + println!("- {} {}", key_id, alias); + } + } + println!("\nAuthorized keys:"); + for (k, perm) in p.authorized_keys.items().iter() { + let rflag = if perm.allow_read { "R" } else { " " }; + let wflag = if perm.allow_write { "W" } else { " " }; + println!("- {}{} {}", rflag, wflag, k); } - println!("Website access: {}", p.website.get()); } }; } diff --git a/src/garage/repair.rs b/src/garage/repair.rs index a786f1f1..3666ca8f 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -77,7 +77,7 @@ impl Repair { let object = self .garage .object_table - .get(&version.bucket, &version.key) + .get(&version.bucket_id, &version.key) .await?; let version_exists = match object { Some(o) => o @@ -92,7 +92,7 @@ impl Repair { .version_table .insert(&Version::new( version.uuid, - version.bucket, + version.bucket_id, version.key, true, )) diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 1d695192..12c08719 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_model" -version = "0.5.0" +version = "0.6.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -14,9 +14,10 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_rpc = { version = "0.5.0", path = "../rpc" } -garage_table = { version = "0.5.0", path = "../table" } -garage_util = { version = "0.5.0", path = "../util" } +garage_rpc = { version = "0.6.0", path = "../rpc" } +garage_table = { version = "0.6.0", path = "../table" } +garage_util = { version = "0.6.0", path = "../util" } +garage_model_050 = { package = "garage_model", version = "0.5.0" } async-trait = "0.1.7" arc-swap = "1.0" diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs new file mode 100644 index 00000000..4d300d05 --- /dev/null +++ b/src/model/bucket_alias_table.rs @@ -0,0 +1,68 @@ +use serde::{Deserialize, Serialize}; + +use garage_table::crdt::*; +use garage_table::*; +use garage_util::data::*; + +/// The bucket alias table holds the names given to buckets +/// in the global namespace. +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct BucketAlias { + pub name: String, + pub state: crdt::Lww>, +} + +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct AliasParams { + pub bucket_id: Uuid, + pub website_access: bool, +} + +impl AutoCrdt for AliasParams { + const WARN_IF_DIFFERENT: bool = true; +} + +impl BucketAlias { + pub fn new(name: String, bucket_id: Uuid, website_access: bool) -> Self { + BucketAlias { + name, + state: crdt::Lww::new(crdt::Deletable::present(AliasParams { + bucket_id, + website_access, + })), + } + } + pub fn is_deleted(&self) -> bool { + self.state.get().is_deleted() + } +} + +impl Crdt for BucketAlias { + fn merge(&mut self, o: &Self) { + self.state.merge(&o.state); + } +} + +impl Entry for BucketAlias { + fn partition_key(&self) -> &EmptyKey { + &EmptyKey + } + fn sort_key(&self) -> &String { + &self.name + } +} + +pub struct BucketAliasTable; + +impl TableSchema for BucketAliasTable { + const TABLE_NAME: &'static str = "bucket_alias"; + + type P = EmptyKey; + type S = String; + type E = BucketAlias; + type Filter = DeletedFilter; + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.is_deleted()) + } +} diff --git a/src/model/bucket_helper.rs b/src/model/bucket_helper.rs new file mode 100644 index 00000000..e0720b4e --- /dev/null +++ b/src/model/bucket_helper.rs @@ -0,0 +1,41 @@ +use garage_util::data::*; +use garage_util::error::*; + +use garage_table::util::EmptyKey; + +use crate::bucket_table::Bucket; +use crate::garage::Garage; + +pub struct BucketHelper<'a>(pub(crate) &'a Garage); + +#[allow(clippy::ptr_arg)] +impl<'a> BucketHelper<'a> { + pub async fn resolve_global_bucket_name( + &self, + bucket_name: &String, + ) -> Result, Error> { + Ok(self + .0 + .bucket_alias_table + .get(&EmptyKey, bucket_name) + .await? + .map(|x| x.state.get().as_option().map(|x| x.bucket_id)) + .flatten()) + } + + #[allow(clippy::ptr_arg)] + pub async fn get_existing_bucket(&self, bucket_id: Uuid) -> Result { + self.0 + .bucket_table + .get(&bucket_id, &EmptyKey) + .await? + .filter(|b| !b.is_deleted()) + .map(Ok) + .unwrap_or_else(|| { + Err(Error::BadRpc(format!( + "Bucket {:?} does not exist", + bucket_id + ))) + }) + } +} diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 2cb206ce..ac40407e 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -2,8 +2,10 @@ use serde::{Deserialize, Serialize}; use garage_table::crdt::Crdt; use garage_table::*; +use garage_util::data::*; +use garage_util::time::*; -use crate::key_table::PermissionSet; +use crate::permission::BucketKeyPerm; /// A bucket is a collection of objects /// @@ -12,49 +14,38 @@ use crate::key_table::PermissionSet; /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present. #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { - /// Name of the bucket - pub name: String, + /// ID of the bucket + pub id: Uuid, /// State, and configuration if not deleted, of the bucket - pub state: crdt::Lww, -} - -/// State of a bucket -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub enum BucketState { - /// The bucket is deleted - Deleted, - /// The bucket exists - Present(BucketParams), -} - -impl Crdt for BucketState { - fn merge(&mut self, o: &Self) { - match o { - BucketState::Deleted => *self = BucketState::Deleted, - BucketState::Present(other_params) => { - if let BucketState::Present(params) = self { - params.merge(other_params); - } - } - } - } + pub state: crdt::Deletable, } /// Configuration for a bucket #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct BucketParams { + /// Bucket's creation date + pub creation_date: u64, /// Map of key with access to the bucket, and what kind of access they give - pub authorized_keys: crdt::LwwMap, - /// Is the bucket served as http - pub website: crdt::Lww, + pub authorized_keys: crdt::Map, + /// Map of aliases that are or have been given to this bucket + /// in the global namespace + /// (not authoritative: this is just used as an indication to + /// map back to aliases when doing ListBuckets) + pub aliases: crdt::LwwMap, + /// Map of aliases that are or have been given to this bucket + /// in namespaces local to keys + /// key = (access key id, alias name) + pub local_aliases: crdt::LwwMap<(String, String), bool>, } impl BucketParams { /// Create an empty BucketParams with no authorized keys and no website accesss pub fn new() -> Self { BucketParams { - authorized_keys: crdt::LwwMap::new(), - website: crdt::Lww::new(false), + creation_date: now_msec(), + authorized_keys: crdt::Map::new(), + aliases: crdt::LwwMap::new(), + local_aliases: crdt::LwwMap::new(), } } } @@ -62,7 +53,14 @@ impl BucketParams { impl Crdt for BucketParams { fn merge(&mut self, o: &Self) { self.authorized_keys.merge(&o.authorized_keys); - self.website.merge(&o.website); + self.aliases.merge(&o.aliases); + self.local_aliases.merge(&o.local_aliases); + } +} + +impl Default for Bucket { + fn default() -> Self { + Self::new() } } @@ -74,34 +72,34 @@ impl Default for BucketParams { impl Bucket { /// Initializes a new instance of the Bucket struct - pub fn new(name: String) -> Self { + pub fn new() -> Self { Bucket { - name, - state: crdt::Lww::new(BucketState::Present(BucketParams::new())), + id: gen_uuid(), + state: crdt::Deletable::present(BucketParams::new()), } } /// Returns true if this represents a deleted bucket pub fn is_deleted(&self) -> bool { - *self.state.get() == BucketState::Deleted + self.state.is_deleted() } /// Return the list of authorized keys, when each was updated, and the permission associated to /// the key - pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { - match self.state.get() { - BucketState::Deleted => &[], - BucketState::Present(state) => state.authorized_keys.items(), + pub fn authorized_keys(&self) -> &[(String, BucketKeyPerm)] { + match &self.state { + crdt::Deletable::Deleted => &[], + crdt::Deletable::Present(state) => state.authorized_keys.items(), } } } -impl Entry for Bucket { - fn partition_key(&self) -> &EmptyKey { - &EmptyKey +impl Entry for Bucket { + fn partition_key(&self) -> &Uuid { + &self.id } - fn sort_key(&self) -> &String { - &self.name + fn sort_key(&self) -> &EmptyKey { + &EmptyKey } } @@ -114,10 +112,10 @@ impl Crdt for Bucket { pub struct BucketTable; impl TableSchema for BucketTable { - const TABLE_NAME: &'static str = "bucket"; + const TABLE_NAME: &'static str = "bucket_v2"; - type P = EmptyKey; - type S = String; + type P = Uuid; + type S = EmptyKey; type E = Bucket; type Filter = DeletedFilter; diff --git a/src/model/garage.rs b/src/model/garage.rs index a874cca8..9db1843c 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -14,6 +14,8 @@ use garage_table::*; use crate::block::*; use crate::block_ref_table::*; +use crate::bucket_alias_table::*; +use crate::bucket_helper::*; use crate::bucket_table::*; use crate::key_table::*; use crate::object_table::*; @@ -35,6 +37,8 @@ pub struct Garage { /// Table containing informations about buckets pub bucket_table: Arc>, + /// Table containing informations about bucket aliases + pub bucket_alias_table: Arc>, /// Table containing informations about api keys pub key_table: Arc>, @@ -120,6 +124,14 @@ impl Garage { info!("Initialize bucket_table..."); let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db); + info!("Initialize bucket_alias_table..."); + let bucket_alias_table = Table::new( + BucketAliasTable, + control_rep_param.clone(), + system.clone(), + &db, + ); + info!("Initialize key_table_table..."); let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db); @@ -131,6 +143,7 @@ impl Garage { system, block_manager, bucket_table, + bucket_alias_table, key_table, object_table, version_table, @@ -148,4 +161,8 @@ impl Garage { pub fn break_reference_cycles(&self) { self.block_manager.garage.swap(None); } + + pub fn bucket_helper(&self) -> BucketHelper { + BucketHelper(self) + } } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 225f51c7..e87f5949 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -2,6 +2,9 @@ use serde::{Deserialize, Serialize}; use garage_table::crdt::*; use garage_table::*; +use garage_util::data::*; + +use crate::permission::BucketKeyPerm; /// An api key #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] @@ -15,12 +18,39 @@ pub struct Key { /// Name for the key pub name: crdt::Lww, - /// Is the key deleted - pub deleted: crdt::Bool, + /// If the key is present: it gives some permissions, + /// a map of bucket IDs (uuids) to permissions. + /// Otherwise no permissions are granted to key + pub state: crdt::Deletable, +} + +/// Configuration for a key +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct KeyParams { + pub authorized_buckets: crdt::Map, + pub local_aliases: crdt::LwwMap>, +} + +impl KeyParams { + pub fn new() -> Self { + KeyParams { + authorized_buckets: crdt::Map::new(), + local_aliases: crdt::LwwMap::new(), + } + } +} + +impl Default for KeyParams { + fn default() -> Self { + Self::new() + } +} - /// Buckets in which the key is authorized. Empty if `Key` is deleted - // CRDT interaction: deleted implies authorized_buckets is empty - pub authorized_buckets: crdt::LwwMap, +impl Crdt for KeyParams { + fn merge(&mut self, o: &Self) { + self.authorized_buckets.merge(&o.authorized_buckets); + self.local_aliases.merge(&o.local_aliases); + } } impl Key { @@ -32,8 +62,7 @@ impl Key { key_id, secret_key, name: crdt::Lww::new(name), - deleted: crdt::Bool::new(false), - authorized_buckets: crdt::LwwMap::new(), + state: crdt::Deletable::present(KeyParams::new()), } } @@ -43,8 +72,7 @@ impl Key { key_id: key_id.to_string(), secret_key: secret_key.to_string(), name: crdt::Lww::new(name.to_string()), - deleted: crdt::Bool::new(false), - authorized_buckets: crdt::LwwMap::new(), + state: crdt::Deletable::present(KeyParams::new()), } } @@ -54,41 +82,37 @@ impl Key { key_id, secret_key: "".into(), name: crdt::Lww::new("".to_string()), - deleted: crdt::Bool::new(true), - authorized_buckets: crdt::LwwMap::new(), + state: crdt::Deletable::Deleted, } } /// Check if `Key` is allowed to read in bucket - pub fn allow_read(&self, bucket: &str) -> bool { - self.authorized_buckets - .get(&bucket.to_string()) - .map(|x| x.allow_read) - .unwrap_or(false) + pub fn allow_read(&self, bucket: &Uuid) -> bool { + if let crdt::Deletable::Present(params) = &self.state { + params + .authorized_buckets + .get(bucket) + .map(|x| x.allow_read) + .unwrap_or(false) + } else { + false + } } /// Check if `Key` is allowed to write in bucket - pub fn allow_write(&self, bucket: &str) -> bool { - self.authorized_buckets - .get(&bucket.to_string()) - .map(|x| x.allow_write) - .unwrap_or(false) + pub fn allow_write(&self, bucket: &Uuid) -> bool { + if let crdt::Deletable::Present(params) = &self.state { + params + .authorized_buckets + .get(bucket) + .map(|x| x.allow_write) + .unwrap_or(false) + } else { + false + } } } -/// Permission given to a key in a bucket -#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] -pub struct PermissionSet { - /// The key can be used to read the bucket - pub allow_read: bool, - /// The key can be used to write in the bucket - pub allow_write: bool, -} - -impl AutoCrdt for PermissionSet { - const WARN_IF_DIFFERENT: bool = true; -} - impl Entry for Key { fn partition_key(&self) -> &EmptyKey { &EmptyKey @@ -101,13 +125,7 @@ impl Entry for Key { impl Crdt for Key { fn merge(&mut self, other: &Self) { self.name.merge(&other.name); - self.deleted.merge(&other.deleted); - - if self.deleted.get() { - self.authorized_buckets.clear(); - } else { - self.authorized_buckets.merge(&other.authorized_buckets); - } + self.state.merge(&other.state); } } @@ -129,7 +147,7 @@ impl TableSchema for KeyTable { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { match filter { - KeyFilter::Deleted(df) => df.apply(entry.deleted.get()), + KeyFilter::Deleted(df) => df.apply(entry.state.is_deleted()), KeyFilter::Matches(pat) => { let pat = pat.to_lowercase(); entry.key_id.to_lowercase().starts_with(&pat) diff --git a/src/model/lib.rs b/src/model/lib.rs index b4a8ddb7..fe8cfdad 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -3,8 +3,11 @@ extern crate log; pub mod block; pub mod block_ref_table; +pub mod bucket_alias_table; +pub mod bucket_helper; pub mod bucket_table; pub mod garage; pub mod key_table; pub mod object_table; +pub mod permission; pub mod version_table; diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 9eec47ff..285cb5a7 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -15,7 +15,7 @@ use crate::version_table::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { /// The bucket in which the object is stored, used as partition key - pub bucket: String, + pub bucket_id: Uuid, /// The key at which the object is stored in its bucket, used as sorting key pub key: String, @@ -26,9 +26,9 @@ pub struct Object { impl Object { /// Initialize an Object struct from parts - pub fn new(bucket: String, key: String, versions: Vec) -> Self { + pub fn new(bucket_id: Uuid, key: String, versions: Vec) -> Self { let mut ret = Self { - bucket, + bucket_id, key, versions: vec![], }; @@ -164,9 +164,9 @@ impl ObjectVersion { } } -impl Entry for Object { - fn partition_key(&self) -> &String { - &self.bucket +impl Entry for Object { + fn partition_key(&self) -> &Uuid { + &self.bucket_id } fn sort_key(&self) -> &String { &self.key @@ -219,7 +219,7 @@ pub struct ObjectTable { impl TableSchema for ObjectTable { const TABLE_NAME: &'static str = "object"; - type P = String; + type P = Uuid; type S = String; type E = Object; type Filter = DeletedFilter; @@ -242,7 +242,7 @@ impl TableSchema for ObjectTable { }; if newly_deleted { let deleted_version = - Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true); + Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); version_table.insert(&deleted_version).await?; } } diff --git a/src/model/permission.rs b/src/model/permission.rs new file mode 100644 index 00000000..b61c92ce --- /dev/null +++ b/src/model/permission.rs @@ -0,0 +1,37 @@ +use std::cmp::Ordering; + +use serde::{Deserialize, Serialize}; + +use garage_util::crdt::*; + +/// Permission given to a key in a bucket +#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] +pub struct BucketKeyPerm { + /// Timestamp at which the permission was given + pub timestamp: u64, + + /// The key can be used to read the bucket + pub allow_read: bool, + /// The key can be used to write in the bucket + pub allow_write: bool, +} + +impl Crdt for BucketKeyPerm { + fn merge(&mut self, other: &Self) { + match other.timestamp.cmp(&self.timestamp) { + Ordering::Greater => { + *self = *other; + } + Ordering::Equal if other != self => { + warn!("Different permission sets with same timestamp: {:?} and {:?}, merging to most restricted permission set.", self, other); + if !other.allow_read { + self.allow_read = false; + } + if !other.allow_write { + self.allow_write = false; + } + } + _ => (), + } + } +} diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 18ec8e1d..4edea0b7 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -29,19 +29,19 @@ pub struct Version { // Back link to bucket+key so that we can figure if // this was deleted later on /// Bucket in which the related object is stored - pub bucket: String, + pub bucket_id: Uuid, /// Key in which the related object is stored pub key: String, } impl Version { - pub fn new(uuid: Uuid, bucket: String, key: String, deleted: bool) -> Self { + pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { Self { uuid, deleted: deleted.into(), blocks: crdt::Map::new(), parts_etags: crdt::Map::new(), - bucket, + bucket_id, key, } } @@ -82,8 +82,8 @@ impl AutoCrdt for VersionBlock { const WARN_IF_DIFFERENT: bool = true; } -impl Entry for Version { - fn partition_key(&self) -> &Hash { +impl Entry for Version { + fn partition_key(&self) -> &Uuid { &self.uuid } fn sort_key(&self) -> &EmptyKey { @@ -116,7 +116,7 @@ pub struct VersionTable { impl TableSchema for VersionTable { const TABLE_NAME: &'static str = "version"; - type P = Hash; + type P = Uuid; type S = EmptyKey; type E = Version; type Filter = DeletedFilter; diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index d8ebb71e..b49a126a 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_rpc" -version = "0.5.0" +version = "0.6.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -14,7 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_util = { version = "0.5.0", path = "../util" } +garage_util = { version = "0.6.0", path = "../util" } arc-swap = "1.0" bytes = "1.0" diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index dc37f12c..91d71ddd 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_table" -version = "0.5.0" +version = "0.6.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -14,8 +14,8 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_rpc = { version = "0.5.0", path = "../rpc" } -garage_util = { version = "0.5.0", path = "../util" } +garage_rpc = { version = "0.6.0", path = "../rpc" } +garage_util = { version = "0.6.0", path = "../util" } async-trait = "0.1.7" bytes = "1.0" diff --git a/src/table/schema.rs b/src/table/schema.rs index fa51fa84..cfe86fba 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -16,7 +16,7 @@ impl PartitionKey for String { } } -impl PartitionKey for Hash { +impl PartitionKey for FixedBytes32 { fn hash(&self) -> Hash { *self } @@ -34,7 +34,7 @@ impl SortKey for String { } } -impl SortKey for Hash { +impl SortKey for FixedBytes32 { fn sort_key(&self) -> &[u8] { self.as_slice() } diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index e33f8a66..d5200f98 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_util" -version = "0.5.0" +version = "0.6.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" diff --git a/src/util/crdt/deletable.rs b/src/util/crdt/deletable.rs new file mode 100644 index 00000000..c76f5cbb --- /dev/null +++ b/src/util/crdt/deletable.rs @@ -0,0 +1,72 @@ +use serde::{Deserialize, Serialize}; + +use crate::crdt::crdt::*; + +/// Deletable object (once deleted, cannot go back) +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +pub enum Deletable { + Present(T), + Deleted, +} + +impl Deletable { + /// Create a new deletable object that isn't deleted + pub fn present(v: T) -> Self { + Self::Present(v) + } + /// Create a new deletable object that is deleted + pub fn delete() -> Self { + Self::Deleted + } + /// As option + pub fn as_option(&self) -> Option<&T> { + match self { + Self::Present(v) => Some(v), + Self::Deleted => None, + } + } + /// As option, mutable + pub fn as_option_mut(&mut self) -> Option<&mut T> { + match self { + Self::Present(v) => Some(v), + Self::Deleted => None, + } + } + /// Into option + pub fn into_option(self) -> Option { + match self { + Self::Present(v) => Some(v), + Self::Deleted => None, + } + } + /// Is object deleted? + pub fn is_deleted(&self) -> bool { + matches!(self, Self::Deleted) + } +} + +impl From> for Deletable { + fn from(v: Option) -> Self { + v.map(Self::Present).unwrap_or(Self::Deleted) + } +} + +impl From> for Option { + fn from(v: Deletable) -> Option { + match v { + Deletable::Present(v) => Some(v), + Deletable::Deleted => None, + } + } +} + +impl Crdt for Deletable { + fn merge(&mut self, other: &Self) { + if let Deletable::Present(v) = self { + match other { + Deletable::Deleted => *self = Deletable::Deleted, + Deletable::Present(v2) => v.merge(v2), + } + } + } +} diff --git a/src/util/crdt/lww.rs b/src/util/crdt/lww.rs index 43d13f27..bc686e05 100644 --- a/src/util/crdt/lww.rs +++ b/src/util/crdt/lww.rs @@ -82,6 +82,11 @@ where &self.v } + /// Take the value inside the CRDT (discards the timesamp) + pub fn take(self) -> T { + self.v + } + /// Get a mutable reference to the CRDT's value /// /// This is usefull to mutate the inside value without changing the LWW timestamp. diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs index 3e9aba79..21cb6e12 100644 --- a/src/util/crdt/lww_map.rs +++ b/src/util/crdt/lww_map.rs @@ -30,8 +30,8 @@ pub struct LwwMap { impl LwwMap where - K: Ord, - V: Crdt, + K: Clone + Ord, + V: Clone + Crdt, { /// Create a new empty map CRDT pub fn new() -> Self { @@ -73,6 +73,10 @@ where }; Self { vals: new_vals } } + + pub fn update_in_place(&mut self, k: K, new_v: V) { + self.merge(&self.update_mutator(k, new_v)); + } /// Takes all of the values of the map and returns them. The current map is reset to the /// empty map. This is very usefull to produce in-place a new map that contains only a delta /// that modifies a certain value: @@ -158,8 +162,8 @@ where impl Default for LwwMap where - K: Ord, - V: Crdt, + K: Clone + Ord, + V: Clone + Crdt, { fn default() -> Self { Self::new() diff --git a/src/util/crdt/mod.rs b/src/util/crdt/mod.rs index 9663a5a5..6ba575ed 100644 --- a/src/util/crdt/mod.rs +++ b/src/util/crdt/mod.rs @@ -12,12 +12,14 @@ mod bool; #[allow(clippy::module_inception)] mod crdt; +mod deletable; mod lww; mod lww_map; mod map; pub use self::bool::*; pub use crdt::*; +pub use deletable::*; pub use lww::*; pub use lww_map::*; pub use map::*; diff --git a/src/util/error.rs b/src/util/error.rs index ff03d05b..08cf1302 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -119,6 +119,35 @@ where } } +/// Trait to map error to the Bad Request error code +pub trait OkOrMessage { + type S2; + fn ok_or_message>(self, message: M) -> Self::S2; +} + +impl OkOrMessage for Result +where + E: std::fmt::Display, +{ + type S2 = Result; + fn ok_or_message>(self, message: M) -> Result { + match self { + Ok(x) => Ok(x), + Err(e) => Err(Error::Message(format!("{}: {}", message.into(), e))), + } + } +} + +impl OkOrMessage for Option { + type S2 = Result; + fn ok_or_message>(self, message: M) -> Result { + match self { + Some(x) => Ok(x), + None => Err(Error::Message(message.into())), + } + } +} + // Custom serialization for our error type, for use in RPC. // Errors are serialized as a string of their Display representation. // Upon deserialization, they all become a RemoteError with the diff --git a/src/util/time.rs b/src/util/time.rs index 238db2c3..d9192443 100644 --- a/src/util/time.rs +++ b/src/util/time.rs @@ -10,6 +10,11 @@ pub fn now_msec() -> u64 { .as_millis() as u64 } +/// Increment logical clock +pub fn increment_logical_clock(prev: u64) -> u64 { + std::cmp::max(prev + 1, now_msec()) +} + /// Convert a timestamp represented as milliseconds since UNIX Epoch to /// its RFC3339 representation, such as "2021-01-01T12:30:00Z" pub fn msec_to_rfc3339(msecs: u64) -> String { diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index 72701c90..54211f5d 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_web" -version = "0.5.0" +version = "0.6.0" authors = ["Alex Auvolat ", "Quentin Dufour "] edition = "2018" license = "AGPL-3.0" @@ -14,10 +14,10 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_api = { version = "0.5.0", path = "../api" } -garage_model = { version = "0.5.0", path = "../model" } -garage_util = { version = "0.5.0", path = "../util" } -garage_table = { version = "0.5.0", path = "../table" } +garage_api = { version = "0.6.0", path = "../api" } +garage_model = { version = "0.6.0", path = "../model" } +garage_util = { version = "0.6.0", path = "../util" } +garage_table = { version = "0.6.0", path = "../table" } err-derive = "0.3" log = "0.4" diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 4a603c05..5eb25e93 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -12,7 +12,6 @@ use hyper::{ use crate::error::*; use garage_api::helpers::{authority_to_host, host_to_bucket}; use garage_api::s3_get::{handle_get, handle_head}; -use garage_model::bucket_table::*; use garage_model::garage::Garage; use garage_table::*; use garage_util::error::Error as GarageError; @@ -77,31 +76,39 @@ async fn serve_file(garage: Arc, req: Request) -> Result Ok(()), - _ => Err(Error::NotFound), - }?; - // Get path let path = req.uri().path().to_string(); let index = &garage.config.s3_web.index; let key = path_to_key(&path, index)?; - info!("Selected bucket: \"{}\", selected key: \"{}\"", bucket, key); + info!( + "Selected bucket: \"{}\" {:?}, selected key: \"{}\"", + bucket_name, bucket_id, key + ); let res = match *req.method() { - Method::HEAD => handle_head(garage, &req, bucket, &key).await?, - Method::GET => handle_get(garage, &req, bucket, &key).await?, + Method::HEAD => handle_head(garage, &req, bucket_id, &key).await?, + Method::GET => handle_get(garage, &req, bucket_id, &key).await?, _ => return Err(Error::BadRequest("HTTP method not supported".to_string())), }; -- cgit v1.2.3