aboutsummaryrefslogtreecommitdiff
path: root/src/garage/tests/k2v
diff options
context:
space:
mode:
Diffstat (limited to 'src/garage/tests/k2v')
-rw-r--r--src/garage/tests/k2v/batch.rs103
-rw-r--r--src/garage/tests/k2v/item.rs37
-rw-r--r--src/garage/tests/k2v/poll.rs173
3 files changed, 247 insertions, 66 deletions
diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs
index 6abba1c5..595d0ba8 100644
--- a/src/garage/tests/k2v/batch.rs
+++ b/src/garage/tests/k2v/batch.rs
@@ -3,6 +3,7 @@ use std::collections::HashMap;
use crate::common;
use assert_json_diff::assert_json_eq;
+use base64::prelude::*;
use serde_json::json;
use super::json_body;
@@ -36,12 +37,12 @@ async fn test_batch() {
{{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}},
{{"pk": "root", "sk": "e", "ct": null, "v": "{}"}}
]"#,
- base64::encode(values.get(&"a").unwrap()),
- base64::encode(values.get(&"b").unwrap()),
- base64::encode(values.get(&"c").unwrap()),
- base64::encode(values.get(&"d.1").unwrap()),
- base64::encode(values.get(&"d.2").unwrap()),
- base64::encode(values.get(&"e").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"a").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"b").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"c").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.1").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.2").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"e").unwrap()),
)
.into_bytes(),
)
@@ -120,12 +121,12 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
- {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -141,10 +142,10 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -160,9 +161,9 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -178,8 +179,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -195,8 +196,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
- {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -212,7 +213,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]}
],
"more": true,
"nextStart": "b",
@@ -228,8 +229,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -255,10 +256,10 @@ async fn test_batch() {
{{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}}
]"#,
ct.get(&"b").unwrap(),
- base64::encode(values.get(&"c'").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"c'").unwrap()),
ct.get(&"d.1").unwrap(),
- base64::encode(values.get(&"d.1'").unwrap()),
- base64::encode(values.get(&"d.2'").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.1'").unwrap()),
+ BASE64_STANDARD.encode(values.get(&"d.2'").unwrap()),
)
.into_bytes(),
)
@@ -333,11 +334,11 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -353,8 +354,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -370,7 +371,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -386,7 +387,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": true,
"nextStart": "d.2",
@@ -402,7 +403,7 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -418,8 +419,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -435,8 +436,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -452,8 +453,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
- {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -563,8 +564,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}
],
"more": false,
"nextStart": null,
@@ -580,8 +581,8 @@ async fn test_batch() {
"tombstones": false,
"singleItem": false,
"items": [
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
],
"more": false,
"nextStart": null,
@@ -599,10 +600,10 @@ async fn test_batch() {
"items": [
{"sk": "a", "ct": ct.get("a").unwrap(), "v": [null]},
{"sk": "b", "ct": ct.get("b").unwrap(), "v": [null]},
- {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]},
{"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [null]},
{"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [null]},
- {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]},
],
"more": false,
"nextStart": null,
diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs
index 2641386f..588836c7 100644
--- a/src/garage/tests/k2v/item.rs
+++ b/src/garage/tests/k2v/item.rs
@@ -3,6 +3,7 @@ use std::time::Duration;
use crate::common;
use assert_json_diff::assert_json_eq;
+use base64::prelude::*;
use serde_json::json;
use super::json_body;
@@ -222,7 +223,10 @@ async fn test_items_and_indices() {
let res_json = json_body(res).await;
assert_json_eq!(
res_json,
- [base64::encode(&content2), base64::encode(&content3)]
+ [
+ BASE64_STANDARD.encode(&content2),
+ BASE64_STANDARD.encode(&content3)
+ ]
);
// ReadIndex -- now there should be some stuff
@@ -411,7 +415,7 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+ assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)]));
// f2: binary
let res = ctx
@@ -452,7 +456,7 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+ assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)]));
// -- Test with a second, concurrent value --
let res = ctx
@@ -488,8 +492,8 @@ async fn test_item_return_format() {
assert_json_eq!(
res_body,
json!([
- base64::encode(&single_value),
- base64::encode(&concurrent_value)
+ BASE64_STANDARD.encode(&single_value),
+ BASE64_STANDARD.encode(&concurrent_value)
])
);
@@ -512,8 +516,8 @@ async fn test_item_return_format() {
assert_json_eq!(
res_body,
json!([
- base64::encode(&single_value),
- base64::encode(&concurrent_value)
+ BASE64_STANDARD.encode(&single_value),
+ BASE64_STANDARD.encode(&concurrent_value)
])
);
@@ -550,8 +554,8 @@ async fn test_item_return_format() {
assert_json_eq!(
res_body,
json!([
- base64::encode(&single_value),
- base64::encode(&concurrent_value)
+ BASE64_STANDARD.encode(&single_value),
+ BASE64_STANDARD.encode(&concurrent_value)
])
);
@@ -587,7 +591,10 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+ assert_json_eq!(
+ res_body,
+ json!([BASE64_STANDARD.encode(&concurrent_value), null])
+ );
// f1: not specified
let res = ctx
@@ -612,7 +619,10 @@ async fn test_item_return_format() {
.unwrap()
.to_string();
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+ assert_json_eq!(
+ res_body,
+ json!([BASE64_STANDARD.encode(&concurrent_value), null])
+ );
// f2: binary
let res = ctx
@@ -644,7 +654,10 @@ async fn test_item_return_format() {
"application/json"
);
let res_body = json_body(res).await;
- assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+ assert_json_eq!(
+ res_body,
+ json!([BASE64_STANDARD.encode(&concurrent_value), null])
+ );
// -- Delete everything --
let res = ctx
diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs
index e56705ae..dd44aed9 100644
--- a/src/garage/tests/k2v/poll.rs
+++ b/src/garage/tests/k2v/poll.rs
@@ -1,12 +1,17 @@
+use base64::prelude::*;
use hyper::{Method, StatusCode};
use std::time::Duration;
+use assert_json_diff::assert_json_eq;
+use serde_json::json;
+
+use super::json_body;
use crate::common;
#[tokio::test]
-async fn test_poll() {
+async fn test_poll_item() {
let ctx = common::context();
- let bucket = ctx.create_bucket("test-k2v-poll");
+ let bucket = ctx.create_bucket("test-k2v-poll-item");
// Write initial value
let res = ctx
@@ -52,8 +57,8 @@ async fn test_poll() {
let poll = {
let bucket = bucket.clone();
let ct = ct.clone();
+ let ctx = ctx.clone();
tokio::spawn(async move {
- let ctx = common::context();
ctx.k2v
.request
.builder(bucket.clone())
@@ -96,3 +101,165 @@ async fn test_poll() {
.to_vec();
assert_eq!(poll_res_body, b"New value");
}
+
+#[tokio::test]
+async fn test_poll_range() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-poll-range");
+
+ // Write initial value
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Initial value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Retrieve initial value to get its causality token
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), StatusCode::OK);
+ let ct = res2
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ // Initial poll range, retrieve single item and first seen_marker
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(b"{}".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), StatusCode::OK);
+ let json_res = json_body(res2).await;
+ let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
+ assert_json_eq!(
+ json_res,
+ json!(
+ {
+ "items": [
+ {"sk": "test1", "ct": ct, "v": [BASE64_STANDARD.encode(b"Initial value")]},
+ ],
+ "seenMarker": seen_marker,
+ }
+ )
+ );
+
+ // Second poll range, which will complete later
+ let poll = {
+ let bucket = bucket.clone();
+ let ctx = ctx.clone();
+ tokio::spawn(async move {
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
+ .send()
+ .await
+ })
+ };
+
+ // Write new value that supersedes initial one
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("x-garage-causality-token", ct)
+ .body(b"New value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), StatusCode::OK);
+ let json_res = json_body(poll_res).await;
+ let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
+ assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
+ assert_json_eq!(&json_res["items"][0]["sk"], json!("test1"));
+ assert_json_eq!(
+ &json_res["items"][0]["v"],
+ json!([BASE64_STANDARD.encode(b"New value")])
+ );
+
+ // Now we will add a value on a different key
+ // Start a new poll operation
+ let poll = {
+ let bucket = bucket.clone();
+ let ctx = ctx.clone();
+ tokio::spawn(async move {
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
+ .send()
+ .await
+ })
+ };
+
+ // Write value on different key
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test2"))
+ .body(b"Other value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), StatusCode::OK);
+ let json_res = json_body(poll_res).await;
+ assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
+ assert_json_eq!(&json_res["items"][0]["sk"], json!("test2"));
+ assert_json_eq!(
+ &json_res["items"][0]["v"],
+ json!([BASE64_STANDARD.encode(b"Other value")])
+ );
+}