aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-02 16:58:00 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-02 16:58:00 +0200
commit9f0f5b2e372a720a807914747fd48ddc93928e04 (patch)
treea4346766c46469758b9138a09c65fa052e9ad253 /src/model
parent04901093e7315558bdc147d27adc3f56ec2c98a1 (diff)
downloadgarage-9f0f5b2e372a720a807914747fd48ddc93928e04.tar.gz
garage-9f0f5b2e372a720a807914747fd48ddc93928e04.zip
Adapt Garage to use new DB abstraction
Diffstat (limited to 'src/model')
-rw-r--r--src/model/Cargo.toml3
-rw-r--r--src/model/garage.rs8
-rw-r--r--src/model/index_counter.rs16
-rw-r--r--src/model/migrate.rs2
4 files changed, 16 insertions, 13 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 133fe44e..d908dc01 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
+garage_db = { version = "0.8.0", path = "../db" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
garage_block = { version = "0.7.0", path = "../block" }
@@ -30,8 +31,6 @@ tracing = "0.1.30"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
-sled = "0.34"
-
rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 2f99bd68..280f3dc7 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -2,6 +2,8 @@ use std::sync::Arc;
use netapp::NetworkKey;
+use garage_db as db;
+
use garage_util::background::*;
use garage_util::config::*;
@@ -33,7 +35,7 @@ pub struct Garage {
pub config: Config,
/// The local database
- pub db: sled::Db,
+ pub db: db::Db,
/// A background job runner
pub background: Arc<BackgroundRunner>,
/// The membership manager
@@ -71,7 +73,7 @@ pub struct GarageK2V {
impl Garage {
/// Create and run garage
- pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ pub fn new(config: Config, db: db::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
let network_key = NetworkKey::from_slice(
&hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
)
@@ -199,7 +201,7 @@ impl Garage {
#[cfg(feature = "k2v")]
impl GarageK2V {
- fn new(system: Arc<System>, db: &sled::Db, meta_rep_param: TableShardedReplication) -> Self {
+ fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
info!("Initialize K2V subscription manager...");
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 123154d4..33de797d 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -6,6 +6,8 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
+use garage_db as db;
+
use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::data::*;
@@ -135,7 +137,7 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
pub struct IndexCounter<T: CounterSchema> {
this_node: Uuid,
- local_counter: sled::Tree,
+ local_counter: db::Tree,
propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
@@ -144,7 +146,7 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn new(
system: Arc<System>,
replication: TableShardedReplication,
- db: &sled::Db,
+ db: &db::Db,
) -> Arc<Self> {
let background = system.background.clone();
@@ -177,12 +179,12 @@ impl<T: CounterSchema> IndexCounter<T> {
pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
let tree_key = self.table.data.tree_key(pk, sk);
- let new_entry = self.local_counter.transaction(|tx| {
- let mut entry = match tx.get(&tree_key[..])? {
+ let new_entry = self.local_counter.db().transaction(|tx| {
+ let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
Some(old_bytes) => {
rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
.map_err(Error::RmpDecode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?
+ .map_err(db::TxError::Abort)?
}
None => LocalCounterEntry {
values: BTreeMap::new(),
@@ -197,8 +199,8 @@ impl<T: CounterSchema> IndexCounter<T> {
let new_entry_bytes = rmp_to_vec_all_named(&entry)
.map_err(Error::RmpEncode)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- tx.insert(&tree_key[..], new_entry_bytes)?;
+ .map_err(db::TxError::Abort)?;
+ tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
Ok(entry)
})?;
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index 7e61957a..1f063265 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -25,7 +25,7 @@ impl Migrate {
.open_tree("bucket:table")
.map_err(GarageError::from)?;
- for res in tree.iter() {
+ for res in tree.iter().map_err(GarageError::from)? {
let (_k, v) = res.map_err(GarageError::from)?;
let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])
.map_err(GarageError::from)?;