aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-01-03 11:37:31 +0000
committerAlex <alex@adnab.me>2023-01-03 11:37:31 +0000
commit582b0761790b7958a3ba10c4b549b466997d2dcd (patch)
treeb94c84bd21ef45e2480c653dc7ed2b37fd5907fb /src/table/table.rs
parent76230f20282e73a5a5afa33af68152acaf732cf5 (diff)
parent939a6d67e8ace1aa38998281f52511a61f4b4d94 (diff)
downloadgarage-582b0761790b7958a3ba10c4b549b466997d2dcd.tar.gz
garage-582b0761790b7958a3ba10c4b549b466997d2dcd.zip
Merge pull request 'Some improvements to Garage internals' (#451) from internals-rework into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/451
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs37
1 files changed, 28 insertions, 9 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 8a66c420..bbcd5971 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -14,6 +14,7 @@ use opentelemetry::{
use garage_db as db;
+use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@@ -25,6 +26,7 @@ use crate::crdt::Crdt;
use crate::data::*;
use crate::gc::*;
use crate::merkle::*;
+use crate::queue::InsertQueueWorker;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
@@ -35,6 +37,7 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
+ gc: Arc<TableGc<F, R>>,
endpoint: Arc<Endpoint<TableRpc<F>, Self>>,
}
@@ -75,15 +78,16 @@ where
let data = TableData::new(system.clone(), instance, replication, db);
- let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
+ let merkle_updater = MerkleUpdater::new(data.clone());
- let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone());
- TableGc::launch(system.clone(), data.clone());
+ let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
+ let gc = TableGc::new(system.clone(), data.clone());
let table = Arc::new(Self {
system,
data,
merkle_updater,
+ gc,
syncer,
endpoint,
});
@@ -93,6 +97,13 @@ where
table
}
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ self.merkle_updater.spawn_workers(bg);
+ self.syncer.spawn_workers(bg);
+ self.gc.spawn_workers(bg);
+ bg.spawn_worker(InsertQueueWorker(self.clone()));
+ }
+
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert", F::TABLE_NAME));
@@ -128,6 +139,11 @@ where
Ok(())
}
+ /// Insert item locally
+ pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> {
+ self.data.queue_insert(tx, e)
+ }
+
pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
@@ -259,9 +275,11 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- self.system
- .background
- .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
+ tokio::spawn(async move {
+ if let Err(e) = self2.repair_on_read(&who[..], ent2).await {
+ warn!("Error doing repair on read: {}", e);
+ }
+ });
}
}
@@ -358,11 +376,12 @@ where
.into_iter()
.map(|k| ret.get(&k).unwrap().clone())
.collect::<Vec<_>>();
- self.system.background.spawn_cancellable(async move {
+ tokio::spawn(async move {
for v in to_repair {
- self2.repair_on_read(&who[..], v).await?;
+ if let Err(e) = self2.repair_on_read(&who[..], v).await {
+ warn!("Error doing repair on read: {}", e);
+ }
}
- Ok(())
});
}