diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/index_counter.rs | 60 | ||||
-rw-r--r-- | src/model/k2v/item_table.rs | 25 | ||||
-rw-r--r-- | src/model/s3/block_ref_table.rs | 20 | ||||
-rw-r--r-- | src/model/s3/object_table.rs | 12 | ||||
-rw-r--r-- | src/model/s3/version_table.rs | 13 |
5 files changed, 88 insertions, 42 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 6f81be3e..4fec1138 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -116,8 +116,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { type E = CounterEntry<T>; type Filter = (DeletedFilter, Vec<Uuid>); - fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + _old: Option<&Self::E>, + _new: Option<&Self::E>, + ) -> db::Result<()> { // nothing for now + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { @@ -176,36 +182,36 @@ impl<T: CounterSchema> IndexCounter<T> { this } - pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { + pub fn count( + &self, + tx: &mut db::Transaction, + pk: &T::P, + sk: &T::S, + counts: &[(&str, i64)], + ) -> db::TxResult<(), Error> { let tree_key = self.table.data.tree_key(pk, sk); - let new_entry = self.local_counter.db().transaction(|mut tx| { - let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { - Some(old_bytes) => { - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(db::TxError::Abort)? - } - None => LocalCounterEntry { - values: BTreeMap::new(), - }, - }; - - for (s, inc) in counts.iter() { - let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; - ent.1 += *inc; - } - - let new_entry_bytes = rmp_to_vec_all_named(&entry) - .map_err(Error::RmpEncode) - .map_err(db::TxError::Abort)?; - tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; + let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { + Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)?, + None => LocalCounterEntry { + values: BTreeMap::new(), + }, + }; + + for (s, inc) in counts.iter() { + let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + ent.0 += 1; + ent.1 += *inc; + } - Ok(entry) - })?; + let new_entry_bytes = rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?; + tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; - if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) { + if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) { error!( "Could not propagate updated counter values, failed to send to channel: {}", e diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 8b7cc08a..77446f64 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use garage_db as db; use garage_util::data::*; use garage_table::crdt::*; @@ -221,7 +222,12 @@ impl TableSchema for K2VItemTable { type E = K2VItem; type Filter = ItemFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::Result<()> { // 1. Count let (old_entries, old_conflicts, old_values, old_bytes) = match old { None => (0, 0, 0, 0), @@ -239,7 +245,8 @@ impl TableSchema for K2VItemTable { .map(|e| &e.partition.partition_key) .unwrap_or_else(|| &new.unwrap().partition.partition_key); - if let Err(e) = self.counter_table.count( + match self.counter_table.count( + tx, &count_pk, count_sk, &[ @@ -249,13 +256,25 @@ impl TableSchema for K2VItemTable { (BYTES, new_bytes - old_bytes), ], ) { - error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); + Ok(()) => (), + Err(db::TxError::Db(e)) => return Err(e), + Err(db::TxError::Abort(e)) => { + // This result can be returned by `counter_table.count()` for instance + // if messagepack serialization or deserialization fails at some step. + // Warn admin but ignore this error for now, that's all we can do. + error!( + "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!", + count_pk, count_sk, e + ); + } } // 2. Notify if let Some(new_ent) = new { self.subscriptions.notify(new_ent); } + + Ok(()) } #[allow(clippy::nonminimal_bool)] diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index 9b3991bf..2c06bc96 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::data::*; use garage_table::crdt::Crdt; @@ -51,21 +53,23 @@ impl TableSchema for BlockRefTable { type E = BlockRef; type Filter = DeletedFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::Result<()> { #[allow(clippy::or_fun_call)] - let block = &old.or(new).unwrap().block; + let block = old.or(new).unwrap().block; let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { - if let Err(e) = self.block_manager.block_incref(block) { - warn!("block_incref failed for block {:?}: {}", block, e); - } + self.block_manager.block_incref(tx, block)?; } if was_before && !is_after { - if let Err(e) = self.block_manager.block_decref(block) { - warn!("block_decref failed for block {:?}: {}", block, e); - } + self.block_manager.block_decref(tx, block)?; } + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 3d9a89f7..f3bd9892 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -232,7 +234,12 @@ impl TableSchema for ObjectTable { type E = Object; type Filter = ObjectFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::Result<()> { let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -259,7 +266,8 @@ impl TableSchema for ObjectTable { } } Ok(()) - }) + }); + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index ad096772..d168c2c2 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -137,7 +139,12 @@ impl TableSchema for VersionTable { type E = Version; type Filter = DeletedFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::Result<()> { let block_ref_table = self.block_ref_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -160,7 +167,9 @@ impl TableSchema for VersionTable { } } Ok(()) - }) + }); + + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { |