aboutsummaryrefslogtreecommitdiff
path: root/src/model/index_counter.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/index_counter.rs')
-rw-r--r--src/model/index_counter.rs62
1 files changed, 30 insertions, 32 deletions
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 123154d4..2602d5d9 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -6,6 +6,8 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
+use garage_db as db;
+
use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::data::*;
@@ -114,10 +116,6 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
type E = CounterEntry<T>;
type Filter = (DeletedFilter, Vec<Uuid>);
- fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {
- // nothing for now
- }
-
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
if filter.0 == DeletedFilter::Any {
return true;
@@ -135,7 +133,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
pub struct IndexCounter<T: CounterSchema> {
this_node: Uuid,
- local_counter: sled::Tree,
+ local_counter: db::Tree,
propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
@@ -144,7 +142,7 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn new(
system: Arc<System>,
replication: TableShardedReplication,
- db: &sled::Db,
+ db: &db::Db,
) -> Arc<Self> {
let background = system.background.clone();
@@ -174,36 +172,36 @@ impl<T: CounterSchema> IndexCounter<T> {
this
}
- pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
+ pub fn count(
+ &self,
+ tx: &mut db::Transaction,
+ pk: &T::P,
+ sk: &T::S,
+ counts: &[(&str, i64)],
+ ) -> db::TxResult<(), Error> {
let tree_key = self.table.data.tree_key(pk, sk);
- let new_entry = self.local_counter.transaction(|tx| {
- let mut entry = match tx.get(&tree_key[..])? {
- Some(old_bytes) => {
- rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
- .map_err(Error::RmpDecode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?
- }
- None => LocalCounterEntry {
- values: BTreeMap::new(),
- },
- };
-
- for (s, inc) in counts.iter() {
- let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
- ent.0 += 1;
- ent.1 += *inc;
- }
-
- let new_entry_bytes = rmp_to_vec_all_named(&entry)
- .map_err(Error::RmpEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- tx.insert(&tree_key[..], new_entry_bytes)?;
+ let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
+ Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
+ .map_err(Error::RmpDecode)
+ .map_err(db::TxError::Abort)?,
+ None => LocalCounterEntry {
+ values: BTreeMap::new(),
+ },
+ };
+
+ for (s, inc) in counts.iter() {
+ let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
+ ent.0 += 1;
+ ent.1 += *inc;
+ }
- Ok(entry)
- })?;
+ let new_entry_bytes = rmp_to_vec_all_named(&entry)
+ .map_err(Error::RmpEncode)
+ .map_err(db::TxError::Abort)?;
+ tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
- if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) {
+ if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) {
error!(
"Could not propagate updated counter values, failed to send to channel: {}",
e