diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/block.rs | 30 | ||||
-rw-r--r-- | src/model/garage.rs | 6 |
2 files changed, 19 insertions, 17 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 5574b7f6..a1dcf776 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -38,7 +38,6 @@ const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); #[derive(Debug, Serialize, Deserialize)] pub enum BlockRpc { Ok, - Error(String), /// Message to ask for a block of data, by hash GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new @@ -61,8 +60,8 @@ pub struct PutBlockMessage { pub data: Vec<u8>, } -impl Message for BlockRpc { - type Response = BlockRpc; +impl Rpc for BlockRpc { + type Response = Result<BlockRpc, Error>; } /// The block manager, handling block exchange between nodes, and block storage on local node @@ -117,15 +116,6 @@ impl BlockManager { block_manager } - async fn handle_rpc(self: Arc<Self>, msg: &BlockRpc) -> Result<BlockRpc, Error> { - match msg { - BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await, - BlockRpc::GetBlock(h) => self.read_block(h).await, - BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), - _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), - } - } - pub fn spawn_background_worker(self: Arc<Self>) { // Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this // launches only one worker with current value of BACKGROUND_WORKERS @@ -532,11 +522,17 @@ impl BlockManager { #[async_trait] impl EndpointHandler<BlockRpc> for BlockManager { - async fn handle(self: &Arc<Self>, message: &BlockRpc, _from: NodeID) -> BlockRpc { - self.clone() - .handle_rpc(message) - .await - .unwrap_or_else(|e| BlockRpc::Error(format!("{}", e))) + async fn handle( + self: &Arc<Self>, + message: &BlockRpc, + _from: NodeID, + ) -> Result<BlockRpc, Error> { + match message { + BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + BlockRpc::GetBlock(h) => self.read_block(h).await, + BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), + _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), + } } } diff --git a/src/model/garage.rs b/src/model/garage.rs index d4ea6f55..482c4df7 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -61,6 +61,7 @@ impl Garage { background.clone(), replication_mode.replication_factor(), config.rpc_bind_addr, + config.rpc_public_addr, config.bootstrap_peers.clone(), config.consul_host.clone(), config.consul_service_name.clone(), @@ -162,4 +163,9 @@ impl Garage { garage } + + /// Use this for shutdown + pub fn break_reference_cycles(&self) { + self.block_manager.garage.swap(None); + } } |