aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/garage.rs30
-rw-r--r--src/model/index_counter.rs151
-rw-r--r--src/model/k2v/rpc.rs7
-rw-r--r--src/model/s3/object_table.rs52
-rw-r--r--src/model/s3/version_table.rs39
5 files changed, 80 insertions, 199 deletions
diff --git a/src/model/garage.rs b/src/model/garage.rs
index e34d034f..5bea6b4f 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -39,8 +39,6 @@ pub struct Garage {
/// The local database
pub db: db::Db,
- /// A background job runner
- pub background: Arc<BackgroundRunner>,
/// The membership manager
pub system: Arc<System>,
/// The block manager
@@ -78,7 +76,7 @@ pub struct GarageK2V {
impl Garage {
/// Create and run garage
- pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> {
+ pub fn new(config: Config) -> Result<Arc<Self>, Error> {
// Create meta dir and data dir if they don't exist already
std::fs::create_dir_all(&config.metadata_dir)
.ok_or_message("Unable to create Garage metadata directory")?;
@@ -167,7 +165,7 @@ impl Garage {
.expect("Invalid replication_mode in config file.");
info!("Initialize membership management system...");
- let system = System::new(network_key, background.clone(), replication_mode, &config)?;
+ let system = System::new(network_key, replication_mode, &config)?;
let data_rep_param = TableShardedReplication {
system: system.clone(),
@@ -225,7 +223,6 @@ impl Garage {
info!("Initialize version_table...");
let version_table = Table::new(
VersionTable {
- background: background.clone(),
block_ref_table: block_ref_table.clone(),
},
meta_rep_param.clone(),
@@ -240,7 +237,6 @@ impl Garage {
#[allow(clippy::redundant_clone)]
let object_table = Table::new(
ObjectTable {
- background: background.clone(),
version_table: version_table.clone(),
object_counter_table: object_counter_table.clone(),
},
@@ -258,7 +254,6 @@ impl Garage {
config,
replication_mode,
db,
- background,
system,
block_manager,
bucket_table,
@@ -273,6 +268,22 @@ impl Garage {
}))
}
+ pub fn spawn_workers(&self, bg: &BackgroundRunner) {
+ self.block_manager.spawn_workers(bg);
+
+ self.bucket_table.spawn_workers(bg);
+ self.bucket_alias_table.spawn_workers(bg);
+ self.key_table.spawn_workers(bg);
+
+ self.object_table.spawn_workers(bg);
+ self.object_counter_table.spawn_workers(bg);
+ self.version_table.spawn_workers(bg);
+ self.block_ref_table.spawn_workers(bg);
+
+ #[cfg(feature = "k2v")]
+ self.k2v.spawn_workers(bg);
+ }
+
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
helper::bucket::BucketHelper(self)
}
@@ -307,4 +318,9 @@ impl GarageK2V {
rpc,
}
}
+
+ pub fn spawn_workers(&self, bg: &BackgroundRunner) {
+ self.item_table.spawn_workers(bg);
+ self.counter_table.spawn_workers(bg);
+ }
}
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index b9594406..6303ea3e 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -1,17 +1,15 @@
use core::ops::Bound;
-use std::collections::{hash_map, BTreeMap, HashMap};
+use std::collections::{BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
-use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use tokio::sync::{mpsc, watch};
use garage_db as db;
use garage_rpc::ring::Ring;
use garage_rpc::system::System;
-use garage_util::background::*;
+use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@@ -142,7 +140,6 @@ impl<T: CountedItem> TableSchema for CounterTable<T> {
pub struct IndexCounter<T: CountedItem> {
this_node: Uuid,
local_counter: db::Tree,
- propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
@@ -152,16 +149,11 @@ impl<T: CountedItem> IndexCounter<T> {
replication: TableShardedReplication,
db: &db::Db,
) -> Arc<Self> {
- let background = system.background.clone();
-
- let (propagate_tx, propagate_rx) = mpsc::unbounded_channel();
-
- let this = Arc::new(Self {
+ Arc::new(Self {
this_node: system.id,
local_counter: db
.open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME))
.expect("Unable to open local counter tree"),
- propagate_tx,
table: Table::new(
CounterTable {
_phantom_t: Default::default(),
@@ -170,16 +162,11 @@ impl<T: CountedItem> IndexCounter<T> {
system,
db,
),
- });
-
- background.spawn_worker(IndexPropagatorWorker {
- index_counter: this.clone(),
- propagate_rx,
- buf: HashMap::new(),
- errors: 0,
- });
+ })
+ }
- this
+ pub fn spawn_workers(&self, bg: &BackgroundRunner) {
+ self.table.spawn_workers(bg);
}
pub fn count(
@@ -232,12 +219,8 @@ impl<T: CountedItem> IndexCounter<T> {
.map_err(db::TxError::Abort)?;
tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
- if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) {
- error!(
- "Could not propagate updated counter values, failed to send to channel: {}",
- e
- );
- }
+ let dist_entry = entry.into_counter_entry(self.this_node);
+ self.table.queue_insert(tx, &dist_entry)?;
Ok(())
}
@@ -250,23 +233,6 @@ impl<T: CountedItem> IndexCounter<T> {
TS: TableSchema<E = T>,
TR: TableReplication,
{
- let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
- let entry_k = self
- .table
- .data
- .tree_key(entry.partition_key(), entry.sort_key());
- self.table
- .data
- .update_entry_with(&entry_k, |ent| match ent {
- Some(mut ent) => {
- ent.merge(&entry);
- ent
- }
- None => entry.clone(),
- })?;
- Ok(())
- };
-
// 1. Set all old local counters to zero
let now = now_msec();
let mut next_start: Option<Vec<u8>> = None;
@@ -302,7 +268,9 @@ impl<T: CountedItem> IndexCounter<T> {
.insert(&local_counter_k, &local_counter_bytes)?;
let counter_entry = local_counter.into_counter_entry(self.this_node);
- save_counter_entry(counter_entry)?;
+ self.local_counter
+ .db()
+ .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
next_start = Some(local_counter_k);
}
@@ -367,7 +335,9 @@ impl<T: CountedItem> IndexCounter<T> {
.insert(&local_counter_key, local_counter_bytes)?;
let counter_entry = local_counter.into_counter_entry(self.this_node);
- save_counter_entry(counter_entry)?;
+ self.local_counter
+ .db()
+ .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
next_start = Some(counted_entry_k);
}
@@ -378,96 +348,7 @@ impl<T: CountedItem> IndexCounter<T> {
}
}
-struct IndexPropagatorWorker<T: CountedItem> {
- index_counter: Arc<IndexCounter<T>>,
- propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
-
- buf: HashMap<Vec<u8>, CounterEntry<T>>,
- errors: usize,
-}
-
-impl<T: CountedItem> IndexPropagatorWorker<T> {
- fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) {
- let tree_key = self.index_counter.table.data.tree_key(&pk, &sk);
- let dist_entry = counters.into_counter_entry(self.index_counter.this_node);
- match self.buf.entry(tree_key) {
- hash_map::Entry::Vacant(e) => {
- e.insert(dist_entry);
- }
- hash_map::Entry::Occupied(mut e) => {
- e.get_mut().merge(&dist_entry);
- }
- }
- }
-}
-
-#[async_trait]
-impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
- fn name(&self) -> String {
- format!("{} counter", T::COUNTER_TABLE_NAME)
- }
-
- fn status(&self) -> WorkerStatus {
- WorkerStatus {
- queue_length: Some(self.buf.len() as u64),
- ..Default::default()
- }
- }
-
- async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- // This loop batches updates to counters to be sent all at once.
- // They are sent once the propagate_rx channel has been emptied (or is closed).
- let closed = loop {
- match self.propagate_rx.try_recv() {
- Ok((pk, sk, counters)) => {
- self.add_ent(pk, sk, counters);
- }
- Err(mpsc::error::TryRecvError::Empty) => break false,
- Err(mpsc::error::TryRecvError::Disconnected) => break true,
- }
- };
-
- if !self.buf.is_empty() {
- let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>();
- let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap());
- if let Err(e) = self.index_counter.table.insert_many(entries).await {
- self.errors += 1;
- if self.errors >= 2 && *must_exit.borrow() {
- error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
- return Ok(WorkerState::Done);
- }
- // Propagate error up to worker manager, it will log it, increment a counter,
- // and sleep for a certain delay (with exponential backoff), waiting for
- // things to go back to normal
- return Err(e);
- } else {
- for k in entries_k {
- self.buf.remove(&k);
- }
- self.errors = 0;
- }
-
- return Ok(WorkerState::Busy);
- } else if closed {
- return Ok(WorkerState::Done);
- } else {
- return Ok(WorkerState::Idle);
- }
- }
-
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
- match self.propagate_rx.recv().await {
- Some((pk, sk, counters)) => {
- self.add_ent(pk, sk, counters);
- WorkerState::Busy
- }
- None => match self.buf.is_empty() {
- false => WorkerState::Busy,
- true => WorkerState::Done,
- },
- }
- }
-}
+// ----
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
struct LocalCounterEntry<T: CountedItem> {
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index a74df277..f64a7984 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -273,14 +273,9 @@ impl K2VRpcHandler {
}
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
- let tree_key = self
- .item_table
- .data
- .tree_key(&item.partition, &item.sort_key);
-
self.item_table
.data
- .update_entry_with(&tree_key[..], |ent| {
+ .update_entry_with(&item.partition, &item.sort_key, |ent| {
let mut ent = ent.unwrap_or_else(|| {
K2VItem::new(
item.partition.bucket_id,
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 26ff57f6..1b2f0014 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -4,7 +4,6 @@ use std::sync::Arc;
use garage_db as db;
-use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
@@ -221,7 +220,6 @@ impl Crdt for Object {
}
pub struct ObjectTable {
- pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
pub object_counter_table: Arc<IndexCounter<Object>>,
}
@@ -255,34 +253,34 @@ impl TableSchema for ObjectTable {
);
}
- // 2. Spawn threads that propagates deletions to version table
- let version_table = self.version_table.clone();
- let old = old.cloned();
- let new = new.cloned();
-
- self.background.spawn(async move {
- if let (Some(old_v), Some(new_v)) = (old, new) {
- // Propagate deletion of old versions
- for v in old_v.versions.iter() {
- let newly_deleted = match new_v
- .versions
- .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
- {
- Err(_) => true,
- Ok(i) => {
- new_v.versions[i].state == ObjectVersionState::Aborted
- && v.state != ObjectVersionState::Aborted
- }
- };
- if newly_deleted {
- let deleted_version =
- Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
- version_table.insert(&deleted_version).await?;
+ // 2. Enqueue propagation deletions to version table
+ if let (Some(old_v), Some(new_v)) = (old, new) {
+ // Propagate deletion of old versions
+ for v in old_v.versions.iter() {
+ let newly_deleted = match new_v
+ .versions
+ .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
+ {
+ Err(_) => true,
+ Ok(i) => {
+ new_v.versions[i].state == ObjectVersionState::Aborted
+ && v.state != ObjectVersionState::Aborted
+ }
+ };
+ if newly_deleted {
+ let deleted_version =
+ Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
+ let res = self.version_table.queue_insert(tx, &deleted_version);
+ if let Err(e) = db::unabort(res)? {
+ error!(
+ "Unable to enqueue version deletion propagation: {}. A repair will be needed.",
+ e
+ );
}
}
}
- Ok(())
- });
+ }
+
Ok(())
}
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index 6bc2ecd1..0486512b 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -3,7 +3,6 @@ use std::sync::Arc;
use garage_db as db;
-use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
@@ -127,7 +126,6 @@ impl Crdt for Version {
}
pub struct VersionTable {
- pub background: Arc<BackgroundRunner>,
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
}
@@ -141,33 +139,26 @@ impl TableSchema for VersionTable {
fn updated(
&self,
- _tx: &mut db::Transaction,
+ tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
- let block_ref_table = self.block_ref_table.clone();
- let old = old.cloned();
- let new = new.cloned();
-
- self.background.spawn(async move {
- if let (Some(old_v), Some(new_v)) = (old, new) {
- // Propagate deletion of version blocks
- if new_v.deleted.get() && !old_v.deleted.get() {
- let deleted_block_refs = old_v
- .blocks
- .items()
- .iter()
- .map(|(_k, vb)| BlockRef {
- block: vb.hash,
- version: old_v.uuid,
- deleted: true.into(),
- })
- .collect::<Vec<_>>();
- block_ref_table.insert_many(&deleted_block_refs[..]).await?;
+ if let (Some(old_v), Some(new_v)) = (old, new) {
+ // Propagate deletion of version blocks
+ if new_v.deleted.get() && !old_v.deleted.get() {
+ let deleted_block_refs = old_v.blocks.items().iter().map(|(_k, vb)| BlockRef {
+ block: vb.hash,
+ version: old_v.uuid,
+ deleted: true.into(),
+ });
+ for block_ref in deleted_block_refs {
+ let res = self.block_ref_table.queue_insert(tx, &block_ref);
+ if let Err(e) = db::unabort(res)? {
+ error!("Unable to enqueue block ref deletion propagation: {}. A repair will be needed.", e);
+ }
}
}
- Ok(())
- });
+ }
Ok(())
}