diff options
author | Alex Auvolat <alex@adnab.me> | 2021-10-15 11:05:09 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-10-22 16:55:24 +0200 |
commit | 1b450c4b493dfcb2ee88acbca3ea584beac8eb4b (patch) | |
tree | d6437f105a630fa197b67446b5c3b2902335c34a /src/table | |
parent | 4067797d0142ee7860aff8da95d65820d6cc0889 (diff) | |
download | garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.tar.gz garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.zip |
Improvements to CLI and various fixes for netapp version
Discovery via consul, persist peer list to file
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/gc.rs | 29 | ||||
-rw-r--r-- | src/table/replication/fullcopy.rs | 7 | ||||
-rw-r--r-- | src/table/replication/parameters.rs | 5 | ||||
-rw-r--r-- | src/table/replication/sharded.rs | 5 | ||||
-rw-r--r-- | src/table/sync.rs | 36 | ||||
-rw-r--r-- | src/table/table.rs | 35 |
6 files changed, 48 insertions, 69 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs index c03648ef..9b3d60ff 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -36,11 +36,10 @@ enum GcRpc { Update(Vec<ByteBuf>), DeleteIfEqualHash(Vec<(ByteBuf, Hash)>), Ok, - Error(String), } -impl Message for GcRpc { - type Response = GcRpc; +impl Rpc for GcRpc { + type Response = Result<GcRpc, Error>; } impl<F, R> TableGc<F, R> @@ -168,7 +167,7 @@ where async fn try_send_and_delete( &self, - nodes: Vec<NodeID>, + nodes: Vec<Uuid>, items: Vec<(ByteBuf, Hash, ByteBuf)>, ) -> Result<(), Error> { let n_items = items.len(); @@ -224,8 +223,15 @@ where .compare_and_swap::<_, _, Vec<u8>>(key, Some(vhash), None)?; Ok(()) } +} - async fn handle_rpc(&self, message: &GcRpc) -> Result<GcRpc, Error> { +#[async_trait] +impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> { match message { GcRpc::Update(items) => { self.data.update_many(items)?; @@ -242,16 +248,3 @@ where } } } - -#[async_trait] -impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> GcRpc { - self.handle_rpc(message) - .await - .unwrap_or_else(|e| GcRpc::Error(format!("{}", e))) - } -} diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index b41c5360..ae6851fb 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use garage_rpc::ring::*; use garage_rpc::system::System; -use garage_rpc::NodeID; use garage_util::data::*; use crate::replication::*; @@ -20,19 +19,19 @@ pub struct TableFullReplication { } impl TableReplication for TableFullReplication { - fn read_nodes(&self, _hash: &Hash) -> Vec<NodeID> { + fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> { vec![self.system.id] } fn read_quorum(&self) -> usize { 1 } - fn write_nodes(&self, _hash: &Hash) -> Vec<NodeID> { + fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> { let ring = self.system.ring.borrow(); ring.config .members .keys() - .map(|id| NodeID::from_slice(id.as_slice()).unwrap()) + .cloned() .collect::<Vec<_>>() } fn write_quorum(&self) -> usize { diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 7fdfce67..3740d947 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -1,5 +1,4 @@ use garage_rpc::ring::*; -use garage_rpc::NodeID; use garage_util::data::*; /// Trait to describe how a table shall be replicated @@ -8,12 +7,12 @@ pub trait TableReplication: Send + Sync { // To understand various replication methods /// Which nodes to send read requests to - fn read_nodes(&self, hash: &Hash) -> Vec<NodeID>; + fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>; /// Responses needed to consider a read succesfull fn read_quorum(&self) -> usize; /// Which nodes to send writes to - fn write_nodes(&self, hash: &Hash) -> Vec<NodeID>; + fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>; /// Responses needed to consider a write succesfull fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index ffe686a5..75043a17 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use garage_rpc::ring::*; use garage_rpc::system::System; -use garage_rpc::NodeID; use garage_util::data::*; use crate::replication::*; @@ -26,7 +25,7 @@ pub struct TableShardedReplication { } impl TableReplication for TableShardedReplication { - fn read_nodes(&self, hash: &Hash) -> Vec<NodeID> { + fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> { let ring = self.system.ring.borrow(); ring.get_nodes(&hash, self.replication_factor) } @@ -34,7 +33,7 @@ impl TableReplication for TableShardedReplication { self.read_quorum } - fn write_nodes(&self, hash: &Hash) -> Vec<NodeID> { + fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> { let ring = self.system.ring.borrow(); ring.get_nodes(&hash, self.replication_factor) } diff --git a/src/table/sync.rs b/src/table/sync.rs index c5db0987..4fcdc528 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -45,11 +45,10 @@ pub(crate) enum SyncRpc { Node(MerkleNodeKey, MerkleNode), Items(Vec<Arc<ByteBuf>>), Ok, - Error(String), } -impl Message for SyncRpc { - type Response = SyncRpc; +impl Rpc for SyncRpc { + type Response = Result<SyncRpc, Error>; } struct SyncTodo { @@ -305,7 +304,7 @@ where async fn offload_items( self: &Arc<Self>, items: &[(Vec<u8>, Arc<ByteBuf>)], - nodes: &[NodeID], + nodes: &[Uuid], ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); @@ -354,7 +353,7 @@ where async fn do_sync_with( self: Arc<Self>, partition: TodoPartition, - who: NodeID, + who: Uuid, must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?; @@ -480,7 +479,7 @@ where Ok(()) } - async fn send_items(&self, who: NodeID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { + async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> { info!( "({}) Sending {} items to {:?}", self.data.name, @@ -513,9 +512,17 @@ where ))) } } +} + +// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== - // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== - async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> { +#[async_trait] +impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> { match message { SyncRpc::RootCkHash(range, h) => { let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; @@ -535,19 +542,6 @@ where } } -#[async_trait] -impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> SyncRpc { - self.handle_rpc(message) - .await - .unwrap_or_else(|e| SyncRpc::Error(format!("{}", e))) - } -} - impl SyncTodo { fn add_full_sync<F: TableSchema, R: TableReplication>( &mut self, diff --git a/src/table/table.rs b/src/table/table.rs index ad263343..e1357471 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -34,7 +34,6 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> { #[derive(Serialize, Deserialize)] pub(crate) enum TableRpc<F: TableSchema> { Ok, - Error(String), ReadEntry(F::P, F::S), ReadEntryResponse(Option<ByteBuf>), @@ -45,8 +44,8 @@ pub(crate) enum TableRpc<F: TableSchema> { Update(Vec<Arc<ByteBuf>>), } -impl<F: TableSchema> Message for TableRpc<F> { - type Response = TableRpc<F>; +impl<F: TableSchema> Rpc for TableRpc<F> { + type Response = Result<TableRpc<F>, Error>; } impl<F, R> Table<F, R> @@ -277,7 +276,7 @@ where // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== - async fn repair_on_read(&self, who: &[NodeID], what: F::E) -> Result<(), Error> { + async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.system .rpc @@ -292,10 +291,19 @@ where .await?; Ok(()) } +} - // ====== RPC HANDLER ===== - // - async fn handle_rpc(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> { +#[async_trait] +impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + async fn handle( + self: &Arc<Self>, + msg: &TableRpc<F>, + _from: NodeID, + ) -> Result<TableRpc<F>, Error> { match msg { TableRpc::ReadEntry(key, sort_key) => { let value = self.data.read_entry(key, sort_key)?; @@ -313,16 +321,3 @@ where } } } - -#[async_trait] -impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - async fn handle(self: &Arc<Self>, msg: &TableRpc<F>, _from: NodeID) -> TableRpc<F> { - self.handle_rpc(msg) - .await - .unwrap_or_else(|e| TableRpc::<F>::Error(format!("{}", e))) - } -} |