aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/table/table.rs21
1 files changed, 13 insertions, 8 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 2beac3f4..54a42d34 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -2,6 +2,8 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
+use log::warn;
+
use arc_swap::ArcSwapOption;
use futures::stream::*;
use serde::{Deserialize, Serialize};
@@ -185,7 +187,7 @@ where
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value {
- let v = Self::decode_entry(v_bytes.as_slice())?;
+ let v = self.decode_entry(v_bytes.as_slice())?;
ret = match ret {
None => Some(v),
Some(mut x) => {
@@ -241,7 +243,7 @@ where
for resp in resps {
if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() {
- let entry = Self::decode_entry(entry_bytes.as_slice())?;
+ let entry = self.decode_entry(entry_bytes.as_slice())?;
let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) {
None => {
@@ -363,7 +365,7 @@ where
let keep = match filter {
None => true,
Some(f) => {
- let entry = Self::decode_entry(value.as_ref())?;
+ let entry = self.decode_entry(value.as_ref())?;
F::matches_filter(&entry, f)
}
};
@@ -382,14 +384,14 @@ where
let mut epidemic_propagate = vec![];
for update_bytes in entries.iter() {
- let update = Self::decode_entry(update_bytes.as_slice())?;
+ let update = self.decode_entry(update_bytes.as_slice())?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let (old_entry, new_entry) = self.store.transaction(|db| {
let (old_entry, new_entry) = match db.get(&tree_key)? {
Some(prev_bytes) => {
- let old_entry = Self::decode_entry(&prev_bytes)
+ let old_entry = self.decode_entry(&prev_bytes)
.map_err(sled::ConflictableTransactionError::Abort)?;
let mut new_entry = old_entry.clone();
new_entry.merge(&update);
@@ -437,7 +439,7 @@ where
break;
}
if let Some(old_val) = self.store.remove(&key)? {
- let old_entry = Self::decode_entry(&old_val)?;
+ let old_entry = self.decode_entry(&old_val)?;
self.instance.updated(Some(old_entry), None).await?;
self.system
.background
@@ -455,12 +457,15 @@ where
ret
}
- fn decode_entry(bytes: &[u8]) -> Result<F::E, Error> {
+ fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
Ok(x) => Ok(x),
Err(e) => match F::try_migrate(bytes) {
Some(x) => Ok(x),
- None => Err(e.into()),
+ None => {
+ warn!("Unable to decode entry of {}: {}", self.name, e);
+ Err(e.into())
+ }
},
}
}