diff options
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 58 |
1 files changed, 35 insertions, 23 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 8a66c420..7ad79677 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -14,9 +14,11 @@ use opentelemetry::{ use garage_db as db; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; +use garage_util::migrate::Migrate; use garage_rpc::system::System; use garage_rpc::*; @@ -25,16 +27,18 @@ use crate::crdt::Crdt; use crate::data::*; use crate::gc::*; use crate::merkle::*; +use crate::queue::InsertQueueWorker; use crate::replication::*; use crate::schema::*; use crate::sync::*; use crate::util::*; -pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { +pub struct Table<F: TableSchema, R: TableReplication> { pub system: Arc<System>, pub data: Arc<TableData<F, R>>, pub merkle_updater: Arc<MerkleUpdater<F, R>>, pub syncer: Arc<TableSyncer<F, R>>, + gc: Arc<TableGc<F, R>>, endpoint: Arc<Endpoint<TableRpc<F>, Self>>, } @@ -61,11 +65,7 @@ impl<F: TableSchema> Rpc for TableRpc<F> { type Response = Result<TableRpc<F>, Error>; } -impl<F, R> Table<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Table<F, R> { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> { @@ -75,15 +75,16 @@ where let data = TableData::new(system.clone(), instance, replication, db); - let merkle_updater = MerkleUpdater::launch(&system.background, data.clone()); + let merkle_updater = MerkleUpdater::new(data.clone()); - let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone()); - TableGc::launch(system.clone(), data.clone()); + let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone()); + let gc = TableGc::new(system.clone(), data.clone()); let table = Arc::new(Self { system, data, merkle_updater, + gc, syncer, endpoint, }); @@ -93,6 +94,13 @@ where table } + pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) { + self.merkle_updater.spawn_workers(bg); + self.syncer.spawn_workers(bg); + self.gc.spawn_workers(bg); + bg.spawn_worker(InsertQueueWorker(self.clone())); + } + pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage_table"); let span = tracer.start(format!("{} insert", F::TABLE_NAME)); @@ -111,7 +119,7 @@ where let hash = e.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); - let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); + let e_enc = Arc::new(ByteBuf::from(e.encode()?)); let rpc = TableRpc::<F>::Update(vec![e_enc]); self.system @@ -128,6 +136,11 @@ where Ok(()) } + /// Insert item locally + pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> { + self.data.queue_insert(tx, e) + } + pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error> where I: IntoIterator<Item = IE> + Send + Sync, @@ -157,7 +170,7 @@ where let entry = entry.borrow(); let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); - let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); + let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); for node in who { call_list.entry(node).or_default().push(e_enc.clone()); } @@ -259,9 +272,11 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - self.system - .background - .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await }); + tokio::spawn(async move { + if let Err(e) = self2.repair_on_read(&who[..], ent2).await { + warn!("Error doing repair on read: {}", e); + } + }); } } @@ -358,11 +373,12 @@ where .into_iter() .map(|k| ret.get(&k).unwrap().clone()) .collect::<Vec<_>>(); - self.system.background.spawn_cancellable(async move { + tokio::spawn(async move { for v in to_repair { - self2.repair_on_read(&who[..], v).await?; + if let Err(e) = self2.repair_on_read(&who[..], v).await { + warn!("Error doing repair on read: {}", e); + } } - Ok(()) }); } @@ -393,7 +409,7 @@ where // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { - let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); + let what_enc = Arc::new(ByteBuf::from(what.encode()?)); self.system .rpc .try_call_many( @@ -408,11 +424,7 @@ where } #[async_trait] -impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> { async fn handle( self: &Arc<Self>, msg: &TableRpc<F>, |