aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/admin_rpc.rs146
-rw-r--r--src/api_server.rs4
-rw-r--r--src/block.rs4
-rw-r--r--src/bucket_table.rs79
-rw-r--r--src/error.rs4
-rw-r--r--src/main.rs139
-rw-r--r--src/membership.rs10
-rw-r--r--src/rpc_client.rs16
-rw-r--r--src/rpc_server.rs3
-rw-r--r--src/server.rs44
-rw-r--r--src/table.rs29
-rw-r--r--src/table_sync.rs2
-rw-r--r--src/version_table.rs8
13 files changed, 433 insertions, 55 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs
new file mode 100644
index 00000000..8a274c26
--- /dev/null
+++ b/src/admin_rpc.rs
@@ -0,0 +1,146 @@
+use std::sync::Arc;
+
+use serde::{Deserialize, Serialize};
+
+use crate::data::*;
+use crate::error::Error;
+use crate::rpc_server::*;
+use crate::server::Garage;
+use crate::table::*;
+use crate::*;
+
+use crate::bucket_table::*;
+
+pub const ADMIN_RPC_PATH: &str = "_admin";
+
+#[derive(Debug, Serialize, Deserialize)]
+pub enum AdminRPC {
+ BucketOperation(BucketOperation),
+
+ // Replies
+ Ok,
+ BucketList(Vec<String>),
+ BucketInfo(Bucket),
+}
+
+impl RpcMessage for AdminRPC {}
+
+pub struct AdminRpcHandler {
+ garage: Arc<Garage>,
+}
+
+impl AdminRpcHandler {
+ pub fn new(garage: Arc<Garage>) -> Arc<Self> {
+ Arc::new(Self { garage })
+ }
+
+ pub fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer) {
+ rpc_server.add_handler::<AdminRPC, _, _>(ADMIN_RPC_PATH.to_string(), move |msg, _addr| {
+ let self2 = self.clone();
+ async move {
+ match msg {
+ AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
+ _ => Err(Error::Message(format!("Invalid RPC"))),
+ }
+ }
+ });
+ }
+
+ async fn handle_bucket_cmd(&self, cmd: BucketOperation) -> Result<AdminRPC, Error> {
+ match cmd {
+ BucketOperation::List => {
+ let bucket_names = self
+ .garage
+ .bucket_table
+ .get_range(&EmptyKey, None, Some(()), 10000)
+ .await?
+ .iter()
+ .map(|b| b.name.to_string())
+ .collect::<Vec<_>>();
+ Ok(AdminRPC::BucketList(bucket_names))
+ }
+ BucketOperation::Info(query) => {
+ let bucket = self
+ .garage
+ .bucket_table
+ .get(&EmptyKey, &query.name)
+ .await?
+ .filter(|b| !b.deleted);
+ match bucket {
+ Some(b) => Ok(AdminRPC::BucketInfo(b)),
+ None => Err(Error::Message(format!("Bucket {} not found", query.name))),
+ }
+ }
+ BucketOperation::Create(query) => {
+ let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?;
+ if bucket.as_ref().filter(|b| !b.deleted).is_some() {
+ return Err(Error::Message(format!(
+ "Bucket {} already exists",
+ query.name
+ )));
+ }
+ let new_time = match bucket {
+ Some(b) => std::cmp::max(b.timestamp + 1, now_msec()),
+ None => now_msec(),
+ };
+ self.garage
+ .bucket_table
+ .insert(&Bucket {
+ name: query.name,
+ timestamp: new_time,
+ deleted: false,
+ authorized_keys: vec![],
+ })
+ .await?;
+ Ok(AdminRPC::Ok)
+ }
+ BucketOperation::Delete(query) => {
+ let bucket = match self
+ .garage
+ .bucket_table
+ .get(&EmptyKey, &query.name)
+ .await?
+ .filter(|b| !b.deleted)
+ {
+ None => {
+ return Err(Error::Message(format!(
+ "Bucket {} does not exist",
+ query.name
+ )));
+ }
+ Some(b) => b,
+ };
+ let objects = self
+ .garage
+ .object_table
+ .get_range(&query.name, None, Some(()), 10)
+ .await?;
+ if !objects.is_empty() {
+ return Err(Error::Message(format!(
+ "Bucket {} is not empty",
+ query.name
+ )));
+ }
+ if !query.yes {
+ return Err(Error::Message(format!(
+ "Add --yes flag to really perform this operation"
+ )));
+ }
+ self.garage
+ .bucket_table
+ .insert(&Bucket {
+ name: query.name,
+ timestamp: std::cmp::max(bucket.timestamp + 1, now_msec()),
+ deleted: true,
+ authorized_keys: vec![],
+ })
+ .await?;
+ Ok(AdminRPC::Ok)
+ }
+ _ => {
+ // TODO
+ Err(Error::Message(format!("Not implemented")))
+ }
+ }
+ }
+}
diff --git a/src/api_server.rs b/src/api_server.rs
index 4070be6c..52464f07 100644
--- a/src/api_server.rs
+++ b/src/api_server.rs
@@ -12,7 +12,7 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode};
use crate::data::*;
use crate::error::Error;
use crate::http_util::*;
-use crate::table::EmptySortKey;
+use crate::table::EmptyKey;
use crate::block::INLINE_THRESHOLD;
use crate::block_ref_table::*;
@@ -307,7 +307,7 @@ async fn handle_get(
}
ObjectVersionData::FirstBlock(first_block_hash) => {
let read_first_block = garage.block_manager.rpc_get_block(&first_block_hash);
- let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptySortKey);
+ let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
let version = match version {
diff --git a/src/block.rs b/src/block.rs
index d7daea01..c84f193b 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -98,7 +98,7 @@ impl BlockManager {
Message::NeedBlockQuery(h) => {
self2.need_block(&h).await.map(Message::NeedBlockReply)
}
- _ => Err(Error::Message(format!("Invalid RPC"))),
+ _ => Err(Error::BadRequest(format!("Unexpected RPC message"))),
}
}
});
@@ -262,7 +262,7 @@ impl BlockManager {
let garage = self.garage.load_full().unwrap();
let active_refs = garage
.block_ref_table
- .get_range(&hash, &[0u8; 32].into(), Some(()), 1)
+ .get_range(&hash, None, Some(()), 1)
.await?;
let needed_by_others = !active_refs.is_empty();
if needed_by_others {
diff --git a/src/bucket_table.rs b/src/bucket_table.rs
new file mode 100644
index 00000000..be7dd348
--- /dev/null
+++ b/src/bucket_table.rs
@@ -0,0 +1,79 @@
+use async_trait::async_trait;
+use serde::{Deserialize, Serialize};
+
+use crate::table::*;
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct Bucket {
+ // Primary key
+ pub name: String,
+
+ // Timestamp and deletion
+ // Upon version increment, all info is replaced
+ pub timestamp: u64,
+ pub deleted: bool,
+
+ // Authorized keys
+ pub authorized_keys: Vec<AllowedKey>,
+}
+
+#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+pub struct AllowedKey {
+ pub access_key_id: String,
+ pub timestamp: u64,
+ pub allowed_read: bool,
+ pub allowed_write: bool,
+}
+
+impl Entry<EmptyKey, String> for Bucket {
+ fn partition_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+ fn sort_key(&self) -> &String {
+ &self.name
+ }
+
+ fn merge(&mut self, other: &Self) {
+ if other.timestamp < self.timestamp {
+ *self = other.clone();
+ return;
+ }
+ if self.timestamp > other.timestamp {
+ return;
+ }
+ for ak in other.authorized_keys.iter() {
+ match self
+ .authorized_keys
+ .binary_search_by(|our_ak| our_ak.access_key_id.cmp(&ak.access_key_id))
+ {
+ Ok(i) => {
+ let our_ak = &mut self.authorized_keys[i];
+ if ak.timestamp > our_ak.timestamp {
+ our_ak.timestamp = ak.timestamp;
+ our_ak.allowed_read = ak.allowed_read;
+ our_ak.allowed_write = ak.allowed_write;
+ }
+ }
+ Err(i) => {
+ self.authorized_keys.insert(i, ak.clone());
+ }
+ }
+ }
+ }
+}
+
+pub struct BucketTable;
+
+#[async_trait]
+impl TableSchema for BucketTable {
+ type P = EmptyKey;
+ type S = String;
+ type E = Bucket;
+ type Filter = ();
+
+ async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
+
+ fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool {
+ !entry.deleted
+ }
+}
diff --git a/src/error.rs b/src/error.rs
index 7c648fab..678ab72d 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -43,8 +43,8 @@ pub enum Error {
#[error(display = "Tokio join error: {}", _0)]
TokioJoin(#[error(source)] tokio::task::JoinError),
- #[error(display = "RPC error: {}", _0)]
- RPCError(String),
+ #[error(display = "RPC error: {} (status code {})", _0, _1)]
+ RPCError(String, StatusCode),
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
diff --git a/src/main.rs b/src/main.rs
index 89953223..08f37dd5 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -10,9 +10,11 @@ mod table_sync;
mod block;
mod block_ref_table;
+mod bucket_table;
mod object_table;
mod version_table;
+mod admin_rpc;
mod api_server;
mod http_util;
mod rpc_client;
@@ -20,6 +22,7 @@ mod rpc_server;
mod server;
mod tls_util;
+use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -32,6 +35,8 @@ use membership::*;
use rpc_client::*;
use server::TlsConfig;
+use admin_rpc::*;
+
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(StructOpt, Debug)]
@@ -62,13 +67,13 @@ pub enum Command {
#[structopt(name = "status")]
Status,
- /// Configure Garage node
- #[structopt(name = "configure")]
- Configure(ConfigureOpt),
+ /// Garage node operations
+ #[structopt(name = "node")]
+ Node(NodeOperation),
- /// Remove Garage node from cluster
- #[structopt(name = "remove")]
- Remove(RemoveOpt),
+ /// Bucket operations
+ #[structopt(name = "bucket")]
+ Bucket(BucketOperation),
}
#[derive(StructOpt, Debug)]
@@ -79,7 +84,18 @@ pub struct ServerOpt {
}
#[derive(StructOpt, Debug)]
-pub struct ConfigureOpt {
+pub enum NodeOperation {
+ /// Configure Garage node
+ #[structopt(name = "configure")]
+ Configure(ConfigureNodeOpt),
+
+ /// Remove Garage node from cluster
+ #[structopt(name = "remove")]
+ Remove(RemoveNodeOpt),
+}
+
+#[derive(StructOpt, Debug)]
+pub struct ConfigureNodeOpt {
/// Node to configure (prefix of hexadecimal node id)
node_id: String,
@@ -91,7 +107,7 @@ pub struct ConfigureOpt {
}
#[derive(StructOpt, Debug)]
-pub struct RemoveOpt {
+pub struct RemoveNodeOpt {
/// Node to configure (prefix of hexadecimal node id)
node_id: String,
@@ -100,6 +116,67 @@ pub struct RemoveOpt {
yes: bool,
}
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub enum BucketOperation {
+ /// List buckets
+ #[structopt(name = "list")]
+ List,
+
+ /// Get bucket info
+ #[structopt(name = "info")]
+ Info(BucketOpt),
+
+ /// Create bucket
+ #[structopt(name = "create")]
+ Create(BucketOpt),
+
+ /// Delete bucket
+ #[structopt(name = "delete")]
+ Delete(DeleteBucketOpt),
+
+ /// Allow key to read or write to bucket
+ #[structopt(name = "allow")]
+ Allow(PermBucketOpt),
+
+ /// Allow key to read or write to bucket
+ #[structopt(name = "deny")]
+ Deny(PermBucketOpt),
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct BucketOpt {
+ /// Bucket name
+ pub name: String,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct DeleteBucketOpt {
+ /// Bucket name
+ pub name: String,
+
+ /// If this flag is not given, the bucket won't be deleted
+ #[structopt(long = "yes")]
+ pub yes: bool,
+}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug)]
+pub struct PermBucketOpt {
+ /// Access key ID
+ #[structopt(long = "key")]
+ pub key: String,
+
+ /// Allow/deny read operations
+ #[structopt(long = "read")]
+ pub read: bool,
+
+ /// Allow/deny write operations
+ #[structopt(long = "write")]
+ pub write: bool,
+
+ /// Bucket name
+ pub bucket: String,
+}
+
#[tokio::main]
async fn main() {
let opt = Opt::from_args();
@@ -119,7 +196,9 @@ async fn main() {
let rpc_http_cli =
Arc::new(RpcHttpClient::new(&tls_config).expect("Could not create RPC client"));
- let rpc_cli = RpcAddrClient::new(rpc_http_cli, "_membership".into());
+ let membership_rpc_cli =
+ RpcAddrClient::new(rpc_http_cli.clone(), MEMBERSHIP_RPC_PATH.to_string());
+ let admin_rpc_cli = RpcAddrClient::new(rpc_http_cli.clone(), ADMIN_RPC_PATH.to_string());
let resp = match opt.cmd {
Command::Server(server_opt) => {
@@ -131,11 +210,16 @@ async fn main() {
server::run_server(server_opt.config_file).await
}
- Command::Status => cmd_status(rpc_cli, opt.rpc_host).await,
- Command::Configure(configure_opt) => {
- cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await
+ Command::Status => cmd_status(membership_rpc_cli, opt.rpc_host).await,
+ Command::Node(NodeOperation::Configure(configure_opt)) => {
+ cmd_configure(membership_rpc_cli, opt.rpc_host, configure_opt).await
+ }
+ Command::Node(NodeOperation::Remove(remove_opt)) => {
+ cmd_remove(membership_rpc_cli, opt.rpc_host, remove_opt).await
+ }
+ Command::Bucket(bo) => {
+ cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await
}
- Command::Remove(remove_opt) => cmd_remove(rpc_cli, opt.rpc_host, remove_opt).await,
};
if let Err(e) = resp {
@@ -201,7 +285,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re
async fn cmd_configure(
rpc_cli: RpcAddrClient<Message>,
rpc_host: SocketAddr,
- args: ConfigureOpt,
+ args: ConfigureNodeOpt,
) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT)
@@ -254,7 +338,7 @@ async fn cmd_configure(
async fn cmd_remove(
rpc_cli: RpcAddrClient<Message>,
rpc_host: SocketAddr,
- args: RemoveOpt,
+ args: RemoveNodeOpt,
) -> Result<(), Error> {
let mut config = match rpc_cli
.call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT)
@@ -296,3 +380,28 @@ async fn cmd_remove(
.await?;
Ok(())
}
+
+async fn cmd_admin(
+ rpc_cli: RpcAddrClient<AdminRPC>,
+ rpc_host: SocketAddr,
+ args: AdminRPC,
+) -> Result<(), Error> {
+ match rpc_cli.call(&rpc_host, args, DEFAULT_TIMEOUT).await? {
+ AdminRPC::Ok => {
+ println!("Ok.");
+ }
+ AdminRPC::BucketList(bl) => {
+ println!("List of buckets:");
+ for bucket in bl {
+ println!("{}", bucket);
+ }
+ }
+ AdminRPC::BucketInfo(bucket) => {
+ println!("{:?}", bucket);
+ }
+ r => {
+ eprintln!("Unexpected response: {:?}", r);
+ }
+ }
+ Ok(())
+}
diff --git a/src/membership.rs b/src/membership.rs
index 08dd5f2f..99b0388d 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -27,6 +27,8 @@ const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILED_PINGS: usize = 3;
+pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
+
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
Ok,
@@ -277,9 +279,9 @@ impl System {
let rpc_http_client =
Arc::new(RpcHttpClient::new(&config.rpc_tls).expect("Could not create RPC client"));
- let rpc_path = "_membership";
+ let rpc_path = MEMBERSHIP_RPC_PATH.to_string();
let rpc_client = RpcClient::new(
- RpcAddrClient::<Message>::new(rpc_http_client.clone(), rpc_path.into()),
+ RpcAddrClient::<Message>::new(rpc_http_client.clone(), rpc_path.clone()),
background.clone(),
status.clone(),
);
@@ -294,7 +296,7 @@ impl System {
update_lock: Mutex::new((update_status, update_ring)),
background,
});
- sys.clone().register_handler(rpc_server, rpc_path.into());
+ sys.clone().register_handler(rpc_server, rpc_path);
sys
}
@@ -310,7 +312,7 @@ impl System {
Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await,
Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await,
- _ => Err(Error::Message(format!("Unexpected RPC message"))),
+ _ => Err(Error::BadRequest(format!("Unexpected RPC message"))),
}
}
});
diff --git a/src/rpc_client.rs b/src/rpc_client.rs
index 95288269..35debb53 100644
--- a/src/rpc_client.rs
+++ b/src/rpc_client.rs
@@ -9,7 +9,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use hyper::client::{Client, HttpConnector};
-use hyper::{Body, Method, Request, StatusCode};
+use hyper::{Body, Method, Request};
use tokio::sync::watch;
use crate::background::BackgroundRunner;
@@ -228,12 +228,14 @@ impl RpcHttpClient {
e
})?;
- if resp.status() == StatusCode::OK {
- let body = hyper::body::to_bytes(resp.into_body()).await?;
- let msg = rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf())?;
- msg.map_err(Error::RPCError)
- } else {
- Err(Error::RPCError(format!("Status code {}", resp.status())))
+ let status = resp.status();
+ let body = hyper::body::to_bytes(resp.into_body()).await?;
+ match rmp_serde::decode::from_read::<_, Result<M, String>>(body.into_buf()) {
+ Err(e) =>
+ Err(Error::RPCError(format!("Invalid reply"), status)),
+ Ok(Err(e)) =>
+ Err(Error::RPCError(e, status)),
+ Ok(Ok(x)) => Ok(x),
}
}
}
diff --git a/src/rpc_server.rs b/src/rpc_server.rs
index 83f8ddc9..7a6a57ee 100644
--- a/src/rpc_server.rs
+++ b/src/rpc_server.rs
@@ -106,7 +106,8 @@ impl RpcServer {
let resp_waiter = tokio::spawn(handler(req, addr));
match resp_waiter.await {
- Err(_err) => {
+ Err(err) => {
+ eprintln!("Handler await error: {}", err);
let mut ise = Response::default();
*ise.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(ise)
diff --git a/src/server.rs b/src/server.rs
index 6b4b5b6b..979d76f9 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -13,13 +13,16 @@ use crate::error::Error;
use crate::membership::System;
use crate::rpc_server::RpcServer;
use crate::table::*;
+use crate::table_fullcopy::*;
use crate::table_sharded::*;
use crate::block::*;
use crate::block_ref_table::*;
+use crate::bucket_table::*;
use crate::object_table::*;
use crate::version_table::*;
+use crate::admin_rpc::*;
use crate::api_server;
#[derive(Deserialize, Debug, Clone)]
@@ -38,12 +41,25 @@ pub struct Config {
#[serde(default = "default_replication_factor")]
pub meta_replication_factor: usize,
+ #[serde(default = "default_epidemic_factor")]
+ pub meta_epidemic_factor: usize,
+
#[serde(default = "default_replication_factor")]
pub data_replication_factor: usize,
pub rpc_tls: Option<TlsConfig>,
}
+fn default_block_size() -> usize {
+ 1048576
+}
+fn default_replication_factor() -> usize {
+ 3
+}
+fn default_epidemic_factor() -> usize {
+ 3
+}
+
#[derive(Deserialize, Debug, Clone)]
pub struct TlsConfig {
pub ca_cert: String,
@@ -57,6 +73,7 @@ pub struct Garage {
pub system: Arc<System>,
pub block_manager: Arc<BlockManager>,
+ pub bucket_table: Arc<Table<BucketTable, TableFullReplication>>,
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
@@ -89,6 +106,11 @@ impl Garage {
read_quorum: (system.config.meta_replication_factor + 1) / 2,
};
+ let control_rep_param = TableFullReplication::new(
+ system.config.meta_epidemic_factor,
+ (system.config.meta_epidemic_factor + 1) / 2,
+ );
+
println!("Initialize block_ref_table...");
let block_ref_table = Table::new(
BlockRefTable {
@@ -131,17 +153,32 @@ impl Garage {
)
.await;
+ println!("Initialize bucket_table...");
+ let bucket_table = Table::new(
+ BucketTable,
+ control_rep_param.clone(),
+ system.clone(),
+ &db,
+ "bucket".to_string(),
+ rpc_server,
+ )
+ .await;
+
println!("Initialize Garage...");
let garage = Arc::new(Self {
db,
system: system.clone(),
block_manager,
background,
+ bucket_table,
object_table,
version_table,
block_ref_table,
});
+ println!("Crate admin RPC handler...");
+ AdminRpcHandler::new(garage.clone()).register_handler(rpc_server);
+
println!("Start block manager background thread...");
garage.block_manager.garage.swap(Some(garage.clone()));
garage.block_manager.clone().spawn_background_worker().await;
@@ -150,13 +187,6 @@ impl Garage {
}
}
-fn default_block_size() -> usize {
- 1048576
-}
-fn default_replication_factor() -> usize {
- 3
-}
-
fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut file = std::fs::OpenOptions::new()
.read(true)
diff --git a/src/table.rs b/src/table.rs
index 619c96d2..37fb2f51 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -36,7 +36,8 @@ pub enum TableRPC<F: TableSchema> {
ReadEntry(F::P, F::S),
ReadEntryResponse(Option<ByteBuf>),
- ReadRange(F::P, F::S, Option<F::Filter>, usize),
+ // Read range: read all keys in partition P, possibly starting at a certain sort key offset
+ ReadRange(F::P, Option<F::S>, Option<F::Filter>, usize),
Update(Vec<Arc<ByteBuf>>),
@@ -62,13 +63,18 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn merge(&mut self, other: &Self);
}
-#[derive(Clone, Serialize, Deserialize)]
-pub struct EmptySortKey;
-impl SortKey for EmptySortKey {
+#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct EmptyKey;
+impl SortKey for EmptyKey {
fn sort_key(&self) -> &[u8] {
&[]
}
}
+impl PartitionKey for EmptyKey {
+ fn hash(&self) -> Hash {
+ [0u8; 32].into()
+ }
+}
impl<T: AsRef<str>> PartitionKey for T {
fn hash(&self) -> Hash {
@@ -272,15 +278,15 @@ where
pub async fn get_range(
self: &Arc<Self>,
partition_key: &F::P,
- begin_sort_key: &F::S,
+ begin_sort_key: Option<F::S>,
filter: Option<F::Filter>,
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
let who = self.replication.read_nodes(&hash, &self.system);
- let rpc =
- TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit);
+ let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
+
let resps = self
.rpc_client
.try_call_many(
@@ -378,7 +384,7 @@ where
.await?;
Ok(TableRPC::SyncRPC(response))
}
- _ => Err(Error::RPCError(format!("Unexpected table RPC"))),
+ _ => Err(Error::BadRequest(format!("Unexpected table RPC"))),
}
}
@@ -394,12 +400,15 @@ where
fn handle_read_range(
&self,
p: &F::P,
- s: &F::S,
+ s: &Option<F::S>,
filter: &Option<F::Filter>,
limit: usize,
) -> Result<Vec<Arc<ByteBuf>>, Error> {
let partition_hash = p.hash();
- let first_key = self.tree_key(p, s);
+ let first_key = match s {
+ None => partition_hash.to_vec(),
+ Some(sk) => self.tree_key(p, sk),
+ };
let mut ret = vec![];
for item in self.store.range(first_key..) {
let (key, value) = item?;
diff --git a/src/table_sync.rs b/src/table_sync.rs
index 550ad0f0..0f3e90d2 100644
--- a/src/table_sync.rs
+++ b/src/table_sync.rs
@@ -438,7 +438,7 @@ where
.spawn(self.clone().send_items(who.clone(), items_to_send));
}
} else {
- return Err(Error::Message(format!(
+ return Err(Error::BadRequest(format!(
"Unexpected response to sync RPC checksums: {}",
debug_serialize(&rpc_resp)
)));
diff --git a/src/version_table.rs b/src/version_table.rs
index 24109981..dfd27812 100644
--- a/src/version_table.rs
+++ b/src/version_table.rs
@@ -30,12 +30,12 @@ pub struct VersionBlock {
pub hash: Hash,
}
-impl Entry<Hash, EmptySortKey> for Version {
+impl Entry<Hash, EmptyKey> for Version {
fn partition_key(&self) -> &Hash {
&self.uuid
}
- fn sort_key(&self) -> &EmptySortKey {
- &EmptySortKey
+ fn sort_key(&self) -> &EmptyKey {
+ &EmptyKey
}
fn merge(&mut self, other: &Self) {
@@ -63,7 +63,7 @@ pub struct VersionTable {
#[async_trait]
impl TableSchema for VersionTable {
type P = Hash;
- type S = EmptySortKey;
+ type S = EmptyKey;
type E = Version;
type Filter = ();