diff options
author | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
commit | 5768bf362262f78376af14517c4921941986192e (patch) | |
tree | b4baf3051eade0f63649443278bb3a3f4c38ec25 /src/garage | |
parent | def78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff) | |
download | garage-5768bf362262f78376af14517c4921941986192e.tar.gz garage-5768bf362262f78376af14517c4921941986192e.zip |
First implementation of K2V (#293)
**Specification:**
View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md)
- [x] Specify the structure of K2V triples
- [x] Specify the DVVS format used for causality detection
- [x] Specify the K2V index (just a counter of number of values per partition key)
- [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem
- [x] Specify index endpoint: ReadIndex
- [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch
- [x] Move to JSON objects instead of tuples
- [x] Specify endpoints for polling for updates on single values (PollItem)
**Implementation:**
- [x] Table for K2V items, causal contexts
- [x] Indexing mechanism and table for K2V index
- [x] Make API handlers a bit more generic
- [x] K2V API endpoint
- [x] K2V API router
- [x] ReadItem
- [x] InsertItem
- [x] DeleteItem
- [x] PollItem
- [x] ReadIndex
- [x] InsertBatch
- [x] ReadBatch
- [x] DeleteBatch
**Testing:**
- [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values)
- [x] Actual tests:
- [x] Adapt testing framework
- [x] Simple test with InsertItem + ReadItem
- [x] Test with several Insert/Read/DeleteItem + ReadIndex
- [x] Test all combinations of return formats for ReadItem
- [x] Test with ReadBatch, InsertBatch, DeleteBatch
- [x] Test with PollItem
- [x] Test error codes
- [ ] Fix most broken stuff
- [x] test PollItem broken randomly
- [x] when invalid causality tokens are given, errors should be 4xx not 5xx
**Improvements:**
- [x] Descending range queries
- [x] Specify
- [x] Implement
- [x] Add test
- [x] Batch updates to index counter
- [x] Put K2V behind `k2v` feature flag
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/garage')
23 files changed, 1690 insertions, 65 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 59f402ff..3b69d7bc 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -63,3 +63,11 @@ hyper = { version = "0.14", features = ["client", "http1", "runtime"] } sha2 = "0.9" static_init = "1.0" +assert-json-diff = "2.0" +serde_json = "1.0" +base64 = "0.13" + + +[features] +kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ] +k2v = [ "garage_util/k2v", "garage_api/k2v" ] diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 0b20bb20..af0c3f22 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -21,8 +21,8 @@ use garage_model::garage::Garage; use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; -use garage_model::object_table::ObjectFilter; use garage_model::permission::*; +use garage_model::s3::object_table::ObjectFilter; use crate::cli::*; use crate::repair::Repair; @@ -80,7 +80,13 @@ impl AdminRpcHandler { let buckets = self .garage .bucket_table - .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) + .get_range( + &EmptyKey, + None, + Some(DeletedFilter::NotDeleted), + 10000, + EnumerationOrder::Forward, + ) .await?; Ok(AdminRpc::BucketList(buckets)) } @@ -210,7 +216,13 @@ impl AdminRpcHandler { let objects = self .garage .object_table - .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10) + .get_range( + &bucket_id, + None, + Some(ObjectFilter::IsData), + 10, + EnumerationOrder::Forward, + ) .await?; if !objects.is_empty() { return Err(Error::BadRequest(format!( @@ -445,6 +457,7 @@ impl AdminRpcHandler { None, Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), 10000, + EnumerationOrder::Forward, ) .await? .iter() diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index a90277a0..2a799868 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -85,13 +85,14 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> format_table(healthy_nodes); let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>(); - let failure_case_1 = status.iter().any(|adv| !adv.is_up); + let failure_case_1 = status + .iter() + .any(|adv| !adv.is_up && matches!(layout.roles.get(&adv.id), Some(NodeRoleV(Some(_))))); let failure_case_2 = layout .roles .items() .iter() - .filter(|(_, _, v)| v.0.is_some()) - .any(|(id, _, _)| !status_keys.contains(id)); + .any(|(id, _, v)| !status_keys.contains(id) && v.0.is_some()); if failure_case_1 || failure_case_2 { println!("\n==== FAILED NODES ===="); let mut failed_nodes = diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 3666ca8f..830eac71 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -2,10 +2,10 @@ use std::sync::Arc; use tokio::sync::watch; -use garage_model::block_ref_table::*; use garage_model::garage::Garage; -use garage_model::object_table::*; -use garage_model::version_table::*; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; use garage_table::*; use garage_util::error::Error; diff --git a/src/garage/server.rs b/src/garage/server.rs index 58c9e782..24bb25b3 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -8,10 +8,13 @@ use garage_util::error::Error; use garage_admin::metrics::*; use garage_admin::tracing_setup::*; -use garage_api::run_api_server; +use garage_api::s3::api_server::S3ApiServer; use garage_model::garage::Garage; use garage_web::run_web_server; +#[cfg(feature = "k2v")] +use garage_api::k2v::api_server::K2VApiServer; + use crate::admin::*; async fn wait_from(mut chan: watch::Receiver<bool>) { @@ -56,12 +59,21 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Create admin RPC handler..."); AdminRpcHandler::new(garage.clone()); - info!("Initializing API server..."); - let api_server = tokio::spawn(run_api_server( + info!("Initializing S3 API server..."); + let s3_api_server = tokio::spawn(S3ApiServer::run( garage.clone(), wait_from(watch_cancel.clone()), )); + #[cfg(feature = "k2v")] + let k2v_api_server = { + info!("Initializing K2V API server..."); + tokio::spawn(K2VApiServer::run( + garage.clone(), + wait_from(watch_cancel.clone()), + )) + }; + info!("Initializing web server..."); let web_server = tokio::spawn(run_web_server( garage.clone(), @@ -80,8 +92,12 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { // Stuff runs // When a cancel signal is sent, stuff stops - if let Err(e) = api_server.await? { - warn!("API server exited with error: {}", e); + if let Err(e) = s3_api_server.await? { + warn!("S3 API server exited with error: {}", e); + } + #[cfg(feature = "k2v")] + if let Err(e) = k2v_api_server.await? { + warn!("K2V API server exited with error: {}", e); } if let Err(e) = web_server.await? { warn!("Web server exited with error: {}", e); diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs index c5ddc6e5..212588b5 100644 --- a/src/garage/tests/common/client.rs +++ b/src/garage/tests/common/client.rs @@ -10,7 +10,7 @@ pub fn build_client(instance: &Instance) -> Client { None, "garage-integ-test", ); - let endpoint = Endpoint::immutable(instance.uri()); + let endpoint = Endpoint::immutable(instance.s3_uri()); let config = Config::builder() .region(super::REGION) diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs index 580691a1..1700cc90 100644 --- a/src/garage/tests/common/custom_requester.rs +++ b/src/garage/tests/common/custom_requester.rs @@ -17,14 +17,25 @@ use garage_api::signature; pub struct CustomRequester { key: Key, uri: Uri, + service: &'static str, client: Client<HttpConnector>, } impl CustomRequester { - pub fn new(instance: &Instance) -> Self { + pub fn new_s3(instance: &Instance) -> Self { CustomRequester { key: instance.key.clone(), - uri: instance.uri(), + uri: instance.s3_uri(), + service: "s3", + client: Client::new(), + } + } + + pub fn new_k2v(instance: &Instance) -> Self { + CustomRequester { + key: instance.key.clone(), + uri: instance.k2v_uri(), + service: "k2v", client: Client::new(), } } @@ -32,6 +43,7 @@ impl CustomRequester { pub fn builder(&self, bucket: String) -> RequestBuilder<'_> { RequestBuilder { requester: self, + service: self.service, bucket, method: Method::GET, path: String::new(), @@ -47,6 +59,7 @@ impl CustomRequester { pub struct RequestBuilder<'a> { requester: &'a CustomRequester, + service: &'static str, bucket: String, method: Method, path: String, @@ -59,13 +72,17 @@ pub struct RequestBuilder<'a> { } impl<'a> RequestBuilder<'a> { + pub fn service(&mut self, service: &'static str) -> &mut Self { + self.service = service; + self + } pub fn method(&mut self, method: Method) -> &mut Self { self.method = method; self } - pub fn path(&mut self, path: String) -> &mut Self { - self.path = path; + pub fn path(&mut self, path: impl ToString) -> &mut Self { + self.path = path.to_string(); self } @@ -74,16 +91,38 @@ impl<'a> RequestBuilder<'a> { self } + pub fn query_param<T, U>(&mut self, param: T, value: Option<U>) -> &mut Self + where + T: ToString, + U: ToString, + { + self.query_params + .insert(param.to_string(), value.as_ref().map(ToString::to_string)); + self + } + pub fn signed_headers(&mut self, signed_headers: HashMap<String, String>) -> &mut Self { self.signed_headers = signed_headers; self } + pub fn signed_header(&mut self, name: impl ToString, value: impl ToString) -> &mut Self { + self.signed_headers + .insert(name.to_string(), value.to_string()); + self + } + pub fn unsigned_headers(&mut self, unsigned_headers: HashMap<String, String>) -> &mut Self { self.unsigned_headers = unsigned_headers; self } + pub fn unsigned_header(&mut self, name: impl ToString, value: impl ToString) -> &mut Self { + self.unsigned_headers + .insert(name.to_string(), value.to_string()); + self + } + pub fn body(&mut self, body: Vec<u8>) -> &mut Self { self.body = body; self @@ -106,24 +145,24 @@ impl<'a> RequestBuilder<'a> { let query = query_param_to_string(&self.query_params); let (host, path) = if self.vhost_style { ( - format!("{}.s3.garage", self.bucket), + format!("{}.{}.garage", self.bucket, self.service), format!("{}{}", self.path, query), ) } else { ( - "s3.garage".to_owned(), + format!("{}.garage", self.service), format!("{}/{}{}", self.bucket, self.path, query), ) }; let uri = format!("{}{}", self.requester.uri, path); let now = Utc::now(); - let scope = signature::compute_scope(&now, super::REGION.as_ref()); + let scope = signature::compute_scope(&now, super::REGION.as_ref(), self.service); let mut signer = signature::signing_hmac( &now, &self.requester.key.secret, super::REGION.as_ref(), - "s3", + self.service, ) .unwrap(); let streaming_signer = signer.clone(); diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs index 88c51501..44d727f9 100644 --- a/src/garage/tests/common/garage.rs +++ b/src/garage/tests/common/garage.rs @@ -22,7 +22,9 @@ pub struct Instance { process: process::Child, pub path: PathBuf, pub key: Key, - pub api_port: u16, + pub s3_port: u16, + pub k2v_port: u16, + pub web_port: u16, } impl Instance { @@ -58,9 +60,12 @@ rpc_secret = "{secret}" [s3_api] s3_region = "{region}" -api_bind_addr = "127.0.0.1:{api_port}" +api_bind_addr = "127.0.0.1:{s3_port}" root_domain = ".s3.garage" +[k2v_api] +api_bind_addr = "127.0.0.1:{k2v_port}" + [s3_web] bind_addr = "127.0.0.1:{web_port}" root_domain = ".web.garage" @@ -72,10 +77,11 @@ api_bind_addr = "127.0.0.1:{admin_port}" path = path.display(), secret = GARAGE_TEST_SECRET, region = super::REGION, - api_port = port, - rpc_port = port + 1, - web_port = port + 2, - admin_port = port + 3, + s3_port = port, + k2v_port = port + 1, + rpc_port = port + 2, + web_port = port + 3, + admin_port = port + 4, ); fs::write(path.join("config.toml"), config).expect("Could not write garage config file"); @@ -88,7 +94,7 @@ api_bind_addr = "127.0.0.1:{admin_port}" .arg("server") .stdout(stdout) .stderr(stderr) - .env("RUST_LOG", "garage=info,garage_api=debug") + .env("RUST_LOG", "garage=info,garage_api=trace") .spawn() .expect("Could not start garage"); @@ -96,7 +102,9 @@ api_bind_addr = "127.0.0.1:{admin_port}" process: child, path, key: Key::default(), - api_port: port, + s3_port: port, + k2v_port: port + 1, + web_port: port + 3, } } @@ -147,8 +155,14 @@ api_bind_addr = "127.0.0.1:{admin_port}" String::from_utf8(output.stdout).unwrap() } - pub fn uri(&self) -> http::Uri { - format!("http://127.0.0.1:{api_port}", api_port = self.api_port) + pub fn s3_uri(&self) -> http::Uri { + format!("http://127.0.0.1:{s3_port}", s3_port = self.s3_port) + .parse() + .expect("Could not build garage endpoint URI") + } + + pub fn k2v_uri(&self) -> http::Uri { + format!("http://127.0.0.1:{k2v_port}", k2v_port = self.k2v_port) .parse() .expect("Could not build garage endpoint URI") } diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs index 8f88c731..28874b02 100644 --- a/src/garage/tests/common/mod.rs +++ b/src/garage/tests/common/mod.rs @@ -17,18 +17,27 @@ pub struct Context { pub garage: &'static garage::Instance, pub client: Client, pub custom_request: CustomRequester, + pub k2v: K2VContext, +} + +pub struct K2VContext { + pub request: CustomRequester, } impl Context { fn new() -> Self { let garage = garage::instance(); let client = client::build_client(garage); - let custom_request = CustomRequester::new(garage); + let custom_request = CustomRequester::new_s3(garage); + let k2v_request = CustomRequester::new_k2v(garage); Context { garage, client, custom_request, + k2v: K2VContext { + request: k2v_request, + }, } } diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs new file mode 100644 index 00000000..1182a298 --- /dev/null +++ b/src/garage/tests/k2v/batch.rs @@ -0,0 +1,525 @@ +use std::collections::HashMap; + +use crate::common; + +use assert_json_diff::assert_json_eq; +use serde_json::json; + +use super::json_body; +use hyper::Method; + +#[tokio::test] +async fn test_batch() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-batch"); + + let mut values = HashMap::new(); + values.insert("a", "initial test 1"); + values.insert("b", "initial test 2"); + values.insert("c", "initial test 3"); + values.insert("d.1", "initial test 4"); + values.insert("d.2", "initial test 5"); + values.insert("e", "initial test 6"); + let mut ct = HashMap::new(); + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .body( + format!( + r#"[ + {{"pk": "root", "sk": "a", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "b", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "c", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "d.1", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "e", "ct": null, "v": "{}"}} + ]"#, + base64::encode(values.get(&"a").unwrap()), + base64::encode(values.get(&"b").unwrap()), + base64::encode(values.get(&"c").unwrap()), + base64::encode(values.get(&"d.1").unwrap()), + base64::encode(values.get(&"d.2").unwrap()), + base64::encode(values.get(&"e").unwrap()), + ) + .into_bytes(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + for sk in ["a", "b", "c", "d.1", "d.2", "e"] { + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + ct.insert( + sk, + res.headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(), + ); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, values.get(sk).unwrap().as_bytes()); + } + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {"partitionKey": "root"}, + {"partitionKey": "root", "start": "c"}, + {"partitionKey": "root", "start": "c", "reverse": true, "end": "a"}, + {"partitionKey": "root", "limit": 1}, + {"partitionKey": "root", "prefix": "d"} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + let json_res = json_body(res).await; + assert_json_eq!( + json_res, + json!([ + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": null, + "start": "c", + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": null, + "start": "c", + "end": "a", + "limit": null, + "reverse": true, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": 1, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]} + ], + "more": true, + "nextStart": "b", + }, + { + "partitionKey": "root", + "prefix": "d", + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]} + ], + "more": false, + "nextStart": null, + }, + ]) + ); + + // Insert some new values + values.insert("c'", "new test 3"); + values.insert("d.1'", "new test 4"); + values.insert("d.2'", "new test 5"); + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .body( + format!( + r#"[ + {{"pk": "root", "sk": "b", "ct": "{}", "v": null}}, + {{"pk": "root", "sk": "c", "ct": null, "v": "{}"}}, + {{"pk": "root", "sk": "d.1", "ct": "{}", "v": "{}"}}, + {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}} + ]"#, + ct.get(&"b").unwrap(), + base64::encode(values.get(&"c'").unwrap()), + ct.get(&"d.1").unwrap(), + base64::encode(values.get(&"d.1'").unwrap()), + base64::encode(values.get(&"d.2'").unwrap()), + ) + .into_bytes(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + for sk in ["b", "c", "d.1", "d.2"] { + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + if sk == "b" { + assert_eq!(res.status(), 204); + } else { + assert_eq!(res.status(), 200); + } + ct.insert( + sk, + res.headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(), + ); + } + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {"partitionKey": "root"}, + {"partitionKey": "root", "prefix": "d"}, + {"partitionKey": "root", "prefix": "d.", "end": "d.2"}, + {"partitionKey": "root", "prefix": "d.", "limit": 1}, + {"partitionKey": "root", "prefix": "d.", "start": "d.2", "limit": 1}, + {"partitionKey": "root", "prefix": "d.", "reverse": true}, + {"partitionKey": "root", "prefix": "d.", "start": "d.2", "reverse": true}, + {"partitionKey": "root", "prefix": "d.", "limit": 2} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + let json_res = json_body(res).await; + assert_json_eq!( + json_res, + json!([ + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d", + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": null, + "end": "d.2", + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": null, + "end": null, + "limit": 1, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + ], + "more": true, + "nextStart": "d.2", + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": "d.2", + "end": null, + "limit": 1, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": null, + "end": null, + "limit": null, + "reverse": true, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": "d.2", + "end": null, + "limit": null, + "reverse": true, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": "d.", + "start": null, + "end": null, + "limit": 2, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + ]) + ); + + // Test DeleteBatch + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("delete", Option::<&str>::None) + .body( + br#"[ + {"partitionKey": "root", "start": "a", "end": "c"}, + {"partitionKey": "root", "prefix": "d"} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + let json_res = json_body(res).await; + assert_json_eq!( + json_res, + json!([ + { + "partitionKey": "root", + "prefix": null, + "start": "a", + "end": "c", + "singleItem": false, + "deletedItems": 1, + }, + { + "partitionKey": "root", + "prefix": "d", + "start": null, + "end": null, + "singleItem": false, + "deletedItems": 2, + }, + ]) + ); + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {"partitionKey": "root"}, + {"partitionKey": "root", "reverse": true} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + let json_res = json_body(res).await; + assert_json_eq!( + json_res, + json!([ + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + ], + "more": false, + "nextStart": null, + }, + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": true, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, + ]) + ); +} diff --git a/src/garage/tests/k2v/errorcodes.rs b/src/garage/tests/k2v/errorcodes.rs new file mode 100644 index 00000000..2fcc45bc --- /dev/null +++ b/src/garage/tests/k2v/errorcodes.rs @@ -0,0 +1,141 @@ +use crate::common; + +use hyper::Method; + +#[tokio::test] +async fn test_error_codes() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-error-codes"); + + // Regular insert should work (code 200) + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .body(b"Hello, world!".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Insert with trash causality token: invalid request + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("x-garage-causality-token", "tra$sh") + .body(b"Hello, world!".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Search without partition key: invalid request + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {}, + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Search with start that is not in prefix: invalid request + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {"partition_key": "root", "prefix": "a", "start": "bx"}, + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Search with invalid json: 400 + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .query_param("search", Option::<&str>::None) + .body( + br#"[ + {"partition_key": "root" + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Batch insert with invalid causality token: 400 + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .body( + br#"[ + {"pk": "root", "sk": "a", "ct": "tra$h", "v": "aGVsbG8sIHdvcmxkCg=="} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Batch insert with invalid data: 400 + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .body( + br#"[ + {"pk": "root", "sk": "a", "ct": null, "v": "aGVsbG8sIHdvcmx$Cg=="} + ]"# + .to_vec(), + ) + .method(Method::POST) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); + + // Poll with invalid causality token: 400 + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .query_param("causality_token", Some("tra$h")) + .query_param("timeout", Some("10")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 400); +} diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs new file mode 100644 index 00000000..bf2b01f8 --- /dev/null +++ b/src/garage/tests/k2v/item.rs @@ -0,0 +1,719 @@ +use crate::common; + +use assert_json_diff::assert_json_eq; +use serde_json::json; + +use super::json_body; +use hyper::Method; + +#[tokio::test] +async fn test_items_and_indices() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-item-and-index"); + + // ReadIndex -- there should be nothing + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [], + "more": false, + "nextStart": null + }) + ); + + let content2_len = "_: hello universe".len(); + let content3_len = "_: concurrent value".len(); + + for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() { + let content = format!("{}: hello world", sk).into_bytes(); + let content2 = format!("{}: hello universe", sk).into_bytes(); + let content3 = format!("{}: concurrent value", sk).into_bytes(); + + // Put initially, no causality token + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .body(content.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Get value back + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, content); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": i+1, + "conflicts": i, + "values": i+i+1, + "bytes": i*(content2.len() + content3.len()) + content.len(), + } + ], + "more": false, + "nextStart": null + }) + ); + + // Put again, this time with causality token + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("x-garage-causality-token", ct.clone()) + .body(content2.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Get value back + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, content2); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": i+1, + "conflicts": i, + "values": i+i+1, + "bytes": i*content3.len() + (i+1)*content2.len(), + } + ], + "more": false, + "nextStart": null + }) + ); + + // Put again with same CT, now we have concurrent values + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("x-garage-causality-token", ct.clone()) + .body(content3.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Get value back + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_json = json_body(res).await; + assert_json_eq!( + res_json, + [base64::encode(&content2), base64::encode(&content3)] + ); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": i+1, + "conflicts": i+1, + "values": 2*(i+1), + "bytes": (i+1)*(content2.len() + content3.len()), + } + ], + "more": false, + "nextStart": null + }) + ); + } + + // Now delete things + for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() { + // Get value back (we just need the CT) + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + // Delete it + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::DELETE) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("x-garage-causality-token", ct) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + if i < 3 { + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": 3-i, + "conflicts": 3-i, + "values": 2*(3-i), + "bytes": (3-i)*(content2_len + content3_len), + } + ], + "more": false, + "nextStart": null + }) + ); + } else { + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [], + "more": false, + "nextStart": null + }) + ); + } + } +} + +#[tokio::test] +async fn test_item_return_format() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-item-return-format"); + + let single_value = b"A single value".to_vec(); + let concurrent_value = b"A concurrent value".to_vec(); + + // -- Test with a single value -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .body(single_value.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, single_value); + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, single_value); + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + + // -- Test with a second, concurrent value -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .body(concurrent_value.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!([ + base64::encode(&single_value), + base64::encode(&concurrent_value) + ]) + ); + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!([ + base64::encode(&single_value), + base64::encode(&concurrent_value) + ]) + ); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 409); // CONFLICT + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!([ + base64::encode(&single_value), + base64::encode(&concurrent_value) + ]) + ); + + // -- Delete first value, concurrently with second insert -- + // -- (we now have a concurrent value and a deletion) -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .method(Method::DELETE) + .signed_header("x-garage-causality-token", ct) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 409); // CONFLICT + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + + // -- Delete everything -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .method(Method::DELETE) + .signed_header("x-garage-causality-token", ct) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); // NO CONTENT + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([null])); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); // NO CONTENT + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([null])); +} diff --git a/src/garage/tests/k2v/mod.rs b/src/garage/tests/k2v/mod.rs new file mode 100644 index 00000000..a009460e --- /dev/null +++ b/src/garage/tests/k2v/mod.rs @@ -0,0 +1,18 @@ +pub mod batch; +pub mod errorcodes; +pub mod item; +pub mod poll; +pub mod simple; + +use hyper::{Body, Response}; + +pub async fn json_body(res: Response<Body>) -> serde_json::Value { + let res_body: serde_json::Value = serde_json::from_slice( + &hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec()[..], + ) + .unwrap(); + res_body +} diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs new file mode 100644 index 00000000..70dc0410 --- /dev/null +++ b/src/garage/tests/k2v/poll.rs @@ -0,0 +1,98 @@ +use hyper::Method; +use std::time::Duration; + +use crate::common; + +#[tokio::test] +async fn test_poll() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-poll"); + + // Write initial value + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .body(b"Initial value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Retrieve initial value to get its causality token + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res2.status(), 200); + let ct = res2 + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + let res2_body = hyper::body::to_bytes(res2.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res2_body, b"Initial value"); + + // Start poll operation + let poll = { + let bucket = bucket.clone(); + let ct = ct.clone(); + tokio::spawn(async move { + let ctx = common::context(); + ctx.k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .query_param("causality_token", Some(ct)) + .query_param("timeout", Some("10")) + .signed_header("accept", "application/octet-stream") + .send() + .await + }) + }; + + // Write new value that supersedes initial one + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("x-garage-causality-token", ct) + .body(b"New value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), 200); + + let poll_res_body = hyper::body::to_bytes(poll_res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(poll_res_body, b"New value"); +} diff --git a/src/garage/tests/k2v/simple.rs b/src/garage/tests/k2v/simple.rs new file mode 100644 index 00000000..ae9a8674 --- /dev/null +++ b/src/garage/tests/k2v/simple.rs @@ -0,0 +1,40 @@ +use crate::common; + +use hyper::Method; + +#[tokio::test] +async fn test_simple() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-simple"); + + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .body(b"Hello, world!".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res2.status(), 200); + + let res2_body = hyper::body::to_bytes(res2.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res2_body, b"Hello, world!"); +} diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs index 8799c395..0106ad10 100644 --- a/src/garage/tests/lib.rs +++ b/src/garage/tests/lib.rs @@ -3,9 +3,5 @@ mod common; mod admin; mod bucket; -mod list; -mod multipart; -mod objects; -mod simple; -mod streaming_signature; -mod website; +mod k2v; +mod s3; diff --git a/src/garage/tests/list.rs b/src/garage/tests/s3/list.rs index bb03f250..bb03f250 100644 --- a/src/garage/tests/list.rs +++ b/src/garage/tests/s3/list.rs diff --git a/src/garage/tests/s3/mod.rs b/src/garage/tests/s3/mod.rs new file mode 100644 index 00000000..623eb665 --- /dev/null +++ b/src/garage/tests/s3/mod.rs @@ -0,0 +1,6 @@ +mod list; +mod multipart; +mod objects; +mod simple; +mod streaming_signature; +mod website; diff --git a/src/garage/tests/multipart.rs b/src/garage/tests/s3/multipart.rs index 895a2993..895a2993 100644 --- a/src/garage/tests/multipart.rs +++ b/src/garage/tests/s3/multipart.rs diff --git a/src/garage/tests/objects.rs b/src/garage/tests/s3/objects.rs index e1175b81..e1175b81 100644 --- a/src/garage/tests/objects.rs +++ b/src/garage/tests/s3/objects.rs diff --git a/src/garage/tests/simple.rs b/src/garage/tests/s3/simple.rs index f54ae9ac..f54ae9ac 100644 --- a/src/garage/tests/simple.rs +++ b/src/garage/tests/s3/simple.rs diff --git a/src/garage/tests/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs index c68f7dfc..c68f7dfc 100644 --- a/src/garage/tests/streaming_signature.rs +++ b/src/garage/tests/s3/streaming_signature.rs diff --git a/src/garage/tests/website.rs b/src/garage/tests/s3/website.rs index 963d11ea..0570ac6a 100644 --- a/src/garage/tests/website.rs +++ b/src/garage/tests/s3/website.rs @@ -35,10 +35,7 @@ async fn test_website() { let req = || { Request::builder() .method("GET") - .uri(format!( - "http://127.0.0.1:{}/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .body(Body::empty()) .unwrap() @@ -170,10 +167,7 @@ async fn test_website_s3_api() { { let req = Request::builder() .method("GET") - .uri(format!( - "http://127.0.0.1:{}/site/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") .body(Body::empty()) @@ -198,7 +192,7 @@ async fn test_website_s3_api() { .method("GET") .uri(format!( "http://127.0.0.1:{}/wrong.html", - common::garage::DEFAULT_PORT + 2 + ctx.garage.web_port )) .header("Host", format!("{}.web.garage", BCKT_NAME)) .body(Body::empty()) @@ -217,10 +211,7 @@ async fn test_website_s3_api() { { let req = Request::builder() .method("OPTIONS") - .uri(format!( - "http://127.0.0.1:{}/site/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") .header("Access-Control-Request-Method", "PUT") @@ -244,10 +235,7 @@ async fn test_website_s3_api() { { let req = Request::builder() .method("OPTIONS") - .uri(format!( - "http://127.0.0.1:{}/site/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") .header("Access-Control-Request-Method", "DELETE") @@ -288,10 +276,7 @@ async fn test_website_s3_api() { { let req = Request::builder() .method("OPTIONS") - .uri(format!( - "http://127.0.0.1:{}/site/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Origin", "https://example.com") .header("Access-Control-Request-Method", "PUT") @@ -319,10 +304,7 @@ async fn test_website_s3_api() { { let req = Request::builder() .method("GET") - .uri(format!( - "http://127.0.0.1:{}/site/", - common::garage::DEFAULT_PORT + 2 - )) + .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .header("Host", format!("{}.web.garage", BCKT_NAME)) .body(Body::empty()) .unwrap(); |