diff options
Diffstat (limited to 'src/garage/tests/k2v/poll.rs')
-rw-r--r-- | src/garage/tests/k2v/poll.rs | 173 |
1 files changed, 170 insertions, 3 deletions
diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs index e56705ae..dd44aed9 100644 --- a/src/garage/tests/k2v/poll.rs +++ b/src/garage/tests/k2v/poll.rs @@ -1,12 +1,17 @@ +use base64::prelude::*; use hyper::{Method, StatusCode}; use std::time::Duration; +use assert_json_diff::assert_json_eq; +use serde_json::json; + +use super::json_body; use crate::common; #[tokio::test] -async fn test_poll() { +async fn test_poll_item() { let ctx = common::context(); - let bucket = ctx.create_bucket("test-k2v-poll"); + let bucket = ctx.create_bucket("test-k2v-poll-item"); // Write initial value let res = ctx @@ -52,8 +57,8 @@ async fn test_poll() { let poll = { let bucket = bucket.clone(); let ct = ct.clone(); + let ctx = ctx.clone(); tokio::spawn(async move { - let ctx = common::context(); ctx.k2v .request .builder(bucket.clone()) @@ -96,3 +101,165 @@ async fn test_poll() { .to_vec(); assert_eq!(poll_res_body, b"New value"); } + +#[tokio::test] +async fn test_poll_range() { + let ctx = common::context(); + let bucket = ctx.create_bucket("test-k2v-poll-range"); + + // Write initial value + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .body(b"Initial value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Retrieve initial value to get its causality token + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res2.status(), StatusCode::OK); + let ct = res2 + .headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(); + + // Initial poll range, retrieve single item and first seen_marker + let res2 = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(b"{}".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res2.status(), StatusCode::OK); + let json_res = json_body(res2).await; + let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string(); + assert_json_eq!( + json_res, + json!( + { + "items": [ + {"sk": "test1", "ct": ct, "v": [BASE64_STANDARD.encode(b"Initial value")]}, + ], + "seenMarker": seen_marker, + } + ) + ); + + // Second poll range, which will complete later + let poll = { + let bucket = bucket.clone(); + let ctx = ctx.clone(); + tokio::spawn(async move { + ctx.k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes()) + .send() + .await + }) + }; + + // Write new value that supersedes initial one + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test1")) + .signed_header("x-garage-causality-token", ct) + .body(b"New value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), StatusCode::OK); + let json_res = json_body(poll_res).await; + let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string(); + assert_eq!(json_res["items"].as_array().unwrap().len(), 1); + assert_json_eq!(&json_res["items"][0]["sk"], json!("test1")); + assert_json_eq!( + &json_res["items"][0]["v"], + json!([BASE64_STANDARD.encode(b"New value")]) + ); + + // Now we will add a value on a different key + // Start a new poll operation + let poll = { + let bucket = bucket.clone(); + let ctx = ctx.clone(); + tokio::spawn(async move { + ctx.k2v + .request + .builder(bucket.clone()) + .method(Method::POST) + .path("root") + .query_param("poll_range", None::<String>) + .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes()) + .send() + .await + }) + }; + + // Write value on different key + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .method(Method::PUT) + .path("root") + .query_param("sort_key", Some("test2")) + .body(b"Other value".to_vec()) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::NO_CONTENT); + + // Check poll finishes with correct value + let poll_res = tokio::select! { + _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"), + res = poll => res.unwrap().unwrap(), + }; + + assert_eq!(poll_res.status(), StatusCode::OK); + let json_res = json_body(poll_res).await; + assert_eq!(json_res["items"].as_array().unwrap().len(), 1); + assert_json_eq!(&json_res["items"][0]["sk"], json!("test2")); + assert_json_eq!( + &json_res["items"][0]["v"], + json!([BASE64_STANDARD.encode(b"Other value")]) + ); +} |