aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs37
1 files changed, 28 insertions, 9 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 8a66c420..bbcd5971 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -14,6 +14,7 @@ use opentelemetry::{
use garage_db as db;
+use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@@ -25,6 +26,7 @@ use crate::crdt::Crdt;
use crate::data::*;
use crate::gc::*;
use crate::merkle::*;
+use crate::queue::InsertQueueWorker;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
@@ -35,6 +37,7 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
+ gc: Arc<TableGc<F, R>>,
endpoint: Arc<Endpoint<TableRpc<F>, Self>>,
}
@@ -75,15 +78,16 @@ where
let data = TableData::new(system.clone(), instance, replication, db);
- let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
+ let merkle_updater = MerkleUpdater::new(data.clone());
- let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone());
- TableGc::launch(system.clone(), data.clone());
+ let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
+ let gc = TableGc::new(system.clone(), data.clone());
let table = Arc::new(Self {
system,
data,
merkle_updater,
+ gc,
syncer,
endpoint,
});
@@ -93,6 +97,13 @@ where
table
}
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ self.merkle_updater.spawn_workers(bg);
+ self.syncer.spawn_workers(bg);
+ self.gc.spawn_workers(bg);
+ bg.spawn_worker(InsertQueueWorker(self.clone()));
+ }
+
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert", F::TABLE_NAME));
@@ -128,6 +139,11 @@ where
Ok(())
}
+ /// Insert item locally
+ pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> {
+ self.data.queue_insert(tx, e)
+ }
+
pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
@@ -259,9 +275,11 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- self.system
- .background
- .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
+ tokio::spawn(async move {
+ if let Err(e) = self2.repair_on_read(&who[..], ent2).await {
+ warn!("Error doing repair on read: {}", e);
+ }
+ });
}
}
@@ -358,11 +376,12 @@ where
.into_iter()
.map(|k| ret.get(&k).unwrap().clone())
.collect::<Vec<_>>();
- self.system.background.spawn_cancellable(async move {
+ tokio::spawn(async move {
for v in to_repair {
- self2.repair_on_read(&who[..], v).await?;
+ if let Err(e) = self2.repair_on_read(&who[..], v).await {
+ warn!("Error doing repair on read: {}", e);
+ }
}
- Ok(())
});
}