aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/bucket.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
committerAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
commit5768bf362262f78376af14517c4921941986192e (patch)
treeb4baf3051eade0f63649443278bb3a3f4c38ec25 /src/api/s3/bucket.rs
parentdef78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff)
downloadgarage-5768bf362262f78376af14517c4921941986192e.tar.gz
garage-5768bf362262f78376af14517c4921941986192e.zip
First implementation of K2V (#293)
**Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat <alex@adnab.me> Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex <alex@adnab.me> Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/api/s3/bucket.rs')
-rw-r--r--src/api/s3/bucket.rs358
1 files changed, 358 insertions, 0 deletions
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
new file mode 100644
index 00000000..93048a8c
--- /dev/null
+++ b/src/api/s3/bucket.rs
@@ -0,0 +1,358 @@
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use hyper::{Body, Request, Response, StatusCode};
+
+use garage_model::bucket_alias_table::*;
+use garage_model::bucket_table::Bucket;
+use garage_model::garage::Garage;
+use garage_model::key_table::Key;
+use garage_model::permission::BucketKeyPerm;
+use garage_model::s3::object_table::ObjectFilter;
+use garage_table::util::*;
+use garage_util::crdt::*;
+use garage_util::data::*;
+use garage_util::time::*;
+
+use crate::error::*;
+use crate::s3::xml as s3_xml;
+use crate::signature::verify_signed_content;
+
+pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> {
+ let loc = s3_xml::LocationConstraint {
+ xmlns: (),
+ region: garage.config.s3_api.s3_region.to_string(),
+ };
+ let xml = s3_xml::to_xml_with_header(&loc)?;
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml.into_bytes()))?)
+}
+
+pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> {
+ let versioning = s3_xml::VersioningConfiguration {
+ xmlns: (),
+ status: None,
+ };
+
+ let xml = s3_xml::to_xml_with_header(&versioning)?;
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml.into_bytes()))?)
+}
+
+pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Response<Body>, Error> {
+ let key_p = api_key.params().ok_or_internal_error(
+ "Key should not be in deleted state at this point (in handle_list_buckets)",
+ )?;
+
+ // Collect buckets user has access to
+ let ids = api_key
+ .state
+ .as_option()
+ .unwrap()
+ .authorized_buckets
+ .items()
+ .iter()
+ .filter(|(_, perms)| perms.is_any())
+ .map(|(id, _)| *id)
+ .collect::<Vec<_>>();
+
+ let mut buckets_by_id = HashMap::new();
+ let mut aliases = HashMap::new();
+
+ for bucket_id in ids.iter() {
+ let bucket = garage.bucket_table.get(&EmptyKey, bucket_id).await?;
+ if let Some(bucket) = bucket {
+ for (alias, _, _active) in bucket.aliases().iter().filter(|(_, _, active)| *active) {
+ let alias_opt = garage.bucket_alias_table.get(&EmptyKey, alias).await?;
+ if let Some(alias_ent) = alias_opt {
+ if *alias_ent.state.get() == Some(*bucket_id) {
+ aliases.insert(alias_ent.name().to_string(), *bucket_id);
+ }
+ }
+ }
+ if let Deletable::Present(param) = bucket.state {
+ buckets_by_id.insert(bucket_id, param);
+ }
+ }
+ }
+
+ for (alias, _, id_opt) in key_p.local_aliases.items() {
+ if let Some(id) = id_opt {
+ aliases.insert(alias.clone(), *id);
+ }
+ }
+
+ // Generate response
+ let list_buckets = s3_xml::ListAllMyBucketsResult {
+ owner: s3_xml::Owner {
+ display_name: s3_xml::Value(key_p.name.get().to_string()),
+ id: s3_xml::Value(api_key.key_id.to_string()),
+ },
+ buckets: s3_xml::BucketList {
+ entries: aliases
+ .iter()
+ .filter_map(|(name, id)| buckets_by_id.get(id).map(|p| (name, id, p)))
+ .map(|(name, _id, param)| s3_xml::Bucket {
+ creation_date: s3_xml::Value(msec_to_rfc3339(param.creation_date)),
+ name: s3_xml::Value(name.to_string()),
+ })
+ .collect(),
+ },
+ };
+
+ let xml = s3_xml::to_xml_with_header(&list_buckets)?;
+ trace!("xml: {}", xml);
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml))?)
+}
+
+pub async fn handle_create_bucket(
+ garage: &Garage,
+ req: Request<Body>,
+ content_sha256: Option<Hash>,
+ api_key: Key,
+ bucket_name: String,
+) -> Result<Response<Body>, Error> {
+ let body = hyper::body::to_bytes(req.into_body()).await?;
+
+ if let Some(content_sha256) = content_sha256 {
+ verify_signed_content(content_sha256, &body[..])?;
+ }
+
+ let cmd =
+ parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?;
+
+ if let Some(location_constraint) = cmd {
+ if location_constraint != garage.config.s3_api.s3_region {
+ return Err(Error::BadRequest(format!(
+ "Cannot satisfy location constraint `{}`: buckets can only be created in region `{}`",
+ location_constraint,
+ garage.config.s3_api.s3_region
+ )));
+ }
+ }
+
+ let key_params = api_key
+ .params()
+ .ok_or_internal_error("Key should not be deleted at this point")?;
+
+ let existing_bucket = if let Some(Some(bucket_id)) = key_params.local_aliases.get(&bucket_name)
+ {
+ Some(*bucket_id)
+ } else {
+ garage
+ .bucket_helper()
+ .resolve_global_bucket_name(&bucket_name)
+ .await?
+ };
+
+ if let Some(bucket_id) = existing_bucket {
+ // Check we have write or owner permission on the bucket,
+ // in that case it's fine, return 200 OK, bucket exists;
+ // otherwise return a forbidden error.
+ let kp = api_key.bucket_permissions(&bucket_id);
+ if !(kp.allow_write || kp.allow_owner) {
+ return Err(Error::BucketAlreadyExists);
+ }
+ } else {
+ // Create the bucket!
+ if !is_valid_bucket_name(&bucket_name) {
+ return Err(Error::BadRequest(format!(
+ "{}: {}",
+ bucket_name, INVALID_BUCKET_NAME_MESSAGE
+ )));
+ }
+
+ let bucket = Bucket::new();
+ garage.bucket_table.insert(&bucket).await?;
+
+ garage
+ .bucket_helper()
+ .set_bucket_key_permissions(bucket.id, &api_key.key_id, BucketKeyPerm::ALL_PERMISSIONS)
+ .await?;
+
+ garage
+ .bucket_helper()
+ .set_local_bucket_alias(bucket.id, &api_key.key_id, &bucket_name)
+ .await?;
+ }
+
+ Ok(Response::builder()
+ .header("Location", format!("/{}", bucket_name))
+ .body(Body::empty())
+ .unwrap())
+}
+
+pub async fn handle_delete_bucket(
+ garage: &Garage,
+ bucket_id: Uuid,
+ bucket_name: String,
+ api_key: Key,
+) -> Result<Response<Body>, Error> {
+ let key_params = api_key
+ .params()
+ .ok_or_internal_error("Key should not be deleted at this point")?;
+
+ let is_local_alias = matches!(key_params.local_aliases.get(&bucket_name), Some(Some(_)));
+
+ let mut bucket = garage
+ .bucket_helper()
+ .get_existing_bucket(bucket_id)
+ .await?;
+ let bucket_state = bucket.state.as_option().unwrap();
+
+ // If the bucket has no other aliases, this is a true deletion.
+ // Otherwise, it is just an alias removal.
+
+ let has_other_global_aliases = bucket_state
+ .aliases
+ .items()
+ .iter()
+ .filter(|(_, _, active)| *active)
+ .any(|(n, _, _)| is_local_alias || (*n != bucket_name));
+
+ let has_other_local_aliases = bucket_state
+ .local_aliases
+ .items()
+ .iter()
+ .filter(|(_, _, active)| *active)
+ .any(|((k, n), _, _)| !is_local_alias || *n != bucket_name || *k != api_key.key_id);
+
+ if !has_other_global_aliases && !has_other_local_aliases {
+ // Delete bucket
+
+ // Check bucket is empty
+ let objects = garage
+ .object_table
+ .get_range(
+ &bucket_id,
+ None,
+ Some(ObjectFilter::IsData),
+ 10,
+ EnumerationOrder::Forward,
+ )
+ .await?;
+ if !objects.is_empty() {
+ return Err(Error::BucketNotEmpty);
+ }
+
+ // --- done checking, now commit ---
+ // 1. delete bucket alias
+ if is_local_alias {
+ garage
+ .bucket_helper()
+ .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
+ .await?;
+ } else {
+ garage
+ .bucket_helper()
+ .unset_global_bucket_alias(bucket_id, &bucket_name)
+ .await?;
+ }
+
+ // 2. delete authorization from keys that had access
+ for (key_id, _) in bucket.authorized_keys() {
+ garage
+ .bucket_helper()
+ .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
+ .await?;
+ }
+
+ // 3. delete bucket
+ bucket.state = Deletable::delete();
+ garage.bucket_table.insert(&bucket).await?;
+ } else if is_local_alias {
+ // Just unalias
+ garage
+ .bucket_helper()
+ .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
+ .await?;
+ } else {
+ // Just unalias (but from global namespace)
+ garage
+ .bucket_helper()
+ .unset_global_bucket_alias(bucket_id, &bucket_name)
+ .await?;
+ }
+
+ Ok(Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())?)
+}
+
+fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> {
+ // Returns None if invalid data
+ // Returns Some(None) if no location constraint is given
+ // Returns Some(Some("xxxx")) where xxxx is the given location constraint
+
+ let xml_str = std::str::from_utf8(xml_bytes).ok()?;
+ if xml_str.trim_matches(char::is_whitespace).is_empty() {
+ return Some(None);
+ }
+
+ let xml = roxmltree::Document::parse(xml_str).ok()?;
+
+ let cbc = xml.root().first_child()?;
+ if !cbc.has_tag_name("CreateBucketConfiguration") {
+ return None;
+ }
+
+ let mut ret = None;
+ for item in cbc.children() {
+ println!("{:?}", item);
+ if item.has_tag_name("LocationConstraint") {
+ if ret != None {
+ return None;
+ }
+ ret = Some(item.text()?.to_string());
+ } else if !item.is_text() {
+ return None;
+ }
+ }
+
+ Some(ret)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn create_bucket() {
+ assert_eq!(parse_create_bucket_xml(br#""#), Some(None));
+ assert_eq!(
+ parse_create_bucket_xml(
+ br#"
+ <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ </CreateBucketConfiguration >
+ "#
+ ),
+ Some(None)
+ );
+ assert_eq!(
+ parse_create_bucket_xml(
+ br#"
+ <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ <LocationConstraint>Europe</LocationConstraint>
+ </CreateBucketConfiguration >
+ "#
+ ),
+ Some(Some("Europe".into()))
+ );
+ assert_eq!(
+ parse_create_bucket_xml(
+ br#"
+ <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
+ </Crea >
+ "#
+ ),
+ None
+ );
+ }
+}