aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-15 11:05:09 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-22 16:55:24 +0200
commit1b450c4b493dfcb2ee88acbca3ea584beac8eb4b (patch)
treed6437f105a630fa197b67446b5c3b2902335c34a /src/model
parent4067797d0142ee7860aff8da95d65820d6cc0889 (diff)
downloadgarage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.tar.gz
garage-1b450c4b493dfcb2ee88acbca3ea584beac8eb4b.zip
Improvements to CLI and various fixes for netapp version
Discovery via consul, persist peer list to file
Diffstat (limited to 'src/model')
-rw-r--r--src/model/block.rs30
-rw-r--r--src/model/garage.rs6
2 files changed, 19 insertions, 17 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 5574b7f6..a1dcf776 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -38,7 +38,6 @@ const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc {
Ok,
- Error(String),
/// Message to ask for a block of data, by hash
GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new
@@ -61,8 +60,8 @@ pub struct PutBlockMessage {
pub data: Vec<u8>,
}
-impl Message for BlockRpc {
- type Response = BlockRpc;
+impl Rpc for BlockRpc {
+ type Response = Result<BlockRpc, Error>;
}
/// The block manager, handling block exchange between nodes, and block storage on local node
@@ -117,15 +116,6 @@ impl BlockManager {
block_manager
}
- async fn handle_rpc(self: Arc<Self>, msg: &BlockRpc) -> Result<BlockRpc, Error> {
- match msg {
- BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
- BlockRpc::GetBlock(h) => self.read_block(h).await,
- BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
- _ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
- }
- }
-
pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing <= TODO actually this
// launches only one worker with current value of BACKGROUND_WORKERS
@@ -532,11 +522,17 @@ impl BlockManager {
#[async_trait]
impl EndpointHandler<BlockRpc> for BlockManager {
- async fn handle(self: &Arc<Self>, message: &BlockRpc, _from: NodeID) -> BlockRpc {
- self.clone()
- .handle_rpc(message)
- .await
- .unwrap_or_else(|e| BlockRpc::Error(format!("{}", e)))
+ async fn handle(
+ self: &Arc<Self>,
+ message: &BlockRpc,
+ _from: NodeID,
+ ) -> Result<BlockRpc, Error> {
+ match message {
+ BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
+ BlockRpc::GetBlock(h) => self.read_block(h).await,
+ BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
+ _ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
+ }
}
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index d4ea6f55..482c4df7 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -61,6 +61,7 @@ impl Garage {
background.clone(),
replication_mode.replication_factor(),
config.rpc_bind_addr,
+ config.rpc_public_addr,
config.bootstrap_peers.clone(),
config.consul_host.clone(),
config.consul_service_name.clone(),
@@ -162,4 +163,9 @@ impl Garage {
garage
}
+
+ /// Use this for shutdown
+ pub fn break_reference_cycles(&self) {
+ self.block_manager.garage.swap(None);
+ }
}