diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/Cargo.toml | 3 | ||||
-rw-r--r-- | src/model/block.rs | 109 | ||||
-rw-r--r-- | src/model/garage.rs | 43 |
3 files changed, 78 insertions, 77 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 4d5d7f9d..a9ae5edf 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -17,6 +17,7 @@ garage_rpc = { version = "0.3.0", path = "../rpc" } garage_table = { version = "0.3.0", path = "../table" } garage_util = { version = "0.3.0", path = "../util" } +async-trait = "0.1.7" arc-swap = "1.0" hex = "0.4" log = "0.4" @@ -31,3 +32,5 @@ serde_bytes = "0.11" futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } + +netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } diff --git a/src/model/block.rs b/src/model/block.rs index 348f0711..5574b7f6 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::Duration; use arc_swap::ArcSwapOption; +use async_trait::async_trait; use futures::future::*; use futures::select; use serde::{Deserialize, Serialize}; @@ -14,9 +15,8 @@ use garage_util::data::*; use garage_util::error::Error; use garage_util::time::*; -use garage_rpc::membership::System; -use garage_rpc::rpc_client::*; -use garage_rpc::rpc_server::*; +use garage_rpc::system::System; +use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; @@ -36,8 +36,9 @@ const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] -pub enum Message { +pub enum BlockRpc { Ok, + Error(String), /// Message to ask for a block of data, by hash GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new @@ -60,7 +61,9 @@ pub struct PutBlockMessage { pub data: Vec<u8>, } -impl RpcMessage for Message {} +impl Message for BlockRpc { + type Response = BlockRpc; +} /// The block manager, handling block exchange between nodes, and block storage on local node pub struct BlockManager { @@ -77,7 +80,7 @@ pub struct BlockManager { resync_notify: Notify, system: Arc<System>, - rpc_client: Arc<RpcClient<Message>>, + endpoint: Arc<Endpoint<BlockRpc, Self>>, pub(crate) garage: ArcSwapOption<Garage>, } @@ -87,7 +90,6 @@ impl BlockManager { data_dir: PathBuf, replication: TableShardedReplication, system: Arc<System>, - rpc_server: &mut RpcServer, ) -> Arc<Self> { let rc = db .open_tree("block_local_rc") @@ -97,8 +99,7 @@ impl BlockManager { .open_tree("block_local_resync_queue") .expect("Unable to open block_local_resync_queue tree"); - let rpc_path = "block_manager"; - let rpc_client = system.rpc_client::<Message>(rpc_path); + let endpoint = system.netapp.endpoint(format!("garage_model/block.rs/Rpc")); let block_manager = Arc::new(Self { replication, @@ -108,35 +109,19 @@ impl BlockManager { resync_queue, resync_notify: Notify::new(), system, - rpc_client, + endpoint, garage: ArcSwapOption::from(None), }); - block_manager - .clone() - .register_handler(rpc_server, rpc_path.into()); - block_manager - } - - fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) { - let self2 = self.clone(); - rpc_server.add_handler::<Message, _, _>(path, move |msg, _addr| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); + block_manager.endpoint.set_handler(block_manager.clone()); - let self2 = self.clone(); - self.rpc_client - .set_local_handler(self.system.id, move |msg| { - let self2 = self2.clone(); - async move { self2.handle(&msg).await } - }); + block_manager } - async fn handle(self: Arc<Self>, msg: &Message) -> Result<Message, Error> { + async fn handle_rpc(self: Arc<Self>, msg: &BlockRpc) -> Result<BlockRpc, Error> { match msg { - Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, - Message::GetBlock(h) => self.read_block(h).await, - Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), + BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + BlockRpc::GetBlock(h) => self.read_block(h).await, + BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), } } @@ -157,7 +142,7 @@ impl BlockManager { } /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { + async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<BlockRpc, Error> { let _lock = self.data_dir_lock.lock().await; let mut path = self.block_dir(hash); @@ -165,18 +150,18 @@ impl BlockManager { path.push(hex::encode(hash)); if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok); + return Ok(BlockRpc::Ok); } let mut f = fs::File::create(path).await?; f.write_all(data).await?; drop(f); - Ok(Message::Ok) + Ok(BlockRpc::Ok) } /// Read block from disk, verifying it's integrity - async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { + async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { let path = self.block_path(hash); let mut f = match fs::File::open(&path).await { @@ -204,7 +189,7 @@ impl BlockManager { return Err(Error::CorruptData(*hash)); } - Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + Ok(BlockRpc::PutBlock(PutBlockMessage { hash: *hash, data })) } /// Check if this node should have a block, but don't actually have it @@ -346,17 +331,22 @@ impl BlockManager { } who.retain(|id| *id != self.system.id); - let msg = Arc::new(Message::NeedBlockQuery(*hash)); + let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash)); let who_needs_fut = who.iter().map(|to| { - self.rpc_client - .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT) + self.system.rpc.call_arc( + &self.endpoint, + *to, + msg.clone(), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_timeout(NEED_BLOCK_QUERY_TIMEOUT), + ) }); let who_needs_resps = join_all(who_needs_fut).await; let mut need_nodes = vec![]; for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { match needed? { - Message::NeedBlockReply(needed) => { + BlockRpc::NeedBlockReply(needed) => { if needed { need_nodes.push(*node); } @@ -377,11 +367,14 @@ impl BlockManager { ); let put_block_message = self.read_block(hash).await?; - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, &need_nodes[..], put_block_message, - RequestStrategy::with_quorum(need_nodes.len()) + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(need_nodes.len()) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; @@ -413,18 +406,21 @@ impl BlockManager { pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { let who = self.replication.read_nodes(&hash); let resps = self - .rpc_client + .system + .rpc .try_call_many( + &self.endpoint, &who[..], - Message::GetBlock(*hash), - RequestStrategy::with_quorum(1) + BlockRpc::GetBlock(*hash), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(1) .with_timeout(BLOCK_RW_TIMEOUT) .interrupt_after_quorum(true), ) .await?; for resp in resps { - if let Message::PutBlock(msg) = resp { + if let BlockRpc::PutBlock(msg) = resp { return Ok(msg.data); } } @@ -437,11 +433,14 @@ impl BlockManager { /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - self.rpc_client + self.system + .rpc .try_call_many( + &self.endpoint, &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), - RequestStrategy::with_quorum(self.replication.write_quorum()) + BlockRpc::PutBlock(PutBlockMessage { hash, data }), + RequestStrategy::with_priority(PRIO_NORMAL) + .with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; @@ -531,6 +530,16 @@ impl BlockManager { } } +#[async_trait] +impl EndpointHandler<BlockRpc> for BlockManager { + async fn handle(self: &Arc<Self>, message: &BlockRpc, _from: NodeID) -> BlockRpc { + self.clone() + .handle_rpc(message) + .await + .unwrap_or_else(|e| BlockRpc::Error(format!("{}", e))) + } +} + fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 { assert!(bytes.as_ref().len() == 8); let mut x8 = [0u8; 8]; diff --git a/src/model/garage.rs b/src/model/garage.rs index c3594934..d4ea6f55 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -1,11 +1,11 @@ use std::sync::Arc; +use netapp::NetworkKey; + use garage_util::background::*; use garage_util::config::*; -use garage_rpc::membership::System; -use garage_rpc::rpc_client::RpcHttpClient; -use garage_rpc::rpc_server::RpcServer; +use garage_rpc::system::System; use garage_table::replication::ReplicationMode; use garage_table::replication::TableFullReplication; @@ -45,26 +45,25 @@ pub struct Garage { impl Garage { /// Create and run garage - pub fn new( - config: Config, - db: sled::Db, - background: Arc<BackgroundRunner>, - rpc_server: &mut RpcServer, - ) -> Arc<Self> { + pub fn new(config: Config, db: sled::Db, background: Arc<BackgroundRunner>) -> Arc<Self> { + let network_key = NetworkKey::from_slice( + &hex::decode(&config.rpc_secret).expect("Invalid RPC secret key")[..], + ) + .expect("Invalid RPC secret key"); + let replication_mode = ReplicationMode::parse(&config.replication_mode) .expect("Invalid replication_mode in config file."); info!("Initialize membership management system..."); - let rpc_http_client = Arc::new( - RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls) - .expect("Could not create RPC client"), - ); let system = System::new( + network_key, config.metadata_dir.clone(), - rpc_http_client, background.clone(), - rpc_server, replication_mode.replication_factor(), + config.rpc_bind_addr, + config.bootstrap_peers.clone(), + config.consul_host.clone(), + config.consul_service_name.clone(), ); let data_rep_param = TableShardedReplication { @@ -87,13 +86,8 @@ impl Garage { }; info!("Initialize block manager..."); - let block_manager = BlockManager::new( - &db, - config.data_dir.clone(), - data_rep_param, - system.clone(), - rpc_server, - ); + let block_manager = + BlockManager::new(&db, config.data_dir.clone(), data_rep_param, system.clone()); info!("Initialize block_ref_table..."); let block_ref_table = Table::new( @@ -104,7 +98,6 @@ impl Garage { system.clone(), &db, "block_ref".to_string(), - rpc_server, ); info!("Initialize version_table..."); @@ -117,7 +110,6 @@ impl Garage { system.clone(), &db, "version".to_string(), - rpc_server, ); info!("Initialize object_table..."); @@ -130,7 +122,6 @@ impl Garage { system.clone(), &db, "object".to_string(), - rpc_server, ); info!("Initialize bucket_table..."); @@ -140,7 +131,6 @@ impl Garage { system.clone(), &db, "bucket".to_string(), - rpc_server, ); info!("Initialize key_table_table..."); @@ -150,7 +140,6 @@ impl Garage { system.clone(), &db, "key".to_string(), - rpc_server, ); info!("Initialize Garage..."); |