diff options
Diffstat (limited to 'src/model/index_counter.rs')
-rw-r--r-- | src/model/index_counter.rs | 62 |
1 files changed, 30 insertions, 32 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 123154d4..2602d5d9 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -6,6 +6,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; +use garage_db as db; + use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; @@ -114,10 +116,6 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { type E = CounterEntry<T>; type Filter = (DeletedFilter, Vec<Uuid>); - fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { - // nothing for now - } - fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { if filter.0 == DeletedFilter::Any { return true; @@ -135,7 +133,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { pub struct IndexCounter<T: CounterSchema> { this_node: Uuid, - local_counter: sled::Tree, + local_counter: db::Tree, propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } @@ -144,7 +142,7 @@ impl<T: CounterSchema> IndexCounter<T> { pub fn new( system: Arc<System>, replication: TableShardedReplication, - db: &sled::Db, + db: &db::Db, ) -> Arc<Self> { let background = system.background.clone(); @@ -174,36 +172,36 @@ impl<T: CounterSchema> IndexCounter<T> { this } - pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { + pub fn count( + &self, + tx: &mut db::Transaction, + pk: &T::P, + sk: &T::S, + counts: &[(&str, i64)], + ) -> db::TxResult<(), Error> { let tree_key = self.table.data.tree_key(pk, sk); - let new_entry = self.local_counter.transaction(|tx| { - let mut entry = match tx.get(&tree_key[..])? { - Some(old_bytes) => { - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)? - } - None => LocalCounterEntry { - values: BTreeMap::new(), - }, - }; - - for (s, inc) in counts.iter() { - let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; - ent.1 += *inc; - } - - let new_entry_bytes = rmp_to_vec_all_named(&entry) - .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - tx.insert(&tree_key[..], new_entry_bytes)?; + let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { + Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)?, + None => LocalCounterEntry { + values: BTreeMap::new(), + }, + }; + + for (s, inc) in counts.iter() { + let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + ent.0 += 1; + ent.1 += *inc; + } - Ok(entry) - })?; + let new_entry_bytes = rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?; + tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; - if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) { + if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) { error!( "Could not propagate updated counter values, failed to send to channel: {}", e |