diff options
author | Alex Auvolat <alex@adnab.me> | 2022-06-02 16:58:00 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-06-02 16:58:00 +0200 |
commit | 9f0f5b2e372a720a807914747fd48ddc93928e04 (patch) | |
tree | a4346766c46469758b9138a09c65fa052e9ad253 /src/model | |
parent | 04901093e7315558bdc147d27adc3f56ec2c98a1 (diff) | |
download | garage-9f0f5b2e372a720a807914747fd48ddc93928e04.tar.gz garage-9f0f5b2e372a720a807914747fd48ddc93928e04.zip |
Adapt Garage to use new DB abstraction
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/Cargo.toml | 3 | ||||
-rw-r--r-- | src/model/garage.rs | 8 | ||||
-rw-r--r-- | src/model/index_counter.rs | 16 | ||||
-rw-r--r-- | src/model/migrate.rs | 2 |
4 files changed, 16 insertions, 13 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 133fe44e..d908dc01 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +garage_db = { version = "0.8.0", path = "../db" } garage_rpc = { version = "0.7.0", path = "../rpc" } garage_table = { version = "0.7.0", path = "../table" } garage_block = { version = "0.7.0", path = "../block" } @@ -30,8 +31,6 @@ tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } -sled = "0.34" - rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_bytes = "0.11" diff --git a/src/model/garage.rs b/src/model/garage.rs index 2f99bd68..280f3dc7 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -2,6 +2,8 @@ use std::sync::Arc; use netapp::NetworkKey; +use garage_db as db; + use garage_util::background::*; use garage_util::config::*; @@ -33,7 +35,7 @@ pub struct Garage { pub config: Config, /// The local database - pub db: sled::Db, + pub db: db::Db, /// A background job runner pub background: Arc<BackgroundRunner>, /// The membership manager @@ -71,7 +73,7 @@ pub struct GarageK2V { impl Garage { /// Create and run garage - pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> { + pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> { let network_key = NetworkKey::from_slice( &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], ) @@ -199,7 +201,7 @@ impl Garage { #[cfg(feature = "k2v")] impl GarageK2V { - fn new(system: Arc<System>, db: &sled::Db, meta_rep_param: TableShardedReplication) -> Self { + fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { info!("Initialize K2V counter table..."); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); info!("Initialize K2V subscription manager..."); diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 123154d4..33de797d 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -6,6 +6,8 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; +use garage_db as db; + use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::data::*; @@ -135,7 +137,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> { pub struct IndexCounter<T: CounterSchema> { this_node: Uuid, - local_counter: sled::Tree, + local_counter: db::Tree, propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>, pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>, } @@ -144,7 +146,7 @@ impl<T: CounterSchema> IndexCounter<T> { pub fn new( system: Arc<System>, replication: TableShardedReplication, - db: &sled::Db, + db: &db::Db, ) -> Arc<Self> { let background = system.background.clone(); @@ -177,12 +179,12 @@ impl<T: CounterSchema> IndexCounter<T> { pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { let tree_key = self.table.data.tree_key(pk, sk); - let new_entry = self.local_counter.transaction(|tx| { - let mut entry = match tx.get(&tree_key[..])? { + let new_entry = self.local_counter.db().transaction(|tx| { + let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { Some(old_bytes) => { rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) .map_err(Error::RmpDecode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)? + .map_err(db::TxError::Abort)? } None => LocalCounterEntry { values: BTreeMap::new(), @@ -197,8 +199,8 @@ impl<T: CounterSchema> IndexCounter<T> { let new_entry_bytes = rmp_to_vec_all_named(&entry) .map_err(Error::RmpEncode) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - tx.insert(&tree_key[..], new_entry_bytes)?; + .map_err(db::TxError::Abort)?; + tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; Ok(entry) })?; diff --git a/src/model/migrate.rs b/src/model/migrate.rs index 7e61957a..1f063265 100644 --- a/src/model/migrate.rs +++ b/src/model/migrate.rs @@ -25,7 +25,7 @@ impl Migrate { .open_tree("bucket:table") .map_err(GarageError::from)?; - for res in tree.iter() { + for res in tree.iter().map_err(GarageError::from)? { let (_k, v) = res.map_err(GarageError::from)?; let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..]) .map_err(GarageError::from)?; |