aboutsummaryrefslogtreecommitdiff
path: root/src/garage/tests/k2v
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/tests/k2v')
-rw-r--r--src/garage/tests/k2v/batch.rs525
-rw-r--r--src/garage/tests/k2v/errorcodes.rs141
-rw-r--r--src/garage/tests/k2v/item.rs719
-rw-r--r--src/garage/tests/k2v/mod.rs18
-rw-r--r--src/garage/tests/k2v/poll.rs98
-rw-r--r--src/garage/tests/k2v/simple.rs40
6 files changed, 1541 insertions, 0 deletions
diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs
new file mode 100644
index 00000000..1182a298
--- /dev/null
+++ b/src/garage/tests/k2v/batch.rs
@@ -0,0 +1,525 @@
+use std::collections::HashMap;
+
+use crate::common;
+
+use assert_json_diff::assert_json_eq;
+use serde_json::json;
+
+use super::json_body;
+use hyper::Method;
+
+#[tokio::test]
+async fn test_batch() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-batch");
+
+ let mut values = HashMap::new();
+ values.insert("a", "initial test 1");
+ values.insert("b", "initial test 2");
+ values.insert("c", "initial test 3");
+ values.insert("d.1", "initial test 4");
+ values.insert("d.2", "initial test 5");
+ values.insert("e", "initial test 6");
+ let mut ct = HashMap::new();
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ format!(
+ r#"[
+ {{"pk": "root", "sk": "a", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "b", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "c", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "d.1", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "e", "ct": null, "v": "{}"}}
+ ]"#,
+ base64::encode(values.get(&"a").unwrap()),
+ base64::encode(values.get(&"b").unwrap()),
+ base64::encode(values.get(&"c").unwrap()),
+ base64::encode(values.get(&"d.1").unwrap()),
+ base64::encode(values.get(&"d.2").unwrap()),
+ base64::encode(values.get(&"e").unwrap()),
+ )
+ .into_bytes(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ for sk in ["a", "b", "c", "d.1", "d.2", "e"] {
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ ct.insert(
+ sk,
+ res.headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string(),
+ );
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, values.get(sk).unwrap().as_bytes());
+ }
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root"},
+ {"partitionKey": "root", "start": "c"},
+ {"partitionKey": "root", "start": "c", "reverse": true, "end": "a"},
+ {"partitionKey": "root", "limit": 1},
+ {"partitionKey": "root", "prefix": "d"}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": "c",
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": "c",
+ "end": "a",
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": 1,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}
+ ],
+ "more": true,
+ "nextStart": "b",
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d",
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ ])
+ );
+
+ // Insert some new values
+ values.insert("c'", "new test 3");
+ values.insert("d.1'", "new test 4");
+ values.insert("d.2'", "new test 5");
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ format!(
+ r#"[
+ {{"pk": "root", "sk": "b", "ct": "{}", "v": null}},
+ {{"pk": "root", "sk": "c", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "d.1", "ct": "{}", "v": "{}"}},
+ {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}}
+ ]"#,
+ ct.get(&"b").unwrap(),
+ base64::encode(values.get(&"c'").unwrap()),
+ ct.get(&"d.1").unwrap(),
+ base64::encode(values.get(&"d.1'").unwrap()),
+ base64::encode(values.get(&"d.2'").unwrap()),
+ )
+ .into_bytes(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ for sk in ["b", "c", "d.1", "d.2"] {
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ if sk == "b" {
+ assert_eq!(res.status(), 204);
+ } else {
+ assert_eq!(res.status(), 200);
+ }
+ ct.insert(
+ sk,
+ res.headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string(),
+ );
+ }
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root"},
+ {"partitionKey": "root", "prefix": "d"},
+ {"partitionKey": "root", "prefix": "d.", "end": "d.2"},
+ {"partitionKey": "root", "prefix": "d.", "limit": 1},
+ {"partitionKey": "root", "prefix": "d.", "start": "d.2", "limit": 1},
+ {"partitionKey": "root", "prefix": "d.", "reverse": true},
+ {"partitionKey": "root", "prefix": "d.", "start": "d.2", "reverse": true},
+ {"partitionKey": "root", "prefix": "d.", "limit": 2}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d",
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": "d.2",
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": null,
+ "limit": 1,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": true,
+ "nextStart": "d.2",
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": "d.2",
+ "end": null,
+ "limit": 1,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": "d.2",
+ "end": null,
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": null,
+ "limit": 2,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ ])
+ );
+
+ // Test DeleteBatch
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("delete", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root", "start": "a", "end": "c"},
+ {"partitionKey": "root", "prefix": "d"}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": "a",
+ "end": "c",
+ "singleItem": false,
+ "deletedItems": 1,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d",
+ "start": null,
+ "end": null,
+ "singleItem": false,
+ "deletedItems": 2,
+ },
+ ])
+ );
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root"},
+ {"partitionKey": "root", "reverse": true}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ ])
+ );
+}
diff --git a/src/garage/tests/k2v/errorcodes.rs b/src/garage/tests/k2v/errorcodes.rs
new file mode 100644
index 00000000..2fcc45bc
--- /dev/null
+++ b/src/garage/tests/k2v/errorcodes.rs
@@ -0,0 +1,141 @@
+use crate::common;
+
+use hyper::Method;
+
+#[tokio::test]
+async fn test_error_codes() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-error-codes");
+
+ // Regular insert should work (code 200)
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Hello, world!".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Insert with trash causality token: invalid request
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("x-garage-causality-token", "tra$sh")
+ .body(b"Hello, world!".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Search without partition key: invalid request
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {},
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Search with start that is not in prefix: invalid request
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partition_key": "root", "prefix": "a", "start": "bx"},
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Search with invalid json: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partition_key": "root"
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Batch insert with invalid causality token: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ br#"[
+ {"pk": "root", "sk": "a", "ct": "tra$h", "v": "aGVsbG8sIHdvcmxkCg=="}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Batch insert with invalid data: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ br#"[
+ {"pk": "root", "sk": "a", "ct": null, "v": "aGVsbG8sIHdvcmx$Cg=="}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Poll with invalid causality token: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .query_param("causality_token", Some("tra$h"))
+ .query_param("timeout", Some("10"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+}
diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs
new file mode 100644
index 00000000..bf2b01f8
--- /dev/null
+++ b/src/garage/tests/k2v/item.rs
@@ -0,0 +1,719 @@
+use crate::common;
+
+use assert_json_diff::assert_json_eq;
+use serde_json::json;
+
+use super::json_body;
+use hyper::Method;
+
+#[tokio::test]
+async fn test_items_and_indices() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-item-and-index");
+
+ // ReadIndex -- there should be nothing
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [],
+ "more": false,
+ "nextStart": null
+ })
+ );
+
+ let content2_len = "_: hello universe".len();
+ let content3_len = "_: concurrent value".len();
+
+ for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() {
+ let content = format!("{}: hello world", sk).into_bytes();
+ let content2 = format!("{}: hello universe", sk).into_bytes();
+ let content3 = format!("{}: concurrent value", sk).into_bytes();
+
+ // Put initially, no causality token
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .body(content.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Get value back
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, content);
+
+ // ReadIndex -- now there should be some stuff
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": i+1,
+ "conflicts": i,
+ "values": i+i+1,
+ "bytes": i*(content2.len() + content3.len()) + content.len(),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+
+ // Put again, this time with causality token
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("x-garage-causality-token", ct.clone())
+ .body(content2.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Get value back
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, content2);
+
+ // ReadIndex -- now there should be some stuff
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": i+1,
+ "conflicts": i,
+ "values": i+i+1,
+ "bytes": i*content3.len() + (i+1)*content2.len(),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+
+ // Put again with same CT, now we have concurrent values
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("x-garage-causality-token", ct.clone())
+ .body(content3.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Get value back
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_json = json_body(res).await;
+ assert_json_eq!(
+ res_json,
+ [base64::encode(&content2), base64::encode(&content3)]
+ );
+
+ // ReadIndex -- now there should be some stuff
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": i+1,
+ "conflicts": i+1,
+ "values": 2*(i+1),
+ "bytes": (i+1)*(content2.len() + content3.len()),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+ }
+
+ // Now delete things
+ for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() {
+ // Get value back (we just need the CT)
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ // Delete it
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::DELETE)
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("x-garage-causality-token", ct)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204);
+
+ // ReadIndex -- now there should be some stuff
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ if i < 3 {
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": 3-i,
+ "conflicts": 3-i,
+ "values": 2*(3-i),
+ "bytes": (3-i)*(content2_len + content3_len),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+ } else {
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [],
+ "more": false,
+ "nextStart": null
+ })
+ );
+ }
+ }
+}
+
+#[tokio::test]
+async fn test_item_return_format() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-item-return-format");
+
+ let single_value = b"A single value".to_vec();
+ let concurrent_value = b"A concurrent value".to_vec();
+
+ // -- Test with a single value --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .body(single_value.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, single_value);
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, single_value);
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+
+ // -- Test with a second, concurrent value --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .body(concurrent_value.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!([
+ base64::encode(&single_value),
+ base64::encode(&concurrent_value)
+ ])
+ );
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!([
+ base64::encode(&single_value),
+ base64::encode(&concurrent_value)
+ ])
+ );
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 409); // CONFLICT
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!([
+ base64::encode(&single_value),
+ base64::encode(&concurrent_value)
+ ])
+ );
+
+ // -- Delete first value, concurrently with second insert --
+ // -- (we now have a concurrent value and a deletion) --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .method(Method::DELETE)
+ .signed_header("x-garage-causality-token", ct)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 409); // CONFLICT
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+
+ // -- Delete everything --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .method(Method::DELETE)
+ .signed_header("x-garage-causality-token", ct)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204); // NO CONTENT
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([null]));
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204); // NO CONTENT
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([null]));
+}
diff --git a/src/garage/tests/k2v/mod.rs b/src/garage/tests/k2v/mod.rs
new file mode 100644
index 00000000..a009460e
--- /dev/null
+++ b/src/garage/tests/k2v/mod.rs
@@ -0,0 +1,18 @@
+pub mod batch;
+pub mod errorcodes;
+pub mod item;
+pub mod poll;
+pub mod simple;
+
+use hyper::{Body, Response};
+
+pub async fn json_body(res: Response<Body>) -> serde_json::Value {
+ let res_body: serde_json::Value = serde_json::from_slice(
+ &hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec()[..],
+ )
+ .unwrap();
+ res_body
+}
diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs
new file mode 100644
index 00000000..70dc0410
--- /dev/null
+++ b/src/garage/tests/k2v/poll.rs
@@ -0,0 +1,98 @@
+use hyper::Method;
+use std::time::Duration;
+
+use crate::common;
+
+#[tokio::test]
+async fn test_poll() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-poll");
+
+ // Write initial value
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Initial value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Retrieve initial value to get its causality token
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), 200);
+ let ct = res2
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ let res2_body = hyper::body::to_bytes(res2.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res2_body, b"Initial value");
+
+ // Start poll operation
+ let poll = {
+ let bucket = bucket.clone();
+ let ct = ct.clone();
+ tokio::spawn(async move {
+ let ctx = common::context();
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .query_param("causality_token", Some(ct))
+ .query_param("timeout", Some("10"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ })
+ };
+
+ // Write new value that supersedes initial one
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("x-garage-causality-token", ct)
+ .body(b"New value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), 200);
+
+ let poll_res_body = hyper::body::to_bytes(poll_res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(poll_res_body, b"New value");
+}
diff --git a/src/garage/tests/k2v/simple.rs b/src/garage/tests/k2v/simple.rs
new file mode 100644
index 00000000..ae9a8674
--- /dev/null
+++ b/src/garage/tests/k2v/simple.rs
@@ -0,0 +1,40 @@
+use crate::common;
+
+use hyper::Method;
+
+#[tokio::test]
+async fn test_simple() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-simple");
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Hello, world!".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), 200);
+
+ let res2_body = hyper::body::to_bytes(res2.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res2_body, b"Hello, world!");
+}