aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-19 17:15:48 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-19 17:15:48 +0200
commita6129d8626f5b87462b70eadbce2db08c9761cfd (patch)
tree7e0e0d348bc8f8672db57680f8aeedf9c6c03523 /src/table.rs
parent302502f4c10b4c1cd03d3b098b3e55a3f70054f2 (diff)
downloadgarage-a6129d8626f5b87462b70eadbce2db08c9761cfd.tar.gz
garage-a6129d8626f5b87462b70eadbce2db08c9761cfd.zip
Begin implement bucket management & admin commands
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs29
1 files changed, 19 insertions, 10 deletions
diff --git a/src/table.rs b/src/table.rs
index 619c96d2..37fb2f51 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -36,7 +36,8 @@ pub enum TableRPC<F: TableSchema> {
ReadEntry(F::P, F::S),
ReadEntryResponse(Option<ByteBuf>),
- ReadRange(F::P, F::S, Option<F::Filter>, usize),
+ // Read range: read all keys in partition P, possibly starting at a certain sort key offset
+ ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize),
Update(Vec<Arc<ByteBuf>>),
@@ -62,13 +63,18 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn merge(&mut self, other: &Self);
}
-#[derive(Clone, Serialize, Deserialize)]
-pub struct EmptySortKey;
-impl SortKey for EmptySortKey {
+#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct EmptyKey;
+impl SortKey for EmptyKey {
fn sort_key(&self) -> &[u8] {
&[]
}
}
+impl PartitionKey for EmptyKey {
+ fn hash(&self) -> Hash {
+ [0u8; 32].into()
+ }
+}
impl<T: AsRef<str>> PartitionKey for T {
fn hash(&self) -> Hash {
@@ -272,15 +278,15 @@ where
pub async fn get_range(
self: &Arc<Self>,
partition_key: &F::P,
- begin_sort_key: &F::S,
+ begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
let who = self.replication.read_nodes(&hash, &self.system);
- let rpc =
- TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit);
+ let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
+
let resps = self
.rpc_client
.try_call_many(
@@ -378,7 +384,7 @@ where
.await?;
Ok(TableRPC::SyncRPC(response))
}
- _ => Err(Error::RPCError(format!("Unexpected table RPC"))),
+ _ => Err(Error::BadRequest(format!("Unexpected table RPC"))),
}
}
@@ -394,12 +400,15 @@ where
fn handle_read_range(
&self,
p: &F::P,
- s: &F::S,
+ s: &Option<F::S>,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
let partition_hash = p.hash();
- let first_key = self.tree_key(p, s);
+ let first_key = match s {
+ None => partition_hash.to_vec(),
+ Some(sk) => self.tree_key(p, sk),
+ };
let mut ret = vec![];
for item in self.store.range(first_key..) {
let (key, value) = item?;