aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-14 11:50:12 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-22 15:55:18 +0200
commit4067797d0142ee7860aff8da95d65820d6cc0889 (patch)
treea1c91ab5043c556bc7b369f6c447686fa782a64d /src/model/block.rs
parentdc017a0cab40cb2f33a01b420bb1b04038abb875 (diff)
downloadgarage-4067797d0142ee7860aff8da95d65820d6cc0889.tar.gz
garage-4067797d0142ee7860aff8da95d65820d6cc0889.zip
First port of Garage to Netapp
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs109
1 files changed, 59 insertions, 50 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 348f0711..5574b7f6 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -3,6 +3,7 @@ use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
+use async_trait::async_trait;
use futures::future::*;
use futures::select;
use serde::{Deserialize, Serialize};
@@ -14,9 +15,8 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_util::time::*;
-use garage_rpc::membership::System;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::system::System;
+use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
@@ -36,8 +36,9 @@ const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
-pub enum Message {
+pub enum BlockRpc {
Ok,
+ Error(String),
/// Message to ask for a block of data, by hash
GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new
@@ -60,7 +61,9 @@ pub struct PutBlockMessage {
pub data: Vec<u8>,
}
-impl RpcMessage for Message {}
+impl Message for BlockRpc {
+ type Response = BlockRpc;
+}
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
@@ -77,7 +80,7 @@ pub struct BlockManager {
resync_notify: Notify,
system: Arc<System>,
- rpc_client: Arc<RpcClient<Message>>,
+ endpoint: Arc<Endpoint<BlockRpc, Self>>,
pub(crate) garage: ArcSwapOption<Garage>,
}
@@ -87,7 +90,6 @@ impl BlockManager {
data_dir: PathBuf,
replication: TableShardedReplication,
system: Arc<System>,
- rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rc = db
.open_tree("block_local_rc")
@@ -97,8 +99,7 @@ impl BlockManager {
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
- let rpc_path = "block_manager";
- let rpc_client = system.rpc_client::<Message>(rpc_path);
+ let endpoint = system.netapp.endpoint(format!("garage_model/block.rs/Rpc"));
let block_manager = Arc::new(Self {
replication,
@@ -108,35 +109,19 @@ impl BlockManager {
resync_queue,
resync_notify: Notify::new(),
system,
- rpc_client,
+ endpoint,
garage: ArcSwapOption::from(None),
});
- block_manager
- .clone()
- .register_handler(rpc_server, rpc_path.into());
- block_manager
- }
-
- fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- let self2 = self.clone();
- rpc_server.add_handler::<Message, _, _>(path, move |msg, _addr| {
- let self2 = self2.clone();
- async move { self2.handle(&msg).await }
- });
+ block_manager.endpoint.set_handler(block_manager.clone());
- let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
- let self2 = self2.clone();
- async move { self2.handle(&msg).await }
- });
+ block_manager
}
- async fn handle(self: Arc<Self>, msg: &Message) -> Result<Message, Error> {
+ async fn handle_rpc(self: Arc<Self>, msg: &BlockRpc) -> Result<BlockRpc, Error> {
match msg {
- Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
- Message::GetBlock(h) => self.read_block(h).await,
- Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply),
+ BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
+ BlockRpc::GetBlock(h) => self.read_block(h).await,
+ BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
}
}
@@ -157,7 +142,7 @@ impl BlockManager {
}
/// Write a block to disk
- async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
+ async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<BlockRpc, Error> {
let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash);
@@ -165,18 +150,18 @@ impl BlockManager {
path.push(hex::encode(hash));
if fs::metadata(&path).await.is_ok() {
- return Ok(Message::Ok);
+ return Ok(BlockRpc::Ok);
}
let mut f = fs::File::create(path).await?;
f.write_all(data).await?;
drop(f);
- Ok(Message::Ok)
+ Ok(BlockRpc::Ok)
}
/// Read block from disk, verifying it's integrity
- async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
+ async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let path = self.block_path(hash);
let mut f = match fs::File::open(&path).await {
@@ -204,7 +189,7 @@ impl BlockManager {
return Err(Error::CorruptData(*hash));
}
- Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
+ Ok(BlockRpc::PutBlock(PutBlockMessage { hash: *hash, data }))
}
/// Check if this node should have a block, but don't actually have it
@@ -346,17 +331,22 @@ impl BlockManager {
}
who.retain(|id| *id != self.system.id);
- let msg = Arc::new(Message::NeedBlockQuery(*hash));
+ let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash));
let who_needs_fut = who.iter().map(|to| {
- self.rpc_client
- .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT)
+ self.system.rpc.call_arc(
+ &self.endpoint,
+ *to,
+ msg.clone(),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
+ )
});
let who_needs_resps = join_all(who_needs_fut).await;
let mut need_nodes = vec![];
for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) {
match needed? {
- Message::NeedBlockReply(needed) => {
+ BlockRpc::NeedBlockReply(needed) => {
if needed {
need_nodes.push(*node);
}
@@ -377,11 +367,14 @@ impl BlockManager {
);
let put_block_message = self.read_block(hash).await?;
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&need_nodes[..],
put_block_message,
- RequestStrategy::with_quorum(need_nodes.len())
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(need_nodes.len())
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
@@ -413,18 +406,21 @@ impl BlockManager {
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash);
let resps = self
- .rpc_client
+ .system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
- Message::GetBlock(*hash),
- RequestStrategy::with_quorum(1)
+ BlockRpc::GetBlock(*hash),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(1)
.with_timeout(BLOCK_RW_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
for resp in resps {
- if let Message::PutBlock(msg) = resp {
+ if let BlockRpc::PutBlock(msg) = resp {
return Ok(msg.data);
}
}
@@ -437,11 +433,14 @@ impl BlockManager {
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
- Message::PutBlock(PutBlockMessage { hash, data }),
- RequestStrategy::with_quorum(self.replication.write_quorum())
+ BlockRpc::PutBlock(PutBlockMessage { hash, data }),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
@@ -531,6 +530,16 @@ impl BlockManager {
}
}
+#[async_trait]
+impl EndpointHandler<BlockRpc> for BlockManager {
+ async fn handle(self: &Arc<Self>, message: &BlockRpc, _from: NodeID) -> BlockRpc {
+ self.clone()
+ .handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| BlockRpc::Error(format!("{}", e)))
+ }
+}
+
fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 {
assert!(bytes.as_ref().len() == 8);
let mut x8 = [0u8; 8];