aboutsummaryrefslogtreecommitdiff
path: root/src/api/k2v/range.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
committerAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
commit5768bf362262f78376af14517c4921941986192e (patch)
treeb4baf3051eade0f63649443278bb3a3f4c38ec25 /src/api/k2v/range.rs
parentdef78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff)
downloadgarage-5768bf362262f78376af14517c4921941986192e.tar.gz
garage-5768bf362262f78376af14517c4921941986192e.zip
First implementation of K2V (#293)
**Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat <alex@adnab.me> Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex <alex@adnab.me> Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/api/k2v/range.rs')
-rw-r--r--src/api/k2v/range.rs96
1 files changed, 96 insertions, 0 deletions
diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs
new file mode 100644
index 00000000..cd019723
--- /dev/null
+++ b/src/api/k2v/range.rs
@@ -0,0 +1,96 @@
+//! Utility module for retrieving ranges of items in Garage tables
+//! Implements parameters (prefix, start, end, limit) as specified
+//! for endpoints ReadIndex, ReadBatch and DeleteBatch
+
+use std::sync::Arc;
+
+use garage_table::replication::TableShardedReplication;
+use garage_table::*;
+
+use crate::error::*;
+use crate::helpers::key_after_prefix;
+
+/// Read range in a Garage table.
+/// Returns (entries, more?, nextStart)
+#[allow(clippy::too_many_arguments)]
+pub(crate) async fn read_range<F>(
+ table: &Arc<Table<F, TableShardedReplication>>,
+ partition_key: &F::P,
+ prefix: &Option<String>,
+ start: &Option<String>,
+ end: &Option<String>,
+ limit: Option<u64>,
+ filter: Option<F::Filter>,
+ enumeration_order: EnumerationOrder,
+) -> Result<(Vec<F::E>, bool, Option<String>), Error>
+where
+ F: TableSchema<S = String> + 'static,
+{
+ let (mut start, mut start_ignore) = match (prefix, start) {
+ (None, None) => (None, false),
+ (None, Some(s)) => (Some(s.clone()), false),
+ (Some(p), Some(s)) => {
+ if !s.starts_with(p) {
+ return Err(Error::BadRequest(format!(
+ "Start key '{}' does not start with prefix '{}'",
+ s, p
+ )));
+ }
+ (Some(s.clone()), false)
+ }
+ (Some(p), None) if enumeration_order == EnumerationOrder::Reverse => {
+ let start = key_after_prefix(p)
+ .ok_or_internal_error("Sorry, can't list this prefix in reverse order")?;
+ (Some(start), true)
+ }
+ (Some(p), None) => (Some(p.clone()), false),
+ };
+
+ let mut entries = vec![];
+ loop {
+ let n_get = std::cmp::min(
+ 1000,
+ limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2,
+ );
+ let get_ret = table
+ .get_range(
+ partition_key,
+ start.clone(),
+ filter.clone(),
+ n_get,
+ enumeration_order,
+ )
+ .await?;
+
+ let get_ret_len = get_ret.len();
+
+ for entry in get_ret {
+ if start_ignore && Some(entry.sort_key()) == start.as_ref() {
+ continue;
+ }
+ if let Some(p) = prefix {
+ if !entry.sort_key().starts_with(p) {
+ return Ok((entries, false, None));
+ }
+ }
+ if let Some(e) = end {
+ if entry.sort_key() == e {
+ return Ok((entries, false, None));
+ }
+ }
+ if let Some(l) = limit {
+ if entries.len() >= l as usize {
+ return Ok((entries, true, Some(entry.sort_key().clone())));
+ }
+ }
+ entries.push(entry);
+ }
+
+ if get_ret_len < n_get {
+ return Ok((entries, false, None));
+ }
+
+ start = Some(entries.last().unwrap().sort_key().clone());
+ start_ignore = true;
+ }
+}