aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/Cargo.toml8
-rw-r--r--src/api/api_server.rs109
-rw-r--r--src/api/s3_bucket.rs62
-rw-r--r--src/api/s3_copy.rs20
-rw-r--r--src/api/s3_delete.rs14
-rw-r--r--src/api/s3_get.rs9
-rw-r--r--src/api/s3_list.rs8
-rw-r--r--src/api/s3_put.rs44
-rw-r--r--src/api/s3_website.rs27
-rw-r--r--src/api/signature.rs2
-rw-r--r--src/garage/Cargo.toml14
-rw-r--r--src/garage/admin.rs545
-rw-r--r--src/garage/cli/cmd.rs7
-rw-r--r--src/garage/cli/util.rs53
-rw-r--r--src/garage/repair.rs4
-rw-r--r--src/model/Cargo.toml9
-rw-r--r--src/model/bucket_alias_table.rs68
-rw-r--r--src/model/bucket_helper.rs41
-rw-r--r--src/model/bucket_table.rs94
-rw-r--r--src/model/garage.rs17
-rw-r--r--src/model/key_table.rs102
-rw-r--r--src/model/lib.rs3
-rw-r--r--src/model/object_table.rs16
-rw-r--r--src/model/permission.rs37
-rw-r--r--src/model/version_table.rs12
-rw-r--r--src/rpc/Cargo.toml4
-rw-r--r--src/table/Cargo.toml6
-rw-r--r--src/table/schema.rs4
-rw-r--r--src/util/Cargo.toml2
-rw-r--r--src/util/crdt/deletable.rs72
-rw-r--r--src/util/crdt/lww.rs5
-rw-r--r--src/util/crdt/lww_map.rs12
-rw-r--r--src/util/crdt/mod.rs2
-rw-r--r--src/util/error.rs29
-rw-r--r--src/util/time.rs5
-rw-r--r--src/web/Cargo.toml10
-rw-r--r--src/web/web_server.rs33
37 files changed, 1043 insertions, 466 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml
index 3ca46764..de58f78b 100644
--- a/src/api/Cargo.toml
+++ b/src/api/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_api"
-version = "0.5.0"
+version = "0.6.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,9 +14,9 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_model = { version = "0.5.0", path = "../model" }
-garage_table = { version = "0.5.0", path = "../table" }
-garage_util = { version = "0.5.0", path = "../util" }
+garage_model = { version = "0.6.0", path = "../model" }
+garage_table = { version = "0.6.0", path = "../table" }
+garage_util = { version = "0.6.0", path = "../util" }
base64 = "0.13"
bytes = "1.0"
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index 2de86233..cc9b9c38 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -7,9 +7,12 @@ use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
+use garage_util::crdt;
+use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_model::garage::Garage;
+use garage_model::key_table::Key;
use crate::error::*;
use crate::signature::check_signature;
@@ -105,10 +108,20 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
.and_then(|root_domain| host_to_bucket(&host, root_domain));
let endpoint = Endpoint::from_request(&req, bucket.map(ToOwned::to_owned))?;
+
+ let bucket_name = match endpoint.authorization_type() {
+ Authorization::None => {
+ return handle_request_without_bucket(garage, req, api_key, endpoint).await
+ }
+ Authorization::Read(bucket) | Authorization::Write(bucket) => bucket.to_string(),
+ };
+
+ let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
+
let allowed = match endpoint.authorization_type() {
- Authorization::None => true,
- Authorization::Read(bucket) => api_key.allow_read(bucket),
- Authorization::Write(bucket) => api_key.allow_write(bucket),
+ Authorization::Read(_) => api_key.allow_read(&bucket_id),
+ Authorization::Write(_) => api_key.allow_write(&bucket_id),
+ _ => unreachable!(),
};
if !allowed {
@@ -118,19 +131,18 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
}
match endpoint {
- Endpoint::ListBuckets => handle_list_buckets(&api_key),
- Endpoint::HeadObject { bucket, key, .. } => handle_head(garage, &req, &bucket, &key).await,
- Endpoint::GetObject { bucket, key, .. } => handle_get(garage, &req, &bucket, &key).await,
+ Endpoint::HeadObject { key, .. } => handle_head(garage, &req, bucket_id, &key).await,
+ Endpoint::GetObject { key, .. } => handle_get(garage, &req, bucket_id, &key).await,
Endpoint::UploadPart {
- bucket,
key,
part_number,
upload_id,
+ ..
} => {
handle_put_part(
garage,
req,
- &bucket,
+ bucket_id,
&key,
part_number,
&upload_id,
@@ -138,38 +150,46 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
)
.await
}
- Endpoint::CopyObject { bucket, key } => {
+ Endpoint::CopyObject { key, .. } => {
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
let (source_bucket, source_key) = parse_bucket_key(&copy_source, None)?;
- if !api_key.allow_read(source_bucket) {
+ let source_bucket_id =
+ resolve_bucket(&garage, &source_bucket.to_string(), &api_key).await?;
+ if !api_key.allow_read(&source_bucket_id) {
return Err(Error::Forbidden(format!(
"Reading from bucket {} not allowed for this key",
source_bucket
)));
}
let source_key = source_key.ok_or_bad_request("No source key specified")?;
- handle_copy(garage, &req, &bucket, &key, source_bucket, source_key).await
+ handle_copy(garage, &req, bucket_id, &key, source_bucket_id, source_key).await
}
- Endpoint::PutObject { bucket, key } => {
- handle_put(garage, req, &bucket, &key, content_sha256).await
+ Endpoint::PutObject { key, .. } => {
+ handle_put(garage, req, bucket_id, &key, content_sha256).await
}
- Endpoint::AbortMultipartUpload {
- bucket,
- key,
- upload_id,
- } => handle_abort_multipart_upload(garage, &bucket, &key, &upload_id).await,
- Endpoint::DeleteObject { bucket, key, .. } => handle_delete(garage, &bucket, &key).await,
+ Endpoint::AbortMultipartUpload { key, upload_id, .. } => {
+ handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
+ }
+ Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await,
Endpoint::CreateMultipartUpload { bucket, key } => {
- handle_create_multipart_upload(garage, &req, &bucket, &key).await
+ handle_create_multipart_upload(garage, &req, &bucket, bucket_id, &key).await
}
Endpoint::CompleteMultipartUpload {
bucket,
key,
upload_id,
} => {
- handle_complete_multipart_upload(garage, req, &bucket, &key, &upload_id, content_sha256)
- .await
+ handle_complete_multipart_upload(
+ garage,
+ req,
+ &bucket,
+ bucket_id,
+ &key,
+ &upload_id,
+ content_sha256,
+ )
+ .await
}
Endpoint::CreateBucket { bucket } => {
debug!(
@@ -206,7 +226,8 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
garage,
&ListObjectsQuery {
is_v2: false,
- bucket,
+ bucket_name: bucket,
+ bucket_id,
delimiter: delimiter.map(|d| d.to_string()),
max_keys: max_keys.unwrap_or(1000),
prefix: prefix.unwrap_or_default(),
@@ -234,7 +255,8 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
garage,
&ListObjectsQuery {
is_v2: true,
- bucket,
+ bucket_name: bucket,
+ bucket_id,
delimiter: delimiter.map(|d| d.to_string()),
max_keys: max_keys.unwrap_or(1000),
prefix: prefix.unwrap_or_default(),
@@ -252,8 +274,8 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
)))
}
}
- Endpoint::DeleteObjects { bucket } => {
- handle_delete_objects(garage, &bucket, req, content_sha256).await
+ Endpoint::DeleteObjects { .. } => {
+ handle_delete_objects(garage, bucket_id, req, content_sha256).await
}
Endpoint::PutBucketWebsite { bucket } => {
handle_put_website(garage, bucket, req, content_sha256).await
@@ -263,6 +285,41 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
}
}
+async fn handle_request_without_bucket(
+ garage: Arc<Garage>,
+ _req: Request<Body>,
+ api_key: Key,
+ endpoint: Endpoint,
+) -> Result<Response<Body>, Error> {
+ match endpoint {
+ Endpoint::ListBuckets => handle_list_buckets(&garage, &api_key).await,
+ endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
+ }
+}
+
+#[allow(clippy::ptr_arg)]
+async fn resolve_bucket(
+ garage: &Garage,
+ bucket_name: &String,
+ api_key: &Key,
+) -> Result<Uuid, Error> {
+ let api_key_params = api_key
+ .state
+ .as_option()
+ .ok_or_else(|| Error::Forbidden("Operation is not allowed for this key.".to_string()))?;
+
+ if let Some(crdt::Deletable::Present(bucket_id)) = api_key_params.local_aliases.get(bucket_name)
+ {
+ Ok(*bucket_id)
+ } else {
+ Ok(garage
+ .bucket_helper()
+ .resolve_global_bucket_name(bucket_name)
+ .await?
+ .ok_or(Error::NotFound)?)
+ }
+}
+
/// Extract the bucket name and the key name from an HTTP path and possibly a bucket provided in
/// the host header of the request
///
diff --git a/src/api/s3_bucket.rs b/src/api/s3_bucket.rs
index 2be0a818..dc131a31 100644
--- a/src/api/s3_bucket.rs
+++ b/src/api/s3_bucket.rs
@@ -1,9 +1,12 @@
+use std::collections::HashMap;
use std::sync::Arc;
use hyper::{Body, Response};
use garage_model::garage::Garage;
use garage_model::key_table::Key;
+use garage_table::util::EmptyKey;
+use garage_util::crdt::*;
use garage_util::time::*;
use crate::error::*;
@@ -34,20 +37,65 @@ pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> {
.body(Body::from(xml.into_bytes()))?)
}
-pub fn handle_list_buckets(api_key: &Key) -> Result<Response<Body>, Error> {
+pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Response<Body>, Error> {
+ let key_state = api_key.state.as_option().ok_or_internal_error(
+ "Key should not be in deleted state at this point (internal error)",
+ )?;
+
+ // Collect buckets user has access to
+ let ids = api_key
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_buckets
+ .items()
+ .iter()
+ .filter(|(_, perms)| perms.allow_read || perms.allow_write)
+ .map(|(id, _)| *id)
+ .collect::<Vec<_>>();
+
+ let mut buckets_by_id = HashMap::new();
+ let mut aliases = HashMap::new();
+
+ for bucket_id in ids.iter() {
+ let bucket = garage.bucket_table.get(bucket_id, &EmptyKey).await?;
+ if let Some(bucket) = bucket {
+ if let Deletable::Present(param) = bucket.state {
+ for (alias, _, active) in param.aliases.items() {
+ if *active {
+ let alias_ent = garage.bucket_alias_table.get(&EmptyKey, alias).await?;
+ if let Some(alias_ent) = alias_ent {
+ if let Some(alias_p) = alias_ent.state.get().as_option() {
+ if alias_p.bucket_id == *bucket_id {
+ aliases.insert(alias_ent.name.clone(), *bucket_id);
+ }
+ }
+ }
+ }
+ }
+ buckets_by_id.insert(bucket_id, param);
+ }
+ }
+ }
+
+ for (alias, _, id) in key_state.local_aliases.items() {
+ if let Some(id) = id.as_option() {
+ aliases.insert(alias.clone(), *id);
+ }
+ }
+
+ // Generate response
let list_buckets = s3_xml::ListAllMyBucketsResult {
owner: s3_xml::Owner {
display_name: s3_xml::Value(api_key.name.get().to_string()),
id: s3_xml::Value(api_key.key_id.to_string()),
},
buckets: s3_xml::BucketList {
- entries: api_key
- .authorized_buckets
- .items()
+ entries: aliases
.iter()
- .filter(|(_, _, perms)| perms.allow_read || perms.allow_write)
- .map(|(name, ts, _)| s3_xml::Bucket {
- creation_date: s3_xml::Value(msec_to_rfc3339(*ts)),
+ .filter_map(|(name, id)| buckets_by_id.get(id).map(|p| (name, id, p)))
+ .map(|(name, _id, param)| s3_xml::Bucket {
+ creation_date: s3_xml::Value(msec_to_rfc3339(param.creation_date)),
name: s3_xml::Value(name.to_string()),
})
.collect(),
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs
index 9ade6985..4ede8230 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3_copy.rs
@@ -18,14 +18,14 @@ use crate::s3_xml;
pub async fn handle_copy(
garage: Arc<Garage>,
req: &Request<Body>,
- dest_bucket: &str,
+ dest_bucket_id: Uuid,
dest_key: &str,
- source_bucket: &str,
+ source_bucket_id: Uuid,
source_key: &str,
) -> Result<Response<Body>, Error> {
let source_object = garage
.object_table
- .get(&source_bucket.to_string(), &source_key.to_string())
+ .get(&source_bucket_id, &source_key.to_string())
.await?
.ok_or(Error::NotFound)?;
@@ -76,7 +76,7 @@ pub async fn handle_copy(
)),
};
let dest_object = Object::new(
- dest_bucket.to_string(),
+ dest_bucket_id,
dest_key.to_string(),
vec![dest_object_version],
);
@@ -99,7 +99,7 @@ pub async fn handle_copy(
state: ObjectVersionState::Uploading(new_meta.headers.clone()),
};
let tmp_dest_object = Object::new(
- dest_bucket.to_string(),
+ dest_bucket_id,
dest_key.to_string(),
vec![tmp_dest_object_version],
);
@@ -109,12 +109,8 @@ pub async fn handle_copy(
// this means that the BlockRef entries linked to this version cannot be
// marked as deleted (they are marked as deleted only if the Version
// doesn't exist or is marked as deleted).
- let mut dest_version = Version::new(
- new_uuid,
- dest_bucket.to_string(),
- dest_key.to_string(),
- false,
- );
+ let mut dest_version =
+ Version::new(new_uuid, dest_bucket_id, dest_key.to_string(), false);
garage.version_table.insert(&dest_version).await?;
// Fill in block list for version and insert block refs
@@ -151,7 +147,7 @@ pub async fn handle_copy(
)),
};
let dest_object = Object::new(
- dest_bucket.to_string(),
+ dest_bucket_id,
dest_key.to_string(),
vec![dest_object_version],
);
diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs
index 425f86d7..1976139b 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3_delete.rs
@@ -14,12 +14,12 @@ use crate::signature::verify_signed_content;
async fn handle_delete_internal(
garage: &Garage,
- bucket: &str,
+ bucket_id: Uuid,
key: &str,
) -> Result<(Uuid, Uuid), Error> {
let object = garage
.object_table
- .get(&bucket.to_string(), &key.to_string())
+ .get(&bucket_id, &key.to_string())
.await?
.ok_or(Error::NotFound)?; // No need to delete
@@ -45,7 +45,7 @@ async fn handle_delete_internal(
let version_uuid = gen_uuid();
let object = Object::new(
- bucket.into(),
+ bucket_id,
key.into(),
vec![ObjectVersion {
uuid: version_uuid,
@@ -61,11 +61,11 @@ async fn handle_delete_internal(
pub async fn handle_delete(
garage: Arc<Garage>,
- bucket: &str,
+ bucket_id: Uuid,
key: &str,
) -> Result<Response<Body>, Error> {
let (_deleted_version, delete_marker_version) =
- handle_delete_internal(&garage, bucket, key).await?;
+ handle_delete_internal(&garage, bucket_id, key).await?;
Ok(Response::builder()
.header("x-amz-version-id", hex::encode(delete_marker_version))
@@ -76,7 +76,7 @@ pub async fn handle_delete(
pub async fn handle_delete_objects(
garage: Arc<Garage>,
- bucket: &str,
+ bucket_id: Uuid,
req: Request<Body>,
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
@@ -90,7 +90,7 @@ pub async fn handle_delete_objects(
let mut ret_errors = Vec::new();
for obj in cmd.objects.iter() {
- match handle_delete_internal(&garage, bucket, &obj.key).await {
+ match handle_delete_internal(&garage, bucket_id, &obj.key).await {
Ok((deleted_version, delete_marker_version)) => {
if cmd.quiet {
continue;
diff --git a/src/api/s3_get.rs b/src/api/s3_get.rs
index 428bbf34..269a3fa8 100644
--- a/src/api/s3_get.rs
+++ b/src/api/s3_get.rs
@@ -7,6 +7,7 @@ use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode};
use garage_table::EmptyKey;
+use garage_util::data::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
@@ -84,12 +85,12 @@ fn try_answer_cached(
pub async fn handle_head(
garage: Arc<Garage>,
req: &Request<Body>,
- bucket: &str,
+ bucket_id: Uuid,
key: &str,
) -> Result<Response<Body>, Error> {
let object = garage
.object_table
- .get(&bucket.to_string(), &key.to_string())
+ .get(&bucket_id, &key.to_string())
.await?
.ok_or(Error::NotFound)?;
@@ -123,12 +124,12 @@ pub async fn handle_head(
pub async fn handle_get(
garage: Arc<Garage>,
req: &Request<Body>,
- bucket: &str,
+ bucket_id: Uuid,
key: &str,
) -> Result<Response<Body>, Error> {
let object = garage
.object_table
- .get(&bucket.to_string(), &key.to_string())
+ .get(&bucket_id, &key.to_string())
.await?
.ok_or(Error::NotFound)?;
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
index df9c3e6b..07efb02d 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3_list.rs
@@ -3,6 +3,7 @@ use std::sync::Arc;
use hyper::{Body, Response};
+use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
@@ -18,7 +19,8 @@ use crate::s3_xml;
#[derive(Debug)]
pub struct ListObjectsQuery {
pub is_v2: bool,
- pub bucket: String,
+ pub bucket_name: String,
+ pub bucket_id: Uuid,
pub delimiter: Option<String>,
pub max_keys: usize,
pub prefix: String,
@@ -102,7 +104,7 @@ pub async fn handle_list(
let objects = garage
.object_table
.get_range(
- &query.bucket,
+ &query.bucket_id,
Some(next_chunk_start.clone()),
Some(DeletedFilter::NotDeleted),
query.max_keys + 1,
@@ -232,7 +234,7 @@ pub async fn handle_list(
let mut result = s3_xml::ListBucketResult {
xmlns: (),
- name: s3_xml::Value(query.bucket.to_string()),
+ name: s3_xml::Value(query.bucket_name.to_string()),
prefix: uriencode_maybe(&query.prefix, query.urlencode_resp),
marker: None,
next_marker: None,
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index f63e8307..152e59b4 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -24,7 +24,7 @@ use crate::signature::verify_signed_content;
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
- bucket: &str,
+ bucket_id: Uuid,
key: &str,
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
@@ -77,7 +77,7 @@ pub async fn handle_put(
)),
};
- let object = Object::new(bucket.into(), key.into(), vec![object_version]);
+ let object = Object::new(bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
return Ok(put_response(version_uuid, data_md5sum_hex));
@@ -90,14 +90,14 @@ pub async fn handle_put(
timestamp: version_timestamp,
state: ObjectVersionState::Uploading(headers.clone()),
};
- let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
+ let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table
// Write this entry now, even with empty block list,
// to prevent block_ref entries from being deleted (they can be deleted
// if the reference a version that isn't found in the version table)
- let version = Version::new(version_uuid, bucket.into(), key.into(), false);
+ let version = Version::new(version_uuid, bucket_id, key.into(), false);
garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
@@ -127,7 +127,7 @@ pub async fn handle_put(
Err(e) => {
// Mark object as aborted, this will free the blocks further down
object_version.state = ObjectVersionState::Aborted;
- let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
+ let object = Object::new(bucket_id, key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
return Err(e);
}
@@ -143,7 +143,7 @@ pub async fn handle_put(
},
first_block_hash,
));
- let object = Object::new(bucket.into(), key.into(), vec![object_version]);
+ let object = Object::new(bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
Ok(put_response(version_uuid, md5sum_hex))
@@ -315,7 +315,8 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
req: &Request<Body>,
- bucket: &str,
+ bucket_name: &str,
+ bucket_id: Uuid,
key: &str,
) -> Result<Response<Body>, Error> {
let version_uuid = gen_uuid();
@@ -327,20 +328,20 @@ pub async fn handle_create_multipart_upload(
timestamp: now_msec(),
state: ObjectVersionState::Uploading(headers),
};
- let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
+ let object = Object::new(bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&object).await?;
// Insert empty version so that block_ref entries refer to something
// (they are inserted concurrently with blocks in the version table, so
// there is the possibility that they are inserted before the version table
// is created, in which case it is allowed to delete them, e.g. in repair_*)
- let version = Version::new(version_uuid, bucket.into(), key.into(), false);
+ let version = Version::new(version_uuid, bucket_id, key.into(), false);
garage.version_table.insert(&version).await?;
// Send success response
let result = s3_xml::InitiateMultipartUploadResult {
xmlns: (),
- bucket: s3_xml::Value(bucket.to_string()),
+ bucket: s3_xml::Value(bucket_name.to_string()),
key: s3_xml::Value(key.to_string()),
upload_id: s3_xml::Value(hex::encode(version_uuid)),
};
@@ -352,7 +353,7 @@ pub async fn handle_create_multipart_upload(
pub async fn handle_put_part(
garage: Arc<Garage>,
req: Request<Body>,
- bucket: &str,
+ bucket_id: Uuid,
key: &str,
part_number: u64,
upload_id: &str,
@@ -366,12 +367,11 @@ pub async fn handle_put_part(
};
// Read first chuck, and at the same time try to get object to see if it exists
- let bucket = bucket.to_string();
let key = key.to_string();
let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
let (object, first_block) =
- futures::try_join!(garage.object_table.get(&bucket, &key), chunker.next(),)?;
+ futures::try_join!(garage.object_table.get(&bucket_id, &key), chunker.next(),)?;
// Check object is valid and multipart block can be accepted
let first_block = first_block.ok_or_else(|| Error::BadRequest("Empty body".to_string()))?;
@@ -386,7 +386,7 @@ pub async fn handle_put_part(
}
// Copy block to store
- let version = Version::new(version_uuid, bucket, key, false);
+ let version = Version::new(version_uuid, bucket_id, key, false);
let first_block_hash = blake2sum(&first_block[..]);
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
&garage,
@@ -424,7 +424,8 @@ pub async fn handle_put_part(
pub async fn handle_complete_multipart_upload(
garage: Arc<Garage>,
req: Request<Body>,
- bucket: &str,
+ bucket_name: &str,
+ bucket_id: Uuid,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
@@ -442,10 +443,9 @@ pub async fn handle_complete_multipart_upload(
let version_uuid = decode_upload_id(upload_id)?;
- let bucket = bucket.to_string();
let key = key.to_string();
let (object, version) = futures::try_join!(
- garage.object_table.get(&bucket, &key),
+ garage.object_table.get(&bucket_id, &key),
garage.version_table.get(&version_uuid, &EmptyKey),
)?;
@@ -510,14 +510,14 @@ pub async fn handle_complete_multipart_upload(
version.blocks.items()[0].1.hash,
));
- let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]);
+ let final_object = Object::new(bucket_id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
// Send response saying ok we're done
let result = s3_xml::CompleteMultipartUploadResult {
xmlns: (),
location: None,
- bucket: s3_xml::Value(bucket),
+ bucket: s3_xml::Value(bucket_name.to_string()),
key: s3_xml::Value(key),
etag: s3_xml::Value(etag),
};
@@ -528,7 +528,7 @@ pub async fn handle_complete_multipart_upload(
pub async fn handle_abort_multipart_upload(
garage: Arc<Garage>,
- bucket: &str,
+ bucket_id: Uuid,
key: &str,
upload_id: &str,
) -> Result<Response<Body>, Error> {
@@ -536,7 +536,7 @@ pub async fn handle_abort_multipart_upload(
let object = garage
.object_table
- .get(&bucket.to_string(), &key.to_string())
+ .get(&bucket_id, &key.to_string())
.await?;
let object = object.ok_or_else(|| Error::BadRequest("Object not found".to_string()))?;
@@ -550,7 +550,7 @@ pub async fn handle_abort_multipart_upload(
};
object_version.state = ObjectVersionState::Aborted;
- let final_object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]);
+ let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
Ok(Response::new(Body::from(vec![])))
diff --git a/src/api/s3_website.rs b/src/api/s3_website.rs
index 37c8b86c..da67c4cd 100644
--- a/src/api/s3_website.rs
+++ b/src/api/s3_website.rs
@@ -7,9 +7,10 @@ use serde::{Deserialize, Serialize};
use crate::error::*;
use crate::s3_xml::{xmlns_tag, IntValue, Value};
use crate::signature::verify_signed_content;
-use garage_model::bucket_table::BucketState;
+
use garage_model::garage::Garage;
use garage_table::*;
+use garage_util::crdt;
use garage_util::data::Hash;
pub async fn handle_delete_website(
@@ -17,14 +18,18 @@ pub async fn handle_delete_website(
bucket: String,
) -> Result<Response<Body>, Error> {
let mut bucket = garage
- .bucket_table
+ .bucket_alias_table
.get(&EmptyKey, &bucket)
.await?
.ok_or(Error::NotFound)?;
- if let BucketState::Present(state) = bucket.state.get_mut() {
- state.website.update(false);
- garage.bucket_table.insert(&bucket).await?;
+ if let crdt::Deletable::Present(state) = bucket.state.get_mut() {
+ let mut new_param = state.clone();
+ new_param.website_access = false;
+ bucket.state.update(crdt::Deletable::present(new_param));
+ garage.bucket_alias_table.insert(&bucket).await?;
+ } else {
+ unreachable!();
}
Ok(Response::builder()
@@ -43,7 +48,7 @@ pub async fn handle_put_website(
verify_signed_content(content_sha256, &body[..])?;
let mut bucket = garage
- .bucket_table
+ .bucket_alias_table
.get(&EmptyKey, &bucket)
.await?
.ok_or(Error::NotFound)?;
@@ -51,9 +56,13 @@ pub async fn handle_put_website(
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;
- if let BucketState::Present(state) = bucket.state.get_mut() {
- state.website.update(true);
- garage.bucket_table.insert(&bucket).await?;
+ if let crdt::Deletable::Present(state) = bucket.state.get() {
+ let mut new_param = state.clone();
+ new_param.website_access = true;
+ bucket.state.update(crdt::Deletable::present(new_param));
+ garage.bucket_alias_table.insert(&bucket).await?;
+ } else {
+ unreachable!();
}
Ok(Response::builder()
diff --git a/src/api/signature.rs b/src/api/signature.rs
index 53ca2ce5..b5da7b62 100644
--- a/src/api/signature.rs
+++ b/src/api/signature.rs
@@ -64,7 +64,7 @@ pub async fn check_signature(
.key_table
.get(&EmptyKey, &authorization.key_id)
.await?
- .filter(|k| !k.deleted.get())
+ .filter(|k| !k.state.is_deleted())
.ok_or_else(|| Error::Forbidden(format!("No such key: {}", authorization.key_id)))?;
let canonical_request = canonical_request(
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 74a6ab0e..44cacde3 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage"
-version = "0.5.0"
+version = "0.6.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -15,12 +15,12 @@ path = "main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_api = { version = "0.5.0", path = "../api" }
-garage_model = { version = "0.5.0", path = "../model" }
-garage_rpc = { version = "0.5.0", path = "../rpc" }
-garage_table = { version = "0.5.0", path = "../table" }
-garage_util = { version = "0.5.0", path = "../util" }
-garage_web = { version = "0.5.0", path = "../web" }
+garage_api = { version = "0.6.0", path = "../api" }
+garage_model = { version = "0.6.0", path = "../model" }
+garage_rpc = { version = "0.6.0", path = "../rpc" }
+garage_table = { version = "0.6.0", path = "../table" }
+garage_util = { version = "0.6.0", path = "../util" }
+garage_web = { version = "0.6.0", path = "../web" }
bytes = "1.0"
git-version = "0.3.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index c7472670..6db8bfbe 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -5,17 +5,21 @@ use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use garage_util::error::Error;
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::time::*;
-use garage_table::crdt::Crdt;
use garage_table::replication::*;
use garage_table::*;
use garage_rpc::*;
+use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::*;
+use garage_model::permission::*;
use crate::cli::*;
use crate::repair::Repair;
@@ -31,7 +35,7 @@ pub enum AdminRpc {
// Replies
Ok(String),
- BucketList(Vec<String>),
+ BucketList(Vec<BucketAlias>),
BucketInfo(Bucket),
KeyList(Vec<(String, String)>),
KeyInfo(Key),
@@ -56,203 +60,331 @@ impl AdminRpcHandler {
async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
- BucketOperation::List => {
- let bucket_names = self
+ BucketOperation::List => self.handle_list_buckets().await,
+ BucketOperation::Info(query) => {
+ let bucket_id = self
.garage
- .bucket_table
- .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.name)
.await?
- .iter()
- .map(|b| b.name.to_string())
- .collect::<Vec<_>>();
- Ok(AdminRpc::BucketList(bucket_names))
- }
- BucketOperation::Info(query) => {
- let bucket = self.get_existing_bucket(&query.name).await?;
- Ok(AdminRpc::BucketInfo(bucket))
- }
- BucketOperation::Create(query) => {
- let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? {
- Some(mut bucket) => {
- if !bucket.is_deleted() {
- return Err(Error::BadRpc(format!(
- "Bucket {} already exists",
- query.name
- )));
- }
- bucket
- .state
- .update(BucketState::Present(BucketParams::new()));
- bucket
- }
- None => Bucket::new(query.name.clone()),
- };
- self.garage.bucket_table.insert(&bucket).await?;
- Ok(AdminRpc::Ok(format!("Bucket {} was created.", query.name)))
- }
- BucketOperation::Delete(query) => {
- let mut bucket = self.get_existing_bucket(&query.name).await?;
- let objects = self
+ .ok_or_message("Bucket not found")?;
+ let bucket = self
.garage
- .object_table
- .get_range(&query.name, None, Some(DeletedFilter::NotDeleted), 10)
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
.await?;
- if !objects.is_empty() {
- return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name)));
- }
- if !query.yes {
- return Err(Error::BadRpc(
- "Add --yes flag to really perform this operation".to_string(),
- ));
- }
- // --- done checking, now commit ---
- for (key_id, _, _) in bucket.authorized_keys() {
- if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? {
- if !key.deleted.get() {
- self.update_key_bucket(&key, &bucket.name, false, false)
- .await?;
- }
- } else {
- return Err(Error::Message(format!("Key not found: {}", key_id)));
- }
- }
- bucket.state.update(BucketState::Deleted);
- self.garage.bucket_table.insert(&bucket).await?;
- Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
- }
- BucketOperation::Allow(query) => {
- let key = self.get_existing_key(&query.key_pattern).await?;
- let bucket = self.get_existing_bucket(&query.bucket).await?;
- let allow_read = query.read || key.allow_read(&query.bucket);
- let allow_write = query.write || key.allow_write(&query.bucket);
- self.update_key_bucket(&key, &query.bucket, allow_read, allow_write)
- .await?;
- self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write)
- .await?;
- Ok(AdminRpc::Ok(format!(
- "New permissions for {} on {}: read {}, write {}.",
- &key.key_id, &query.bucket, allow_read, allow_write
- )))
- }
- BucketOperation::Deny(query) => {
- let key = self.get_existing_key(&query.key_pattern).await?;
- let bucket = self.get_existing_bucket(&query.bucket).await?;
- let allow_read = !query.read && key.allow_read(&query.bucket);
- let allow_write = !query.write && key.allow_write(&query.bucket);
- self.update_key_bucket(&key, &query.bucket, allow_read, allow_write)
- .await?;
- self.update_bucket_key(bucket, &key.key_id, allow_read, allow_write)
- .await?;
- Ok(AdminRpc::Ok(format!(
- "New permissions for {} on {}: read {}, write {}.",
- &key.key_id, &query.bucket, allow_read, allow_write
- )))
+ Ok(AdminRpc::BucketInfo(bucket))
}
- BucketOperation::Website(query) => {
- let mut bucket = self.get_existing_bucket(&query.bucket).await?;
+ BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await,
+ BucketOperation::Delete(query) => self.handle_delete_bucket(query).await,
+ BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
+ BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
+ BucketOperation::Website(query) => self.handle_bucket_website(query).await,
+ }
+ }
+
+ async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> {
+ let bucket_aliases = self
+ .garage
+ .bucket_alias_table
+ .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
+ .await?;
+ Ok(AdminRpc::BucketList(bucket_aliases))
+ }
- if !(query.allow ^ query.deny) {
- return Err(Error::Message(
- "You must specify exactly one flag, either --allow or --deny".to_string(),
- ));
+ #[allow(clippy::ptr_arg)]
+ async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> {
+ let mut bucket = Bucket::new();
+ let alias = match self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
+ Some(mut alias) => {
+ if !alias.state.get().is_deleted() {
+ return Err(Error::BadRpc(format!("Bucket {} already exists", name)));
}
+ alias.state.update(Deletable::Present(AliasParams {
+ bucket_id: bucket.id,
+ website_access: false,
+ }));
+ alias
+ }
+ None => BucketAlias::new(name.clone(), bucket.id, false),
+ };
+ bucket
+ .state
+ .as_option_mut()
+ .unwrap()
+ .aliases
+ .update_in_place(name.clone(), true);
+ self.garage.bucket_table.insert(&bucket).await?;
+ self.garage.bucket_alias_table.insert(&alias).await?;
+ Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
+ }
- if let BucketState::Present(state) = bucket.state.get_mut() {
- state.website.update(query.allow);
- self.garage.bucket_table.insert(&bucket).await?;
- let msg = if query.allow {
- format!("Website access allowed for {}", &query.bucket)
- } else {
- format!("Website access denied for {}", &query.bucket)
- };
+ async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> {
+ let mut bucket_alias = self
+ .garage
+ .bucket_alias_table
+ .get(&EmptyKey, &query.name)
+ .await?
+ .filter(|a| !a.is_deleted())
+ .ok_or_message(format!("Bucket {} does not exist", query.name))?;
- Ok(AdminRpc::Ok(msg))
- } else {
- unreachable!();
+ let bucket_id = bucket_alias.state.get().as_option().unwrap().bucket_id;
+
+ // Check bucket doesn't have other aliases
+ let mut bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option().unwrap();
+ if bucket_state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, active)| *active)
+ .any(|(name, _, _)| name != &query.name)
+ {
+ return Err(Error::Message(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+ if bucket_state
+ .local_aliases
+ .items()
+ .iter()
+ .any(|(_, _, active)| *active)
+ {
+ return Err(Error::Message(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
+ }
+
+ // Check bucket is empty
+ let objects = self
+ .garage
+ .object_table
+ .get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10)
+ .await?;
+ if !objects.is_empty() {
+ return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name)));
+ }
+
+ if !query.yes {
+ return Err(Error::BadRpc(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+
+ // --- done checking, now commit ---
+ // 1. delete authorization from keys that had access
+ for (key_id, _) in bucket.authorized_keys() {
+ if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? {
+ if !key.state.is_deleted() {
+ self.update_key_bucket(&key, bucket.id, false, false)
+ .await?;
}
+ } else {
+ return Err(Error::Message(format!("Key not found: {}", key_id)));
}
}
+ // 2. delete bucket alias
+ bucket_alias.state.update(Deletable::Deleted);
+ self.garage.bucket_alias_table.insert(&bucket_alias).await?;
+ // 3. delete bucket alias
+ bucket.state = Deletable::delete();
+ self.garage.bucket_table.insert(&bucket).await?;
+
+ Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
+ }
+
+ async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_message("Bucket not found")?;
+ let bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let key = self.get_existing_key(&query.key_pattern).await?;
+
+ let allow_read = query.read || key.allow_read(&bucket_id);
+ let allow_write = query.write || key.allow_write(&bucket_id);
+
+ let new_perm = self
+ .update_key_bucket(&key, bucket_id, allow_read, allow_write)
+ .await?;
+ self.update_bucket_key(bucket, &key.key_id, new_perm)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write
+ )))
+ }
+
+ async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
+ let bucket_id = self
+ .garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&query.bucket)
+ .await?
+ .ok_or_message("Bucket not found")?;
+ let bucket = self
+ .garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let key = self.get_existing_key(&query.key_pattern).await?;
+
+ let allow_read = !query.read && key.allow_read(&bucket_id);
+ let allow_write = !query.write && key.allow_write(&bucket_id);
+
+ let new_perm = self
+ .update_key_bucket(&key, bucket_id, allow_read, allow_write)
+ .await?;
+ self.update_bucket_key(bucket, &key.key_id, new_perm)
+ .await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "New permissions for {} on {}: read {}, write {}.",
+ &key.key_id, &query.bucket, allow_read, allow_write
+ )))
+ }
+
+ async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> {
+ let mut bucket_alias = self
+ .garage
+ .bucket_alias_table
+ .get(&EmptyKey, &query.bucket)
+ .await?
+ .filter(|a| !a.is_deleted())
+ .ok_or_message(format!("Bucket {} does not exist", query.bucket))?;
+
+ let mut state = bucket_alias.state.get().as_option().unwrap().clone();
+
+ if !(query.allow ^ query.deny) {
+ return Err(Error::Message(
+ "You must specify exactly one flag, either --allow or --deny".to_string(),
+ ));
+ }
+
+ state.website_access = query.allow;
+ bucket_alias.state.update(Deletable::present(state));
+ self.garage.bucket_alias_table.insert(&bucket_alias).await?;
+
+ let msg = if query.allow {
+ format!("Website access allowed for {}", &query.bucket)
+ } else {
+ format!("Website access denied for {}", &query.bucket)
+ };
+
+ Ok(AdminRpc::Ok(msg))
}
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
- KeyOperation::List => {
- let key_ids = self
- .garage
- .key_table
- .get_range(
- &EmptyKey,
- None,
- Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
- 10000,
- )
- .await?
- .iter()
- .map(|k| (k.key_id.to_string(), k.name.get().clone()))
- .collect::<Vec<_>>();
- Ok(AdminRpc::KeyList(key_ids))
- }
+ KeyOperation::List => self.handle_list_keys().await,
KeyOperation::Info(query) => {
let key = self.get_existing_key(&query.key_pattern).await?;
Ok(AdminRpc::KeyInfo(key))
}
- KeyOperation::New(query) => {
- let key = Key::new(query.name.clone());
- self.garage.key_table.insert(&key).await?;
- Ok(AdminRpc::KeyInfo(key))
- }
- KeyOperation::Rename(query) => {
- let mut key = self.get_existing_key(&query.key_pattern).await?;
- key.name.update(query.new_name.clone());
- self.garage.key_table.insert(&key).await?;
- Ok(AdminRpc::KeyInfo(key))
- }
- KeyOperation::Delete(query) => {
- let key = self.get_existing_key(&query.key_pattern).await?;
- if !query.yes {
- return Err(Error::BadRpc(
- "Add --yes flag to really perform this operation".to_string(),
- ));
- }
- // --- done checking, now commit ---
- for (ab_name, _, _) in key.authorized_buckets.items().iter() {
- if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? {
- if !bucket.is_deleted() {
- self.update_bucket_key(bucket, &key.key_id, false, false)
- .await?;
- }
- } else {
- return Err(Error::Message(format!("Bucket not found: {}", ab_name)));
+ KeyOperation::New(query) => self.handle_create_key(query).await,
+ KeyOperation::Rename(query) => self.handle_rename_key(query).await,
+ KeyOperation::Delete(query) => self.handle_delete_key(query).await,
+ KeyOperation::Import(query) => self.handle_import_key(query).await,
+ }
+ }
+
+ async fn handle_list_keys(&self) -> Result<AdminRpc, Error> {
+ let key_ids = self
+ .garage
+ .key_table
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
+ 10000,
+ )
+ .await?
+ .iter()
+ .map(|k| (k.key_id.to_string(), k.name.get().clone()))
+ .collect::<Vec<_>>();
+ Ok(AdminRpc::KeyList(key_ids))
+ }
+
+ async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
+ let key = Key::new(query.name.clone());
+ self.garage.key_table.insert(&key).await?;
+ Ok(AdminRpc::KeyInfo(key))
+ }
+
+ async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
+ let mut key = self.get_existing_key(&query.key_pattern).await?;
+ key.name.update(query.new_name.clone());
+ self.garage.key_table.insert(&key).await?;
+ Ok(AdminRpc::KeyInfo(key))
+ }
+
+ async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
+ let mut key = self.get_existing_key(&query.key_pattern).await?;
+ if !query.yes {
+ return Err(Error::BadRpc(
+ "Add --yes flag to really perform this operation".to_string(),
+ ));
+ }
+ let state = key.state.as_option_mut().unwrap();
+
+ // --- done checking, now commit ---
+ // 1. Delete local aliases
+ for (alias, _, to) in state.local_aliases.items().iter() {
+ if let Deletable::Present(bucket_id) = to {
+ if let Some(mut bucket) = self.garage.bucket_table.get(bucket_id, &EmptyKey).await?
+ {
+ if let Deletable::Present(bucket_state) = &mut bucket.state {
+ bucket_state.local_aliases = bucket_state
+ .local_aliases
+ .update_mutator((key.key_id.to_string(), alias.to_string()), false);
+ self.garage.bucket_table.insert(&bucket).await?;
}
+ } else {
+ // ignore
}
- let del_key = Key::delete(key.key_id.to_string());
- self.garage.key_table.insert(&del_key).await?;
- Ok(AdminRpc::Ok(format!(
- "Key {} was deleted successfully.",
- key.key_id
- )))
}
- KeyOperation::Import(query) => {
- let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
- if prev_key.is_some() {
- return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
+ }
+ // 2. Delete authorized buckets
+ for (ab_id, auth) in state.authorized_buckets.items().iter() {
+ if let Some(bucket) = self.garage.bucket_table.get(ab_id, &EmptyKey).await? {
+ let new_perm = BucketKeyPerm {
+ timestamp: increment_logical_clock(auth.timestamp),
+ allow_read: false,
+ allow_write: false,
+ };
+ if !bucket.is_deleted() {
+ self.update_bucket_key(bucket, &key.key_id, new_perm)
+ .await?;
}
- let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
- self.garage.key_table.insert(&imported_key).await?;
- Ok(AdminRpc::KeyInfo(imported_key))
+ } else {
+ // ignore
}
}
+ // 3. Actually delete key
+ key.state = Deletable::delete();
+ self.garage.key_table.insert(&key).await?;
+
+ Ok(AdminRpc::Ok(format!(
+ "Key {} was deleted successfully.",
+ key.key_id
+ )))
}
- #[allow(clippy::ptr_arg)]
- async fn get_existing_bucket(&self, bucket: &String) -> Result<Bucket, Error> {
- self.garage
- .bucket_table
- .get(&EmptyKey, bucket)
- .await?
- .filter(|b| !b.is_deleted())
- .map(Ok)
- .unwrap_or_else(|| Err(Error::BadRpc(format!("Bucket {} does not exist", bucket))))
+ async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
+ let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
+ if prev_key.is_some() {
+ return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
+ }
+ let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
+ self.garage.key_table.insert(&imported_key).await?;
+ Ok(AdminRpc::KeyInfo(imported_key))
}
async fn get_existing_key(&self, pattern: &str) -> Result<Key, Error> {
@@ -267,7 +399,7 @@ impl AdminRpcHandler {
)
.await?
.into_iter()
- .filter(|k| !k.deleted.get())
+ .filter(|k| !k.state.is_deleted())
.collect::<Vec<_>>();
if candidates.len() != 1 {
Err(Error::Message(format!(
@@ -279,51 +411,48 @@ impl AdminRpcHandler {
}
}
- /// Update **bucket table** to inform of the new linked key
- async fn update_bucket_key(
- &self,
- mut bucket: Bucket,
- key_id: &str,
- allow_read: bool,
- allow_write: bool,
- ) -> Result<(), Error> {
- if let BucketState::Present(params) = bucket.state.get_mut() {
- let ak = &mut params.authorized_keys;
- let old_ak = ak.take_and_clear();
- ak.merge(&old_ak.update_mutator(
- key_id.to_string(),
- PermissionSet {
- allow_read,
- allow_write,
- },
- ));
- } else {
- return Err(Error::Message(
- "Bucket is deleted in update_bucket_key".to_string(),
- ));
- }
- self.garage.bucket_table.insert(&bucket).await?;
- Ok(())
- }
-
/// Update **key table** to inform of the new linked bucket
async fn update_key_bucket(
&self,
key: &Key,
- bucket: &str,
+ bucket_id: Uuid,
allow_read: bool,
allow_write: bool,
- ) -> Result<(), Error> {
+ ) -> Result<BucketKeyPerm, Error> {
let mut key = key.clone();
- let old_map = key.authorized_buckets.take_and_clear();
- key.authorized_buckets.merge(&old_map.update_mutator(
- bucket.to_string(),
- PermissionSet {
+ let mut key_state = key.state.as_option_mut().unwrap();
+
+ let perm = key_state
+ .authorized_buckets
+ .get(&bucket_id)
+ .cloned()
+ .map(|old_perm| BucketKeyPerm {
+ timestamp: increment_logical_clock(old_perm.timestamp),
allow_read,
allow_write,
- },
- ));
+ })
+ .unwrap_or(BucketKeyPerm {
+ timestamp: now_msec(),
+ allow_read,
+ allow_write,
+ });
+
+ key_state.authorized_buckets = Map::put_mutator(bucket_id, perm);
+
self.garage.key_table.insert(&key).await?;
+ Ok(perm)
+ }
+
+ /// Update **bucket table** to inform of the new linked key
+ async fn update_bucket_key(
+ &self,
+ mut bucket: Bucket,
+ key_id: &str,
+ new_perm: BucketKeyPerm,
+ ) -> Result<(), Error> {
+ bucket.state.as_option_mut().unwrap().authorized_keys =
+ Map::put_mutator(key_id.to_string(), new_perm);
+ self.garage.bucket_table.insert(&bucket).await?;
Ok(())
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index a916974e..3cdf4d26 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -161,8 +161,11 @@ pub async fn cmd_admin(
}
AdminRpc::BucketList(bl) => {
println!("List of buckets:");
- for bucket in bl {
- println!("{}", bucket);
+ for alias in bl {
+ if let Some(p) = alias.state.get().as_option() {
+ let wflag = if p.website_access { "W" } else { " " };
+ println!("- {} {} {:?}", wflag, alias.name, p.bucket_id);
+ }
}
}
AdminRpc::BucketInfo(bucket) => {
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index 647a2449..be34183e 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -1,3 +1,4 @@
+use garage_util::crdt::*;
use garage_util::data::Uuid;
use garage_util::error::*;
@@ -8,26 +9,50 @@ pub fn print_key_info(key: &Key) {
println!("Key name: {}", key.name.get());
println!("Key ID: {}", key.key_id);
println!("Secret key: {}", key.secret_key);
- if key.deleted.get() {
- println!("Key is deleted.");
- } else {
- println!("Authorized buckets:");
- for (b, _, perm) in key.authorized_buckets.items().iter() {
- println!("- {} R:{} W:{}", b, perm.allow_read, perm.allow_write);
+ match &key.state {
+ Deletable::Present(p) => {
+ println!("\nKey-specific bucket aliases:");
+ for (alias_name, _, alias) in p.local_aliases.items().iter() {
+ if let Some(bucket_id) = alias.as_option() {
+ println!("- {} {:?}", alias_name, bucket_id);
+ }
+ }
+ println!("\nAuthorized buckets:");
+ for (b, perm) in p.authorized_buckets.items().iter() {
+ let rflag = if perm.allow_read { "R" } else { " " };
+ let wflag = if perm.allow_write { "W" } else { " " };
+ println!("- {}{} {:?}", rflag, wflag, b);
+ }
+ }
+ Deletable::Deleted => {
+ println!("\nKey is deleted.");
}
}
}
pub fn print_bucket_info(bucket: &Bucket) {
- println!("Bucket name: {}", bucket.name);
- match bucket.state.get() {
- BucketState::Deleted => println!("Bucket is deleted."),
- BucketState::Present(p) => {
- println!("Authorized keys:");
- for (k, _, perm) in p.authorized_keys.items().iter() {
- println!("- {} R:{} W:{}", k, perm.allow_read, perm.allow_write);
+ println!("Bucket: {}", hex::encode(bucket.id));
+ match &bucket.state {
+ Deletable::Deleted => println!("Bucket is deleted."),
+ Deletable::Present(p) => {
+ println!("\nGlobal aliases:");
+ for (alias, _, active) in p.aliases.items().iter() {
+ if *active {
+ println!("- {}", alias);
+ }
+ }
+ println!("\nKey-specific aliases:");
+ for ((key_id, alias), _, active) in p.local_aliases.items().iter() {
+ if *active {
+ println!("- {} {}", key_id, alias);
+ }
+ }
+ println!("\nAuthorized keys:");
+ for (k, perm) in p.authorized_keys.items().iter() {
+ let rflag = if perm.allow_read { "R" } else { " " };
+ let wflag = if perm.allow_write { "W" } else { " " };
+ println!("- {}{} {}", rflag, wflag, k);
}
- println!("Website access: {}", p.website.get());
}
};
}
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index a786f1f1..3666ca8f 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -77,7 +77,7 @@ impl Repair {
let object = self
.garage
.object_table
- .get(&version.bucket, &version.key)
+ .get(&version.bucket_id, &version.key)
.await?;
let version_exists = match object {
Some(o) => o
@@ -92,7 +92,7 @@ impl Repair {
.version_table
.insert(&Version::new(
version.uuid,
- version.bucket,
+ version.bucket_id,
version.key,
true,
))
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 1d695192..12c08719 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_model"
-version = "0.5.0"
+version = "0.6.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,9 +14,10 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_rpc = { version = "0.5.0", path = "../rpc" }
-garage_table = { version = "0.5.0", path = "../table" }
-garage_util = { version = "0.5.0", path = "../util" }
+garage_rpc = { version = "0.6.0", path = "../rpc" }
+garage_table = { version = "0.6.0", path = "../table" }
+garage_util = { version = "0.6.0", path = "../util" }
+garage_model_050 = { package = "garage_model", version = "0.5.0" }
async-trait = "0.1.7"
arc-swap = "1.0"
diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs
new file mode 100644
index 00000000..4d300d05
--- /dev/null
+++ b/src/model/bucket_alias_table.rs
@@ -0,0 +1,68 @@
+use serde::{Deserialize, Serialize};
+
+use garage_table::crdt::*;
+use garage_table::*;
+use garage_util::data::*;
+
+/// The bucket alias table holds the names given to buckets
+/// in the global namespace.
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct BucketAlias {
+ pub name: String,
+ pub state: crdt::Lww<crdt::Deletable<AliasParams>>,
+}
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub struct AliasParams {
+ pub bucket_id: Uuid,
+ pub website_access: bool,
+}
+
+impl AutoCrdt for AliasParams {
+ const WARN_IF_DIFFERENT: bool = true;
+}
+
+impl BucketAlias {
+ pub fn new(name: String, bucket_id: Uuid, website_access: bool) -> Self {
+ BucketAlias {
+ name,
+ state: crdt::Lww::new(crdt::Deletable::present(AliasParams {
+ bucket_id,
+ website_access,
+ })),
+ }
+ }
+ pub fn is_deleted(&self) -> bool {
+ self.state.get().is_deleted()
+ }
+}
+
+impl Crdt for BucketAlias {
+ fn merge(&mut self, o: &Self) {
+ self.state.merge(&o.state);
+ }
+}
+
+impl Entry<EmptyKey, String> for BucketAlias {
+ fn partition_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+ fn sort_key(&self) -> &String {
+ &self.name
+ }
+}
+
+pub struct BucketAliasTable;
+
+impl TableSchema for BucketAliasTable {
+ const TABLE_NAME: &'static str = "bucket_alias";
+
+ type P = EmptyKey;
+ type S = String;
+ type E = BucketAlias;
+ type Filter = DeletedFilter;
+
+ fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
+ filter.apply(entry.is_deleted())
+ }
+}
diff --git a/src/model/bucket_helper.rs b/src/model/bucket_helper.rs
new file mode 100644
index 00000000..e0720b4e
--- /dev/null
+++ b/src/model/bucket_helper.rs
@@ -0,0 +1,41 @@
+use garage_util::data::*;
+use garage_util::error::*;
+
+use garage_table::util::EmptyKey;
+
+use crate::bucket_table::Bucket;
+use crate::garage::Garage;
+
+pub struct BucketHelper<'a>(pub(crate) &'a Garage);
+
+#[allow(clippy::ptr_arg)]
+impl<'a> BucketHelper<'a> {
+ pub async fn resolve_global_bucket_name(
+ &self,
+ bucket_name: &String,
+ ) -> Result<Option<Uuid>, Error> {
+ Ok(self
+ .0
+ .bucket_alias_table
+ .get(&EmptyKey, bucket_name)
+ .await?
+ .map(|x| x.state.get().as_option().map(|x| x.bucket_id))
+ .flatten())
+ }
+
+ #[allow(clippy::ptr_arg)]
+ pub async fn get_existing_bucket(&self, bucket_id: Uuid) -> Result<Bucket, Error> {
+ self.0
+ .bucket_table
+ .get(&bucket_id, &EmptyKey)
+ .await?
+ .filter(|b| !b.is_deleted())
+ .map(Ok)
+ .unwrap_or_else(|| {
+ Err(Error::BadRpc(format!(
+ "Bucket {:?} does not exist",
+ bucket_id
+ )))
+ })
+ }
+}
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 2cb206ce..ac40407e 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -2,8 +2,10 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::Crdt;
use garage_table::*;
+use garage_util::data::*;
+use garage_util::time::*;
-use crate::key_table::PermissionSet;
+use crate::permission::BucketKeyPerm;
/// A bucket is a collection of objects
///
@@ -12,49 +14,38 @@ use crate::key_table::PermissionSet;
/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket {
- /// Name of the bucket
- pub name: String,
+ /// ID of the bucket
+ pub id: Uuid,
/// State, and configuration if not deleted, of the bucket
- pub state: crdt::Lww<BucketState>,
-}
-
-/// State of a bucket
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-pub enum BucketState {
- /// The bucket is deleted
- Deleted,
- /// The bucket exists
- Present(BucketParams),
-}
-
-impl Crdt for BucketState {
- fn merge(&mut self, o: &Self) {
- match o {
- BucketState::Deleted => *self = BucketState::Deleted,
- BucketState::Present(other_params) => {
- if let BucketState::Present(params) = self {
- params.merge(other_params);
- }
- }
- }
- }
+ pub state: crdt::Deletable<BucketParams>,
}
/// Configuration for a bucket
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
+ /// Bucket's creation date
+ pub creation_date: u64,
/// Map of key with access to the bucket, and what kind of access they give
- pub authorized_keys: crdt::LwwMap<String, PermissionSet>,
- /// Is the bucket served as http
- pub website: crdt::Lww<bool>,
+ pub authorized_keys: crdt::Map<String, BucketKeyPerm>,
+ /// Map of aliases that are or have been given to this bucket
+ /// in the global namespace
+ /// (not authoritative: this is just used as an indication to
+ /// map back to aliases when doing ListBuckets)
+ pub aliases: crdt::LwwMap<String, bool>,
+ /// Map of aliases that are or have been given to this bucket
+ /// in namespaces local to keys
+ /// key = (access key id, alias name)
+ pub local_aliases: crdt::LwwMap<(String, String), bool>,
}
impl BucketParams {
/// Create an empty BucketParams with no authorized keys and no website accesss
pub fn new() -> Self {
BucketParams {
- authorized_keys: crdt::LwwMap::new(),
- website: crdt::Lww::new(false),
+ creation_date: now_msec(),
+ authorized_keys: crdt::Map::new(),
+ aliases: crdt::LwwMap::new(),
+ local_aliases: crdt::LwwMap::new(),
}
}
}
@@ -62,7 +53,14 @@ impl BucketParams {
impl Crdt for BucketParams {
fn merge(&mut self, o: &Self) {
self.authorized_keys.merge(&o.authorized_keys);
- self.website.merge(&o.website);
+ self.aliases.merge(&o.aliases);
+ self.local_aliases.merge(&o.local_aliases);
+ }
+}
+
+impl Default for Bucket {
+ fn default() -> Self {
+ Self::new()
}
}
@@ -74,34 +72,34 @@ impl Default for BucketParams {
impl Bucket {
/// Initializes a new instance of the Bucket struct
- pub fn new(name: String) -> Self {
+ pub fn new() -> Self {
Bucket {
- name,
- state: crdt::Lww::new(BucketState::Present(BucketParams::new())),
+ id: gen_uuid(),
+ state: crdt::Deletable::present(BucketParams::new()),
}
}
/// Returns true if this represents a deleted bucket
pub fn is_deleted(&self) -> bool {
- *self.state.get() == BucketState::Deleted
+ self.state.is_deleted()
}
/// Return the list of authorized keys, when each was updated, and the permission associated to
/// the key
- pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
- match self.state.get() {
- BucketState::Deleted => &[],
- BucketState::Present(state) => state.authorized_keys.items(),
+ pub fn authorized_keys(&self) -> &[(String, BucketKeyPerm)] {
+ match &self.state {
+ crdt::Deletable::Deleted => &[],
+ crdt::Deletable::Present(state) => state.authorized_keys.items(),
}
}
}
-impl Entry<EmptyKey, String> for Bucket {
- fn partition_key(&self) -> &EmptyKey {
- &EmptyKey
+impl Entry<Uuid, EmptyKey> for Bucket {
+ fn partition_key(&self) -> &Uuid {
+ &self.id
}
- fn sort_key(&self) -> &String {
- &self.name
+ fn sort_key(&self) -> &EmptyKey {
+ &EmptyKey
}
}
@@ -114,10 +112,10 @@ impl Crdt for Bucket {
pub struct BucketTable;
impl TableSchema for BucketTable {
- const TABLE_NAME: &'static str = "bucket";
+ const TABLE_NAME: &'static str = "bucket_v2";
- type P = EmptyKey;
- type S = String;
+ type P = Uuid;
+ type S = EmptyKey;
type E = Bucket;
type Filter = DeletedFilter;
diff --git a/src/model/garage.rs b/src/model/garage.rs
index a874cca8..9db1843c 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -14,6 +14,8 @@ use garage_table::*;
use crate::block::*;
use crate::block_ref_table::*;
+use crate::bucket_alias_table::*;
+use crate::bucket_helper::*;
use crate::bucket_table::*;
use crate::key_table::*;
use crate::object_table::*;
@@ -35,6 +37,8 @@ pub struct Garage {
/// Table containing informations about buckets
pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
+ /// Table containing informations about bucket aliases
+ pub bucket_alias_table: Arc<Table<BucketAliasTable, TableFullReplication>>,
/// Table containing informations about api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
@@ -120,6 +124,14 @@ impl Garage {
info!("Initialize bucket_table...");
let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
+ info!("Initialize bucket_alias_table...");
+ let bucket_alias_table = Table::new(
+ BucketAliasTable,
+ control_rep_param.clone(),
+ system.clone(),
+ &db,
+ );
+
info!("Initialize key_table_table...");
let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
@@ -131,6 +143,7 @@ impl Garage {
system,
block_manager,
bucket_table,
+ bucket_alias_table,
key_table,
object_table,
version_table,
@@ -148,4 +161,8 @@ impl Garage {
pub fn break_reference_cycles(&self) {
self.block_manager.garage.swap(None);
}
+
+ pub fn bucket_helper(&self) -> BucketHelper {
+ BucketHelper(self)
+ }
}
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 225f51c7..e87f5949 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -2,6 +2,9 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::*;
use garage_table::*;
+use garage_util::data::*;
+
+use crate::permission::BucketKeyPerm;
/// An api key
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
@@ -15,12 +18,39 @@ pub struct Key {
/// Name for the key
pub name: crdt::Lww<String>,
- /// Is the key deleted
- pub deleted: crdt::Bool,
+ /// If the key is present: it gives some permissions,
+ /// a map of bucket IDs (uuids) to permissions.
+ /// Otherwise no permissions are granted to key
+ pub state: crdt::Deletable<KeyParams>,
+}
+
+/// Configuration for a key
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct KeyParams {
+ pub authorized_buckets: crdt::Map<Uuid, BucketKeyPerm>,
+ pub local_aliases: crdt::LwwMap<String, crdt::Deletable<Uuid>>,
+}
+
+impl KeyParams {
+ pub fn new() -> Self {
+ KeyParams {
+ authorized_buckets: crdt::Map::new(),
+ local_aliases: crdt::LwwMap::new(),
+ }
+ }
+}
+
+impl Default for KeyParams {
+ fn default() -> Self {
+ Self::new()
+ }
+}
- /// Buckets in which the key is authorized. Empty if `Key` is deleted
- // CRDT interaction: deleted implies authorized_buckets is empty
- pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
+impl Crdt for KeyParams {
+ fn merge(&mut self, o: &Self) {
+ self.authorized_buckets.merge(&o.authorized_buckets);
+ self.local_aliases.merge(&o.local_aliases);
+ }
}
impl Key {
@@ -32,8 +62,7 @@ impl Key {
key_id,
secret_key,
name: crdt::Lww::new(name),
- deleted: crdt::Bool::new(false),
- authorized_buckets: crdt::LwwMap::new(),
+ state: crdt::Deletable::present(KeyParams::new()),
}
}
@@ -43,8 +72,7 @@ impl Key {
key_id: key_id.to_string(),
secret_key: secret_key.to_string(),
name: crdt::Lww::new(name.to_string()),
- deleted: crdt::Bool::new(false),
- authorized_buckets: crdt::LwwMap::new(),
+ state: crdt::Deletable::present(KeyParams::new()),
}
}
@@ -54,41 +82,37 @@ impl Key {
key_id,
secret_key: "".into(),
name: crdt::Lww::new("".to_string()),
- deleted: crdt::Bool::new(true),
- authorized_buckets: crdt::LwwMap::new(),
+ state: crdt::Deletable::Deleted,
}
}
/// Check if `Key` is allowed to read in bucket
- pub fn allow_read(&self, bucket: &str) -> bool {
- self.authorized_buckets
- .get(&bucket.to_string())
- .map(|x| x.allow_read)
- .unwrap_or(false)
+ pub fn allow_read(&self, bucket: &Uuid) -> bool {
+ if let crdt::Deletable::Present(params) = &self.state {
+ params
+ .authorized_buckets
+ .get(bucket)
+ .map(|x| x.allow_read)
+ .unwrap_or(false)
+ } else {
+ false
+ }
}
/// Check if `Key` is allowed to write in bucket
- pub fn allow_write(&self, bucket: &str) -> bool {
- self.authorized_buckets
- .get(&bucket.to_string())
- .map(|x| x.allow_write)
- .unwrap_or(false)
+ pub fn allow_write(&self, bucket: &Uuid) -> bool {
+ if let crdt::Deletable::Present(params) = &self.state {
+ params
+ .authorized_buckets
+ .get(bucket)
+ .map(|x| x.allow_write)
+ .unwrap_or(false)
+ } else {
+ false
+ }
}
}
-/// Permission given to a key in a bucket
-#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct PermissionSet {
- /// The key can be used to read the bucket
- pub allow_read: bool,
- /// The key can be used to write in the bucket
- pub allow_write: bool,
-}
-
-impl AutoCrdt for PermissionSet {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
impl Entry<EmptyKey, String> for Key {
fn partition_key(&self) -> &EmptyKey {
&EmptyKey
@@ -101,13 +125,7 @@ impl Entry<EmptyKey, String> for Key {
impl Crdt for Key {
fn merge(&mut self, other: &Self) {
self.name.merge(&other.name);
- self.deleted.merge(&other.deleted);
-
- if self.deleted.get() {
- self.authorized_buckets.clear();
- } else {
- self.authorized_buckets.merge(&other.authorized_buckets);
- }
+ self.state.merge(&other.state);
}
}
@@ -129,7 +147,7 @@ impl TableSchema for KeyTable {
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
match filter {
- KeyFilter::Deleted(df) => df.apply(entry.deleted.get()),
+ KeyFilter::Deleted(df) => df.apply(entry.state.is_deleted()),
KeyFilter::Matches(pat) => {
let pat = pat.to_lowercase();
entry.key_id.to_lowercase().starts_with(&pat)
diff --git a/src/model/lib.rs b/src/model/lib.rs
index b4a8ddb7..fe8cfdad 100644
--- a/src/model/lib.rs
+++ b/src/model/lib.rs
@@ -3,8 +3,11 @@ extern crate log;
pub mod block;
pub mod block_ref_table;
+pub mod bucket_alias_table;
+pub mod bucket_helper;
pub mod bucket_table;
pub mod garage;
pub mod key_table;
pub mod object_table;
+pub mod permission;
pub mod version_table;
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 9eec47ff..285cb5a7 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -15,7 +15,7 @@ use crate::version_table::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
/// The bucket in which the object is stored, used as partition key
- pub bucket: String,
+ pub bucket_id: Uuid,
/// The key at which the object is stored in its bucket, used as sorting key
pub key: String,
@@ -26,9 +26,9 @@ pub struct Object {
impl Object {
/// Initialize an Object struct from parts
- pub fn new(bucket: String, key: String, versions: Vec<ObjectVersion>) -> Self {
+ pub fn new(bucket_id: Uuid, key: String, versions: Vec<ObjectVersion>) -> Self {
let mut ret = Self {
- bucket,
+ bucket_id,
key,
versions: vec![],
};
@@ -164,9 +164,9 @@ impl ObjectVersion {
}
}
-impl Entry<String, String> for Object {
- fn partition_key(&self) -> &String {
- &self.bucket
+impl Entry<Uuid, String> for Object {
+ fn partition_key(&self) -> &Uuid {
+ &self.bucket_id
}
fn sort_key(&self) -> &String {
&self.key
@@ -219,7 +219,7 @@ pub struct ObjectTable {
impl TableSchema for ObjectTable {
const TABLE_NAME: &'static str = "object";
- type P = String;
+ type P = Uuid;
type S = String;
type E = Object;
type Filter = DeletedFilter;
@@ -242,7 +242,7 @@ impl TableSchema for ObjectTable {
};
if newly_deleted {
let deleted_version =
- Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true);
+ Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
version_table.insert(&deleted_version).await?;
}
}
diff --git a/src/model/permission.rs b/src/model/permission.rs
new file mode 100644
index 00000000..b61c92ce
--- /dev/null
+++ b/src/model/permission.rs
@@ -0,0 +1,37 @@
+use std::cmp::Ordering;
+
+use serde::{Deserialize, Serialize};
+
+use garage_util::crdt::*;
+
+/// Permission given to a key in a bucket
+#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+pub struct BucketKeyPerm {
+ /// Timestamp at which the permission was given
+ pub timestamp: u64,
+
+ /// The key can be used to read the bucket
+ pub allow_read: bool,
+ /// The key can be used to write in the bucket
+ pub allow_write: bool,
+}
+
+impl Crdt for BucketKeyPerm {
+ fn merge(&mut self, other: &Self) {
+ match other.timestamp.cmp(&self.timestamp) {
+ Ordering::Greater => {
+ *self = *other;
+ }
+ Ordering::Equal if other != self => {
+ warn!("Different permission sets with same timestamp: {:?} and {:?}, merging to most restricted permission set.", self, other);
+ if !other.allow_read {
+ self.allow_read = false;
+ }
+ if !other.allow_write {
+ self.allow_write = false;
+ }
+ }
+ _ => (),
+ }
+ }
+}
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index 18ec8e1d..4edea0b7 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -29,19 +29,19 @@ pub struct Version {
// Back link to bucket+key so that we can figure if
// this was deleted later on
/// Bucket in which the related object is stored
- pub bucket: String,
+ pub bucket_id: Uuid,
/// Key in which the related object is stored
pub key: String,
}
impl Version {
- pub fn new(uuid: Uuid, bucket: String, key: String, deleted: bool) -> Self {
+ pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self {
Self {
uuid,
deleted: deleted.into(),
blocks: crdt::Map::new(),
parts_etags: crdt::Map::new(),
- bucket,
+ bucket_id,
key,
}
}
@@ -82,8 +82,8 @@ impl AutoCrdt for VersionBlock {
const WARN_IF_DIFFERENT: bool = true;
}
-impl Entry<Hash, EmptyKey> for Version {
- fn partition_key(&self) -> &Hash {
+impl Entry<Uuid, EmptyKey> for Version {
+ fn partition_key(&self) -> &Uuid {
&self.uuid
}
fn sort_key(&self) -> &EmptyKey {
@@ -116,7 +116,7 @@ pub struct VersionTable {
impl TableSchema for VersionTable {
const TABLE_NAME: &'static str = "version";
- type P = Hash;
+ type P = Uuid;
type S = EmptyKey;
type E = Version;
type Filter = DeletedFilter;
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index d8ebb71e..b49a126a 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_rpc"
-version = "0.5.0"
+version = "0.6.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,7 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_util = { version = "0.5.0", path = "../util" }
+garage_util = { version = "0.6.0", path = "../util" }
arc-swap = "1.0"
bytes = "1.0"
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index dc37f12c..91d71ddd 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_table"
-version = "0.5.0"
+version = "0.6.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,8 +14,8 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_rpc = { version = "0.5.0", path = "../rpc" }
-garage_util = { version = "0.5.0", path = "../util" }
+garage_rpc = { version = "0.6.0", path = "../rpc" }
+garage_util = { version = "0.6.0", path = "../util" }
async-trait = "0.1.7"
bytes = "1.0"
diff --git a/src/table/schema.rs b/src/table/schema.rs
index fa51fa84..cfe86fba 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -16,7 +16,7 @@ impl PartitionKey for String {
}
}
-impl PartitionKey for Hash {
+impl PartitionKey for FixedBytes32 {
fn hash(&self) -> Hash {
*self
}
@@ -34,7 +34,7 @@ impl SortKey for String {
}
}
-impl SortKey for Hash {
+impl SortKey for FixedBytes32 {
fn sort_key(&self) -> &[u8] {
self.as_slice()
}
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index e33f8a66..d5200f98 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_util"
-version = "0.5.0"
+version = "0.6.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
diff --git a/src/util/crdt/deletable.rs b/src/util/crdt/deletable.rs
new file mode 100644
index 00000000..c76f5cbb
--- /dev/null
+++ b/src/util/crdt/deletable.rs
@@ -0,0 +1,72 @@
+use serde::{Deserialize, Serialize};
+
+use crate::crdt::crdt::*;
+
+/// Deletable object (once deleted, cannot go back)
+#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
+pub enum Deletable<T> {
+ Present(T),
+ Deleted,
+}
+
+impl<T: Crdt> Deletable<T> {
+ /// Create a new deletable object that isn't deleted
+ pub fn present(v: T) -> Self {
+ Self::Present(v)
+ }
+ /// Create a new deletable object that is deleted
+ pub fn delete() -> Self {
+ Self::Deleted
+ }
+ /// As option
+ pub fn as_option(&self) -> Option<&T> {
+ match self {
+ Self::Present(v) => Some(v),
+ Self::Deleted => None,
+ }
+ }
+ /// As option, mutable
+ pub fn as_option_mut(&mut self) -> Option<&mut T> {
+ match self {
+ Self::Present(v) => Some(v),
+ Self::Deleted => None,
+ }
+ }
+ /// Into option
+ pub fn into_option(self) -> Option<T> {
+ match self {
+ Self::Present(v) => Some(v),
+ Self::Deleted => None,
+ }
+ }
+ /// Is object deleted?
+ pub fn is_deleted(&self) -> bool {
+ matches!(self, Self::Deleted)
+ }
+}
+
+impl<T> From<Option<T>> for Deletable<T> {
+ fn from(v: Option<T>) -> Self {
+ v.map(Self::Present).unwrap_or(Self::Deleted)
+ }
+}
+
+impl<T> From<Deletable<T>> for Option<T> {
+ fn from(v: Deletable<T>) -> Option<T> {
+ match v {
+ Deletable::Present(v) => Some(v),
+ Deletable::Deleted => None,
+ }
+ }
+}
+
+impl<T: Crdt> Crdt for Deletable<T> {
+ fn merge(&mut self, other: &Self) {
+ if let Deletable::Present(v) = self {
+ match other {
+ Deletable::Deleted => *self = Deletable::Deleted,
+ Deletable::Present(v2) => v.merge(v2),
+ }
+ }
+ }
+}
diff --git a/src/util/crdt/lww.rs b/src/util/crdt/lww.rs
index 43d13f27..bc686e05 100644
--- a/src/util/crdt/lww.rs
+++ b/src/util/crdt/lww.rs
@@ -82,6 +82,11 @@ where
&self.v
}
+ /// Take the value inside the CRDT (discards the timesamp)
+ pub fn take(self) -> T {
+ self.v
+ }
+
/// Get a mutable reference to the CRDT's value
///
/// This is usefull to mutate the inside value without changing the LWW timestamp.
diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs
index 3e9aba79..21cb6e12 100644
--- a/src/util/crdt/lww_map.rs
+++ b/src/util/crdt/lww_map.rs
@@ -30,8 +30,8 @@ pub struct LwwMap<K, V> {
impl<K, V> LwwMap<K, V>
where
- K: Ord,
- V: Crdt,
+ K: Clone + Ord,
+ V: Clone + Crdt,
{
/// Create a new empty map CRDT
pub fn new() -> Self {
@@ -73,6 +73,10 @@ where
};
Self { vals: new_vals }
}
+
+ pub fn update_in_place(&mut self, k: K, new_v: V) {
+ self.merge(&self.update_mutator(k, new_v));
+ }
/// Takes all of the values of the map and returns them. The current map is reset to the
/// empty map. This is very usefull to produce in-place a new map that contains only a delta
/// that modifies a certain value:
@@ -158,8 +162,8 @@ where
impl<K, V> Default for LwwMap<K, V>
where
- K: Ord,
- V: Crdt,
+ K: Clone + Ord,
+ V: Clone + Crdt,
{
fn default() -> Self {
Self::new()
diff --git a/src/util/crdt/mod.rs b/src/util/crdt/mod.rs
index 9663a5a5..6ba575ed 100644
--- a/src/util/crdt/mod.rs
+++ b/src/util/crdt/mod.rs
@@ -12,12 +12,14 @@
mod bool;
#[allow(clippy::module_inception)]
mod crdt;
+mod deletable;
mod lww;
mod lww_map;
mod map;
pub use self::bool::*;
pub use crdt::*;
+pub use deletable::*;
pub use lww::*;
pub use lww_map::*;
pub use map::*;
diff --git a/src/util/error.rs b/src/util/error.rs
index ff03d05b..08cf1302 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -119,6 +119,35 @@ where
}
}
+/// Trait to map error to the Bad Request error code
+pub trait OkOrMessage {
+ type S2;
+ fn ok_or_message<M: Into<String>>(self, message: M) -> Self::S2;
+}
+
+impl<T, E> OkOrMessage for Result<T, E>
+where
+ E: std::fmt::Display,
+{
+ type S2 = Result<T, Error>;
+ fn ok_or_message<M: Into<String>>(self, message: M) -> Result<T, Error> {
+ match self {
+ Ok(x) => Ok(x),
+ Err(e) => Err(Error::Message(format!("{}: {}", message.into(), e))),
+ }
+ }
+}
+
+impl<T> OkOrMessage for Option<T> {
+ type S2 = Result<T, Error>;
+ fn ok_or_message<M: Into<String>>(self, message: M) -> Result<T, Error> {
+ match self {
+ Some(x) => Ok(x),
+ None => Err(Error::Message(message.into())),
+ }
+ }
+}
+
// Custom serialization for our error type, for use in RPC.
// Errors are serialized as a string of their Display representation.
// Upon deserialization, they all become a RemoteError with the
diff --git a/src/util/time.rs b/src/util/time.rs
index 238db2c3..d9192443 100644
--- a/src/util/time.rs
+++ b/src/util/time.rs
@@ -10,6 +10,11 @@ pub fn now_msec() -> u64 {
.as_millis() as u64
}
+/// Increment logical clock
+pub fn increment_logical_clock(prev: u64) -> u64 {
+ std::cmp::max(prev + 1, now_msec())
+}
+
/// Convert a timestamp represented as milliseconds since UNIX Epoch to
/// its RFC3339 representation, such as "2021-01-01T12:30:00Z"
pub fn msec_to_rfc3339(msecs: u64) -> String {
diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml
index 72701c90..54211f5d 100644
--- a/src/web/Cargo.toml
+++ b/src/web/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_web"
-version = "0.5.0"
+version = "0.6.0"
authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"]
edition = "2018"
license = "AGPL-3.0"
@@ -14,10 +14,10 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_api = { version = "0.5.0", path = "../api" }
-garage_model = { version = "0.5.0", path = "../model" }
-garage_util = { version = "0.5.0", path = "../util" }
-garage_table = { version = "0.5.0", path = "../table" }
+garage_api = { version = "0.6.0", path = "../api" }
+garage_model = { version = "0.6.0", path = "../model" }
+garage_util = { version = "0.6.0", path = "../util" }
+garage_table = { version = "0.6.0", path = "../table" }
err-derive = "0.3"
log = "0.4"
diff --git a/src/web/web_server.rs b/src/web/web_server.rs
index 4a603c05..5eb25e93 100644
--- a/src/web/web_server.rs
+++ b/src/web/web_server.rs
@@ -12,7 +12,6 @@ use hyper::{
use crate::error::*;
use garage_api::helpers::{authority_to_host, host_to_bucket};
use garage_api::s3_get::{handle_get, handle_head};
-use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_table::*;
use garage_util::error::Error as GarageError;
@@ -77,31 +76,39 @@ async fn serve_file(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<
// Get bucket
let host = authority_to_host(authority)?;
let root = &garage.config.s3_web.root_domain;
- let bucket = host_to_bucket(&host, root).unwrap_or(&host);
- // Check bucket is exposed as a website
- let bucket_desc = garage
+ let bucket_name = host_to_bucket(&host, root).unwrap_or(&host);
+ let bucket_id = garage
+ .bucket_alias_table
+ .get(&EmptyKey, &bucket_name.to_string())
+ .await?
+ .map(|x| x.state.take().into_option())
+ .flatten()
+ .filter(|param| param.website_access)
+ .map(|param| param.bucket_id)
+ .ok_or(Error::NotFound)?;
+
+ // Sanity check: check bucket isn't deleted
+ garage
.bucket_table
- .get(&EmptyKey, &bucket.to_string())
+ .get(&bucket_id, &EmptyKey)
.await?
.filter(|b| !b.is_deleted())
.ok_or(Error::NotFound)?;
- match bucket_desc.state.get() {
- BucketState::Present(params) if *params.website.get() => Ok(()),
- _ => Err(Error::NotFound),
- }?;
-
// Get path
let path = req.uri().path().to_string();
let index = &garage.config.s3_web.index;
let key = path_to_key(&path, index)?;
- info!("Selected bucket: \"{}\", selected key: \"{}\"", bucket, key);
+ info!(
+ "Selected bucket: \"{}\" {:?}, selected key: \"{}\"",
+ bucket_name, bucket_id, key
+ );
let res = match *req.method() {
- Method::HEAD => handle_head(garage, &req, bucket, &key).await?,
- Method::GET => handle_get(garage, &req, bucket, &key).await?,
+ Method::HEAD => handle_head(garage, &req, bucket_id, &key).await?,
+ Method::GET => handle_get(garage, &req, bucket_id, &key).await?,
_ => return Err(Error::BadRequest("HTTP method not supported".to_string())),
};