aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/k2v/batch.rs6
-rw-r--r--src/api/k2v/item.rs10
-rw-r--r--src/model/k2v/causality.rs23
-rw-r--r--src/model/k2v/rpc.rs13
-rw-r--r--src/model/k2v/seen.rs16
5 files changed, 39 insertions, 29 deletions
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index be3fba07..844faf89 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -24,11 +24,7 @@ pub async fn handle_insert_batch(
let mut items2 = vec![];
for it in items {
- let ct = it
- .ct
- .map(|s| CausalContext::parse(&s))
- .transpose()
- .ok_or_bad_request("Invalid causality token")?;
+ let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?;
let v = match it.v {
Some(vs) => {
DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?)
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index ebf34723..e7385bcc 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -133,9 +133,8 @@ pub async fn handle_insert_item(
.get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str())
.transpose()?
- .map(CausalContext::parse)
- .transpose()
- .ok_or_bad_request("Invalid causality token")?;
+ .map(CausalContext::parse_helper)
+ .transpose()?;
let body = hyper::body::to_bytes(req.into_body()).await?;
let value = DvvsValue::Value(body.to_vec());
@@ -169,9 +168,8 @@ pub async fn handle_delete_item(
.get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str())
.transpose()?
- .map(CausalContext::parse)
- .transpose()
- .ok_or_bad_request("Invalid causality token")?;
+ .map(CausalContext::parse_helper)
+ .transpose()?;
let value = DvvsValue::Deleted;
diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs
index 665b02b2..b1ec8035 100644
--- a/src/model/k2v/causality.rs
+++ b/src/model/k2v/causality.rs
@@ -15,6 +15,8 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
+use crate::helper::error::{Error as HelperError, OkOrBadRequest};
+
/// Node IDs used in K2V are u64 integers that are the abbreviation
/// of full Garage node IDs which are 256-bit UUIDs.
pub type K2VNodeId = u64;
@@ -50,6 +52,7 @@ impl CausalContext {
pub fn new() -> Self {
Self::default()
}
+
/// Make binary representation and encode in base64
pub fn serialize(&self) -> String {
let mut ints = Vec::with_capacity(2 * self.vector_clock.len());
@@ -66,12 +69,13 @@ impl CausalContext {
base64::encode_config(bytes, base64::URL_SAFE_NO_PAD)
}
- /// Parse from base64-encoded binary representation
- pub fn parse(s: &str) -> Result<Self, String> {
- let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD)
- .map_err(|e| format!("bad causality token base64: {}", e))?;
+
+ /// Parse from base64-encoded binary representation.
+ /// Returns None on error.
+ pub fn parse(s: &str) -> Option<Self> {
+ let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD).ok()?;
if bytes.len() % 16 != 8 || bytes.len() < 8 {
- return Err("bad causality token length".into());
+ return None;
}
let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
@@ -88,11 +92,16 @@ impl CausalContext {
let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
if check != checksum {
- return Err("bad causality token checksum".into());
+ return None;
}
- Ok(ret)
+ Some(ret)
}
+
+ pub fn parse_helper(s: &str) -> Result<Self, HelperError> {
+ Self::parse(s).ok_or_bad_request("Invalid causality token")
+ }
+
/// Check if this causal context contains newer items than another one
pub fn is_newer_than(&self, other: &Self) -> bool {
vclock_gt(&self.vector_clock, &other.vector_clock)
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 04801ebf..3b4d7465 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -29,6 +29,7 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::{PartitionKey, Table};
+use crate::helper::error::Error as HelperError;
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
use crate::k2v::seen::*;
@@ -212,7 +213,7 @@ impl K2VRpcHandler {
sort_key: String,
causal_context: CausalContext,
timeout_msec: u64,
- ) -> Result<Option<K2VItem>, Error> {
+ ) -> Result<Option<K2VItem>, HelperError> {
let poll_key = PollKey {
partition: K2VItemPartition {
bucket_id,
@@ -255,7 +256,7 @@ impl K2VRpcHandler {
}
}
K2VRpc::PollItemResponse(None) => (),
- v => return Err(Error::unexpected_rpc_message(v)),
+ v => return Err(Error::unexpected_rpc_message(v).into()),
}
}
@@ -267,12 +268,12 @@ impl K2VRpcHandler {
range: PollRange,
seen_str: Option<String>,
timeout_msec: u64,
- ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, Error> {
+ ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> {
let has_seen_marker = seen_str.is_some();
let mut seen = seen_str
.as_deref()
- .map(RangeSeenMarker::decode)
+ .map(RangeSeenMarker::decode_helper)
.transpose()?
.unwrap_or_default();
seen.restrict(&range);
@@ -316,7 +317,7 @@ impl K2VRpcHandler {
}
}
} else {
- return Err(Error::unexpected_rpc_message(v));
+ return Err(Error::unexpected_rpc_message(v).into());
}
}
@@ -435,7 +436,7 @@ impl K2VRpcHandler {
seen_str: &Option<String>,
) -> Result<Vec<K2VItem>, Error> {
if let Some(seen_str) = seen_str {
- let seen = RangeSeenMarker::decode(seen_str)?;
+ let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?;
// Subscribe now to all changes on that partition,
// so that new items that are inserted while we are reading the range
diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs
index d2cd54c7..314d0f9e 100644
--- a/src/model/k2v/seen.rs
+++ b/src/model/k2v/seen.rs
@@ -13,8 +13,9 @@ use serde::{Deserialize, Serialize};
use garage_util::data::Uuid;
use garage_util::encode::{nonversioned_decode, nonversioned_encode};
-use garage_util::error::{Error, OkOrMessage};
+use garage_util::error::Error;
+use crate::helper::error::{Error as HelperError, OkOrBadRequest};
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
use crate::k2v::sub::*;
@@ -80,10 +81,15 @@ impl RangeSeenMarker {
Ok(base64::encode(&bytes))
}
- pub fn decode(s: &str) -> Result<Self, Error> {
- let bytes = base64::decode(&s).ok_or_message("invalid base64")?;
- let bytes = zstd::stream::decode_all(&mut &bytes[..])?;
- Ok(nonversioned_decode(&bytes)?)
+ /// Decode from msgpack+zstd+b64 representation, returns None on error.
+ pub fn decode(s: &str) -> Option<Self> {
+ let bytes = base64::decode(&s).ok()?;
+ let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?;
+ nonversioned_decode(&bytes).ok()
+ }
+
+ pub fn decode_helper(s: &str) -> Result<Self, HelperError> {
+ Self::decode(s).ok_or_bad_request("Invalid causality token")
}
pub fn is_new_item(&self, item: &K2VItem) -> bool {