diff options
author | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
commit | 5768bf362262f78376af14517c4921941986192e (patch) | |
tree | b4baf3051eade0f63649443278bb3a3f4c38ec25 /src/api/s3/bucket.rs | |
parent | def78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff) | |
download | garage-5768bf362262f78376af14517c4921941986192e.tar.gz garage-5768bf362262f78376af14517c4921941986192e.zip |
First implementation of K2V (#293)
**Specification:**
View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md)
- [x] Specify the structure of K2V triples
- [x] Specify the DVVS format used for causality detection
- [x] Specify the K2V index (just a counter of number of values per partition key)
- [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem
- [x] Specify index endpoint: ReadIndex
- [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch
- [x] Move to JSON objects instead of tuples
- [x] Specify endpoints for polling for updates on single values (PollItem)
**Implementation:**
- [x] Table for K2V items, causal contexts
- [x] Indexing mechanism and table for K2V index
- [x] Make API handlers a bit more generic
- [x] K2V API endpoint
- [x] K2V API router
- [x] ReadItem
- [x] InsertItem
- [x] DeleteItem
- [x] PollItem
- [x] ReadIndex
- [x] InsertBatch
- [x] ReadBatch
- [x] DeleteBatch
**Testing:**
- [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values)
- [x] Actual tests:
- [x] Adapt testing framework
- [x] Simple test with InsertItem + ReadItem
- [x] Test with several Insert/Read/DeleteItem + ReadIndex
- [x] Test all combinations of return formats for ReadItem
- [x] Test with ReadBatch, InsertBatch, DeleteBatch
- [x] Test with PollItem
- [x] Test error codes
- [ ] Fix most broken stuff
- [x] test PollItem broken randomly
- [x] when invalid causality tokens are given, errors should be 4xx not 5xx
**Improvements:**
- [x] Descending range queries
- [x] Specify
- [x] Implement
- [x] Add test
- [x] Batch updates to index counter
- [x] Put K2V behind `k2v` feature flag
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/api/s3/bucket.rs')
-rw-r--r-- | src/api/s3/bucket.rs | 358 |
1 files changed, 358 insertions, 0 deletions
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs new file mode 100644 index 00000000..93048a8c --- /dev/null +++ b/src/api/s3/bucket.rs @@ -0,0 +1,358 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; + +use garage_model::bucket_alias_table::*; +use garage_model::bucket_table::Bucket; +use garage_model::garage::Garage; +use garage_model::key_table::Key; +use garage_model::permission::BucketKeyPerm; +use garage_model::s3::object_table::ObjectFilter; +use garage_table::util::*; +use garage_util::crdt::*; +use garage_util::data::*; +use garage_util::time::*; + +use crate::error::*; +use crate::s3::xml as s3_xml; +use crate::signature::verify_signed_content; + +pub fn handle_get_bucket_location(garage: Arc<Garage>) -> Result<Response<Body>, Error> { + let loc = s3_xml::LocationConstraint { + xmlns: (), + region: garage.config.s3_api.s3_region.to_string(), + }; + let xml = s3_xml::to_xml_with_header(&loc)?; + + Ok(Response::builder() + .header("Content-Type", "application/xml") + .body(Body::from(xml.into_bytes()))?) +} + +pub fn handle_get_bucket_versioning() -> Result<Response<Body>, Error> { + let versioning = s3_xml::VersioningConfiguration { + xmlns: (), + status: None, + }; + + let xml = s3_xml::to_xml_with_header(&versioning)?; + + Ok(Response::builder() + .header("Content-Type", "application/xml") + .body(Body::from(xml.into_bytes()))?) +} + +pub async fn handle_list_buckets(garage: &Garage, api_key: &Key) -> Result<Response<Body>, Error> { + let key_p = api_key.params().ok_or_internal_error( + "Key should not be in deleted state at this point (in handle_list_buckets)", + )?; + + // Collect buckets user has access to + let ids = api_key + .state + .as_option() + .unwrap() + .authorized_buckets + .items() + .iter() + .filter(|(_, perms)| perms.is_any()) + .map(|(id, _)| *id) + .collect::<Vec<_>>(); + + let mut buckets_by_id = HashMap::new(); + let mut aliases = HashMap::new(); + + for bucket_id in ids.iter() { + let bucket = garage.bucket_table.get(&EmptyKey, bucket_id).await?; + if let Some(bucket) = bucket { + for (alias, _, _active) in bucket.aliases().iter().filter(|(_, _, active)| *active) { + let alias_opt = garage.bucket_alias_table.get(&EmptyKey, alias).await?; + if let Some(alias_ent) = alias_opt { + if *alias_ent.state.get() == Some(*bucket_id) { + aliases.insert(alias_ent.name().to_string(), *bucket_id); + } + } + } + if let Deletable::Present(param) = bucket.state { + buckets_by_id.insert(bucket_id, param); + } + } + } + + for (alias, _, id_opt) in key_p.local_aliases.items() { + if let Some(id) = id_opt { + aliases.insert(alias.clone(), *id); + } + } + + // Generate response + let list_buckets = s3_xml::ListAllMyBucketsResult { + owner: s3_xml::Owner { + display_name: s3_xml::Value(key_p.name.get().to_string()), + id: s3_xml::Value(api_key.key_id.to_string()), + }, + buckets: s3_xml::BucketList { + entries: aliases + .iter() + .filter_map(|(name, id)| buckets_by_id.get(id).map(|p| (name, id, p))) + .map(|(name, _id, param)| s3_xml::Bucket { + creation_date: s3_xml::Value(msec_to_rfc3339(param.creation_date)), + name: s3_xml::Value(name.to_string()), + }) + .collect(), + }, + }; + + let xml = s3_xml::to_xml_with_header(&list_buckets)?; + trace!("xml: {}", xml); + + Ok(Response::builder() + .header("Content-Type", "application/xml") + .body(Body::from(xml))?) +} + +pub async fn handle_create_bucket( + garage: &Garage, + req: Request<Body>, + content_sha256: Option<Hash>, + api_key: Key, + bucket_name: String, +) -> Result<Response<Body>, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } + + let cmd = + parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?; + + if let Some(location_constraint) = cmd { + if location_constraint != garage.config.s3_api.s3_region { + return Err(Error::BadRequest(format!( + "Cannot satisfy location constraint `{}`: buckets can only be created in region `{}`", + location_constraint, + garage.config.s3_api.s3_region + ))); + } + } + + let key_params = api_key + .params() + .ok_or_internal_error("Key should not be deleted at this point")?; + + let existing_bucket = if let Some(Some(bucket_id)) = key_params.local_aliases.get(&bucket_name) + { + Some(*bucket_id) + } else { + garage + .bucket_helper() + .resolve_global_bucket_name(&bucket_name) + .await? + }; + + if let Some(bucket_id) = existing_bucket { + // Check we have write or owner permission on the bucket, + // in that case it's fine, return 200 OK, bucket exists; + // otherwise return a forbidden error. + let kp = api_key.bucket_permissions(&bucket_id); + if !(kp.allow_write || kp.allow_owner) { + return Err(Error::BucketAlreadyExists); + } + } else { + // Create the bucket! + if !is_valid_bucket_name(&bucket_name) { + return Err(Error::BadRequest(format!( + "{}: {}", + bucket_name, INVALID_BUCKET_NAME_MESSAGE + ))); + } + + let bucket = Bucket::new(); + garage.bucket_table.insert(&bucket).await?; + + garage + .bucket_helper() + .set_bucket_key_permissions(bucket.id, &api_key.key_id, BucketKeyPerm::ALL_PERMISSIONS) + .await?; + + garage + .bucket_helper() + .set_local_bucket_alias(bucket.id, &api_key.key_id, &bucket_name) + .await?; + } + + Ok(Response::builder() + .header("Location", format!("/{}", bucket_name)) + .body(Body::empty()) + .unwrap()) +} + +pub async fn handle_delete_bucket( + garage: &Garage, + bucket_id: Uuid, + bucket_name: String, + api_key: Key, +) -> Result<Response<Body>, Error> { + let key_params = api_key + .params() + .ok_or_internal_error("Key should not be deleted at this point")?; + + let is_local_alias = matches!(key_params.local_aliases.get(&bucket_name), Some(Some(_))); + + let mut bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + let bucket_state = bucket.state.as_option().unwrap(); + + // If the bucket has no other aliases, this is a true deletion. + // Otherwise, it is just an alias removal. + + let has_other_global_aliases = bucket_state + .aliases + .items() + .iter() + .filter(|(_, _, active)| *active) + .any(|(n, _, _)| is_local_alias || (*n != bucket_name)); + + let has_other_local_aliases = bucket_state + .local_aliases + .items() + .iter() + .filter(|(_, _, active)| *active) + .any(|((k, n), _, _)| !is_local_alias || *n != bucket_name || *k != api_key.key_id); + + if !has_other_global_aliases && !has_other_local_aliases { + // Delete bucket + + // Check bucket is empty + let objects = garage + .object_table + .get_range( + &bucket_id, + None, + Some(ObjectFilter::IsData), + 10, + EnumerationOrder::Forward, + ) + .await?; + if !objects.is_empty() { + return Err(Error::BucketNotEmpty); + } + + // --- done checking, now commit --- + // 1. delete bucket alias + if is_local_alias { + garage + .bucket_helper() + .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name) + .await?; + } else { + garage + .bucket_helper() + .unset_global_bucket_alias(bucket_id, &bucket_name) + .await?; + } + + // 2. delete authorization from keys that had access + for (key_id, _) in bucket.authorized_keys() { + garage + .bucket_helper() + .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS) + .await?; + } + + // 3. delete bucket + bucket.state = Deletable::delete(); + garage.bucket_table.insert(&bucket).await?; + } else if is_local_alias { + // Just unalias + garage + .bucket_helper() + .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name) + .await?; + } else { + // Just unalias (but from global namespace) + garage + .bucket_helper() + .unset_global_bucket_alias(bucket_id, &bucket_name) + .await?; + } + + Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?) +} + +fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> { + // Returns None if invalid data + // Returns Some(None) if no location constraint is given + // Returns Some(Some("xxxx")) where xxxx is the given location constraint + + let xml_str = std::str::from_utf8(xml_bytes).ok()?; + if xml_str.trim_matches(char::is_whitespace).is_empty() { + return Some(None); + } + + let xml = roxmltree::Document::parse(xml_str).ok()?; + + let cbc = xml.root().first_child()?; + if !cbc.has_tag_name("CreateBucketConfiguration") { + return None; + } + + let mut ret = None; + for item in cbc.children() { + println!("{:?}", item); + if item.has_tag_name("LocationConstraint") { + if ret != None { + return None; + } + ret = Some(item.text()?.to_string()); + } else if !item.is_text() { + return None; + } + } + + Some(ret) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_bucket() { + assert_eq!(parse_create_bucket_xml(br#""#), Some(None)); + assert_eq!( + parse_create_bucket_xml( + br#" + <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + </CreateBucketConfiguration > + "# + ), + Some(None) + ); + assert_eq!( + parse_create_bucket_xml( + br#" + <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + <LocationConstraint>Europe</LocationConstraint> + </CreateBucketConfiguration > + "# + ), + Some(Some("Europe".into())) + ); + assert_eq!( + parse_create_bucket_xml( + br#" + <CreateBucketConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"> + </Crea > + "# + ), + None + ); + } +} |