//! Utility module for retrieving ranges of items in Garage tables
//! Implements parameters (prefix, start, end, limit) as specified
//! for endpoints ReadIndex, ReadBatch and DeleteBatch
use std::sync::Arc;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::helpers::key_after_prefix;
use crate::k2v::error::*;
/// Read range in a Garage table.
/// Returns (entries, more?, nextStart)
#[allow(clippy::too_many_arguments)]
pub(crate) async fn read_range<F>(
table: &Arc<Table<F, TableShardedReplication>>,
partition_key: &F::P,
prefix: &Option<String>,
start: &Option<String>,
end: &Option<String>,
limit: Option<u64>,
filter: Option<F::Filter>,
enumeration_order: EnumerationOrder,
) -> Result<(Vec<F::E>, bool, Option<String>), Error>
where
F: TableSchema<S = String> + 'static,
{
let (mut start, mut start_ignore) = match (prefix, start) {
(None, None) => (None, false),
(None, Some(s)) => (Some(s.clone()), false),
(Some(p), Some(s)) => {
if !s.starts_with(p) {
return Err(Error::bad_request(format!(
"Start key '{}' does not start with prefix '{}'",
s, p
)));
}
(Some(s.clone()), false)
}
(Some(p), None) if enumeration_order == EnumerationOrder::Reverse => {
let start = key_after_prefix(p)
.ok_or_internal_error("Sorry, can't list this prefix in reverse order")?;
(Some(start), true)
}
(Some(p), None) => (Some(p.clone()), false),
};
let mut entries = vec![];
loop {
let n_get = std::cmp::min(
1000,
limit.map(|x| x as usize).unwrap_or(usize::MAX - 10) - entries.len() + 2,
);
let get_ret = table
.get_range(
partition_key,
start.clone(),
filter.clone(),
n_get,
enumeration_order,
)
.await?;
let get_ret_len = get_ret.len();
for entry in get_ret {
if start_ignore && Some(entry.sort_key()) == start.as_ref() {
continue;
}
if let Some(p) = prefix {
if !entry.sort_key().starts_with(p) {
return Ok((entries, false, None));
}
}
if let Some(e) = end {
let is_finished = match enumeration_order {
EnumerationOrder::Forward => entry.sort_key() >= e,
EnumerationOrder::Reverse => entry.sort_key() <= e,
};
if is_finished {
return Ok((entries, false, None));
}
}
if let Some(l) = limit {
if entries.len() >= l as usize {
return Ok((entries, true, Some(entry.sort_key().clone())));
}
}
entries.push(entry);
}
if get_ret_len < n_get {
return Ok((entries, false, None));
}
start = Some(entries.last().unwrap().sort_key().clone());
start_ignore = true;
}
}