aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/Cargo.toml3
-rw-r--r--src/model/block.rs109
-rw-r--r--src/model/garage.rs43
3 files changed, 78 insertions, 77 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 4d5d7f9d..a9ae5edf 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -17,6 +17,7 @@ garage_rpc = { version = "0.3.0", path = "../rpc" }
garage_table = { version = "0.3.0", path = "../table" }
garage_util = { version = "0.3.0", path = "../util" }
+async-trait = "0.1.7"
arc-swap = "1.0"
hex = "0.4"
log = "0.4"
@@ -31,3 +32,5 @@ serde_bytes = "0.11"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
+
+netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
diff --git a/src/model/block.rs b/src/model/block.rs
index 348f0711..5574b7f6 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -3,6 +3,7 @@ use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
+use async_trait::async_trait;
use futures::future::*;
use futures::select;
use serde::{Deserialize, Serialize};
@@ -14,9 +15,8 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_util::time::*;
-use garage_rpc::membership::System;
-use garage_rpc::rpc_client::*;
-use garage_rpc::rpc_server::*;
+use garage_rpc::system::System;
+use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
@@ -36,8 +36,9 @@ const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
-pub enum Message {
+pub enum BlockRpc {
Ok,
+ Error(String),
/// Message to ask for a block of data, by hash
GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new
@@ -60,7 +61,9 @@ pub struct PutBlockMessage {
pub data: Vec<u8>,
}
-impl RpcMessage for Message {}
+impl Message for BlockRpc {
+ type Response = BlockRpc;
+}
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
@@ -77,7 +80,7 @@ pub struct BlockManager {
resync_notify: Notify,
system: Arc<System>,
- rpc_client: Arc<RpcClient<Message>>,
+ endpoint: Arc<Endpoint<BlockRpc, Self>>,
pub(crate) garage: ArcSwapOption<Garage>,
}
@@ -87,7 +90,6 @@ impl BlockManager {
data_dir: PathBuf,
replication: TableShardedReplication,
system: Arc<System>,
- rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rc = db
.open_tree("block_local_rc")
@@ -97,8 +99,7 @@ impl BlockManager {
.open_tree("block_local_resync_queue")
.expect("Unable to open block_local_resync_queue tree");
- let rpc_path = "block_manager";
- let rpc_client = system.rpc_client::<Message>(rpc_path);
+ let endpoint = system.netapp.endpoint(format!("garage_model/block.rs/Rpc"));
let block_manager = Arc::new(Self {
replication,
@@ -108,35 +109,19 @@ impl BlockManager {
resync_queue,
resync_notify: Notify::new(),
system,
- rpc_client,
+ endpoint,
garage: ArcSwapOption::from(None),
});
- block_manager
- .clone()
- .register_handler(rpc_server, rpc_path.into());
- block_manager
- }
-
- fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
- let self2 = self.clone();
- rpc_server.add_handler::<Message, _, _>(path, move |msg, _addr| {
- let self2 = self2.clone();
- async move { self2.handle(&msg).await }
- });
+ block_manager.endpoint.set_handler(block_manager.clone());
- let self2 = self.clone();
- self.rpc_client
- .set_local_handler(self.system.id, move |msg| {
- let self2 = self2.clone();
- async move { self2.handle(&msg).await }
- });
+ block_manager
}
- async fn handle(self: Arc<Self>, msg: &Message) -> Result<Message, Error> {
+ async fn handle_rpc(self: Arc<Self>, msg: &BlockRpc) -> Result<BlockRpc, Error> {
match msg {
- Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
- Message::GetBlock(h) => self.read_block(h).await,
- Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply),
+ BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
+ BlockRpc::GetBlock(h) => self.read_block(h).await,
+ BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
}
}
@@ -157,7 +142,7 @@ impl BlockManager {
}
/// Write a block to disk
- async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
+ async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<BlockRpc, Error> {
let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash);
@@ -165,18 +150,18 @@ impl BlockManager {
path.push(hex::encode(hash));
if fs::metadata(&path).await.is_ok() {
- return Ok(Message::Ok);
+ return Ok(BlockRpc::Ok);
}
let mut f = fs::File::create(path).await?;
f.write_all(data).await?;
drop(f);
- Ok(Message::Ok)
+ Ok(BlockRpc::Ok)
}
/// Read block from disk, verifying it's integrity
- async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
+ async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let path = self.block_path(hash);
let mut f = match fs::File::open(&path).await {
@@ -204,7 +189,7 @@ impl BlockManager {
return Err(Error::CorruptData(*hash));
}
- Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
+ Ok(BlockRpc::PutBlock(PutBlockMessage { hash: *hash, data }))
}
/// Check if this node should have a block, but don't actually have it
@@ -346,17 +331,22 @@ impl BlockManager {
}
who.retain(|id| *id != self.system.id);
- let msg = Arc::new(Message::NeedBlockQuery(*hash));
+ let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash));
let who_needs_fut = who.iter().map(|to| {
- self.rpc_client
- .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT)
+ self.system.rpc.call_arc(
+ &self.endpoint,
+ *to,
+ msg.clone(),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
+ )
});
let who_needs_resps = join_all(who_needs_fut).await;
let mut need_nodes = vec![];
for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) {
match needed? {
- Message::NeedBlockReply(needed) => {
+ BlockRpc::NeedBlockReply(needed) => {
if needed {
need_nodes.push(*node);
}
@@ -377,11 +367,14 @@ impl BlockManager {
);
let put_block_message = self.read_block(hash).await?;
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&need_nodes[..],
put_block_message,
- RequestStrategy::with_quorum(need_nodes.len())
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(need_nodes.len())
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
@@ -413,18 +406,21 @@ impl BlockManager {
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(&hash);
let resps = self
- .rpc_client
+ .system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
- Message::GetBlock(*hash),
- RequestStrategy::with_quorum(1)
+ BlockRpc::GetBlock(*hash),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(1)
.with_timeout(BLOCK_RW_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
for resp in resps {
- if let Message::PutBlock(msg) = resp {
+ if let BlockRpc::PutBlock(msg) = resp {
return Ok(msg.data);
}
}
@@ -437,11 +433,14 @@ impl BlockManager {
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
- self.rpc_client
+ self.system
+ .rpc
.try_call_many(
+ &self.endpoint,
&who[..],
- Message::PutBlock(PutBlockMessage { hash, data }),
- RequestStrategy::with_quorum(self.replication.write_quorum())
+ BlockRpc::PutBlock(PutBlockMessage { hash, data }),
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
@@ -531,6 +530,16 @@ impl BlockManager {
}
}
+#[async_trait]
+impl EndpointHandler<BlockRpc> for BlockManager {
+ async fn handle(self: &Arc<Self>, message: &BlockRpc, _from: NodeID) -> BlockRpc {
+ self.clone()
+ .handle_rpc(message)
+ .await
+ .unwrap_or_else(|e| BlockRpc::Error(format!("{}", e)))
+ }
+}
+
fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 {
assert!(bytes.as_ref().len() == 8);
let mut x8 = [0u8; 8];
diff --git a/src/model/garage.rs b/src/model/garage.rs
index c3594934..d4ea6f55 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -1,11 +1,11 @@
use std::sync::Arc;
+use netapp::NetworkKey;
+
use garage_util::background::*;
use garage_util::config::*;
-use garage_rpc::membership::System;
-use garage_rpc::rpc_client::RpcHttpClient;
-use garage_rpc::rpc_server::RpcServer;
+use garage_rpc::system::System;
use garage_table::replication::ReplicationMode;
use garage_table::replication::TableFullReplication;
@@ -45,26 +45,25 @@ pub struct Garage {
impl Garage {
/// Create and run garage
- pub fn new(
- config: Config,
- db: sled::Db,
- background: Arc<BackgroundRunner>,
- rpc_server: &mut RpcServer,
- ) -> Arc<Self> {
+ pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> {
+ let network_key = NetworkKey::from_slice(
+ &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..],
+ )
+ .expect("Invalid RPC secret key");
+
let replication_mode = ReplicationMode::parse(&config.replication_mode)
.expect("Invalid replication_mode in config file.");
info!("Initialize membership management system...");
- let rpc_http_client = Arc::new(
- RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls)
- .expect("Could not create RPC client"),
- );
let system = System::new(
+ network_key,
config.metadata_dir.clone(),
- rpc_http_client,
background.clone(),
- rpc_server,
replication_mode.replication_factor(),
+ config.rpc_bind_addr,
+ config.bootstrap_peers.clone(),
+ config.consul_host.clone(),
+ config.consul_service_name.clone(),
);
let data_rep_param = TableShardedReplication {
@@ -87,13 +86,8 @@ impl Garage {
};
info!("Initialize block manager...");
- let block_manager = BlockManager::new(
- &db,
- config.data_dir.clone(),
- data_rep_param,
- system.clone(),
- rpc_server,
- );
+ let block_manager =
+ BlockManager::new(&db, config.data_dir.clone(), data_rep_param, system.clone());
info!("Initialize block_ref_table...");
let block_ref_table = Table::new(
@@ -104,7 +98,6 @@ impl Garage {
system.clone(),
&db,
"block_ref".to_string(),
- rpc_server,
);
info!("Initialize version_table...");
@@ -117,7 +110,6 @@ impl Garage {
system.clone(),
&db,
"version".to_string(),
- rpc_server,
);
info!("Initialize object_table...");
@@ -130,7 +122,6 @@ impl Garage {
system.clone(),
&db,
"object".to_string(),
- rpc_server,
);
info!("Initialize bucket_table...");
@@ -140,7 +131,6 @@ impl Garage {
system.clone(),
&db,
"bucket".to_string(),
- rpc_server,
);
info!("Initialize key_table_table...");
@@ -150,7 +140,6 @@ impl Garage {
system.clone(),
&db,
"key".to_string(),
- rpc_server,
);
info!("Initialize Garage...");