aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs36
1 files changed, 36 insertions, 0 deletions
diff --git a/src/table.rs b/src/table.rs
index 2ae70398..9b67ad94 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -388,6 +388,10 @@ impl<F: TableSchema + 'static> Table<F> {
let value = self.handle_read_entry(&key, &sort_key)?;
Ok(TableRPC::ReadEntryResponse(value))
}
+ TableRPC::ReadRange(key, begin_sort_key, filter, limit) => {
+ let values = self.handle_read_range(&key, &begin_sort_key, &filter, limit)?;
+ Ok(TableRPC::Update(values))
+ }
TableRPC::Update(pairs) => {
self.handle_update(pairs).await?;
Ok(TableRPC::Ok)
@@ -412,6 +416,38 @@ impl<F: TableSchema + 'static> Table<F> {
}
}
+ fn handle_read_range(
+ &self,
+ p: &F::P,
+ s: &F::S,
+ filter: &Option<F::Filter>,
+ limit: usize,
+ ) -> Result<Vec<Arc<ByteBuf>>, Error> {
+ let partition_hash = p.hash();
+ let first_key = self.tree_key(p, s);
+ let mut ret = vec![];
+ for item in self.store.range(first_key..) {
+ let (key, value) = item?;
+ if &key[..32] != partition_hash.as_slice() {
+ break;
+ }
+ let keep = match filter {
+ None => true,
+ Some(f) => {
+ let entry = rmp_serde::decode::from_read_ref::<_, F::E>(value.as_ref())?;
+ F::matches_filter(&entry, f)
+ }
+ };
+ if keep {
+ ret.push(Arc::new(ByteBuf::from(value.as_ref())));
+ }
+ if ret.len() >= limit {
+ break;
+ }
+ }
+ Ok(ret)
+ }
+
pub async fn handle_update(
self: &Arc<Self>,
mut entries: Vec<Arc<ByteBuf>>,