aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v/rpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/k2v/rpc.rs')
-rw-r--r--src/model/k2v/rpc.rs13
1 files changed, 7 insertions, 6 deletions
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 04801ebf..3b4d7465 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -29,6 +29,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::{PartitionKey, Table};
+use crate::helper::error::Error as HelperError;
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
use crate::k2v::seen::*;
@@ -212,7 +213,7 @@ impl K2VRpcHandler {
sort_key: String,
causal_context: CausalContext,
timeout_msec: u64,
- ) -> Result<Option<K2VItem>, Error> {
+ ) -> Result<Option<K2VItem>, HelperError> {
let poll_key = PollKey {
partition: K2VItemPartition {
bucket_id,
@@ -255,7 +256,7 @@ impl K2VRpcHandler {
}
}
K2VRpc::PollItemResponse(None) => (),
- v => return Err(Error::unexpected_rpc_message(v)),
+ v => return Err(Error::unexpected_rpc_message(v).into()),
}
}
@@ -267,12 +268,12 @@ impl K2VRpcHandler {
range: PollRange,
seen_str: Option<String>,
timeout_msec: u64,
- ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, Error> {
+ ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> {
let has_seen_marker = seen_str.is_some();
let mut seen = seen_str
.as_deref()
- .map(RangeSeenMarker::decode)
+ .map(RangeSeenMarker::decode_helper)
.transpose()?
.unwrap_or_default();
seen.restrict(&range);
@@ -316,7 +317,7 @@ impl K2VRpcHandler {
}
}
} else {
- return Err(Error::unexpected_rpc_message(v));
+ return Err(Error::unexpected_rpc_message(v).into());
}
}
@@ -435,7 +436,7 @@ impl K2VRpcHandler {
seen_str: &Option<String>,
) -> Result<Vec<K2VItem>, Error> {
if let Some(seen_str) = seen_str {
- let seen = RangeSeenMarker::decode(seen_str)?;
+ let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?;
// Subscribe now to all changes on that partition,
// so that new items that are inserted while we are reading the range