aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-15 11:05:09 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-19 23:38:42 +0200
commit65070f3c05775f6692f4a16b6a304991d0510301 (patch)
treeaad55f988f3d29e3643e4141af6c53e79fd89257 /src/table
parente6da0dc90098a7c830e14f6f0dce61e8e7132d3a (diff)
downloadgarage-65070f3c05775f6692f4a16b6a304991d0510301.tar.gz
garage-65070f3c05775f6692f4a16b6a304991d0510301.zip
Improvements to CLI and various fixes for netapp version
Diffstat (limited to 'src/table')
-rw-r--r--src/table/gc.rs29
-rw-r--r--src/table/replication/fullcopy.rs7
-rw-r--r--src/table/replication/parameters.rs5
-rw-r--r--src/table/replication/sharded.rs5
-rw-r--r--src/table/sync.rs36
-rw-r--r--src/table/table.rs35
6 files changed, 48 insertions, 69 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs
index c03648ef..9b3d60ff 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -36,11 +36,10 @@ enum GcRpc {
Update(Vec<ByteBuf>),
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
Ok,
- Error(String),
}
-impl Message for GcRpc {
- type Response = GcRpc;
+impl Rpc for GcRpc {
+ type Response = Result<GcRpc, Error>;
}
impl<F, R> TableGc<F, R>
@@ -168,7 +167,7 @@ where
async fn try_send_and_delete(
&self,
- nodes: Vec<NodeID>,
+ nodes: Vec<Uuid>,
items: Vec<(ByteBuf, Hash, ByteBuf)>,
) -> Result<(), Error> {
let n_items = items.len();
@@ -224,8 +223,15 @@ where
.compare_and_swap::<_, _, Vec<u8>>(key, Some(vhash), None)?;
Ok(())
}
+}
- async fn handle_rpc(&self, message: &GcRpc) -> Result<GcRpc, Error> {
+#[async_trait]
+impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
match message {
GcRpc::Update(items) => {
self.data.update_many(items)?;
@@ -242,16 +248,3 @@ where
}
}
}
-
-#[async_trait]
-impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> GcRpc {
- self.handle_rpc(message)
- .await
- .unwrap_or_else(|e| GcRpc::Error(format!("{}", e)))
- }
-}
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index b41c5360..ae6851fb 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -2,7 +2,6 @@ use std::sync::Arc;
use garage_rpc::ring::*;
use garage_rpc::system::System;
-use garage_rpc::NodeID;
use garage_util::data::*;
use crate::replication::*;
@@ -20,19 +19,19 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
- fn read_nodes(&self, _hash: &Hash) -> Vec<NodeID> {
+ fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
fn read_quorum(&self) -> usize {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<NodeID> {
+ fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
let ring = self.system.ring.borrow();
ring.config
.members
.keys()
- .map(|id| NodeID::from_slice(id.as_slice()).unwrap())
+ .cloned()
.collect::<Vec<_>>()
}
fn write_quorum(&self) -> usize {
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 7fdfce67..3740d947 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -1,5 +1,4 @@
use garage_rpc::ring::*;
-use garage_rpc::NodeID;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
@@ -8,12 +7,12 @@ pub trait TableReplication: Send + Sync {
// To understand various replication methods
/// Which nodes to send read requests to
- fn read_nodes(&self, hash: &Hash) -> Vec<NodeID>;
+ fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash) -> Vec<NodeID>;
+ fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a write succesfull
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index ffe686a5..75043a17 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -2,7 +2,6 @@ use std::sync::Arc;
use garage_rpc::ring::*;
use garage_rpc::system::System;
-use garage_rpc::NodeID;
use garage_util::data::*;
use crate::replication::*;
@@ -26,7 +25,7 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
- fn read_nodes(&self, hash: &Hash) -> Vec<NodeID> {
+ fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
let ring = self.system.ring.borrow();
ring.get_nodes(&hash, self.replication_factor)
}
@@ -34,7 +33,7 @@ impl TableReplication for TableShardedReplication {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<NodeID> {
+ fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
let ring = self.system.ring.borrow();
ring.get_nodes(&hash, self.replication_factor)
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index c5db0987..4fcdc528 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -45,11 +45,10 @@ pub(crate) enum SyncRpc {
Node(MerkleNodeKey, MerkleNode),
Items(Vec<Arc<ByteBuf>>),
Ok,
- Error(String),
}
-impl Message for SyncRpc {
- type Response = SyncRpc;
+impl Rpc for SyncRpc {
+ type Response = Result<SyncRpc, Error>;
}
struct SyncTodo {
@@ -305,7 +304,7 @@ where
async fn offload_items(
self: &Arc<Self>,
items: &[(Vec<u8>, Arc<ByteBuf>)],
- nodes: &[NodeID],
+ nodes: &[Uuid],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
@@ -354,7 +353,7 @@ where
async fn do_sync_with(
self: Arc<Self>,
partition: TodoPartition,
- who: NodeID,
+ who: Uuid,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
@@ -480,7 +479,7 @@ where
Ok(())
}
- async fn send_items(&self, who: NodeID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
self.data.name,
@@ -513,9 +512,17 @@ where
)))
}
}
+}
+
+// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
- // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
- async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> {
+#[async_trait]
+impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
@@ -535,19 +542,6 @@ where
}
}
-#[async_trait]
-impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> SyncRpc {
- self.handle_rpc(message)
- .await
- .unwrap_or_else(|e| SyncRpc::Error(format!("{}", e)))
- }
-}
-
impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self,
diff --git a/src/table/table.rs b/src/table/table.rs
index ad263343..e1357471 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -34,7 +34,6 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
#[derive(Serialize, Deserialize)]
pub(crate) enum TableRpc<F: TableSchema> {
Ok,
- Error(String),
ReadEntry(F::P, F::S),
ReadEntryResponse(Option<ByteBuf>),
@@ -45,8 +44,8 @@ pub(crate) enum TableRpc<F: TableSchema> {
Update(Vec<Arc<ByteBuf>>),
}
-impl<F: TableSchema> Message for TableRpc<F> {
- type Response = TableRpc<F>;
+impl<F: TableSchema> Rpc for TableRpc<F> {
+ type Response = Result<TableRpc<F>, Error>;
}
impl<F, R> Table<F, R>
@@ -277,7 +276,7 @@ where
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
- async fn repair_on_read(&self, who: &[NodeID], what: F::E) -> Result<(), Error> {
+ async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
self.system
.rpc
@@ -292,10 +291,19 @@ where
.await?;
Ok(())
}
+}
- // ====== RPC HANDLER =====
- //
- async fn handle_rpc(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> {
+#[async_trait]
+impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(
+ self: &Arc<Self>,
+ msg: &TableRpc<F>,
+ _from: NodeID,
+ ) -> Result<TableRpc<F>, Error> {
match msg {
TableRpc::ReadEntry(key, sort_key) => {
let value = self.data.read_entry(key, sort_key)?;
@@ -313,16 +321,3 @@ where
}
}
}
-
-#[async_trait]
-impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- async fn handle(self: &Arc<Self>, msg: &TableRpc<F>, _from: NodeID) -> TableRpc<F> {
- self.handle_rpc(msg)
- .await
- .unwrap_or_else(|e| TableRpc::<F>::Error(format!("{}", e)))
- }
-}