aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-01-11 10:04:41 +0100
committerAlex Auvolat <alex@adnab.me>2023-01-11 10:04:41 +0100
commitde1111076b28add706bded9fb738a1ef89ee0882 (patch)
tree8a0028e447624f737d6167f303b51f192473daec /src/garage
parentb83517d521b1bea7585ce45a803fad373f28225c (diff)
downloadgarage-de1111076b28add706bded9fb738a1ef89ee0882.tar.gz
garage-de1111076b28add706bded9fb738a1ef89ee0882.zip
PollRange integration test
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/tests/k2v/poll.rs170
1 files changed, 168 insertions, 2 deletions
diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs
index e56705ae..f54cc5d4 100644
--- a/src/garage/tests/k2v/poll.rs
+++ b/src/garage/tests/k2v/poll.rs
@@ -1,12 +1,16 @@
use hyper::{Method, StatusCode};
use std::time::Duration;
+use assert_json_diff::assert_json_eq;
+use serde_json::json;
+
+use super::json_body;
use crate::common;
#[tokio::test]
-async fn test_poll() {
+async fn test_poll_item() {
let ctx = common::context();
- let bucket = ctx.create_bucket("test-k2v-poll");
+ let bucket = ctx.create_bucket("test-k2v-poll-item");
// Write initial value
let res = ctx
@@ -96,3 +100,165 @@ async fn test_poll() {
.to_vec();
assert_eq!(poll_res_body, b"New value");
}
+
+#[tokio::test]
+async fn test_poll_range() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-poll-range");
+
+ // Write initial value
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Initial value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Retrieve initial value to get its causality token
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), StatusCode::OK);
+ let ct = res2
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ // Initial poll range, retrieve single item and first seen_marker
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(b"{}".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), StatusCode::OK);
+ let json_res = json_body(res2).await;
+ let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
+ assert_json_eq!(
+ json_res,
+ json!(
+ {
+ "items": [
+ {"sk": "test1", "ct": ct, "v": [base64::encode(b"Initial value")]},
+ ],
+ "seenMarker": seen_marker,
+ }
+ )
+ );
+
+ // Second poll range, which will complete later
+ let poll = {
+ let bucket = bucket.clone();
+ tokio::spawn(async move {
+ let ctx = common::context();
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
+ .send()
+ .await
+ })
+ };
+
+ // Write new value that supersedes initial one
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("x-garage-causality-token", ct)
+ .body(b"New value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), StatusCode::OK);
+ let json_res = json_body(poll_res).await;
+ let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
+ assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
+ assert_json_eq!(&json_res["items"][0]["sk"], json!("test1"));
+ assert_json_eq!(
+ &json_res["items"][0]["v"],
+ json!([base64::encode(b"New value")])
+ );
+
+ // Now we will add a value on a different key
+ // Start a new poll operation
+ let poll = {
+ let bucket = bucket.clone();
+ tokio::spawn(async move {
+ let ctx = common::context();
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
+ .send()
+ .await
+ })
+ };
+
+ // Write value on different key
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test2"))
+ .body(b"Other value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), StatusCode::OK);
+ let json_res = json_body(poll_res).await;
+ assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
+ assert_json_eq!(&json_res["items"][0]["sk"], json!("test2"));
+ assert_json_eq!(
+ &json_res["items"][0]["v"],
+ json!([base64::encode(b"Other value")])
+ );
+}