aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/Cargo.toml8
-rw-r--r--src/table/data.rs2
-rw-r--r--src/table/gc.rs80
-rw-r--r--src/table/replication/fullcopy.rs13
-rw-r--r--src/table/replication/parameters.rs6
-rw-r--r--src/table/replication/sharded.rs7
-rw-r--r--src/table/sync.rs97
-rw-r--r--src/table/table.rs110
8 files changed, 186 insertions, 137 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index ccbd1748..616bf275 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "garage_table"
-version = "0.3.0"
+version = "0.4.0"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -13,9 +13,10 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
-garage_rpc = { version = "0.3.0", path = "../rpc" }
-garage_util = { version = "0.3.0", path = "../util" }
+garage_rpc = { version = "0.4.0", path = "../rpc" }
+garage_util = { version = "0.4.0", path = "../util" }
+async-trait = "0.1.7"
bytes = "1.0"
hexdump = "0.1"
log = "0.4"
@@ -30,4 +31,3 @@ serde_bytes = "0.11"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-
diff --git a/src/table/data.rs b/src/table/data.rs
index e7e85e65..ffd494d5 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -9,7 +9,7 @@ use tokio::sync::Notify;
use garage_util::data::*;
use garage_util::error::*;
-use garage_rpc::membership::System;
+use garage_rpc::system::System;
use crate::crdt::Crdt;
use crate::replication::*;
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 73e08827..c03648ef 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
+use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -13,9 +14,8 @@ use tokio::sync::watch;
use garage_util::data::*;
use garage_util::error::Error;
-use garage_rpc::membership::System;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::system::System;
+use garage_rpc::*;
use crate::data::*;
use crate::replication::*;
@@ -24,11 +24,11 @@ use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-pub struct TableGc<F: TableSchema, R: TableReplication> {
+pub struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
- rpc_client: Arc<RpcClient<GcRpc>>,
+ endpoint: Arc<Endpoint<GcRpc, Self>>,
}
#[derive(Serialize, Deserialize)]
@@ -36,30 +36,30 @@ enum GcRpc {
Update(Vec<ByteBuf>),
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
Ok,
+ Error(String),
}
-impl RpcMessage for GcRpc {}
+impl Message for GcRpc {
+ type Response = GcRpc;
+}
impl<F, R> TableGc<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(
- system: Arc<System>,
- data: Arc<TableData<F, R>>,
- rpc_server: &mut RpcServer,
- ) -> Arc<Self> {
- let rpc_path = format!("table_{}/gc", data.name);
- let rpc_client = system.rpc_client::<GcRpc>(&rpc_path);
+ pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
+ let endpoint = system
+ .netapp
+ .endpoint(format!("garage_table/gc.rs/Rpc:{}", data.name));
let gc = Arc::new(Self {
system: system.clone(),
data: data.clone(),
- rpc_client,
+ endpoint,
});
- gc.register_handler(rpc_server, rpc_path);
+ gc.endpoint.set_handler(gc.clone());
let gc1 = gc.clone();
system.background.spawn_worker(
@@ -168,7 +168,7 @@ where
async fn try_send_and_delete(
&self,
- nodes: Vec<Uuid>,
+ nodes: Vec<NodeID>,
items: Vec<(ByteBuf, Hash, ByteBuf)>,
) -> Result<(), Error> {
let n_items = items.len();
@@ -180,11 +180,15 @@ where
deletes.push((k, vhash));
}
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&nodes[..],
GcRpc::Update(updates),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_quorum(nodes.len())
+ .with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
@@ -193,11 +197,15 @@ where
self.data.name, n_items
);
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&nodes[..],
GcRpc::DeleteIfEqualHash(deletes.clone()),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_quorum(nodes.len())
+ .with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
@@ -217,24 +225,7 @@ where
Ok(())
}
- // ---- RPC HANDLER ----
-
- fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- let self2 = self.clone();
- rpc_server.add_handler::<GcRpc, _, _>(path, move |msg, _addr| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
-
- let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
- }
-
- async fn handle_rpc(self: &Arc<Self>, message: &GcRpc) -> Result<GcRpc, Error> {
+ async fn handle_rpc(&self, message: &GcRpc) -> Result<GcRpc, Error> {
match message {
GcRpc::Update(items) => {
self.data.update_many(items)?;
@@ -251,3 +242,16 @@ where
}
}
}
+
+#[async_trait]
+impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> GcRpc {
+ self.handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| GcRpc::Error(format!("{}", e)))
+ }
+}
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index 3ce7c0bf..b41c5360 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -1,7 +1,8 @@
use std::sync::Arc;
-use garage_rpc::membership::System;
use garage_rpc::ring::*;
+use garage_rpc::system::System;
+use garage_rpc::NodeID;
use garage_util::data::*;
use crate::replication::*;
@@ -19,16 +20,20 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
- fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
+ fn read_nodes(&self, _hash: &Hash) -> Vec<NodeID> {
vec![self.system.id]
}
fn read_quorum(&self) -> usize {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
+ fn write_nodes(&self, _hash: &Hash) -> Vec<NodeID> {
let ring = self.system.ring.borrow();
- ring.config.members.keys().cloned().collect::<Vec<_>>()
+ ring.config
+ .members
+ .keys()
+ .map(|id| NodeID::from_slice(id.as_slice()).unwrap())
+ .collect::<Vec<_>>()
}
fn write_quorum(&self) -> usize {
let nmembers = self.system.ring.borrow().config.members.len();
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 64996828..7fdfce67 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -1,5 +1,5 @@
use garage_rpc::ring::*;
-
+use garage_rpc::NodeID;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
@@ -8,12 +8,12 @@ pub trait TableReplication: Send + Sync {
// To understand various replication methods
/// Which nodes to send read requests to
- fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
+ fn read_nodes(&self, hash: &Hash) -> Vec<NodeID>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
+ fn write_nodes(&self, hash: &Hash) -> Vec<NodeID>;
/// Responses needed to consider a write succesfull
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 8081b892..ffe686a5 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -1,7 +1,8 @@
use std::sync::Arc;
-use garage_rpc::membership::System;
use garage_rpc::ring::*;
+use garage_rpc::system::System;
+use garage_rpc::NodeID;
use garage_util::data::*;
use crate::replication::*;
@@ -25,7 +26,7 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
- fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
+ fn read_nodes(&self, hash: &Hash) -> Vec<NodeID> {
let ring = self.system.ring.borrow();
ring.get_nodes(&hash, self.replication_factor)
}
@@ -33,7 +34,7 @@ impl TableReplication for TableShardedReplication {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
+ fn write_nodes(&self, hash: &Hash) -> Vec<NodeID> {
let ring = self.system.ring.borrow();
ring.get_nodes(&hash, self.replication_factor)
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index a3afbbba..c5db0987 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
+use async_trait::async_trait;
use futures::select;
use futures_util::future::*;
use futures_util::stream::*;
@@ -13,10 +14,9 @@ use tokio::sync::{mpsc, watch};
use garage_util::data::*;
use garage_util::error::Error;
-use garage_rpc::membership::System;
use garage_rpc::ring::*;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::system::System;
+use garage_rpc::*;
use crate::data::*;
use crate::merkle::*;
@@ -28,13 +28,13 @@ const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
-pub struct TableSyncer<F: TableSchema, R: TableReplication> {
+pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
todo: Mutex<SyncTodo>,
- rpc_client: Arc<RpcClient<SyncRpc>>,
+ endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
#[derive(Serialize, Deserialize)]
@@ -45,9 +45,12 @@ pub(crate) enum SyncRpc {
Node(MerkleNodeKey, MerkleNode),
Items(Vec<Arc<ByteBuf>>),
Ok,
+ Error(String),
}
-impl RpcMessage for SyncRpc {}
+impl Message for SyncRpc {
+ type Response = SyncRpc;
+}
struct SyncTodo {
todo: Vec<TodoPartition>,
@@ -72,10 +75,10 @@ where
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let rpc_path = format!("table_{}/sync", data.name);
- let rpc_client = system.rpc_client::<SyncRpc>(&rpc_path);
+ let endpoint = system
+ .netapp
+ .endpoint(format!("garage_table/sync.rs/Rpc:{}", data.name));
let todo = SyncTodo { todo: vec![] };
@@ -84,10 +87,10 @@ where
data: data.clone(),
merkle,
todo: Mutex::new(todo),
- rpc_client,
+ endpoint,
});
- syncer.register_handler(rpc_server, rpc_path);
+ syncer.endpoint.set_handler(syncer.clone());
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
@@ -112,21 +115,6 @@ where
syncer
}
- fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- let self2 = self.clone();
- rpc_server.add_handler::<SyncRpc, _, _>(path, move |msg, _addr| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
-
- let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
- let self2 = self2.clone();
- async move { self2.handle_rpc(&msg).await }
- });
- }
-
async fn watcher_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
@@ -317,15 +305,19 @@ where
async fn offload_items(
self: &Arc<Self>,
items: &[(Vec<u8>, Arc<ByteBuf>)],
- nodes: &[Uuid],
+ nodes: &[NodeID],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
nodes,
SyncRpc::Items(values),
- RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_quorum(nodes.len())
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
)
.await?;
@@ -362,7 +354,7 @@ where
async fn do_sync_with(
self: Arc<Self>,
partition: TodoPartition,
- who: Uuid,
+ who: NodeID,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
@@ -378,11 +370,14 @@ where
// Check if they have the same root checksum
// If so, do nothing.
let root_resp = self
- .rpc_client
+ .system
+ .rpc
.call(
+ &self.endpoint,
who,
SyncRpc::RootCkHash(partition.partition, root_ck_hash),
- TABLE_SYNC_RPC_TIMEOUT,
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
)
.await?;
@@ -430,8 +425,15 @@ where
// Get Merkle node for this tree position at remote node
// and compare it with local node
let remote_node = match self
- .rpc_client
- .call(who, SyncRpc::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
+ .system
+ .rpc
+ .call(
+ &self.endpoint,
+ who,
+ SyncRpc::GetNode(key.clone()),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ )
.await?
{
SyncRpc::Node(_, node) => node,
@@ -478,7 +480,7 @@ where
Ok(())
}
- async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ async fn send_items(&self, who: NodeID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
self.data.name,
@@ -492,8 +494,15 @@ where
.collect::<Vec<_>>();
let rpc_resp = self
- .rpc_client
- .call(who, SyncRpc::Items(values), TABLE_SYNC_RPC_TIMEOUT)
+ .system
+ .rpc
+ .call(
+ &self.endpoint,
+ who,
+ SyncRpc::Items(values),
+ RequestStrategy::with_priority(PRIO_BACKGROUND)
+ .with_timeout(TABLE_SYNC_RPC_TIMEOUT),
+ )
.await?;
if let SyncRpc::Ok = rpc_resp {
Ok(())
@@ -506,7 +515,6 @@ where
}
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
-
async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
@@ -527,6 +535,19 @@ where
}
}
+#[async_trait]
+impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, message: &SyncRpc, _from: NodeID) -> SyncRpc {
+ self.handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| SyncRpc::Error(format!("{}", e)))
+ }
+}
+
impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self,
diff --git a/src/table/table.rs b/src/table/table.rs
index eb9bd25c..ad263343 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
+use async_trait::async_trait;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
@@ -9,9 +10,8 @@ use serde_bytes::ByteBuf;
use garage_util::data::*;
use garage_util::error::Error;
-use garage_rpc::membership::System;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::system::System;
+use garage_rpc::*;
use crate::crdt::Crdt;
use crate::data::*;
@@ -23,17 +23,18 @@ use crate::sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
-pub struct Table<F: TableSchema, R: TableReplication> {
+pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub system: Arc<System>,
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
- rpc_client: Arc<RpcClient<TableRpc<F>>>,
+ endpoint: Arc<Endpoint<TableRpc<F>, Self>>,
}
#[derive(Serialize, Deserialize)]
pub(crate) enum TableRpc<F: TableSchema> {
Ok,
+ Error(String),
ReadEntry(F::P, F::S),
ReadEntryResponse(Option<ByteBuf>),
@@ -44,7 +45,9 @@ pub(crate) enum TableRpc<F: TableSchema> {
Update(Vec<Arc<ByteBuf>>),
}
-impl<F: TableSchema> RpcMessage for TableRpc<F> {}
+impl<F: TableSchema> Message for TableRpc<F> {
+ type Response = TableRpc<F>;
+}
impl<F, R> Table<F, R>
where
@@ -59,32 +62,27 @@ where
system: Arc<System>,
db: &sled::Db,
name: String,
- rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let rpc_path = format!("table_{}", name);
- let rpc_client = system.rpc_client::<TableRpc<F>>(&rpc_path);
+ let endpoint = system
+ .netapp
+ .endpoint(format!("garage_table/table.rs/Rpc:{}", name));
let data = TableData::new(system.clone(), name, instance, replication, db);
let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
- let syncer = TableSyncer::launch(
- system.clone(),
- data.clone(),
- merkle_updater.clone(),
- rpc_server,
- );
- TableGc::launch(system.clone(), data.clone(), rpc_server);
+ let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone());
+ TableGc::launch(system.clone(), data.clone());
let table = Arc::new(Self {
system,
data,
merkle_updater,
syncer,
- rpc_client,
+ endpoint,
});
- table.clone().register_handler(rpc_server, rpc_path);
+ table.endpoint.set_handler(table.clone());
table
}
@@ -97,11 +95,14 @@ where
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
rpc,
- RequestStrategy::with_quorum(self.data.replication.write_quorum())
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.data.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -123,7 +124,16 @@ where
let call_futures = call_list.drain().map(|(node, entries)| async move {
let rpc = TableRpc::<F>::Update(entries);
- let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
+ let resp = self
+ .system
+ .rpc
+ .call(
+ &self.endpoint,
+ node,
+ rpc,
+ RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT),
+ )
+ .await?;
Ok::<_, Error>((node, resp))
});
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
@@ -152,11 +162,14 @@ where
let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
- .rpc_client
+ .system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
rpc,
- RequestStrategy::with_quorum(self.data.replication.read_quorum())
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -208,11 +221,14 @@ where
let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
let resps = self
- .rpc_client
+ .system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
rpc,
- RequestStrategy::with_quorum(self.data.replication.read_quorum())
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
@@ -261,36 +277,25 @@ where
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
- async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
+ async fn repair_on_read(&self, who: &[NodeID], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
who,
TableRpc::<F>::Update(vec![what_enc]),
- RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(who.len())
+ .with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
Ok(())
}
- // =============== HANDLERS FOR RPC OPERATIONS (SERVER SIDE) ==============
-
- fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- let self2 = self.clone();
- rpc_server.add_handler::<TableRpc<F>, _, _>(path, move |msg, _addr| {
- let self2 = self2.clone();
- async move { self2.handle(&msg).await }
- });
-
- let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
- let self2 = self2.clone();
- async move { self2.handle(&msg).await }
- });
- }
-
- async fn handle(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> {
+ // ====== RPC HANDLER =====
+ //
+ async fn handle_rpc(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> {
match msg {
TableRpc::ReadEntry(key, sort_key) => {
let value = self.data.read_entry(key, sort_key)?;
@@ -308,3 +313,16 @@ where
}
}
}
+
+#[async_trait]
+impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
+where
+ F: TableSchema + 'static,
+ R: TableReplication + 'static,
+{
+ async fn handle(self: &Arc<Self>, msg: &TableRpc<F>, _from: NodeID) -> TableRpc<F> {
+ self.handle_rpc(msg)
+ .await
+ .unwrap_or_else(|e| TableRpc::<F>::Error(format!("{}", e)))
+ }
+}