From 5768bf362262f78376af14517c4921941986192e Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 10 May 2022 13:16:57 +0200 Subject: First implementation of K2V (#293) **Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex Co-committed-by: Alex --- src/api/k2v/api_server.rs | 195 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 195 insertions(+) create mode 100644 src/api/k2v/api_server.rs (limited to 'src/api/k2v/api_server.rs') diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs new file mode 100644 index 00000000..5f5e9030 --- /dev/null +++ b/src/api/k2v/api_server.rs @@ -0,0 +1,195 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::future::Future; +use hyper::{Body, Method, Request, Response}; + +use opentelemetry::{trace::SpanRef, KeyValue}; + +use garage_table::util::*; +use garage_util::error::Error as GarageError; + +use garage_model::garage::Garage; + +use crate::error::*; +use crate::generic_server::*; + +use crate::signature::payload::check_payload_signature; +use crate::signature::streaming::*; + +use crate::helpers::*; +use crate::k2v::batch::*; +use crate::k2v::index::*; +use crate::k2v::item::*; +use crate::k2v::router::Endpoint; +use crate::s3::cors::*; + +pub struct K2VApiServer { + garage: Arc, +} + +pub(crate) struct K2VApiEndpoint { + bucket_name: String, + endpoint: Endpoint, +} + +impl K2VApiServer { + pub async fn run( + garage: Arc, + shutdown_signal: impl Future, + ) -> Result<(), GarageError> { + if let Some(cfg) = &garage.config.k2v_api { + let bind_addr = cfg.api_bind_addr; + + ApiServer::new( + garage.config.s3_api.s3_region.clone(), + K2VApiServer { garage }, + ) + .run_server(bind_addr, shutdown_signal) + .await + } else { + Ok(()) + } + } +} + +#[async_trait] +impl ApiHandler for K2VApiServer { + const API_NAME: &'static str = "k2v"; + const API_NAME_DISPLAY: &'static str = "K2V"; + + type Endpoint = K2VApiEndpoint; + + fn parse_endpoint(&self, req: &Request) -> Result { + let (endpoint, bucket_name) = Endpoint::from_request(req)?; + + Ok(K2VApiEndpoint { + bucket_name, + endpoint, + }) + } + + async fn handle( + &self, + req: Request, + endpoint: K2VApiEndpoint, + ) -> Result, Error> { + let K2VApiEndpoint { + bucket_name, + endpoint, + } = endpoint; + let garage = self.garage.clone(); + + // The OPTIONS method is procesed early, before we even check for an API key + if let Endpoint::Options = endpoint { + return handle_options_s3api(garage, &req, Some(bucket_name)).await; + } + + let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?; + let api_key = api_key.ok_or_else(|| { + Error::Forbidden("Garage does not support anonymous access yet".to_string()) + })?; + + let req = parse_streaming_body( + &api_key, + req, + &mut content_sha256, + &garage.config.s3_api.s3_region, + "k2v", + )?; + + let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?; + let bucket = garage + .bucket_table + .get(&EmptyKey, &bucket_id) + .await? + .filter(|b| !b.state.is_deleted()) + .ok_or(Error::NoSuchBucket)?; + + let allowed = match endpoint.authorization_type() { + Authorization::Read => api_key.allow_read(&bucket_id), + Authorization::Write => api_key.allow_write(&bucket_id), + Authorization::Owner => api_key.allow_owner(&bucket_id), + _ => unreachable!(), + }; + + if !allowed { + return Err(Error::Forbidden( + "Operation is not allowed for this key.".to_string(), + )); + } + + // Look up what CORS rule might apply to response. + // Requests for methods different than GET, HEAD or POST + // are always preflighted, i.e. the browser should make + // an OPTIONS call before to check it is allowed + let matching_cors_rule = match *req.method() { + Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?, + _ => None, + }; + + let resp = match endpoint { + Endpoint::DeleteItem { + partition_key, + sort_key, + } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await, + Endpoint::InsertItem { + partition_key, + sort_key, + } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await, + Endpoint::ReadItem { + partition_key, + sort_key, + } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await, + Endpoint::PollItem { + partition_key, + sort_key, + causality_token, + timeout, + } => { + handle_poll_item( + garage, + &req, + bucket_id, + partition_key, + sort_key, + causality_token, + timeout, + ) + .await + } + Endpoint::ReadIndex { + prefix, + start, + end, + limit, + reverse, + } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await, + Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, + Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, + Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, + Endpoint::Options => unreachable!(), + }; + + // If request was a success and we have a CORS rule that applies to it, + // add the corresponding CORS headers to the response + let mut resp_ok = resp?; + if let Some(rule) = matching_cors_rule { + add_cors_headers(&mut resp_ok, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + + Ok(resp_ok) + } +} + +impl ApiEndpoint for K2VApiEndpoint { + fn name(&self) -> &'static str { + self.endpoint.name() + } + + fn add_span_attributes(&self, span: SpanRef<'_>) { + span.set_attribute(KeyValue::new("bucket", self.bucket_name.clone())); + } +} -- cgit v1.2.3 From 382e74c798263d042b1c6ca3788c866a8c69c4f4 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 24 May 2022 12:16:39 +0200 Subject: First version of admin API (#298) **Spec:** - [x] Start writing - [x] Specify all layout endpoints - [x] Specify all endpoints for operations on keys - [x] Specify all endpoints for operations on key/bucket permissions - [x] Specify all endpoints for operations on buckets - [x] Specify all endpoints for operations on bucket aliases View rendered spec at **Code:** - [x] Refactor code for admin api to use common api code that was created for K2V **General endpoints:** - [x] Metrics - [x] GetClusterStatus - [x] ConnectClusterNodes - [x] GetClusterLayout - [x] UpdateClusterLayout - [x] ApplyClusterLayout - [x] RevertClusterLayout **Key-related endpoints:** - [x] ListKeys - [x] CreateKey - [x] ImportKey - [x] GetKeyInfo - [x] UpdateKey - [x] DeleteKey **Bucket-related endpoints:** - [x] ListBuckets - [x] CreateBucket - [x] GetBucketInfo - [x] DeleteBucket - [x] PutBucketWebsite - [x] DeleteBucketWebsite **Operations on key/bucket permissions:** - [x] BucketAllowKey - [x] BucketDenyKey **Operations on bucket aliases:** - [x] GlobalAliasBucket - [x] GlobalUnaliasBucket - [x] LocalAliasBucket - [x] LocalUnaliasBucket **And also:** - [x] Separate error type for the admin API (this PR includes a quite big refactoring of error handling) - [x] Add management of website access - [ ] Check that nothing is missing wrt what can be done using the CLI - [ ] Improve formatting of the spec - [x] Make sure everyone is cool with the API design Fix #231 Fix #295 Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/298 Co-authored-by: Alex Co-committed-by: Alex --- src/api/k2v/api_server.rs | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) (limited to 'src/api/k2v/api_server.rs') diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 5f5e9030..eb0fbdd7 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -7,13 +7,12 @@ use hyper::{Body, Method, Request, Response}; use opentelemetry::{trace::SpanRef, KeyValue}; -use garage_table::util::*; use garage_util::error::Error as GarageError; use garage_model::garage::Garage; -use crate::error::*; use crate::generic_server::*; +use crate::k2v::error::*; use crate::signature::payload::check_payload_signature; use crate::signature::streaming::*; @@ -60,6 +59,7 @@ impl ApiHandler for K2VApiServer { const API_NAME_DISPLAY: &'static str = "K2V"; type Endpoint = K2VApiEndpoint; + type Error = Error; fn parse_endpoint(&self, req: &Request) -> Result { let (endpoint, bucket_name) = Endpoint::from_request(req)?; @@ -83,13 +83,14 @@ impl ApiHandler for K2VApiServer { // The OPTIONS method is procesed early, before we even check for an API key if let Endpoint::Options = endpoint { - return handle_options_s3api(garage, &req, Some(bucket_name)).await; + return Ok(handle_options_s3api(garage, &req, Some(bucket_name)) + .await + .ok_or_bad_request("Error handling OPTIONS")?); } let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?; - let api_key = api_key.ok_or_else(|| { - Error::Forbidden("Garage does not support anonymous access yet".to_string()) - })?; + let api_key = api_key + .ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?; let req = parse_streaming_body( &api_key, @@ -99,13 +100,14 @@ impl ApiHandler for K2VApiServer { "k2v", )?; - let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?; + let bucket_id = garage + .bucket_helper() + .resolve_bucket(&bucket_name, &api_key) + .await?; let bucket = garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .filter(|b| !b.state.is_deleted()) - .ok_or(Error::NoSuchBucket)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; let allowed = match endpoint.authorization_type() { Authorization::Read => api_key.allow_read(&bucket_id), @@ -115,9 +117,7 @@ impl ApiHandler for K2VApiServer { }; if !allowed { - return Err(Error::Forbidden( - "Operation is not allowed for this key.".to_string(), - )); + return Err(Error::forbidden("Operation is not allowed for this key.")); } // Look up what CORS rule might apply to response. @@ -125,7 +125,8 @@ impl ApiHandler for K2VApiServer { // are always preflighted, i.e. the browser should make // an OPTIONS call before to check it is allowed let matching_cors_rule = match *req.method() { - Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?, + Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req) + .ok_or_internal_error("Error looking up CORS rule")?, _ => None, }; -- cgit v1.2.3 From 2559f63e9bb58a66da70f33e852ebbd5f909876e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Sep 2022 17:54:16 +0200 Subject: Make all HTTP services optionnal --- src/api/k2v/api_server.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) (limited to 'src/api/k2v/api_server.rs') diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index eb0fbdd7..084867b5 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1,3 +1,4 @@ +use std::net::SocketAddr; use std::sync::Arc; use async_trait::async_trait; @@ -36,20 +37,13 @@ pub(crate) struct K2VApiEndpoint { impl K2VApiServer { pub async fn run( garage: Arc, + bind_addr: SocketAddr, + s3_region: String, shutdown_signal: impl Future, ) -> Result<(), GarageError> { - if let Some(cfg) = &garage.config.k2v_api { - let bind_addr = cfg.api_bind_addr; - - ApiServer::new( - garage.config.s3_api.s3_region.clone(), - K2VApiServer { garage }, - ) + ApiServer::new(s3_region, K2VApiServer { garage }) .run_server(bind_addr, shutdown_signal) .await - } else { - Ok(()) - } } } -- cgit v1.2.3