diff options
author | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
commit | 5768bf362262f78376af14517c4921941986192e (patch) | |
tree | b4baf3051eade0f63649443278bb3a3f4c38ec25 /src/api/generic_server.rs | |
parent | def78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff) | |
download | garage-5768bf362262f78376af14517c4921941986192e.tar.gz garage-5768bf362262f78376af14517c4921941986192e.zip |
First implementation of K2V (#293)
**Specification:**
View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md)
- [x] Specify the structure of K2V triples
- [x] Specify the DVVS format used for causality detection
- [x] Specify the K2V index (just a counter of number of values per partition key)
- [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem
- [x] Specify index endpoint: ReadIndex
- [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch
- [x] Move to JSON objects instead of tuples
- [x] Specify endpoints for polling for updates on single values (PollItem)
**Implementation:**
- [x] Table for K2V items, causal contexts
- [x] Indexing mechanism and table for K2V index
- [x] Make API handlers a bit more generic
- [x] K2V API endpoint
- [x] K2V API router
- [x] ReadItem
- [x] InsertItem
- [x] DeleteItem
- [x] PollItem
- [x] ReadIndex
- [x] InsertBatch
- [x] ReadBatch
- [x] DeleteBatch
**Testing:**
- [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values)
- [x] Actual tests:
- [x] Adapt testing framework
- [x] Simple test with InsertItem + ReadItem
- [x] Test with several Insert/Read/DeleteItem + ReadIndex
- [x] Test all combinations of return formats for ReadItem
- [x] Test with ReadBatch, InsertBatch, DeleteBatch
- [x] Test with PollItem
- [x] Test error codes
- [ ] Fix most broken stuff
- [x] test PollItem broken randomly
- [x] when invalid causality tokens are given, errors should be 4xx not 5xx
**Improvements:**
- [x] Descending range queries
- [x] Specify
- [x] Implement
- [x] Add test
- [x] Batch updates to index counter
- [x] Put K2V behind `k2v` feature flag
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/api/generic_server.rs')
-rw-r--r-- | src/api/generic_server.rs | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/src/api/generic_server.rs b/src/api/generic_server.rs new file mode 100644 index 00000000..9281e596 --- /dev/null +++ b/src/api/generic_server.rs @@ -0,0 +1,202 @@ +use std::net::SocketAddr; +use std::sync::Arc; + +use async_trait::async_trait; + +use futures::future::Future; + +use hyper::server::conn::AddrStream; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Request, Response, Server}; + +use opentelemetry::{ + global, + metrics::{Counter, ValueRecorder}, + trace::{FutureExt, SpanRef, TraceContextExt, Tracer}, + Context, KeyValue, +}; + +use garage_util::error::Error as GarageError; +use garage_util::metrics::{gen_trace_id, RecordDuration}; + +use crate::error::*; + +pub(crate) trait ApiEndpoint: Send + Sync + 'static { + fn name(&self) -> &'static str; + fn add_span_attributes(&self, span: SpanRef<'_>); +} + +#[async_trait] +pub(crate) trait ApiHandler: Send + Sync + 'static { + const API_NAME: &'static str; + const API_NAME_DISPLAY: &'static str; + + type Endpoint: ApiEndpoint; + + fn parse_endpoint(&self, r: &Request<Body>) -> Result<Self::Endpoint, Error>; + async fn handle( + &self, + req: Request<Body>, + endpoint: Self::Endpoint, + ) -> Result<Response<Body>, Error>; +} + +pub(crate) struct ApiServer<A: ApiHandler> { + region: String, + api_handler: A, + + // Metrics + request_counter: Counter<u64>, + error_counter: Counter<u64>, + request_duration: ValueRecorder<f64>, +} + +impl<A: ApiHandler> ApiServer<A> { + pub fn new(region: String, api_handler: A) -> Arc<Self> { + let meter = global::meter("garage/api"); + Arc::new(Self { + region, + api_handler, + request_counter: meter + .u64_counter(format!("api.{}.request_counter", A::API_NAME)) + .with_description(format!( + "Number of API calls to the various {} API endpoints", + A::API_NAME_DISPLAY + )) + .init(), + error_counter: meter + .u64_counter(format!("api.{}.error_counter", A::API_NAME)) + .with_description(format!( + "Number of API calls to the various {} API endpoints that resulted in errors", + A::API_NAME_DISPLAY + )) + .init(), + request_duration: meter + .f64_value_recorder(format!("api.{}.request_duration", A::API_NAME)) + .with_description(format!( + "Duration of API calls to the various {} API endpoints", + A::API_NAME_DISPLAY + )) + .init(), + }) + } + + pub async fn run_server( + self: Arc<Self>, + bind_addr: SocketAddr, + shutdown_signal: impl Future<Output = ()>, + ) -> Result<(), GarageError> { + let service = make_service_fn(|conn: &AddrStream| { + let this = self.clone(); + + let client_addr = conn.remote_addr(); + async move { + Ok::<_, GarageError>(service_fn(move |req: Request<Body>| { + let this = this.clone(); + + this.handler(req, client_addr) + })) + } + }); + + let server = Server::bind(&bind_addr).serve(service); + + let graceful = server.with_graceful_shutdown(shutdown_signal); + info!( + "{} API server listening on http://{}", + A::API_NAME_DISPLAY, + bind_addr + ); + + graceful.await?; + Ok(()) + } + + async fn handler( + self: Arc<Self>, + req: Request<Body>, + addr: SocketAddr, + ) -> Result<Response<Body>, GarageError> { + let uri = req.uri().clone(); + info!("{} {} {}", addr, req.method(), uri); + debug!("{:?}", req); + + let tracer = opentelemetry::global::tracer("garage"); + let span = tracer + .span_builder(format!("{} API call (unknown)", A::API_NAME_DISPLAY)) + .with_trace_id(gen_trace_id()) + .with_attributes(vec![ + KeyValue::new("method", format!("{}", req.method())), + KeyValue::new("uri", req.uri().to_string()), + ]) + .start(&tracer); + + let res = self + .handler_stage2(req) + .with_context(Context::current_with_span(span)) + .await; + + match res { + Ok(x) => { + debug!("{} {:?}", x.status(), x.headers()); + Ok(x) + } + Err(e) => { + let body: Body = Body::from(e.aws_xml(&self.region, uri.path())); + let mut http_error_builder = Response::builder() + .status(e.http_status_code()) + .header("Content-Type", "application/xml"); + + if let Some(header_map) = http_error_builder.headers_mut() { + e.add_headers(header_map) + } + + let http_error = http_error_builder.body(body)?; + + if e.http_status_code().is_server_error() { + warn!("Response: error {}, {}", e.http_status_code(), e); + } else { + info!("Response: error {}, {}", e.http_status_code(), e); + } + Ok(http_error) + } + } + } + + async fn handler_stage2(&self, req: Request<Body>) -> Result<Response<Body>, Error> { + let endpoint = self.api_handler.parse_endpoint(&req)?; + debug!("Endpoint: {}", endpoint.name()); + + let current_context = Context::current(); + let current_span = current_context.span(); + current_span.update_name::<String>(format!("S3 API {}", endpoint.name())); + current_span.set_attribute(KeyValue::new("endpoint", endpoint.name())); + endpoint.add_span_attributes(current_span); + + let metrics_tags = &[KeyValue::new("api_endpoint", endpoint.name())]; + + let res = self + .api_handler + .handle(req, endpoint) + .record_duration(&self.request_duration, &metrics_tags[..]) + .await; + + self.request_counter.add(1, &metrics_tags[..]); + + let status_code = match &res { + Ok(r) => r.status(), + Err(e) => e.http_status_code(), + }; + if status_code.is_client_error() || status_code.is_server_error() { + self.error_counter.add( + 1, + &[ + metrics_tags[0].clone(), + KeyValue::new("status_code", status_code.as_str().to_string()), + ], + ); + } + + res + } +} |