diff options
author | Quentin <quentin@deuxfleurs.fr> | 2020-11-21 18:01:50 +0100 |
---|---|---|
committer | Quentin <quentin@deuxfleurs.fr> | 2020-11-21 18:01:50 +0100 |
commit | 28efe341cbb4d96b5f81f5fe756f1a0995461e77 (patch) | |
tree | 27664b3c8519df6cea381218d3542f19e0c04126 /src/table/table.rs | |
parent | b7a377308bbcbb7285a5b11cdcb07361eff93a28 (diff) | |
parent | b3814b15ccc233d7c4233b43816cce20db278f17 (diff) | |
download | garage-28efe341cbb4d96b5f81f5fe756f1a0995461e77.tar.gz garage-28efe341cbb4d96b5f81f5fe756f1a0995461e77.zip |
Merge branch 'master' into feature/website
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 24 |
1 files changed, 16 insertions, 8 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 2beac3f4..5dfee3c8 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,6 +2,8 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; +use log::warn; + use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; @@ -185,7 +187,7 @@ where for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { - let v = Self::decode_entry(v_bytes.as_slice())?; + let v = self.decode_entry(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -241,7 +243,7 @@ where for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = Self::decode_entry(entry_bytes.as_slice())?; + let entry = self.decode_entry(entry_bytes.as_slice())?; let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { @@ -363,7 +365,7 @@ where let keep = match filter { None => true, Some(f) => { - let entry = Self::decode_entry(value.as_ref())?; + let entry = self.decode_entry(value.as_ref())?; F::matches_filter(&entry, f) } }; @@ -382,14 +384,14 @@ where let mut epidemic_propagate = vec![]; for update_bytes in entries.iter() { - let update = Self::decode_entry(update_bytes.as_slice())?; + let update = self.decode_entry(update_bytes.as_slice())?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let (old_entry, new_entry) = self.store.transaction(|db| { let (old_entry, new_entry) = match db.get(&tree_key)? { Some(prev_bytes) => { - let old_entry = Self::decode_entry(&prev_bytes) + let old_entry = self.decode_entry(&prev_bytes) .map_err(sled::ConflictableTransactionError::Abort)?; let mut new_entry = old_entry.clone(); new_entry.merge(&update); @@ -437,7 +439,7 @@ where break; } if let Some(old_val) = self.store.remove(&key)? { - let old_entry = Self::decode_entry(&old_val)?; + let old_entry = self.decode_entry(&old_val)?; self.instance.updated(Some(old_entry), None).await?; self.system .background @@ -455,12 +457,18 @@ where ret } - fn decode_entry(bytes: &[u8]) -> Result<F::E, Error> { + fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { Ok(x) => Ok(x), Err(e) => match F::try_migrate(bytes) { Some(x) => Ok(x), - None => Err(e.into()), + None => { + warn!("Unable to decode entry of {}: {}", self.name, e); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); + } + Err(e.into()) + } }, } } |