aboutsummaryrefslogtreecommitdiff
path: root/src/garage
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
committerAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
commit5768bf362262f78376af14517c4921941986192e (patch)
treeb4baf3051eade0f63649443278bb3a3f4c38ec25 /src/garage
parentdef78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff)
downloadgarage-5768bf362262f78376af14517c4921941986192e.tar.gz
garage-5768bf362262f78376af14517c4921941986192e.zip
First implementation of K2V (#293)
**Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat <alex@adnab.me> Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex <alex@adnab.me> Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/garage')
-rw-r--r--src/garage/Cargo.toml8
-rw-r--r--src/garage/admin.rs19
-rw-r--r--src/garage/cli/cmd.rs7
-rw-r--r--src/garage/repair.rs6
-rw-r--r--src/garage/server.rs26
-rw-r--r--src/garage/tests/common/client.rs2
-rw-r--r--src/garage/tests/common/custom_requester.rs55
-rw-r--r--src/garage/tests/common/garage.rs34
-rw-r--r--src/garage/tests/common/mod.rs11
-rw-r--r--src/garage/tests/k2v/batch.rs525
-rw-r--r--src/garage/tests/k2v/errorcodes.rs141
-rw-r--r--src/garage/tests/k2v/item.rs719
-rw-r--r--src/garage/tests/k2v/mod.rs18
-rw-r--r--src/garage/tests/k2v/poll.rs98
-rw-r--r--src/garage/tests/k2v/simple.rs40
-rw-r--r--src/garage/tests/lib.rs8
-rw-r--r--src/garage/tests/s3/list.rs (renamed from src/garage/tests/list.rs)0
-rw-r--r--src/garage/tests/s3/mod.rs6
-rw-r--r--src/garage/tests/s3/multipart.rs (renamed from src/garage/tests/multipart.rs)0
-rw-r--r--src/garage/tests/s3/objects.rs (renamed from src/garage/tests/objects.rs)0
-rw-r--r--src/garage/tests/s3/simple.rs (renamed from src/garage/tests/simple.rs)0
-rw-r--r--src/garage/tests/s3/streaming_signature.rs (renamed from src/garage/tests/streaming_signature.rs)0
-rw-r--r--src/garage/tests/s3/website.rs (renamed from src/garage/tests/website.rs)32
23 files changed, 1690 insertions, 65 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 59f402ff..3b69d7bc 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -63,3 +63,11 @@ hyper = { version = "0.14", features = ["client", "http1", "runtime"] }
sha2 = "0.9"
static_init = "1.0"
+assert-json-diff = "2.0"
+serde_json = "1.0"
+base64 = "0.13"
+
+
+[features]
+kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ]
+k2v = [ "garage_util/k2v", "garage_api/k2v" ]
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 0b20bb20..af0c3f22 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -21,8 +21,8 @@ use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
-use garage_model::object_table::ObjectFilter;
use garage_model::permission::*;
+use garage_model::s3::object_table::ObjectFilter;
use crate::cli::*;
use crate::repair::Repair;
@@ -80,7 +80,13 @@ impl AdminRpcHandler {
let buckets = self
.garage
.bucket_table
- .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
+ .get_range(
+ &EmptyKey,
+ None,
+ Some(DeletedFilter::NotDeleted),
+ 10000,
+ EnumerationOrder::Forward,
+ )
.await?;
Ok(AdminRpc::BucketList(buckets))
}
@@ -210,7 +216,13 @@ impl AdminRpcHandler {
let objects = self
.garage
.object_table
- .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10)
+ .get_range(
+ &bucket_id,
+ None,
+ Some(ObjectFilter::IsData),
+ 10,
+ EnumerationOrder::Forward,
+ )
.await?;
if !objects.is_empty() {
return Err(Error::BadRequest(format!(
@@ -445,6 +457,7 @@ impl AdminRpcHandler {
None,
Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
10000,
+ EnumerationOrder::Forward,
)
.await?
.iter()
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index a90277a0..2a799868 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -85,13 +85,14 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
format_table(healthy_nodes);
let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>();
- let failure_case_1 = status.iter().any(|adv| !adv.is_up);
+ let failure_case_1 = status
+ .iter()
+ .any(|adv| !adv.is_up && matches!(layout.roles.get(&adv.id), Some(NodeRoleV(Some(_)))));
let failure_case_2 = layout
.roles
.items()
.iter()
- .filter(|(_, _, v)| v.0.is_some())
- .any(|(id, _, _)| !status_keys.contains(id));
+ .any(|(id, _, v)| !status_keys.contains(id) && v.0.is_some());
if failure_case_1 || failure_case_2 {
println!("\n==== FAILED NODES ====");
let mut failed_nodes =
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 3666ca8f..830eac71 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -2,10 +2,10 @@ use std::sync::Arc;
use tokio::sync::watch;
-use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
-use garage_model::object_table::*;
-use garage_model::version_table::*;
+use garage_model::s3::block_ref_table::*;
+use garage_model::s3::object_table::*;
+use garage_model::s3::version_table::*;
use garage_table::*;
use garage_util::error::Error;
diff --git a/src/garage/server.rs b/src/garage/server.rs
index 58c9e782..24bb25b3 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -8,10 +8,13 @@ use garage_util::error::Error;
use garage_admin::metrics::*;
use garage_admin::tracing_setup::*;
-use garage_api::run_api_server;
+use garage_api::s3::api_server::S3ApiServer;
use garage_model::garage::Garage;
use garage_web::run_web_server;
+#[cfg(feature = "k2v")]
+use garage_api::k2v::api_server::K2VApiServer;
+
use crate::admin::*;
async fn wait_from(mut chan: watch::Receiver<bool>) {
@@ -56,12 +59,21 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Create admin RPC handler...");
AdminRpcHandler::new(garage.clone());
- info!("Initializing API server...");
- let api_server = tokio::spawn(run_api_server(
+ info!("Initializing S3 API server...");
+ let s3_api_server = tokio::spawn(S3ApiServer::run(
garage.clone(),
wait_from(watch_cancel.clone()),
));
+ #[cfg(feature = "k2v")]
+ let k2v_api_server = {
+ info!("Initializing K2V API server...");
+ tokio::spawn(K2VApiServer::run(
+ garage.clone(),
+ wait_from(watch_cancel.clone()),
+ ))
+ };
+
info!("Initializing web server...");
let web_server = tokio::spawn(run_web_server(
garage.clone(),
@@ -80,8 +92,12 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
// Stuff runs
// When a cancel signal is sent, stuff stops
- if let Err(e) = api_server.await? {
- warn!("API server exited with error: {}", e);
+ if let Err(e) = s3_api_server.await? {
+ warn!("S3 API server exited with error: {}", e);
+ }
+ #[cfg(feature = "k2v")]
+ if let Err(e) = k2v_api_server.await? {
+ warn!("K2V API server exited with error: {}", e);
}
if let Err(e) = web_server.await? {
warn!("Web server exited with error: {}", e);
diff --git a/src/garage/tests/common/client.rs b/src/garage/tests/common/client.rs
index c5ddc6e5..212588b5 100644
--- a/src/garage/tests/common/client.rs
+++ b/src/garage/tests/common/client.rs
@@ -10,7 +10,7 @@ pub fn build_client(instance: &Instance) -> Client {
None,
"garage-integ-test",
);
- let endpoint = Endpoint::immutable(instance.uri());
+ let endpoint = Endpoint::immutable(instance.s3_uri());
let config = Config::builder()
.region(super::REGION)
diff --git a/src/garage/tests/common/custom_requester.rs b/src/garage/tests/common/custom_requester.rs
index 580691a1..1700cc90 100644
--- a/src/garage/tests/common/custom_requester.rs
+++ b/src/garage/tests/common/custom_requester.rs
@@ -17,14 +17,25 @@ use garage_api::signature;
pub struct CustomRequester {
key: Key,
uri: Uri,
+ service: &'static str,
client: Client<HttpConnector>,
}
impl CustomRequester {
- pub fn new(instance: &Instance) -> Self {
+ pub fn new_s3(instance: &Instance) -> Self {
CustomRequester {
key: instance.key.clone(),
- uri: instance.uri(),
+ uri: instance.s3_uri(),
+ service: "s3",
+ client: Client::new(),
+ }
+ }
+
+ pub fn new_k2v(instance: &Instance) -> Self {
+ CustomRequester {
+ key: instance.key.clone(),
+ uri: instance.k2v_uri(),
+ service: "k2v",
client: Client::new(),
}
}
@@ -32,6 +43,7 @@ impl CustomRequester {
pub fn builder(&self, bucket: String) -> RequestBuilder<'_> {
RequestBuilder {
requester: self,
+ service: self.service,
bucket,
method: Method::GET,
path: String::new(),
@@ -47,6 +59,7 @@ impl CustomRequester {
pub struct RequestBuilder<'a> {
requester: &'a CustomRequester,
+ service: &'static str,
bucket: String,
method: Method,
path: String,
@@ -59,13 +72,17 @@ pub struct RequestBuilder<'a> {
}
impl<'a> RequestBuilder<'a> {
+ pub fn service(&mut self, service: &'static str) -> &mut Self {
+ self.service = service;
+ self
+ }
pub fn method(&mut self, method: Method) -> &mut Self {
self.method = method;
self
}
- pub fn path(&mut self, path: String) -> &mut Self {
- self.path = path;
+ pub fn path(&mut self, path: impl ToString) -> &mut Self {
+ self.path = path.to_string();
self
}
@@ -74,16 +91,38 @@ impl<'a> RequestBuilder<'a> {
self
}
+ pub fn query_param<T, U>(&mut self, param: T, value: Option<U>) -> &mut Self
+ where
+ T: ToString,
+ U: ToString,
+ {
+ self.query_params
+ .insert(param.to_string(), value.as_ref().map(ToString::to_string));
+ self
+ }
+
pub fn signed_headers(&mut self, signed_headers: HashMap<String, String>) -> &mut Self {
self.signed_headers = signed_headers;
self
}
+ pub fn signed_header(&mut self, name: impl ToString, value: impl ToString) -> &mut Self {
+ self.signed_headers
+ .insert(name.to_string(), value.to_string());
+ self
+ }
+
pub fn unsigned_headers(&mut self, unsigned_headers: HashMap<String, String>) -> &mut Self {
self.unsigned_headers = unsigned_headers;
self
}
+ pub fn unsigned_header(&mut self, name: impl ToString, value: impl ToString) -> &mut Self {
+ self.unsigned_headers
+ .insert(name.to_string(), value.to_string());
+ self
+ }
+
pub fn body(&mut self, body: Vec<u8>) -> &mut Self {
self.body = body;
self
@@ -106,24 +145,24 @@ impl<'a> RequestBuilder<'a> {
let query = query_param_to_string(&self.query_params);
let (host, path) = if self.vhost_style {
(
- format!("{}.s3.garage", self.bucket),
+ format!("{}.{}.garage", self.bucket, self.service),
format!("{}{}", self.path, query),
)
} else {
(
- "s3.garage".to_owned(),
+ format!("{}.garage", self.service),
format!("{}/{}{}", self.bucket, self.path, query),
)
};
let uri = format!("{}{}", self.requester.uri, path);
let now = Utc::now();
- let scope = signature::compute_scope(&now, super::REGION.as_ref());
+ let scope = signature::compute_scope(&now, super::REGION.as_ref(), self.service);
let mut signer = signature::signing_hmac(
&now,
&self.requester.key.secret,
super::REGION.as_ref(),
- "s3",
+ self.service,
)
.unwrap();
let streaming_signer = signer.clone();
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index 88c51501..44d727f9 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -22,7 +22,9 @@ pub struct Instance {
process: process::Child,
pub path: PathBuf,
pub key: Key,
- pub api_port: u16,
+ pub s3_port: u16,
+ pub k2v_port: u16,
+ pub web_port: u16,
}
impl Instance {
@@ -58,9 +60,12 @@ rpc_secret = "{secret}"
[s3_api]
s3_region = "{region}"
-api_bind_addr = "127.0.0.1:{api_port}"
+api_bind_addr = "127.0.0.1:{s3_port}"
root_domain = ".s3.garage"
+[k2v_api]
+api_bind_addr = "127.0.0.1:{k2v_port}"
+
[s3_web]
bind_addr = "127.0.0.1:{web_port}"
root_domain = ".web.garage"
@@ -72,10 +77,11 @@ api_bind_addr = "127.0.0.1:{admin_port}"
path = path.display(),
secret = GARAGE_TEST_SECRET,
region = super::REGION,
- api_port = port,
- rpc_port = port + 1,
- web_port = port + 2,
- admin_port = port + 3,
+ s3_port = port,
+ k2v_port = port + 1,
+ rpc_port = port + 2,
+ web_port = port + 3,
+ admin_port = port + 4,
);
fs::write(path.join("config.toml"), config).expect("Could not write garage config file");
@@ -88,7 +94,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
.arg("server")
.stdout(stdout)
.stderr(stderr)
- .env("RUST_LOG", "garage=info,garage_api=debug")
+ .env("RUST_LOG", "garage=info,garage_api=trace")
.spawn()
.expect("Could not start garage");
@@ -96,7 +102,9 @@ api_bind_addr = "127.0.0.1:{admin_port}"
process: child,
path,
key: Key::default(),
- api_port: port,
+ s3_port: port,
+ k2v_port: port + 1,
+ web_port: port + 3,
}
}
@@ -147,8 +155,14 @@ api_bind_addr = "127.0.0.1:{admin_port}"
String::from_utf8(output.stdout).unwrap()
}
- pub fn uri(&self) -> http::Uri {
- format!("http://127.0.0.1:{api_port}", api_port = self.api_port)
+ pub fn s3_uri(&self) -> http::Uri {
+ format!("http://127.0.0.1:{s3_port}", s3_port = self.s3_port)
+ .parse()
+ .expect("Could not build garage endpoint URI")
+ }
+
+ pub fn k2v_uri(&self) -> http::Uri {
+ format!("http://127.0.0.1:{k2v_port}", k2v_port = self.k2v_port)
.parse()
.expect("Could not build garage endpoint URI")
}
diff --git a/src/garage/tests/common/mod.rs b/src/garage/tests/common/mod.rs
index 8f88c731..28874b02 100644
--- a/src/garage/tests/common/mod.rs
+++ b/src/garage/tests/common/mod.rs
@@ -17,18 +17,27 @@ pub struct Context {
pub garage: &'static garage::Instance,
pub client: Client,
pub custom_request: CustomRequester,
+ pub k2v: K2VContext,
+}
+
+pub struct K2VContext {
+ pub request: CustomRequester,
}
impl Context {
fn new() -> Self {
let garage = garage::instance();
let client = client::build_client(garage);
- let custom_request = CustomRequester::new(garage);
+ let custom_request = CustomRequester::new_s3(garage);
+ let k2v_request = CustomRequester::new_k2v(garage);
Context {
garage,
client,
custom_request,
+ k2v: K2VContext {
+ request: k2v_request,
+ },
}
}
diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs
new file mode 100644
index 00000000..1182a298
--- /dev/null
+++ b/src/garage/tests/k2v/batch.rs
@@ -0,0 +1,525 @@
+use std::collections::HashMap;
+
+use crate::common;
+
+use assert_json_diff::assert_json_eq;
+use serde_json::json;
+
+use super::json_body;
+use hyper::Method;
+
+#[tokio::test]
+async fn test_batch() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-batch");
+
+ let mut values = HashMap::new();
+ values.insert("a", "initial test 1");
+ values.insert("b", "initial test 2");
+ values.insert("c", "initial test 3");
+ values.insert("d.1", "initial test 4");
+ values.insert("d.2", "initial test 5");
+ values.insert("e", "initial test 6");
+ let mut ct = HashMap::new();
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ format!(
+ r#"[
+ {{"pk": "root", "sk": "a", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "b", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "c", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "d.1", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "e", "ct": null, "v": "{}"}}
+ ]"#,
+ base64::encode(values.get(&"a").unwrap()),
+ base64::encode(values.get(&"b").unwrap()),
+ base64::encode(values.get(&"c").unwrap()),
+ base64::encode(values.get(&"d.1").unwrap()),
+ base64::encode(values.get(&"d.2").unwrap()),
+ base64::encode(values.get(&"e").unwrap()),
+ )
+ .into_bytes(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ for sk in ["a", "b", "c", "d.1", "d.2", "e"] {
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ ct.insert(
+ sk,
+ res.headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string(),
+ );
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, values.get(sk).unwrap().as_bytes());
+ }
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root"},
+ {"partitionKey": "root", "start": "c"},
+ {"partitionKey": "root", "start": "c", "reverse": true, "end": "a"},
+ {"partitionKey": "root", "limit": 1},
+ {"partitionKey": "root", "prefix": "d"}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": "c",
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": "c",
+ "end": "a",
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]},
+ {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": 1,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]}
+ ],
+ "more": true,
+ "nextStart": "b",
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d",
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ ])
+ );
+
+ // Insert some new values
+ values.insert("c'", "new test 3");
+ values.insert("d.1'", "new test 4");
+ values.insert("d.2'", "new test 5");
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ format!(
+ r#"[
+ {{"pk": "root", "sk": "b", "ct": "{}", "v": null}},
+ {{"pk": "root", "sk": "c", "ct": null, "v": "{}"}},
+ {{"pk": "root", "sk": "d.1", "ct": "{}", "v": "{}"}},
+ {{"pk": "root", "sk": "d.2", "ct": null, "v": "{}"}}
+ ]"#,
+ ct.get(&"b").unwrap(),
+ base64::encode(values.get(&"c'").unwrap()),
+ ct.get(&"d.1").unwrap(),
+ base64::encode(values.get(&"d.1'").unwrap()),
+ base64::encode(values.get(&"d.2'").unwrap()),
+ )
+ .into_bytes(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ for sk in ["b", "c", "d.1", "d.2"] {
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ if sk == "b" {
+ assert_eq!(res.status(), 204);
+ } else {
+ assert_eq!(res.status(), 200);
+ }
+ ct.insert(
+ sk,
+ res.headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string(),
+ );
+ }
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root"},
+ {"partitionKey": "root", "prefix": "d"},
+ {"partitionKey": "root", "prefix": "d.", "end": "d.2"},
+ {"partitionKey": "root", "prefix": "d.", "limit": 1},
+ {"partitionKey": "root", "prefix": "d.", "start": "d.2", "limit": 1},
+ {"partitionKey": "root", "prefix": "d.", "reverse": true},
+ {"partitionKey": "root", "prefix": "d.", "start": "d.2", "reverse": true},
+ {"partitionKey": "root", "prefix": "d.", "limit": 2}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "a", "ct": ct.get("a").unwrap(), "v": [base64::encode(values.get("a").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d",
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": "d.2",
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": null,
+ "limit": 1,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": true,
+ "nextStart": "d.2",
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": "d.2",
+ "end": null,
+ "limit": 1,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": "d.2",
+ "end": null,
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d.",
+ "start": null,
+ "end": null,
+ "limit": 2,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1'").unwrap())]},
+ {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap()), base64::encode(values.get("d.2'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ ])
+ );
+
+ // Test DeleteBatch
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("delete", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root", "start": "a", "end": "c"},
+ {"partitionKey": "root", "prefix": "d"}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": "a",
+ "end": "c",
+ "singleItem": false,
+ "deletedItems": 1,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": "d",
+ "start": null,
+ "end": null,
+ "singleItem": false,
+ "deletedItems": 2,
+ },
+ ])
+ );
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partitionKey": "root"},
+ {"partitionKey": "root", "reverse": true}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let json_res = json_body(res).await;
+ assert_json_eq!(
+ json_res,
+ json!([
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ {
+ "partitionKey": "root",
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": true,
+ "conflictsOnly": false,
+ "tombstones": false,
+ "singleItem": false,
+ "items": [
+ {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]},
+ {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]},
+ ],
+ "more": false,
+ "nextStart": null,
+ },
+ ])
+ );
+}
diff --git a/src/garage/tests/k2v/errorcodes.rs b/src/garage/tests/k2v/errorcodes.rs
new file mode 100644
index 00000000..2fcc45bc
--- /dev/null
+++ b/src/garage/tests/k2v/errorcodes.rs
@@ -0,0 +1,141 @@
+use crate::common;
+
+use hyper::Method;
+
+#[tokio::test]
+async fn test_error_codes() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-error-codes");
+
+ // Regular insert should work (code 200)
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Hello, world!".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Insert with trash causality token: invalid request
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("x-garage-causality-token", "tra$sh")
+ .body(b"Hello, world!".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Search without partition key: invalid request
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {},
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Search with start that is not in prefix: invalid request
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partition_key": "root", "prefix": "a", "start": "bx"},
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Search with invalid json: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .query_param("search", Option::<&str>::None)
+ .body(
+ br#"[
+ {"partition_key": "root"
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Batch insert with invalid causality token: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ br#"[
+ {"pk": "root", "sk": "a", "ct": "tra$h", "v": "aGVsbG8sIHdvcmxkCg=="}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Batch insert with invalid data: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .body(
+ br#"[
+ {"pk": "root", "sk": "a", "ct": null, "v": "aGVsbG8sIHdvcmx$Cg=="}
+ ]"#
+ .to_vec(),
+ )
+ .method(Method::POST)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+
+ // Poll with invalid causality token: 400
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .query_param("causality_token", Some("tra$h"))
+ .query_param("timeout", Some("10"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 400);
+}
diff --git a/src/garage/tests/k2v/item.rs b/src/garage/tests/k2v/item.rs
new file mode 100644
index 00000000..bf2b01f8
--- /dev/null
+++ b/src/garage/tests/k2v/item.rs
@@ -0,0 +1,719 @@
+use crate::common;
+
+use assert_json_diff::assert_json_eq;
+use serde_json::json;
+
+use super::json_body;
+use hyper::Method;
+
+#[tokio::test]
+async fn test_items_and_indices() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-item-and-index");
+
+ // ReadIndex -- there should be nothing
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [],
+ "more": false,
+ "nextStart": null
+ })
+ );
+
+ let content2_len = "_: hello universe".len();
+ let content3_len = "_: concurrent value".len();
+
+ for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() {
+ let content = format!("{}: hello world", sk).into_bytes();
+ let content2 = format!("{}: hello universe", sk).into_bytes();
+ let content3 = format!("{}: concurrent value", sk).into_bytes();
+
+ // Put initially, no causality token
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .body(content.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Get value back
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, content);
+
+ // ReadIndex -- now there should be some stuff
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": i+1,
+ "conflicts": i,
+ "values": i+i+1,
+ "bytes": i*(content2.len() + content3.len()) + content.len(),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+
+ // Put again, this time with causality token
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("x-garage-causality-token", ct.clone())
+ .body(content2.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Get value back
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, content2);
+
+ // ReadIndex -- now there should be some stuff
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": i+1,
+ "conflicts": i,
+ "values": i+i+1,
+ "bytes": i*content3.len() + (i+1)*content2.len(),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+
+ // Put again with same CT, now we have concurrent values
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("x-garage-causality-token", ct.clone())
+ .body(content3.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Get value back
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_json = json_body(res).await;
+ assert_json_eq!(
+ res_json,
+ [base64::encode(&content2), base64::encode(&content3)]
+ );
+
+ // ReadIndex -- now there should be some stuff
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": i+1,
+ "conflicts": i+1,
+ "values": 2*(i+1),
+ "bytes": (i+1)*(content2.len() + content3.len()),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+ }
+
+ // Now delete things
+ for (i, sk) in ["a", "b", "c", "d"].iter().enumerate() {
+ // Get value back (we just need the CT)
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ // Delete it
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::DELETE)
+ .path("root")
+ .query_param("sort_key", Some(sk))
+ .signed_header("x-garage-causality-token", ct)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204);
+
+ // ReadIndex -- now there should be some stuff
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .send()
+ .await
+ .unwrap();
+ let res_body = json_body(res).await;
+ if i < 3 {
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [
+ {
+ "pk": "root",
+ "entries": 3-i,
+ "conflicts": 3-i,
+ "values": 2*(3-i),
+ "bytes": (3-i)*(content2_len + content3_len),
+ }
+ ],
+ "more": false,
+ "nextStart": null
+ })
+ );
+ } else {
+ assert_json_eq!(
+ res_body,
+ json!({
+ "prefix": null,
+ "start": null,
+ "end": null,
+ "limit": null,
+ "reverse": false,
+ "partitionKeys": [],
+ "more": false,
+ "nextStart": null
+ })
+ );
+ }
+ }
+}
+
+#[tokio::test]
+async fn test_item_return_format() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-item-return-format");
+
+ let single_value = b"A single value".to_vec();
+ let concurrent_value = b"A concurrent value".to_vec();
+
+ // -- Test with a single value --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .body(single_value.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, single_value);
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/octet-stream"
+ );
+ let res_body = hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res_body, single_value);
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&single_value)]));
+
+ // -- Test with a second, concurrent value --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .body(concurrent_value.clone())
+ .method(Method::PUT)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!([
+ base64::encode(&single_value),
+ base64::encode(&concurrent_value)
+ ])
+ );
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!([
+ base64::encode(&single_value),
+ base64::encode(&concurrent_value)
+ ])
+ );
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 409); // CONFLICT
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(
+ res_body,
+ json!([
+ base64::encode(&single_value),
+ base64::encode(&concurrent_value)
+ ])
+ );
+
+ // -- Delete first value, concurrently with second insert --
+ // -- (we now have a concurrent value and a deletion) --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .method(Method::DELETE)
+ .signed_header("x-garage-causality-token", ct)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let ct = res
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 409); // CONFLICT
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([base64::encode(&concurrent_value), null]));
+
+ // -- Delete everything --
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .method(Method::DELETE)
+ .signed_header("x-garage-causality-token", ct)
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204);
+
+ // f0: either
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "*/*")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204); // NO CONTENT
+
+ // f1: not specified
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([null]));
+
+ // f2: binary
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 204); // NO CONTENT
+
+ // f3: json
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("v1"))
+ .signed_header("accept", "application/json")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+ assert_eq!(
+ res.headers().get("content-type").unwrap().to_str().unwrap(),
+ "application/json"
+ );
+ let res_body = json_body(res).await;
+ assert_json_eq!(res_body, json!([null]));
+}
diff --git a/src/garage/tests/k2v/mod.rs b/src/garage/tests/k2v/mod.rs
new file mode 100644
index 00000000..a009460e
--- /dev/null
+++ b/src/garage/tests/k2v/mod.rs
@@ -0,0 +1,18 @@
+pub mod batch;
+pub mod errorcodes;
+pub mod item;
+pub mod poll;
+pub mod simple;
+
+use hyper::{Body, Response};
+
+pub async fn json_body(res: Response<Body>) -> serde_json::Value {
+ let res_body: serde_json::Value = serde_json::from_slice(
+ &hyper::body::to_bytes(res.into_body())
+ .await
+ .unwrap()
+ .to_vec()[..],
+ )
+ .unwrap();
+ res_body
+}
diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs
new file mode 100644
index 00000000..70dc0410
--- /dev/null
+++ b/src/garage/tests/k2v/poll.rs
@@ -0,0 +1,98 @@
+use hyper::Method;
+use std::time::Duration;
+
+use crate::common;
+
+#[tokio::test]
+async fn test_poll() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-poll");
+
+ // Write initial value
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Initial value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Retrieve initial value to get its causality token
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), 200);
+ let ct = res2
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ let res2_body = hyper::body::to_bytes(res2.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res2_body, b"Initial value");
+
+ // Start poll operation
+ let poll = {
+ let bucket = bucket.clone();
+ let ct = ct.clone();
+ tokio::spawn(async move {
+ let ctx = common::context();
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .query_param("causality_token", Some(ct))
+ .query_param("timeout", Some("10"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ })
+ };
+
+ // Write new value that supersedes initial one
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("x-garage-causality-token", ct)
+ .body(b"New value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), 200);
+
+ let poll_res_body = hyper::body::to_bytes(poll_res.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(poll_res_body, b"New value");
+}
diff --git a/src/garage/tests/k2v/simple.rs b/src/garage/tests/k2v/simple.rs
new file mode 100644
index 00000000..ae9a8674
--- /dev/null
+++ b/src/garage/tests/k2v/simple.rs
@@ -0,0 +1,40 @@
+use crate::common;
+
+use hyper::Method;
+
+#[tokio::test]
+async fn test_simple() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-simple");
+
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Hello, world!".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), 200);
+
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), 200);
+
+ let res2_body = hyper::body::to_bytes(res2.into_body())
+ .await
+ .unwrap()
+ .to_vec();
+ assert_eq!(res2_body, b"Hello, world!");
+}
diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs
index 8799c395..0106ad10 100644
--- a/src/garage/tests/lib.rs
+++ b/src/garage/tests/lib.rs
@@ -3,9 +3,5 @@ mod common;
mod admin;
mod bucket;
-mod list;
-mod multipart;
-mod objects;
-mod simple;
-mod streaming_signature;
-mod website;
+mod k2v;
+mod s3;
diff --git a/src/garage/tests/list.rs b/src/garage/tests/s3/list.rs
index bb03f250..bb03f250 100644
--- a/src/garage/tests/list.rs
+++ b/src/garage/tests/s3/list.rs
diff --git a/src/garage/tests/s3/mod.rs b/src/garage/tests/s3/mod.rs
new file mode 100644
index 00000000..623eb665
--- /dev/null
+++ b/src/garage/tests/s3/mod.rs
@@ -0,0 +1,6 @@
+mod list;
+mod multipart;
+mod objects;
+mod simple;
+mod streaming_signature;
+mod website;
diff --git a/src/garage/tests/multipart.rs b/src/garage/tests/s3/multipart.rs
index 895a2993..895a2993 100644
--- a/src/garage/tests/multipart.rs
+++ b/src/garage/tests/s3/multipart.rs
diff --git a/src/garage/tests/objects.rs b/src/garage/tests/s3/objects.rs
index e1175b81..e1175b81 100644
--- a/src/garage/tests/objects.rs
+++ b/src/garage/tests/s3/objects.rs
diff --git a/src/garage/tests/simple.rs b/src/garage/tests/s3/simple.rs
index f54ae9ac..f54ae9ac 100644
--- a/src/garage/tests/simple.rs
+++ b/src/garage/tests/s3/simple.rs
diff --git a/src/garage/tests/streaming_signature.rs b/src/garage/tests/s3/streaming_signature.rs
index c68f7dfc..c68f7dfc 100644
--- a/src/garage/tests/streaming_signature.rs
+++ b/src/garage/tests/s3/streaming_signature.rs
diff --git a/src/garage/tests/website.rs b/src/garage/tests/s3/website.rs
index 963d11ea..0570ac6a 100644
--- a/src/garage/tests/website.rs
+++ b/src/garage/tests/s3/website.rs
@@ -35,10 +35,7 @@ async fn test_website() {
let req = || {
Request::builder()
.method("GET")
- .uri(format!(
- "http://127.0.0.1:{}/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty())
.unwrap()
@@ -170,10 +167,7 @@ async fn test_website_s3_api() {
{
let req = Request::builder()
.method("GET")
- .uri(format!(
- "http://127.0.0.1:{}/site/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.body(Body::empty())
@@ -198,7 +192,7 @@ async fn test_website_s3_api() {
.method("GET")
.uri(format!(
"http://127.0.0.1:{}/wrong.html",
- common::garage::DEFAULT_PORT + 2
+ ctx.garage.web_port
))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty())
@@ -217,10 +211,7 @@ async fn test_website_s3_api() {
{
let req = Request::builder()
.method("OPTIONS")
- .uri(format!(
- "http://127.0.0.1:{}/site/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "PUT")
@@ -244,10 +235,7 @@ async fn test_website_s3_api() {
{
let req = Request::builder()
.method("OPTIONS")
- .uri(format!(
- "http://127.0.0.1:{}/site/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "DELETE")
@@ -288,10 +276,7 @@ async fn test_website_s3_api() {
{
let req = Request::builder()
.method("OPTIONS")
- .uri(format!(
- "http://127.0.0.1:{}/site/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "PUT")
@@ -319,10 +304,7 @@ async fn test_website_s3_api() {
{
let req = Request::builder()
.method("GET")
- .uri(format!(
- "http://127.0.0.1:{}/site/",
- common::garage::DEFAULT_PORT + 2
- ))
+ .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty())
.unwrap();