diff options
Diffstat (limited to 'src/garage/tests/k2v')
-rw-r--r-- | src/garage/tests/k2v/batch.rs | 103 | ||||
-rw-r--r-- | src/garage/tests/k2v/item.rs | 37 | ||||
-rw-r--r-- | src/garage/tests/k2v/poll.rs | 170 |
3 files changed, 245 insertions, 65 deletions
diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs index 6abba1c5..595d0ba8 100644 --- a/src/garage/tests/k2v/batch.rs +++ b/src/garage/tests/k2v/batch.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use crate::common; use assert_json_diff::assert_json_eq; +use base64::prelude::*; use serde_json::json; use super::json_body; @@ -36,12 +37,12 @@ async fn test_batch() { {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}}, {{"pk": "root", "sk": "e", "ct": null, "v": "{}"}} ]"#, - base64::encode(values.get(&"a").unwrap()), - base64::encode(values.get(&"b").unwrap()), - base64::encode(values.get(&"c").unwrap()), - base64::encode(values.get(&"d.1").unwrap()), - base64::encode(values.get(&"d.2").unwrap()), - base64::encode(values.get(&"e").unwrap()), + BASE64_STANDARD.encode(values.get(&"a").unwrap()), + BASE64_STANDARD.encode(values.get(&"b").unwrap()), + BASE64_STANDARD.encode(values.get(&"c").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.1").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.2").unwrap()), + BASE64_STANDARD.encode(values.get(&"e").unwrap()), ) .into_bytes(), ) @@ -120,12 +121,12 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}, - {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -141,10 +142,10 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -160,9 +161,9 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]}, ], "more": false, "nextStart": null, @@ -178,8 +179,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]}, ], "more": false, "nextStart": null, @@ -195,8 +196,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, - {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [BASE64_STANDARD.encode(values.get("b").unwrap())]}, ], "more": false, "nextStart": null, @@ -212,7 +213,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]} + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]} ], "more": true, "nextStart": "b", @@ -228,8 +229,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]} + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap())]} ], "more": false, "nextStart": null, @@ -255,10 +256,10 @@ async fn test_batch() { {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}} ]"#, ct.get(&"b").unwrap(), - base64::encode(values.get(&"c'").unwrap()), + BASE64_STANDARD.encode(values.get(&"c'").unwrap()), ct.get(&"d.1").unwrap(), - base64::encode(values.get(&"d.1'").unwrap()), - base64::encode(values.get(&"d.2'").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.1'").unwrap()), + BASE64_STANDARD.encode(values.get(&"d.2'").unwrap()), ) .into_bytes(), ) @@ -333,11 +334,11 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [BASE64_STANDARD.encode(values.get("a").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -353,8 +354,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, ], "more": false, "nextStart": null, @@ -370,7 +371,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": false, "nextStart": null, @@ -386,7 +387,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": true, "nextStart": "d.2", @@ -402,7 +403,7 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, ], "more": false, "nextStart": null, @@ -418,8 +419,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": false, "nextStart": null, @@ -435,8 +436,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, ], "more": false, "nextStart": null, @@ -452,8 +453,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]}, - {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.1'").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [BASE64_STANDARD.encode(values.get("d.2").unwrap()), BASE64_STANDARD.encode(values.get("d.2'").unwrap())]}, ], "more": false, "nextStart": null, @@ -563,8 +564,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]} + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]} ], "more": false, "nextStart": null, @@ -580,8 +581,8 @@ async fn test_batch() { "tombstones": false, "singleItem": false, "items": [ - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, ], "more": false, "nextStart": null, @@ -599,10 +600,10 @@ async fn test_batch() { "items": [ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [null]}, {"sk": "b", "ct": ct.get("b").unwrap(), "v": [null]}, - {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [BASE64_STANDARD.encode(values.get("c").unwrap()), BASE64_STANDARD.encode(values.get("c'").unwrap())]}, {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [null]}, {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [null]}, - {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [BASE64_STANDARD.encode(values.get("e").unwrap())]}, ], "more": false, "nextStart": null, diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs index 2641386f..588836c7 100644 --- a/src/garage/tests/k2v/item.rs +++ b/src/garage/tests/k2v/item.rs @@ -3,6 +3,7 @@ use std::time::Duration; use crate::common; use assert_json_diff::assert_json_eq; +use base64::prelude::*; use serde_json::json; use super::json_body; @@ -222,7 +223,10 @@ async fn test_items_and_indices() { let res_json = json_body(res).await; assert_json_eq!( res_json, - [base64::encode(&content2), base64::encode(&content3)] + [ + BASE64_STANDARD.encode(&content2), + BASE64_STANDARD.encode(&content3) + ] ); // ReadIndex -- now there should be some stuff @@ -411,7 +415,7 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)])); // f2: binary let res = ctx @@ -452,7 +456,7 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&single_value)])); + assert_json_eq!(res_body, json!([BASE64_STANDARD.encode(&single_value)])); // -- Test with a second, concurrent value -- let res = ctx @@ -488,8 +492,8 @@ async fn test_item_return_format() { assert_json_eq!( res_body, json!([ - base64::encode(&single_value), - base64::encode(&concurrent_value) + BASE64_STANDARD.encode(&single_value), + BASE64_STANDARD.encode(&concurrent_value) ]) ); @@ -512,8 +516,8 @@ async fn test_item_return_format() { assert_json_eq!( res_body, json!([ - base64::encode(&single_value), - base64::encode(&concurrent_value) + BASE64_STANDARD.encode(&single_value), + BASE64_STANDARD.encode(&concurrent_value) ]) ); @@ -550,8 +554,8 @@ async fn test_item_return_format() { assert_json_eq!( res_body, json!([ - base64::encode(&single_value), - base64::encode(&concurrent_value) + BASE64_STANDARD.encode(&single_value), + BASE64_STANDARD.encode(&concurrent_value) ]) ); @@ -587,7 +591,10 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + assert_json_eq!( + res_body, + json!([BASE64_STANDARD.encode(&concurrent_value), null]) + ); // f1: not specified let res = ctx @@ -612,7 +619,10 @@ async fn test_item_return_format() { .unwrap() .to_string(); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + assert_json_eq!( + res_body, + json!([BASE64_STANDARD.encode(&concurrent_value), null]) + ); // f2: binary let res = ctx @@ -644,7 +654,10 @@ async fn test_item_return_format() { "application/json" ); let res_body = json_body(res).await; - assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null])); + assert_json_eq!( + res_body, + json!([BASE64_STANDARD.encode(&concurrent_value), null]) + ); // -- Delete everything -- let res = ctx diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs index e56705ae..f54cc5d4 100644 --- a/src/garage/tests/k2v/poll.rs +++ b/src/garage/tests/k2v/poll.rs @@ -1,12 +1,16 @@ use hyper::{Method, StatusCode}; use std::time::Duration; +use assert_json_diff::assert_json_eq; +use serde_json::json; + +use super::json_body; use crate::common; #[tokio::test] -async fn test_poll() { +async fn test_poll_item() { let ctx = common::context(); - let bucket = ctx.create_bucket("test-k2v-poll"); + let bucket = ctx.create_bucket("test-k2v-poll-item"); // Write initial value let res = ctx @@ -96,3 +100,165 @@ async fn test_poll() { .to_vec(); assert_eq!(poll_res_body, b"New value"); } + +#[tokio::test] +async fn test_poll_range() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-poll-range"); + + // Write initial value + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .body(b"Initial value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Retrieve initial value to get its causality token + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res2.status(), StatusCode::OK); + let ct = res2 + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + // Initial poll range, retrieve single item and first seen_marker + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(b"{}".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res2.status(), StatusCode::OK); + let json_res = json_body(res2).await; + let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string(); + assert_json_eq!( + json_res, + json!( + { + "items": [ + {"sk": "test1", "ct": ct, "v": [base64::encode(b"Initial value")]}, + ], + "seenMarker": seen_marker, + } + ) + ); + + // Second poll range, which will complete later + let poll = { + let bucket = bucket.clone(); + tokio::spawn(async move { + let ctx = common::context(); + ctx.k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes()) + .send() + .await + }) + }; + + // Write new value that supersedes initial one + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("x-garage-causality-token", ct) + .body(b"New value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), StatusCode::OK); + let json_res = json_body(poll_res).await; + let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string(); + assert_eq!(json_res["items"].as_array().unwrap().len(), 1); + assert_json_eq!(&json_res["items"][0]["sk"], json!("test1")); + assert_json_eq!( + &json_res["items"][0]["v"], + json!([base64::encode(b"New value")]) + ); + + // Now we will add a value on a different key + // Start a new poll operation + let poll = { + let bucket = bucket.clone(); + tokio::spawn(async move { + let ctx = common::context(); + ctx.k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes()) + .send() + .await + }) + }; + + // Write value on different key + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test2")) + .body(b"Other value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), StatusCode::OK); + let json_res = json_body(poll_res).await; + assert_eq!(json_res["items"].as_array().unwrap().len(), 1); + assert_json_eq!(&json_res["items"][0]["sk"], json!("test2")); + assert_json_eq!( + &json_res["items"][0]["v"], + json!([base64::encode(b"Other value")]) + ); +} |