From 176715c5b27ea62e3b1bf77356360b5086d671e2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 16 May 2022 11:54:37 +0200 Subject: Fix ReadIndex spec and add JSON5 remark to doc --- doc/drafts/k2v-spec.md | 51 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/doc/drafts/k2v-spec.md b/doc/drafts/k2v-spec.md index 08809069..175bb02e 100644 --- a/doc/drafts/k2v-spec.md +++ b/doc/drafts/k2v-spec.md @@ -195,6 +195,10 @@ TO UNDERSTAND IN ORDER TO USE IT CORRECTLY.** ## API Endpoints +**Remark.** Example queries and responses here are given in JSON5 format +for clarity. However the actual K2V API uses basic JSON so all examples +and responses need to be translated. + ### Operations on single items **ReadItem: `GET //?sort_key=`** @@ -370,8 +374,11 @@ HTTP/1.1 204 NO CONTENT **ReadIndex: `GET /?start=&end=&limit=`** Lists all partition keys in the bucket for which some triplets exist, and gives -for each the number of triplets (or an approximation thereof, this value is - asynchronously updated, and thus eventually consistent). +for each the number of triplets, total number of values (which might be bigger +than the number of triplets in case of conflicts), total number of bytes of +these values, and number of triplets that are in a state of conflict. +The values returned are an approximation of the true counts in the bucket, +as these values are asynchronously updated, and thus eventually consistent. Query parameters: @@ -426,11 +433,41 @@ HTTP/1.1 200 OK limit: null, reverse: false, partitionKeys: [ - { pk: "keys", n: 3043 }, - { pk: "mailbox:INBOX", n: 42 }, - { pk: "mailbox:Junk", n: 2991 }, - { pk: "mailbox:Trash", n: 10 }, - { pk: "mailboxes", n: 3 }, + { + pk: "keys", + entries: 3043, + conflicts: 0, + values: 3043, + bytes: 121720, + }, + { + pk: "mailbox:INBOX", + entries: 42, + conflicts: 1, + values: 43, + bytes: 142029, + }, + { + pk: "mailbox:Junk", + entries: 2991 + conflicts: 0, + values: 2991, + bytes: 12019322, + }, + { + pk: "mailbox:Trash", + entries: 10, + conflicts: 0, + values: 10, + bytes: 32401, + }, + { + pk: "mailboxes", + entries: 3, + conflicts: 0, + values: 3, + bytes: 3019, + }, ], more: false, nextStart: null, -- cgit v1.2.3 From 7b474855e3a8491fcdde69d12d3fbae27f520383 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 5 May 2022 10:56:44 +0200 Subject: Make background runner terminate correctly --- src/util/background.rs | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/util/background.rs b/src/util/background.rs index bfdaaf1e..d35425f5 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -6,7 +6,9 @@ use std::time::Duration; use futures::future::*; use futures::select; -use tokio::sync::{mpsc, watch, Mutex}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex}; use crate::error::Error; @@ -30,26 +32,31 @@ impl BackgroundRunner { let stop_signal_2 = stop_signal.clone(); let await_all_done = tokio::spawn(async move { + let mut workers = FuturesUnordered::new(); + let mut shutdown_timer = 0; loop { - let wkr = { - select! { - item = worker_out.recv().fuse() => { - match item { - Some(x) => x, - None => break, - } + let closed = match worker_out.try_recv() { + Ok(wkr) => { + workers.push(wkr); + false + } + Err(TryRecvError::Empty) => false, + Err(TryRecvError::Disconnected) => true, + }; + select! { + res = workers.next() => { + if let Some(Err(e)) = res { + error!("Worker exited with error: {}", e); } - _ = tokio::time::sleep(Duration::from_secs(5)).fuse() => { - if *stop_signal_2.borrow() { + } + _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { + if closed || *stop_signal_2.borrow() { + shutdown_timer += 1; + if shutdown_timer >= 10 { break; - } else { - continue; } } } - }; - if let Err(e) = wkr.await { - error!("Error while awaiting for worker: {}", e); } } }); -- cgit v1.2.3 From c692f55d5ce2c3ed08db7fbc4844debcc0aeb134 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 17 May 2022 11:50:23 +0200 Subject: K2V: Fix `end` parameter and add tests (fix #305) --- src/api/k2v/range.rs | 6 ++- src/garage/tests/k2v/batch.rs | 89 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 93 insertions(+), 2 deletions(-) diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs index cd019723..295c34aa 100644 --- a/src/api/k2v/range.rs +++ b/src/api/k2v/range.rs @@ -74,7 +74,11 @@ where } } if let Some(e) = end { - if entry.sort_key() == e { + let is_finished = match enumeration_order { + EnumerationOrder::Forward => entry.sort_key() >= e, + EnumerationOrder::Reverse => entry.sort_key() <= e, + }; + if is_finished { return Ok((entries, false, None)); } } diff --git a/src/garage/tests/k2v/batch.rs b/src/garage/tests/k2v/batch.rs index 1182a298..acae1910 100644 --- a/src/garage/tests/k2v/batch.rs +++ b/src/garage/tests/k2v/batch.rs @@ -92,7 +92,9 @@ async fn test_batch() { br#"[ {"partitionKey": "root"}, {"partitionKey": "root", "start": "c"}, + {"partitionKey": "root", "start": "c", "end": "dynamite"}, {"partitionKey": "root", "start": "c", "reverse": true, "end": "a"}, + {"partitionKey": "root", "start": "c", "reverse": true, "end": "azerty"}, {"partitionKey": "root", "limit": 1}, {"partitionKey": "root", "prefix": "d"} ]"# @@ -147,6 +149,24 @@ async fn test_batch() { "more": false, "nextStart": null, }, + { + "partitionKey": "root", + "prefix": null, + "start": "c", + "end": "dynamite", + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [base64::encode(values.get("d.1").unwrap())]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [base64::encode(values.get("d.2").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, { "partitionKey": "root", "prefix": null, @@ -164,6 +184,23 @@ async fn test_batch() { "more": false, "nextStart": null, }, + { + "partitionKey": "root", + "prefix": null, + "start": "c", + "end": "azerty", + "limit": null, + "reverse": true, + "conflictsOnly": false, + "tombstones": false, + "singleItem": false, + "items": [ + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap())]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [base64::encode(values.get("b").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, { "partitionKey": "root", "prefix": null, @@ -465,6 +502,34 @@ async fn test_batch() { ]) ); + // update our known tombstones + for sk in ["a", "b", "d.1", "d.2"] { + let res = ctx + .k2v + .request + .builder(bucket.clone()) + .path("root") + .query_param("sort_key", Some(sk)) + .signed_header("accept", "application/octet-stream") + .send() + .await + .unwrap(); + assert_eq!(res.status(), 204); + assert_eq!( + res.headers().get("content-type").unwrap().to_str().unwrap(), + "application/octet-stream" + ); + ct.insert( + sk, + res.headers() + .get("x-garage-causality-token") + .unwrap() + .to_str() + .unwrap() + .to_string(), + ); + } + let res = ctx .k2v .request @@ -473,7 +538,8 @@ async fn test_batch() { .body( br#"[ {"partitionKey": "root"}, - {"partitionKey": "root", "reverse": true} + {"partitionKey": "root", "reverse": true}, + {"partitionKey": "root", "tombstones": true} ]"# .to_vec(), ) @@ -520,6 +586,27 @@ async fn test_batch() { "more": false, "nextStart": null, }, + { + "partitionKey": "root", + "prefix": null, + "start": null, + "end": null, + "limit": null, + "reverse": false, + "conflictsOnly": false, + "tombstones": true, + "singleItem": false, + "items": [ + {"sk": "a", "ct": ct.get("a").unwrap(), "v": [null]}, + {"sk": "b", "ct": ct.get("b").unwrap(), "v": [null]}, + {"sk": "c", "ct": ct.get("c").unwrap(), "v": [base64::encode(values.get("c").unwrap()), base64::encode(values.get("c'").unwrap())]}, + {"sk": "d.1", "ct": ct.get("d.1").unwrap(), "v": [null]}, + {"sk": "d.2", "ct": ct.get("d.2").unwrap(), "v": [null]}, + {"sk": "e", "ct": ct.get("e").unwrap(), "v": [base64::encode(values.get("e").unwrap())]}, + ], + "more": false, + "nextStart": null, + }, ]) ); } -- cgit v1.2.3