diff options
author | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
commit | 5768bf362262f78376af14517c4921941986192e (patch) | |
tree | b4baf3051eade0f63649443278bb3a3f4c38ec25 /src/api/k2v/range.rs | |
parent | def78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff) | |
download | garage-5768bf362262f78376af14517c4921941986192e.tar.gz garage-5768bf362262f78376af14517c4921941986192e.zip |
First implementation of K2V (#293)
**Specification:**
View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md)
- [x] Specify the structure of K2V triples
- [x] Specify the DVVS format used for causality detection
- [x] Specify the K2V index (just a counter of number of values per partition key)
- [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem
- [x] Specify index endpoint: ReadIndex
- [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch
- [x] Move to JSON objects instead of tuples
- [x] Specify endpoints for polling for updates on single values (PollItem)
**Implementation:**
- [x] Table for K2V items, causal contexts
- [x] Indexing mechanism and table for K2V index
- [x] Make API handlers a bit more generic
- [x] K2V API endpoint
- [x] K2V API router
- [x] ReadItem
- [x] InsertItem
- [x] DeleteItem
- [x] PollItem
- [x] ReadIndex
- [x] InsertBatch
- [x] ReadBatch
- [x] DeleteBatch
**Testing:**
- [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values)
- [x] Actual tests:
- [x] Adapt testing framework
- [x] Simple test with InsertItem + ReadItem
- [x] Test with several Insert/Read/DeleteItem + ReadIndex
- [x] Test all combinations of return formats for ReadItem
- [x] Test with ReadBatch, InsertBatch, DeleteBatch
- [x] Test with PollItem
- [x] Test error codes
- [ ] Fix most broken stuff
- [x] test PollItem broken randomly
- [x] when invalid causality tokens are given, errors should be 4xx not 5xx
**Improvements:**
- [x] Descending range queries
- [x] Specify
- [x] Implement
- [x] Add test
- [x] Batch updates to index counter
- [x] Put K2V behind `k2v` feature flag
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/api/k2v/range.rs')
-rw-r--r-- | src/api/k2v/range.rs | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/src/api/k2v/range.rs b/src/api/k2v/range.rs new file mode 100644 index 00000000..cd019723 --- /dev/null +++ b/src/api/k2v/range.rs @@ -0,0 +1,96 @@ +//! Utility module for retrieving ranges of items in Garage tables +//! Implements parameters (prefix, start, end, limit) as specified +//! for endpoints ReadIndex, ReadBatch and DeleteBatch + +use std::sync::Arc; + +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::error::*; +use crate::helpers::key_after_prefix; + +/// Read range in a Garage table. +/// Returns (entries, more?, nextStart) +#[allow(clippy::too_many_arguments)] +pub(crate) async fn read_range<F>( + table: &Arc<Table<F, TableShardedReplication>>, + partition_key: &F::P, + prefix: &Option<String>, + start: &Option<String>, + end: &Option<String>, + limit: Option<u64>, + filter: Option<F::Filter>, + enumeration_order: EnumerationOrder, +) -> Result<(Vec<F::E>, bool, Option<String>), Error> +where + F: TableSchema<S = String> + 'static, +{ + let (mut start, mut start_ignore) = match (prefix, start) { + (None, None) => (None, false), + (None, Some(s)) => (Some(s.clone()), false), + (Some(p), Some(s)) => { + if !s.starts_with(p) { + return Err(Error::BadRequest(format!( + "Start key '{}' does not start with prefix '{}'", + s, p + ))); + } + (Some(s.clone()), false) + } + (Some(p), None) if enumeration_order == EnumerationOrder::Reverse => { + let start = key_after_prefix(p) + .ok_or_internal_error("Sorry, can't list this prefix in reverse order")?; + (Some(start), true) + } + (Some(p), None) => (Some(p.clone()), false), + }; + + let mut entries = vec![]; + loop { + let n_get = std::cmp::min( + 1000, + limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2, + ); + let get_ret = table + .get_range( + partition_key, + start.clone(), + filter.clone(), + n_get, + enumeration_order, + ) + .await?; + + let get_ret_len = get_ret.len(); + + for entry in get_ret { + if start_ignore && Some(entry.sort_key()) == start.as_ref() { + continue; + } + if let Some(p) = prefix { + if !entry.sort_key().starts_with(p) { + return Ok((entries, false, None)); + } + } + if let Some(e) = end { + if entry.sort_key() == e { + return Ok((entries, false, None)); + } + } + if let Some(l) = limit { + if entries.len() >= l as usize { + return Ok((entries, true, Some(entry.sort_key().clone()))); + } + } + entries.push(entry); + } + + if get_ret_len < n_get { + return Ok((entries, false, None)); + } + + start = Some(entries.last().unwrap().sort_key().clone()); + start_ignore = true; + } +} |