aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/Cargo.toml3
-rw-r--r--src/model/garage.rs8
-rw-r--r--src/model/index_counter.rs62
-rw-r--r--src/model/k2v/item_table.rs24
-rw-r--r--src/model/migrate.rs6
-rw-r--r--src/model/s3/block_ref_table.rs21
-rw-r--r--src/model/s3/object_table.rs12
-rw-r--r--src/model/s3/version_table.rs13
8 files changed, 94 insertions, 55 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 133fe44e..d908dc01 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
garage_block = { version = "0.7.0", path = "../block" }
@@ -30,8 +31,6 @@ tracing = "0.1.30"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 2f99bd68..280f3dc7 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -2,6 +2,8 @@ use std::sync::Arc;
use netapp::NetworkKey;
+use garage_db as db;
+
use garage_util::background::*;
use garage_util::config::*;
@@ -33,7 +35,7 @@ pub struct Garage {
pub config: Config,
/// The local database
- pub db: sled::Db,
+ pub db: db::Db,
/// A background job runner
pub background: Arc<BackgroundRunner>,
/// The membership manager
@@ -71,7 +73,7 @@ pub struct GarageK2V {
impl Garage {
/// Create and run garage
- pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
let network_key = NetworkKey::from_slice(
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
)
@@ -199,7 +201,7 @@ impl Garage {
#[cfg(feature = "k2v")]
impl GarageK2V {
- fn new(system: Arc<System>, db: &sled::Db, meta_rep_param: TableShardedReplication) -> Self {
+ fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
info!("Initialize K2V subscription manager...");
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 123154d4..2602d5d9 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -6,6 +6,8 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
+use garage_db as db;
+
use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::data::*;
@@ -114,10 +116,6 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
type E = CounterEntry<T>;
type Filter = (DeletedFilter, Vec<Uuid>);
- fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {
- // nothing for now
- }
-
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
if filter.0 == DeletedFilter::Any {
return true;
@@ -135,7 +133,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
pub struct IndexCounter<T: CounterSchema> {
this_node: Uuid,
- local_counter: sled::Tree,
+ local_counter: db::Tree,
propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
@@ -144,7 +142,7 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn new(
system: Arc<System>,
replication: TableShardedReplication,
- db: &sled::Db,
+ db: &db::Db,
) -> Arc<Self> {
let background = system.background.clone();
@@ -174,36 +172,36 @@ impl<T: CounterSchema> IndexCounter<T> {
this
}
- pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
+ pub fn count(
+ &self,
+ tx: &mut db::Transaction,
+ pk: &T::P,
+ sk: &T::S,
+ counts: &[(&str, i64)],
+ ) -> db::TxResult<(), Error> {
let tree_key = self.table.data.tree_key(pk, sk);
- let new_entry = self.local_counter.transaction(|tx| {
- let mut entry = match tx.get(&tree_key[..])? {
- Some(old_bytes) => {
- rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
- .map_err(Error::RmpDecode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?
- }
- None => LocalCounterEntry {
- values: BTreeMap::new(),
- },
- };
-
- for (s, inc) in counts.iter() {
- let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
- ent.0 += 1;
- ent.1 += *inc;
- }
-
- let new_entry_bytes = rmp_to_vec_all_named(&entry)
- .map_err(Error::RmpEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- tx.insert(&tree_key[..], new_entry_bytes)?;
+ let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
+ Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
+ .map_err(Error::RmpDecode)
+ .map_err(db::TxError::Abort)?,
+ None => LocalCounterEntry {
+ values: BTreeMap::new(),
+ },
+ };
+
+ for (s, inc) in counts.iter() {
+ let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
+ ent.0 += 1;
+ ent.1 += *inc;
+ }
- Ok(entry)
- })?;
+ let new_entry_bytes = rmp_to_vec_all_named(&entry)
+ .map_err(Error::RmpEncode)
+ .map_err(db::TxError::Abort)?;
+ tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
- if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) {
+ if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) {
error!(
"Could not propagate updated counter values, failed to send to channel: {}",
e
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 8b7cc08a..991fe66d 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
+use garage_db as db;
use garage_util::data::*;
use garage_table::crdt::*;
@@ -221,7 +222,12 @@ impl TableSchema for K2VItemTable {
type E = K2VItem;
type Filter = ItemFilter;
- fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
+ fn updated(
+ &self,
+ tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
// 1. Count
let (old_entries, old_conflicts, old_values, old_bytes) = match old {
None => (0, 0, 0, 0),
@@ -239,7 +245,8 @@ impl TableSchema for K2VItemTable {
.map(|e| &e.partition.partition_key)
.unwrap_or_else(|| &new.unwrap().partition.partition_key);
- if let Err(e) = self.counter_table.count(
+ let counter_res = self.counter_table.count(
+ tx,
&count_pk,
count_sk,
&[
@@ -248,14 +255,23 @@ impl TableSchema for K2VItemTable {
(VALUES, new_values - old_values),
(BYTES, new_bytes - old_bytes),
],
- ) {
- error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e);
+ );
+ if let Err(e) = db::unabort(counter_res)? {
+ // This result can be returned by `counter_table.count()` for instance
+ // if messagepack serialization or deserialization fails at some step.
+ // Warn admin but ignore this error for now, that's all we can do.
+ error!(
+ "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!",
+ count_pk, count_sk, e
+ );
}
// 2. Notify
if let Some(new_ent) = new {
self.subscriptions.notify(new_ent);
}
+
+ Ok(())
}
#[allow(clippy::nonminimal_bool)]
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index 7e61957a..25acb4b0 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -25,11 +25,15 @@ impl Migrate {
.open_tree("bucket:table")
.map_err(GarageError::from)?;
- for res in tree.iter() {
+ let mut old_buckets = vec![];
+ for res in tree.iter().map_err(GarageError::from)? {
let (_k, v) = res.map_err(GarageError::from)?;
let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])
.map_err(GarageError::from)?;
+ old_buckets.push(bucket);
+ }
+ for bucket in old_buckets {
if let old_bucket::BucketState::Present(p) = bucket.state.get() {
self.migrate_buckets050_do_bucket(&bucket, p).await?;
}
diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs
index 9b3991bf..9589b4aa 100644
--- a/src/model/s3/block_ref_table.rs
+++ b/src/model/s3/block_ref_table.rs
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
+use garage_db as db;
+
use garage_util::data::*;
use garage_table::crdt::Crdt;
@@ -51,21 +53,22 @@ impl TableSchema for BlockRefTable {
type E = BlockRef;
type Filter = DeletedFilter;
- fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
- #[allow(clippy::or_fun_call)]
- let block = &old.or(new).unwrap().block;
+ fn updated(
+ &self,
+ tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ let block = old.or(new).unwrap().block;
let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false);
let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false);
if is_after && !was_before {
- if let Err(e) = self.block_manager.block_incref(block) {
- warn!("block_incref failed for block {:?}: {}", block, e);
- }
+ self.block_manager.block_incref(tx, block)?;
}
if was_before && !is_after {
- if let Err(e) = self.block_manager.block_decref(block) {
- warn!("block_decref failed for block {:?}: {}", block, e);
- }
+ self.block_manager.block_decref(tx, block)?;
}
+ Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 3d9a89f7..62f5d8d9 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
+use garage_db as db;
+
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -232,7 +234,12 @@ impl TableSchema for ObjectTable {
type E = Object;
type Filter = ObjectFilter;
- fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
+ fn updated(
+ &self,
+ _tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
let version_table = self.version_table.clone();
let old = old.cloned();
let new = new.cloned();
@@ -259,7 +266,8 @@ impl TableSchema for ObjectTable {
}
}
Ok(())
- })
+ });
+ Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index ad096772..881c245a 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -1,6 +1,8 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
+use garage_db as db;
+
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -137,7 +139,12 @@ impl TableSchema for VersionTable {
type E = Version;
type Filter = DeletedFilter;
- fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
+ fn updated(
+ &self,
+ _tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
let block_ref_table = self.block_ref_table.clone();
let old = old.cloned();
let new = new.cloned();
@@ -160,7 +167,9 @@ impl TableSchema for VersionTable {
}
}
Ok(())
- })
+ });
+
+ Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {