diff options
Diffstat (limited to 'src/model/block.rs')
-rw-r--r-- | src/model/block.rs | 109 |
1 files changed, 59 insertions, 50 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 348f0711..5574b7f6 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::Duration; use arc_swap::ArcSwapOption; +use async_trait::async_trait; use futures::future::*; use futures::select; use serde::{Deserialize, Serialize}; @@ -14,9 +15,8 @@ use garage_util::data::*; use garage_util::error::Error; use garage_util::time::*; -use garage_rpc::membership::System; -use garage_rpc::rpc_client::*; -use garage_rpc::rpc_server::*; +use garage_rpc::system::System; +use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; @@ -36,8 +36,9 @@ const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] -pub enum Message { +pub enum BlockRpc { Ok, + Error(String), /// Message to ask for a block of data, by hash GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new @@ -60,7 +61,9 @@ pub struct PutBlockMessage { pub data: Vec<u8>, } -impl RpcMessage for Message {} +impl Message for BlockRpc { + type Response = BlockRpc; +} /// The block manager, handling block exchange between nodes, and block storage on local node pub struct BlockManager { @@ -77,7 +80,7 @@ pub struct BlockManager { resync_notify: Notify, system: Arc<System>, - rpc_client: Arc<RpcClient<Message>>, + endpoint: Arc<Endpoint<BlockRpc, Self>>, pub(crate) garage: ArcSwapOption<Garage>, } @@ -87,7 +90,6 @@ impl BlockManager { data_dir: PathBuf, replication: TableShardedReplication, system: Arc<System>, - rpc_server: &mut RpcServer, ) -> Arc<Self> { let rc = db .open_tree("block_local_rc") @@ -97,8 +99,7 @@ impl BlockManager { .open_tree("block_local_resync_queue") .expect("Unable to open block_local_resync_queue tree"); - let rpc_path = "block_manager"; - let rpc_client = system.rpc_client::<Message>(rpc_path); + let endpoint = system.netapp.endpoint(format!("garage_model/block.rs/Rpc")); let block_manager = Arc::new(Self { replication, @@ -108,35 +109,19 @@ impl BlockManager { resync_queue, resync_notify: Notify::new(), system, - rpc_client, + endpoint, garage: ArcSwapOption::from(None), }); - block_manager - .clone() - .register_handler(rpc_server, rpc_path.into()); - block_manager - } - - fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) { - let self2 = self.clone(); - rpc_server.add_handler::<Message, _, _>(path, move |msg, _addr| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); + block_manager.endpoint.set_handler(block_manager.clone()); - let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); + block_manager } - async fn handle(self: Arc<Self>, msg: &Message) -> Result<Message, Error> { + async fn handle_rpc(self: Arc<Self>, msg: &BlockRpc) -> Result<BlockRpc, Error> { match msg { - Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, - Message::GetBlock(h) => self.read_block(h).await, - Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), + BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + BlockRpc::GetBlock(h) => self.read_block(h).await, + BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), } } @@ -157,7 +142,7 @@ impl BlockManager { } /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { + async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<BlockRpc, Error> { let _lock = self.data_dir_lock.lock().await; let mut path = self.block_dir(hash); @@ -165,18 +150,18 @@ impl BlockManager { path.push(hex::encode(hash)); if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok); + return Ok(BlockRpc::Ok); } let mut f = fs::File::create(path).await?; f.write_all(data).await?; drop(f); - Ok(Message::Ok) + Ok(BlockRpc::Ok) } /// Read block from disk, verifying it's integrity - async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { + async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { let path = self.block_path(hash); let mut f = match fs::File::open(&path).await { @@ -204,7 +189,7 @@ impl BlockManager { return Err(Error::CorruptData(*hash)); } - Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + Ok(BlockRpc::PutBlock(PutBlockMessage { hash: *hash, data })) } /// Check if this node should have a block, but don't actually have it @@ -346,17 +331,22 @@ impl BlockManager { } who.retain(|id| *id != self.system.id); - let msg = Arc::new(Message::NeedBlockQuery(*hash)); + let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash)); let who_needs_fut = who.iter().map(|to| { - self.rpc_client - .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT) + self.system.rpc.call_arc( + &self.endpoint, + *to, + msg.clone(), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_timeout(NEED_BLOCK_QUERY_TIMEOUT), + ) }); let who_needs_resps = join_all(who_needs_fut).await; let mut need_nodes = vec![]; for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { match needed? { - Message::NeedBlockReply(needed) => { + BlockRpc::NeedBlockReply(needed) => { if needed { need_nodes.push(*node); } @@ -377,11 +367,14 @@ impl BlockManager { ); let put_block_message = self.read_block(hash).await?; - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, &need_nodes[..], put_block_message, - RequestStrategy::with_quorum(need_nodes.len()) + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(need_nodes.len()) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; @@ -413,18 +406,21 @@ impl BlockManager { pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { let who = self.replication.read_nodes(&hash); let resps = self - .rpc_client + .system + .rpc .try_call_many( + &self.endpoint, &who[..], - Message::GetBlock(*hash), - RequestStrategy::with_quorum(1) + BlockRpc::GetBlock(*hash), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(1) .with_timeout(BLOCK_RW_TIMEOUT) .interrupt_after_quorum(true), ) .await?; for resp in resps { - if let Message::PutBlock(msg) = resp { + if let BlockRpc::PutBlock(msg) = resp { return Ok(msg.data); } } @@ -437,11 +433,14 @@ impl BlockManager { /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), - RequestStrategy::with_quorum(self.replication.write_quorum()) + BlockRpc::PutBlock(PutBlockMessage { hash, data }), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; @@ -531,6 +530,16 @@ impl BlockManager { } } +#[async_trait] +impl EndpointHandler<BlockRpc> for BlockManager { + async fn handle(self: &Arc<Self>, message: &BlockRpc, _from: NodeID) -> BlockRpc { + self.clone() + .handle_rpc(message) + .await + .unwrap_or_else(|e| BlockRpc::Error(format!("{}", e))) + } +} + fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 { assert!(bytes.as_ref().len() == 8); let mut x8 = [0u8; 8]; |