diff options
author | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
commit | 5768bf362262f78376af14517c4921941986192e (patch) | |
tree | b4baf3051eade0f63649443278bb3a3f4c38ec25 /src/garage/tests/k2v/item.rs | |
parent | def78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff) | |
download | garage-5768bf362262f78376af14517c4921941986192e.tar.gz garage-5768bf362262f78376af14517c4921941986192e.zip |
First implementation of K2V (#293)
**Specification:**
View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md)
- [x] Specify the structure of K2V triples
- [x] Specify the DVVS format used for causality detection
- [x] Specify the K2V index (just a counter of number of values per partition key)
- [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem
- [x] Specify index endpoint: ReadIndex
- [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch
- [x] Move to JSON objects instead of tuples
- [x] Specify endpoints for polling for updates on single values (PollItem)
**Implementation:**
- [x] Table for K2V items, causal contexts
- [x] Indexing mechanism and table for K2V index
- [x] Make API handlers a bit more generic
- [x] K2V API endpoint
- [x] K2V API router
- [x] ReadItem
- [x] InsertItem
- [x] DeleteItem
- [x] PollItem
- [x] ReadIndex
- [x] InsertBatch
- [x] ReadBatch
- [x] DeleteBatch
**Testing:**
- [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values)
- [x] Actual tests:
- [x] Adapt testing framework
- [x] Simple test with InsertItem + ReadItem
- [x] Test with several Insert/Read/DeleteItem + ReadIndex
- [x] Test all combinations of return formats for ReadItem
- [x] Test with ReadBatch, InsertBatch, DeleteBatch
- [x] Test with PollItem
- [x] Test error codes
- [ ] Fix most broken stuff
- [x] test PollItem broken randomly
- [x] when invalid causality tokens are given, errors should be 4xx not 5xx
**Improvements:**
- [x] Descending range queries
- [x] Specify
- [x] Implement
- [x] Add test
- [x] Batch updates to index counter
- [x] Put K2V behind `k2v` feature flag
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/garage/tests/k2v/item.rs')
-rw-r--r-- | src/garage/tests/k2v/item.rs | 719 |
1 files changed, 719 insertions, 0 deletions
diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs new file mode 100644 index 00000000..bf2b01f8 --- /dev/null +++ b/src/garage/tests/k2v/item.rs @@ -0,0 +1,719 @@ +use crate::common; + +use assert_json_diff::assert_json_eq; +use serde_json::json; + +use super::json_body; +use hyper::Method; + +#[tokio::test] +async fn test_items_and_indices() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-item-and-index"); + + // ReadIndex -- there should be nothing + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [], + "more": false, + "nextStart": null + }) + ); + + let content2_len = "_: hello universe".len(); + let content3_len = "_: concurrent value".len(); + + for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() { + let content = format!("{}: hello world", sk).into_bytes(); + let content2 = format!("{}: hello universe", sk).into_bytes(); + let content3 = format!("{}: concurrent value", sk).into_bytes(); + + // Put initially, no causality token + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .body(content.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Get value back + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, content); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": i+1, + "conflicts": i, + "values": i+i+1, + "bytes": i*(content2.len() + content3.len()) + content.len(), + } + ], + "more": false, + "nextStart": null + }) + ); + + // Put again, this time with causality token + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("x-garage-causality-token", ct.clone()) + .body(content2.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Get value back + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, content2); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": i+1, + "conflicts": i, + "values": i+i+1, + "bytes": i*content3.len() + (i+1)*content2.len(), + } + ], + "more": false, + "nextStart": null + }) + ); + + // Put again with same CT, now we have concurrent values + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("x-garage-causality-token", ct.clone()) + .body(content3.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // Get value back + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_json = json_body(res).await; + assert_json_eq!( + res_json, + [base64::encode(&content2), base64::encode(&content3)] + ); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": i+1, + "conflicts": i+1, + "values": 2*(i+1), + "bytes": (i+1)*(content2.len() + content3.len()), + } + ], + "more": false, + "nextStart": null + }) + ); + } + + // Now delete things + for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() { + // Get value back (we just need the CT) + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + // Delete it + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::DELETE) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("x-garage-causality-token", ct) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); + + // ReadIndex -- now there should be some stuff + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .send() + .await + .unwrap(); + let res_body = json_body(res).await; + if i < 3 { + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [ + { + "pk": "root", + "entries": 3-i, + "conflicts": 3-i, + "values": 2*(3-i), + "bytes": (3-i)*(content2_len + content3_len), + } + ], + "more": false, + "nextStart": null + }) + ); + } else { + assert_json_eq!( + res_body, + json!({ + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "partitionKeys": [], + "more": false, + "nextStart": null + }) + ); + } + } +} + +#[tokio::test] +async fn test_item_return_format() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-item-return-format"); + + let single_value = b"A single value".to_vec(); + let concurrent_value = b"A concurrent value".to_vec(); + + // -- Test with a single value -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .body(single_value.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, single_value); + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + let res_body = hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .to_vec(); + assert_eq!(res_body, single_value); + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + + // -- Test with a second, concurrent value -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .body(concurrent_value.clone()) + .method(Method::PUT) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!([ + base64::encode(&single_value), + base64::encode(&concurrent_value) + ]) + ); + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!([ + base64::encode(&single_value), + base64::encode(&concurrent_value) + ]) + ); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 409); // CONFLICT + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!( + res_body, + json!([ + base64::encode(&single_value), + base64::encode(&concurrent_value) + ]) + ); + + // -- Delete first value, concurrently with second insert -- + // -- (we now have a concurrent value and a deletion) -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .method(Method::DELETE) + .signed_header("x-garage-causality-token", ct) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let ct = res + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 409); // CONFLICT + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + + // -- Delete everything -- + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .method(Method::DELETE) + .signed_header("x-garage-causality-token", ct) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); + + // f0: either + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "*/*") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); // NO CONTENT + + // f1: not specified + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([null])); + + // f2: binary + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); // NO CONTENT + + // f3: json + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("v1")) + .signed_header("accept", "application/json") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 200); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/json" + ); + let res_body = json_body(res).await; + assert_json_eq!(res_body, json!([null])); +} |