diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-07 17:50:10 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-07 17:50:10 +0200 |
commit | 0543cb345320a15280a5af7db941bb9fbffb4cd6 (patch) | |
tree | ff5b054bc30b3ca826d5e1eece05198ae3c16f57 | |
parent | 1bbe0794f363eb59c56548cca672013fd78f361a (diff) | |
download | garage-0543cb345320a15280a5af7db941bb9fbffb4cd6.tar.gz garage-0543cb345320a15280a5af7db941bb9fbffb4cd6.zip |
Cleaner error management (less error-prone api)
-rw-r--r-- | src/block/manager.rs | 12 | ||||
-rw-r--r-- | src/block/rc.rs | 12 | ||||
-rw-r--r-- | src/db/lib.rs | 57 | ||||
-rw-r--r-- | src/db/lmdb_adapter.rs | 45 | ||||
-rw-r--r-- | src/db/sled_adapter.rs | 40 | ||||
-rw-r--r-- | src/db/sqlite_adapter.rs | 47 | ||||
-rw-r--r-- | src/model/index_counter.rs | 2 | ||||
-rw-r--r-- | src/model/k2v/item_table.rs | 25 | ||||
-rw-r--r-- | src/model/s3/block_ref_table.rs | 2 | ||||
-rw-r--r-- | src/model/s3/object_table.rs | 2 | ||||
-rw-r--r-- | src/model/s3/version_table.rs | 2 | ||||
-rw-r--r-- | src/table/schema.rs | 2 |
12 files changed, 149 insertions, 99 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index aa0969f4..32ba0431 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -325,7 +325,11 @@ impl BlockManager { /// Increment the number of time a block is used, putting it to resynchronization if it is /// required, but not known - pub fn block_incref(self: &Arc<Self>, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> { + pub fn block_incref( + self: &Arc<Self>, + tx: &mut db::Transaction, + hash: Hash, + ) -> db::TxOpResult<()> { if self.rc.block_incref(tx, &hash)? { // When the reference counter is incremented, there is // normally a node that is responsible for sending us the @@ -344,7 +348,11 @@ impl BlockManager { } /// Decrement the number of time a block is used - pub fn block_decref(self: &Arc<Self>, tx: &mut db::Transaction, hash: Hash) -> db::Result<()> { + pub fn block_decref( + self: &Arc<Self>, + tx: &mut db::Transaction, + hash: Hash, + ) -> db::TxOpResult<()> { if self.rc.block_decref(tx, &hash)? { // When the RC is decremented, it might drop to zero, // indicating that we don't need the block. diff --git a/src/block/rc.rs b/src/block/rc.rs index f82595b7..ce6defad 100644 --- a/src/block/rc.rs +++ b/src/block/rc.rs @@ -19,7 +19,11 @@ impl BlockRc { /// Increment the reference counter associated to a hash. /// Returns true if the RC goes from zero to nonzero. - pub(crate) fn block_incref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result<bool> { + pub(crate) fn block_incref( + &self, + tx: &mut db::Transaction, + hash: &Hash, + ) -> db::TxOpResult<bool> { let old_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?); match old_rc.increment().serialize() { Some(x) => tx.insert(&self.rc, &hash, x)?, @@ -30,7 +34,11 @@ impl BlockRc { /// Decrement the reference counter associated to a hash. /// Returns true if the RC is now zero. - pub(crate) fn block_decref(&self, tx: &mut db::Transaction, hash: &Hash) -> db::Result<bool> { + pub(crate) fn block_decref( + &self, + tx: &mut db::Transaction, + hash: &Hash, + ) -> db::TxOpResult<bool> { let new_rc = RcEntry::parse_opt(tx.get(&self.rc, &hash)?).decrement(); match new_rc.serialize() { Some(x) => tx.insert(&self.rc, &hash, x)?, diff --git a/src/db/lib.rs b/src/db/lib.rs index a7b6197c..a4da4086 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -34,6 +34,9 @@ pub struct Error(pub Cow<'static, str>); pub type Result<T> = std::result::Result<T, Error>; +pub struct TxOpError(pub(crate) Error); +pub type TxOpResult<T> = std::result::Result<T, TxOpError>; + #[derive(Debug)] pub enum TxError<E> { Abort(E), @@ -41,9 +44,17 @@ pub enum TxError<E> { } pub type TxResult<R, E> = std::result::Result<R, TxError<E>>; -impl<E> From<Error> for TxError<E> { - fn from(e: Error) -> TxError<E> { - TxError::Db(e) +impl<E> From<TxOpError> for TxError<E> { + fn from(e: TxOpError) -> TxError<E> { + TxError::Db(e.0) + } +} + +pub fn unabort<R, E>(res: TxResult<R, E>) -> TxOpResult<std::result::Result<R, E>> { + match res { + Ok(v) => Ok(Ok(v)), + Err(TxError::Abort(e)) => Ok(Err(e)), + Err(TxError::Db(e)) => Err(TxOpError(e)), } } @@ -117,19 +128,19 @@ impl Db { let tx_res = self.transaction(|mut tx| { let mut i = 0; - for item in ex_tree.iter()? { - let (k, v) = item?; + for item in ex_tree.iter().map_err(TxError::Abort)? { + let (k, v) = item.map_err(TxError::Abort)?; tx.insert(&tree, k, v)?; i += 1; if i % 1000 == 0 { println!("{}: imported {}", name, i); } } - Ok::<_, TxError<()>>(i) + tx.commit(i) }); let total = match tx_res { Err(TxError::Db(e)) => return Err(e), - Err(TxError::Abort(_)) => unreachable!(), + Err(TxError::Abort(e)) => return Err(e), Ok(x) => x, }; @@ -215,11 +226,11 @@ impl Tree { #[allow(clippy::len_without_is_empty)] impl<'a> Transaction<'a> { #[inline] - pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> Result<Option<Value>> { + pub fn get<T: AsRef<[u8]>>(&self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> { self.0.get(tree.1, key.as_ref()) } #[inline] - pub fn len(&self, tree: &Tree) -> Result<usize> { + pub fn len(&self, tree: &Tree) -> TxOpResult<usize> { self.0.len(tree.1) } @@ -230,26 +241,26 @@ impl<'a> Transaction<'a> { tree: &Tree, key: T, value: U, - ) -> Result<Option<Value>> { + ) -> TxOpResult<Option<Value>> { self.0.insert(tree.1, key.as_ref(), value.as_ref()) } /// Returns the old value if there was one #[inline] - pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> Result<Option<Value>> { + pub fn remove<T: AsRef<[u8]>>(&mut self, tree: &Tree, key: T) -> TxOpResult<Option<Value>> { self.0.remove(tree.1, key.as_ref()) } #[inline] - pub fn iter(&self, tree: &Tree) -> Result<ValueIter<'_>> { + pub fn iter(&self, tree: &Tree) -> TxOpResult<ValueIter<'_>> { self.0.iter(tree.1) } #[inline] - pub fn iter_rev(&self, tree: &Tree) -> Result<ValueIter<'_>> { + pub fn iter_rev(&self, tree: &Tree) -> TxOpResult<ValueIter<'_>> { self.0.iter_rev(tree.1) } #[inline] - pub fn range<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'_>> + pub fn range<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<ValueIter<'_>> where K: AsRef<[u8]>, R: RangeBounds<K>, @@ -259,7 +270,7 @@ impl<'a> Transaction<'a> { self.0.range(tree.1, get_bound(sb), get_bound(eb)) } #[inline] - pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> Result<ValueIter<'_>> + pub fn range_rev<K, R>(&self, tree: &Tree, range: R) -> TxOpResult<ValueIter<'_>> where K: AsRef<[u8]>, R: RangeBounds<K>, @@ -314,27 +325,27 @@ pub(crate) trait IDb: Send + Sync { } pub(crate) trait ITx { - fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>>; - fn len(&self, tree: usize) -> Result<usize>; + fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>; + fn len(&self, tree: usize) -> TxOpResult<usize>; - fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>>; - fn remove(&mut self, tree: usize, key: &[u8]) -> Result<Option<Value>>; + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>>; + fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>>; - fn iter(&self, tree: usize) -> Result<ValueIter<'_>>; - fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>>; + fn iter(&self, tree: usize) -> TxOpResult<ValueIter<'_>>; + fn iter_rev(&self, tree: usize) -> TxOpResult<ValueIter<'_>>; fn range<'r>( &self, tree: usize, low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, - ) -> Result<ValueIter<'_>>; + ) -> TxOpResult<ValueIter<'_>>; fn range_rev<'r>( &self, tree: usize, low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, - ) -> Result<ValueIter<'_>>; + ) -> TxOpResult<ValueIter<'_>>; } pub(crate) trait ITxFn { diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index 9e4306c8..d1efd216 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -8,7 +8,10 @@ use std::sync::{Arc, RwLock}; use heed::types::ByteSlice; use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database}; -use crate::{Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter}; +use crate::{ + Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, + Value, ValueIter, +}; pub use heed; @@ -20,9 +23,9 @@ impl From<heed::Error> for Error { } } -impl<T> From<heed::Error> for TxError<T> { - fn from(e: heed::Error) -> TxError<T> { - TxError::Db(e.into()) +impl From<heed::Error> for TxOpError { + fn from(e: heed::Error) -> TxOpError { + TxOpError(e.into()) } } @@ -171,21 +174,25 @@ impl IDb for LmdbDb { let trees = self.trees.read().unwrap(); let mut tx = LmdbTx { trees: &trees.0[..], - tx: self.db.write_txn()?, + tx: self + .db + .write_txn() + .map_err(Error::from) + .map_err(TxError::Db)?, }; let res = f.try_on(&mut tx); match res { TxFnResult::Ok => { - tx.tx.commit()?; + tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?; Ok(()) } TxFnResult::Abort => { - tx.tx.abort()?; + tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?; Err(TxError::Abort(())) } TxFnResult::DbErr => { - tx.tx.abort()?; + tx.tx.abort().map_err(Error::from).map_err(TxError::Db)?; Err(TxError::Db(Error( "(this message will be discarded)".into(), ))) @@ -202,44 +209,44 @@ struct LmdbTx<'a, 'db> { } impl<'a, 'db> LmdbTx<'a, 'db> { - fn get_tree(&self, i: usize) -> Result<&Database> { + fn get_tree(&self, i: usize) -> TxOpResult<&Database> { self.trees.get(i).ok_or_else(|| { - Error( + TxOpError(Error( "invalid tree id (it might have been openned after the transaction started)".into(), - ) + )) }) } } impl<'a, 'db> ITx for LmdbTx<'a, 'db> { - fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { + fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> { let tree = self.get_tree(tree)?; match tree.get(&self.tx, key)? { Some(v) => Ok(Some(v.to_vec())), None => Ok(None), } } - fn len(&self, _tree: usize) -> Result<usize> { + fn len(&self, _tree: usize) -> TxOpResult<usize> { unimplemented!(".len() in transaction not supported with LMDB backend") } - fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> { let tree = *self.get_tree(tree)?; let old_val = tree.get(&self.tx, key)?.map(Vec::from); tree.put(&mut self.tx, key, value)?; Ok(old_val) } - fn remove(&mut self, tree: usize, key: &[u8]) -> Result<Option<Value>> { + fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> { let tree = *self.get_tree(tree)?; let old_val = tree.get(&self.tx, key)?.map(Vec::from); tree.delete(&mut self.tx, key)?; Ok(old_val) } - fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> { + fn iter(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> { unimplemented!("Iterators in transactions not supported with LMDB backend"); } - fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> { + fn iter_rev(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> { unimplemented!("Iterators in transactions not supported with LMDB backend"); } @@ -248,7 +255,7 @@ impl<'a, 'db> ITx for LmdbTx<'a, 'db> { _tree: usize, _low: Bound<&'r [u8]>, _high: Bound<&'r [u8]>, - ) -> Result<ValueIter<'_>> { + ) -> TxOpResult<ValueIter<'_>> { unimplemented!("Iterators in transactions not supported with LMDB backend"); } fn range_rev<'r>( @@ -256,7 +263,7 @@ impl<'a, 'db> ITx for LmdbTx<'a, 'db> { _tree: usize, _low: Bound<&'r [u8]>, _high: Bound<&'r [u8]>, - ) -> Result<ValueIter<'_>> { + ) -> TxOpResult<ValueIter<'_>> { unimplemented!("Iterators in transactions not supported with LMDB backend"); } } diff --git a/src/db/sled_adapter.rs b/src/db/sled_adapter.rs index b07401c9..d0d9e9c0 100644 --- a/src/db/sled_adapter.rs +++ b/src/db/sled_adapter.rs @@ -9,7 +9,10 @@ use sled::transaction::{ UnabortableTransactionError, }; -use crate::{Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter}; +use crate::{ + Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, + Value, ValueIter, +}; pub use sled; @@ -21,6 +24,12 @@ impl From<sled::Error> for Error { } } +impl From<sled::Error> for TxOpError { + fn from(e: sled::Error) -> TxOpError { + TxOpError(e.into()) + } +} + // -- db pub struct SledDb { @@ -177,51 +186,54 @@ struct SledTx<'a> { } impl<'a> SledTx<'a> { - fn get_tree(&self, i: usize) -> Result<&TransactionalTree> { + fn get_tree(&self, i: usize) -> TxOpResult<&TransactionalTree> { self.trees.get(i).ok_or_else(|| { - Error( + TxOpError(Error( "invalid tree id (it might have been openned after the transaction started)".into(), - ) + )) }) } - fn save_error<R>(&self, v: std::result::Result<R, UnabortableTransactionError>) -> Result<R> { + fn save_error<R>( + &self, + v: std::result::Result<R, UnabortableTransactionError>, + ) -> TxOpResult<R> { match v { Ok(x) => Ok(x), Err(e) => { let txt = format!("{}", e); self.err.set(Some(e)); - Err(Error(txt.into())) + Err(TxOpError(Error(txt.into()))) } } } } impl<'a> ITx for SledTx<'a> { - fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { + fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> { let tree = self.get_tree(tree)?; let tmp = self.save_error(tree.get(key))?; Ok(tmp.map(|x| x.to_vec())) } - fn len(&self, _tree: usize) -> Result<usize> { + fn len(&self, _tree: usize) -> TxOpResult<usize> { unimplemented!(".len() in transaction not supported with Sled backend") } - fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> { let tree = self.get_tree(tree)?; let old_val = self.save_error(tree.insert(key, value))?; Ok(old_val.map(|x| x.to_vec())) } - fn remove(&mut self, tree: usize, key: &[u8]) -> Result<Option<Value>> { + fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> { let tree = self.get_tree(tree)?; let old_val = self.save_error(tree.remove(key))?; Ok(old_val.map(|x| x.to_vec())) } - fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> { + fn iter(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> { unimplemented!("Iterators in transactions not supported with Sled backend"); } - fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> { + fn iter_rev(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> { unimplemented!("Iterators in transactions not supported with Sled backend"); } @@ -230,7 +242,7 @@ impl<'a> ITx for SledTx<'a> { _tree: usize, _low: Bound<&'r [u8]>, _high: Bound<&'r [u8]>, - ) -> Result<ValueIter<'_>> { + ) -> TxOpResult<ValueIter<'_>> { unimplemented!("Iterators in transactions not supported with Sled backend"); } fn range_rev<'r>( @@ -238,7 +250,7 @@ impl<'a> ITx for SledTx<'a> { _tree: usize, _low: Bound<&'r [u8]>, _high: Bound<&'r [u8]>, - ) -> Result<ValueIter<'_>> { + ) -> TxOpResult<ValueIter<'_>> { unimplemented!("Iterators in transactions not supported with Sled backend"); } } diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index b23885bd..8d6bd714 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -10,7 +10,10 @@ use log::trace; use rusqlite::{params, Connection, Rows, Statement, Transaction}; -use crate::{Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter}; +use crate::{ + Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, + Value, ValueIter, +}; pub use rusqlite; @@ -22,9 +25,9 @@ impl From<rusqlite::Error> for Error { } } -impl<T> From<rusqlite::Error> for TxError<T> { - fn from(e: rusqlite::Error) -> TxError<T> { - TxError::Db(e.into()) +impl From<rusqlite::Error> for TxOpError { + fn from(e: rusqlite::Error) -> TxOpError { + TxOpError(e.into()) } } @@ -260,20 +263,24 @@ impl IDb for SqliteDb { let this_mut_ref: &mut SqliteDbInner = this.borrow_mut(); let mut tx = SqliteTx { - tx: this_mut_ref.db.transaction()?, + tx: this_mut_ref + .db + .transaction() + .map_err(Error::from) + .map_err(TxError::Db)?, trees: &this_mut_ref.trees, }; let res = match f.try_on(&mut tx) { TxFnResult::Ok => { - tx.tx.commit()?; + tx.tx.commit().map_err(Error::from).map_err(TxError::Db)?; Ok(()) } TxFnResult::Abort => { - tx.tx.rollback()?; + tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?; Err(TxError::Abort(())) } TxFnResult::DbErr => { - tx.tx.rollback()?; + tx.tx.rollback().map_err(Error::from).map_err(TxError::Db)?; Err(TxError::Db(Error( "(this message will be discarded)".into(), ))) @@ -293,15 +300,15 @@ struct SqliteTx<'a> { } impl<'a> SqliteTx<'a> { - fn get_tree(&self, i: usize) -> Result<&'_ str> { + fn get_tree(&self, i: usize) -> TxOpResult<&'_ str> { self.trees.get(i).map(String::as_ref).ok_or_else(|| { - Error( + TxOpError(Error( "invalid tree id (it might have been openned after the transaction started)".into(), - ) + )) }) } - fn internal_get(&self, tree: &str, key: &[u8]) -> Result<Option<Value>> { + fn internal_get(&self, tree: &str, key: &[u8]) -> TxOpResult<Option<Value>> { let mut stmt = self .tx .prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; @@ -314,11 +321,11 @@ impl<'a> SqliteTx<'a> { } impl<'a> ITx for SqliteTx<'a> { - fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value>> { + fn get(&self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> { let tree = self.get_tree(tree)?; self.internal_get(tree, key) } - fn len(&self, tree: usize) -> Result<usize> { + fn len(&self, tree: usize) -> TxOpResult<usize> { let tree = self.get_tree(tree)?; let mut stmt = self.tx.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; let mut res_iter = stmt.query([])?; @@ -328,7 +335,7 @@ impl<'a> ITx for SqliteTx<'a> { } } - fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> Result<Option<Value>> { + fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<Option<Value>> { let tree = self.get_tree(tree)?; let old_val = self.internal_get(tree, key)?; @@ -351,7 +358,7 @@ impl<'a> ITx for SqliteTx<'a> { Ok(old_val) } - fn remove(&mut self, tree: usize, key: &[u8]) -> Result<Option<Value>> { + fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<Option<Value>> { let tree = self.get_tree(tree)?; let old_val = self.internal_get(tree, key)?; @@ -365,10 +372,10 @@ impl<'a> ITx for SqliteTx<'a> { Ok(old_val) } - fn iter(&self, _tree: usize) -> Result<ValueIter<'_>> { + fn iter(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> { unimplemented!(); } - fn iter_rev(&self, _tree: usize) -> Result<ValueIter<'_>> { + fn iter_rev(&self, _tree: usize) -> TxOpResult<ValueIter<'_>> { unimplemented!(); } @@ -377,7 +384,7 @@ impl<'a> ITx for SqliteTx<'a> { _tree: usize, _low: Bound<&'r [u8]>, _high: Bound<&'r [u8]>, - ) -> Result<ValueIter<'_>> { + ) -> TxOpResult<ValueIter<'_>> { unimplemented!(); } fn range_rev<'r>( @@ -385,7 +392,7 @@ impl<'a> ITx for SqliteTx<'a> { _tree: usize, _low: Bound<&'r [u8]>, _high: Bound<&'r [u8]>, - ) -> Result<ValueIter<'_>> { + ) -> TxOpResult<ValueIter<'_>> { unimplemented!(); } } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 4fec1138..48f616f7 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -121,7 +121,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { _tx: &mut db::Transaction, _old: Option<&Self::E>, _new: Option<&Self::E>, - ) -> db::Result<()> { + ) -> db::TxOpResult<()> { // nothing for now Ok(()) } diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 77446f64..991fe66d 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -227,7 +227,7 @@ impl TableSchema for K2VItemTable { tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, - ) -> db::Result<()> { + ) -> db::TxOpResult<()> { // 1. Count let (old_entries, old_conflicts, old_values, old_bytes) = match old { None => (0, 0, 0, 0), @@ -245,7 +245,7 @@ impl TableSchema for K2VItemTable { .map(|e| &e.partition.partition_key) .unwrap_or_else(|| &new.unwrap().partition.partition_key); - match self.counter_table.count( + let counter_res = self.counter_table.count( tx, &count_pk, count_sk, @@ -255,18 +255,15 @@ impl TableSchema for K2VItemTable { (VALUES, new_values - old_values), (BYTES, new_bytes - old_bytes), ], - ) { - Ok(()) => (), - Err(db::TxError::Db(e)) => return Err(e), - Err(db::TxError::Abort(e)) => { - // This result can be returned by `counter_table.count()` for instance - // if messagepack serialization or deserialization fails at some step. - // Warn admin but ignore this error for now, that's all we can do. - error!( - "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!", - count_pk, count_sk, e - ); - } + ); + if let Err(e) = db::unabort(counter_res)? { + // This result can be returned by `counter_table.count()` for instance + // if messagepack serialization or deserialization fails at some step. + // Warn admin but ignore this error for now, that's all we can do. + error!( + "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!", + count_pk, count_sk, e + ); } // 2. Notify diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index 2c06bc96..1134922e 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -58,7 +58,7 @@ impl TableSchema for BlockRefTable { tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, - ) -> db::Result<()> { + ) -> db::TxOpResult<()> { #[allow(clippy::or_fun_call)] let block = old.or(new).unwrap().block; let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index f3bd9892..62f5d8d9 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -239,7 +239,7 @@ impl TableSchema for ObjectTable { _tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, - ) -> db::Result<()> { + ) -> db::TxOpResult<()> { let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index d168c2c2..881c245a 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -144,7 +144,7 @@ impl TableSchema for VersionTable { _tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, - ) -> db::Result<()> { + ) -> db::TxOpResult<()> { let block_ref_table = self.block_ref_table.clone(); let old = old.cloned(); let new = new.cloned(); diff --git a/src/table/schema.rs b/src/table/schema.rs index 393c7388..74f57798 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -93,7 +93,7 @@ pub trait TableSchema: Send + Sync { _tx: &mut db::Transaction, _old: Option<&Self::E>, _new: Option<&Self::E>, - ) -> db::Result<()> { + ) -> db::TxOpResult<()> { Ok(()) } |