diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/k2v/batch.rs | 6 | ||||
-rw-r--r-- | src/api/k2v/item.rs | 10 | ||||
-rw-r--r-- | src/model/k2v/causality.rs | 23 | ||||
-rw-r--r-- | src/model/k2v/rpc.rs | 13 | ||||
-rw-r--r-- | src/model/k2v/seen.rs | 16 |
5 files changed, 39 insertions, 29 deletions
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index be3fba07..844faf89 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -24,11 +24,7 @@ pub async fn handle_insert_batch( let mut items2 = vec![]; for it in items { - let ct = it - .ct - .map(|s| CausalContext::parse(&s)) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?; let v = match it.v { Some(vs) => { DvvsValue::Value(base64::decode(vs).ok_or_bad_request("Invalid base64 value")?) diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index ebf34723..e7385bcc 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -133,9 +133,8 @@ pub async fn handle_insert_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + .map(CausalContext::parse_helper) + .transpose()?; let body = hyper::body::to_bytes(req.into_body()).await?; let value = DvvsValue::Value(body.to_vec()); @@ -169,9 +168,8 @@ pub async fn handle_delete_item( .get(X_GARAGE_CAUSALITY_TOKEN) .map(|s| s.to_str()) .transpose()? - .map(CausalContext::parse) - .transpose() - .ok_or_bad_request("Invalid causality token")?; + .map(CausalContext::parse_helper) + .transpose()?; let value = DvvsValue::Deleted; diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 665b02b2..b1ec8035 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -15,6 +15,8 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; +use crate::helper::error::{Error as HelperError, OkOrBadRequest}; + /// Node IDs used in K2V are u64 integers that are the abbreviation /// of full Garage node IDs which are 256-bit UUIDs. pub type K2VNodeId = u64; @@ -50,6 +52,7 @@ impl CausalContext { pub fn new() -> Self { Self::default() } + /// Make binary representation and encode in base64 pub fn serialize(&self) -> String { let mut ints = Vec::with_capacity(2 * self.vector_clock.len()); @@ -66,12 +69,13 @@ impl CausalContext { base64::encode_config(bytes, base64::URL_SAFE_NO_PAD) } - /// Parse from base64-encoded binary representation - pub fn parse(s: &str) -> Result<Self, String> { - let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD) - .map_err(|e| format!("bad causality token base64: {}", e))?; + + /// Parse from base64-encoded binary representation. + /// Returns None on error. + pub fn parse(s: &str) -> Option<Self> { + let bytes = base64::decode_config(s, base64::URL_SAFE_NO_PAD).ok()?; if bytes.len() % 16 != 8 || bytes.len() < 8 { - return Err("bad causality token length".into()); + return None; } let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap()); @@ -88,11 +92,16 @@ impl CausalContext { let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t); if check != checksum { - return Err("bad causality token checksum".into()); + return None; } - Ok(ret) + Some(ret) } + + pub fn parse_helper(s: &str) -> Result<Self, HelperError> { + Self::parse(s).ok_or_bad_request("Invalid causality token") + } + /// Check if this causal context contains newer items than another one pub fn is_newer_than(&self, other: &Self) -> bool { vclock_gt(&self.vector_clock, &other.vector_clock) diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 04801ebf..3b4d7465 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -29,6 +29,7 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use garage_table::{PartitionKey, Table}; +use crate::helper::error::Error as HelperError; use crate::k2v::causality::*; use crate::k2v::item_table::*; use crate::k2v::seen::*; @@ -212,7 +213,7 @@ impl K2VRpcHandler { sort_key: String, causal_context: CausalContext, timeout_msec: u64, - ) -> Result<Option<K2VItem>, Error> { + ) -> Result<Option<K2VItem>, HelperError> { let poll_key = PollKey { partition: K2VItemPartition { bucket_id, @@ -255,7 +256,7 @@ impl K2VRpcHandler { } } K2VRpc::PollItemResponse(None) => (), - v => return Err(Error::unexpected_rpc_message(v)), + v => return Err(Error::unexpected_rpc_message(v).into()), } } @@ -267,12 +268,12 @@ impl K2VRpcHandler { range: PollRange, seen_str: Option<String>, timeout_msec: u64, - ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, Error> { + ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> { let has_seen_marker = seen_str.is_some(); let mut seen = seen_str .as_deref() - .map(RangeSeenMarker::decode) + .map(RangeSeenMarker::decode_helper) .transpose()? .unwrap_or_default(); seen.restrict(&range); @@ -316,7 +317,7 @@ impl K2VRpcHandler { } } } else { - return Err(Error::unexpected_rpc_message(v)); + return Err(Error::unexpected_rpc_message(v).into()); } } @@ -435,7 +436,7 @@ impl K2VRpcHandler { seen_str: &Option<String>, ) -> Result<Vec<K2VItem>, Error> { if let Some(seen_str) = seen_str { - let seen = RangeSeenMarker::decode(seen_str)?; + let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?; // Subscribe now to all changes on that partition, // so that new items that are inserted while we are reading the range diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs index d2cd54c7..314d0f9e 100644 --- a/src/model/k2v/seen.rs +++ b/src/model/k2v/seen.rs @@ -13,8 +13,9 @@ use serde::{Deserialize, Serialize}; use garage_util::data::Uuid; use garage_util::encode::{nonversioned_decode, nonversioned_encode}; -use garage_util::error::{Error, OkOrMessage}; +use garage_util::error::Error; +use crate::helper::error::{Error as HelperError, OkOrBadRequest}; use crate::k2v::causality::*; use crate::k2v::item_table::*; use crate::k2v::sub::*; @@ -80,10 +81,15 @@ impl RangeSeenMarker { Ok(base64::encode(&bytes)) } - pub fn decode(s: &str) -> Result<Self, Error> { - let bytes = base64::decode(&s).ok_or_message("invalid base64")?; - let bytes = zstd::stream::decode_all(&mut &bytes[..])?; - Ok(nonversioned_decode(&bytes)?) + /// Decode from msgpack+zstd+b64 representation, returns None on error. + pub fn decode(s: &str) -> Option<Self> { + let bytes = base64::decode(&s).ok()?; + let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?; + nonversioned_decode(&bytes).ok() + } + + pub fn decode_helper(s: &str) -> Result<Self, HelperError> { + Self::decode(s).ok_or_bad_request("Invalid causality token") } pub fn is_new_item(&self, item: &K2VItem) -> bool { |