diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/Cargo.toml | 3 | ||||
-rw-r--r-- | src/model/garage.rs | 8 | ||||
-rw-r--r-- | src/model/index_counter.rs | 62 | ||||
-rw-r--r-- | src/model/k2v/item_table.rs | 24 | ||||
-rw-r--r-- | src/model/migrate.rs | 6 | ||||
-rw-r--r-- | src/model/s3/block_ref_table.rs | 21 | ||||
-rw-r--r-- | src/model/s3/object_table.rs | 12 | ||||
-rw-r--r-- | src/model/s3/version_table.rs | 13 |
8 files changed, 94 insertions, 55 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 133fe44e..d908dc01 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } garage_rpc = { version = "0.7.0", path = "../rpc" } garage_table = { version = "0.7.0", path = "../table" } garage_block = { version = "0.7.0", path = "../block" } @@ -30,8 +31,6 @@ tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/model/garage.rs b/src/model/garage.rs index 2f99bd68..280f3dc7 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -2,6 +2,8 @@ use std::sync::Arc; use netapp::NetworkKey; +use garage_db as db; + use garage_util::background::*; use garage_util::config::*; @@ -33,7 +35,7 @@ pub struct Garage { pub config: Config, /// The local database - pub db: sled::Db, + pub db: db::Db, /// A background job runner pub background: Arc<BackgroundRunner>, /// The membership manager @@ -71,7 +73,7 @@ pub struct GarageK2V { impl Garage { /// Create and run garage - pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> { + pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> { let network_key = NetworkKey::from_slice( &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], ) @@ -199,7 +201,7 @@ impl Garage { #[cfg(feature = "k2v")] impl GarageK2V { - fn new(system: Arc<System>, db: &sled::Db, meta_rep_param: TableShardedReplication) -> Self { + fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { info!("Initialize K2V counter table..."); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); info!("Initialize K2V subscription manager..."); diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 123154d4..2602d5d9 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -6,6 +6,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; +use garage_db as db; + use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; @@ -114,10 +116,6 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { type E = CounterEntry<T>; type Filter = (DeletedFilter, Vec<Uuid>); - fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { - // nothing for now - } - fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { if filter.0 == DeletedFilter::Any { return true; @@ -135,7 +133,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { pub struct IndexCounter<T: CounterSchema> { this_node: Uuid, - local_counter: sled::Tree, + local_counter: db::Tree, propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } @@ -144,7 +142,7 @@ impl<T: CounterSchema> IndexCounter<T> { pub fn new( system: Arc<System>, replication: TableShardedReplication, - db: &sled::Db, + db: &db::Db, ) -> Arc<Self> { let background = system.background.clone(); @@ -174,36 +172,36 @@ impl<T: CounterSchema> IndexCounter<T> { this } - pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { + pub fn count( + &self, + tx: &mut db::Transaction, + pk: &T::P, + sk: &T::S, + counts: &[(&str, i64)], + ) -> db::TxResult<(), Error> { let tree_key = self.table.data.tree_key(pk, sk); - let new_entry = self.local_counter.transaction(|tx| { - let mut entry = match tx.get(&tree_key[..])? { - Some(old_bytes) => { - rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) - .map_err(Error::RmpDecode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)? - } - None => LocalCounterEntry { - values: BTreeMap::new(), - }, - }; - - for (s, inc) in counts.iter() { - let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); - ent.0 += 1; - ent.1 += *inc; - } - - let new_entry_bytes = rmp_to_vec_all_named(&entry) - .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - tx.insert(&tree_key[..], new_entry_bytes)?; + let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { + Some(old_bytes) => rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) + .map_err(Error::RmpDecode) + .map_err(db::TxError::Abort)?, + None => LocalCounterEntry { + values: BTreeMap::new(), + }, + }; + + for (s, inc) in counts.iter() { + let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); + ent.0 += 1; + ent.1 += *inc; + } - Ok(entry) - })?; + let new_entry_bytes = rmp_to_vec_all_named(&entry) + .map_err(Error::RmpEncode) + .map_err(db::TxError::Abort)?; + tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; - if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), new_entry)) { + if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) { error!( "Could not propagate updated counter values, failed to send to channel: {}", e diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 8b7cc08a..991fe66d 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use garage_db as db; use garage_util::data::*; use garage_table::crdt::*; @@ -221,7 +222,12 @@ impl TableSchema for K2VItemTable { type E = K2VItem; type Filter = ItemFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { // 1. Count let (old_entries, old_conflicts, old_values, old_bytes) = match old { None => (0, 0, 0, 0), @@ -239,7 +245,8 @@ impl TableSchema for K2VItemTable { .map(|e| &e.partition.partition_key) .unwrap_or_else(|| &new.unwrap().partition.partition_key); - if let Err(e) = self.counter_table.count( + let counter_res = self.counter_table.count( + tx, &count_pk, count_sk, &[ @@ -248,14 +255,23 @@ impl TableSchema for K2VItemTable { (VALUES, new_values - old_values), (BYTES, new_bytes - old_bytes), ], - ) { - error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); + ); + if let Err(e) = db::unabort(counter_res)? { + // This result can be returned by `counter_table.count()` for instance + // if messagepack serialization or deserialization fails at some step. + // Warn admin but ignore this error for now, that's all we can do. + error!( + "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!", + count_pk, count_sk, e + ); } // 2. Notify if let Some(new_ent) = new { self.subscriptions.notify(new_ent); } + + Ok(()) } #[allow(clippy::nonminimal_bool)] diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 7e61957a..25acb4b0 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -25,11 +25,15 @@ impl Migrate { .open_tree("bucket:table") .map_err(GarageError::from)?; - for res in tree.iter() { + let mut old_buckets = vec![]; + for res in tree.iter().map_err(GarageError::from)? { let (_k, v) = res.map_err(GarageError::from)?; let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) .map_err(GarageError::from)?; + old_buckets.push(bucket); + } + for bucket in old_buckets { if let old_bucket::BucketState::Present(p) = bucket.state.get() { self.migrate_buckets050_do_bucket(&bucket, p).await?; } diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs index 9b3991bf..9589b4aa 100644 --- a/src/model/s3/block_ref_table.rs +++ b/src/model/s3/block_ref_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::data::*; use garage_table::crdt::Crdt; @@ -51,21 +53,22 @@ impl TableSchema for BlockRefTable { type E = BlockRef; type Filter = DeletedFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { - #[allow(clippy::or_fun_call)] - let block = &old.or(new).unwrap().block; + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + let block = old.or(new).unwrap().block; let was_before = old.map(|x| !x.deleted.get()).unwrap_or(false); let is_after = new.map(|x| !x.deleted.get()).unwrap_or(false); if is_after && !was_before { - if let Err(e) = self.block_manager.block_incref(block) { - warn!("block_incref failed for block {:?}: {}", block, e); - } + self.block_manager.block_incref(tx, block)?; } if was_before && !is_after { - if let Err(e) = self.block_manager.block_decref(block) { - warn!("block_decref failed for block {:?}: {}", block, e); - } + self.block_manager.block_decref(tx, block)?; } + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 3d9a89f7..62f5d8d9 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -232,7 +234,12 @@ impl TableSchema for ObjectTable { type E = Object; type Filter = ObjectFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -259,7 +266,8 @@ impl TableSchema for ObjectTable { } } Ok(()) - }) + }); + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index ad096772..881c245a 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -1,6 +1,8 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -137,7 +139,12 @@ impl TableSchema for VersionTable { type E = Version; type Filter = DeletedFilter; - fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + fn updated( + &self, + _tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { let block_ref_table = self.block_ref_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -160,7 +167,9 @@ impl TableSchema for VersionTable { } } Ok(()) - }) + }); + + Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { |