aboutsummaryrefslogtreecommitdiff
path: root/src/model/k2v
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/k2v')
-rw-r--r--src/model/k2v/item_table.rs24
1 files changed, 20 insertions, 4 deletions
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 8b7cc08a..991fe66d 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
+use garage_db as db;
use garage_util::data::*;
use garage_table::crdt::*;
@@ -221,7 +222,12 @@ impl TableSchema for K2VItemTable {
type E = K2VItem;
type Filter = ItemFilter;
- fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
+ fn updated(
+ &self,
+ tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
// 1. Count
let (old_entries, old_conflicts, old_values, old_bytes) = match old {
None => (0, 0, 0, 0),
@@ -239,7 +245,8 @@ impl TableSchema for K2VItemTable {
.map(|e| &e.partition.partition_key)
.unwrap_or_else(|| &new.unwrap().partition.partition_key);
- if let Err(e) = self.counter_table.count(
+ let counter_res = self.counter_table.count(
+ tx,
&count_pk,
count_sk,
&[
@@ -248,14 +255,23 @@ impl TableSchema for K2VItemTable {
(VALUES, new_values - old_values),
(BYTES, new_bytes - old_bytes),
],
- ) {
- error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e);
+ );
+ if let Err(e) = db::unabort(counter_res)? {
+ // This result can be returned by `counter_table.count()` for instance
+ // if messagepack serialization or deserialization fails at some step.
+ // Warn admin but ignore this error for now, that's all we can do.
+ error!(
+ "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!",
+ count_pk, count_sk, e
+ );
}
// 2. Notify
if let Some(new_ent) = new {
self.subscriptions.notify(new_ent);
}
+
+ Ok(())
}
#[allow(clippy::nonminimal_bool)]