aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/admin_rpc.rs15
-rw-r--r--src/api/api_server.rs (renamed from src/api_server.rs)18
-rw-r--r--src/api/http_util.rs (renamed from src/http_util.rs)0
-rw-r--r--src/api/mod.rs2
-rw-r--r--src/config.rs66
-rw-r--r--src/error.rs2
-rw-r--r--src/main.rs28
-rw-r--r--src/rpc/membership.rs (renamed from src/membership.rs)66
-rw-r--r--src/rpc/mod.rs4
-rw-r--r--src/rpc/rpc_client.rs (renamed from src/rpc_client.rs)10
-rw-r--r--src/rpc/rpc_server.rs (renamed from src/rpc_server.rs)5
-rw-r--r--src/rpc/tls_util.rs (renamed from src/tls_util.rs)0
-rw-r--r--src/server.rs168
-rw-r--r--src/store/block.rs (renamed from src/block.rs)28
-rw-r--r--src/store/block_ref_table.rs (renamed from src/block_ref_table.rs)3
-rw-r--r--src/store/bucket_table.rs (renamed from src/bucket_table.rs)0
-rw-r--r--src/store/mod.rs5
-rw-r--r--src/store/object_table.rs (renamed from src/object_table.rs)5
-rw-r--r--src/store/version_table.rs (renamed from src/version_table.rs)5
-rw-r--r--src/table/mod.rs6
-rw-r--r--src/table/table.rs (renamed from src/table.rs)10
-rw-r--r--src/table/table_fullcopy.rs (renamed from src/table_fullcopy.rs)2
-rw-r--r--src/table/table_sharded.rs (renamed from src/table_sharded.rs)2
-rw-r--r--src/table/table_sync.rs (renamed from src/table_sync.rs)2
24 files changed, 249 insertions, 203 deletions
diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs
index 458df360..fe59f92e 100644
--- a/src/admin_rpc.rs
+++ b/src/admin_rpc.rs
@@ -5,15 +5,18 @@ use tokio::sync::watch;
use crate::data::*;
use crate::error::Error;
-use crate::rpc_client::*;
-use crate::rpc_server::*;
use crate::server::Garage;
+
use crate::table::*;
-use crate::*;
-use crate::block_ref_table::*;
-use crate::bucket_table::*;
-use crate::version_table::*;
+use crate::rpc::rpc_client::*;
+use crate::rpc::rpc_server::*;
+
+use crate::store::block_ref_table::*;
+use crate::store::bucket_table::*;
+use crate::store::version_table::*;
+
+use crate::*;
pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub const ADMIN_RPC_PATH: &str = "_admin";
diff --git a/src/api_server.rs b/src/api/api_server.rs
index f4bb4177..a80b2ea2 100644
--- a/src/api_server.rs
+++ b/src/api/api_server.rs
@@ -11,14 +11,16 @@ use hyper::{Body, Method, Request, Response, Server, StatusCode};
use crate::data::*;
use crate::error::Error;
-use crate::http_util::*;
+use crate::server::Garage;
+
use crate::table::EmptyKey;
-use crate::block::INLINE_THRESHOLD;
-use crate::block_ref_table::*;
-use crate::object_table::*;
-use crate::server::Garage;
-use crate::version_table::*;
+use crate::store::block::INLINE_THRESHOLD;
+use crate::store::block_ref_table::*;
+use crate::store::object_table::*;
+use crate::store::version_table::*;
+
+use crate::api::http_util::*;
type BodyType = Box<dyn HttpBody<Data = Bytes, Error = Error> + Send + Unpin>;
@@ -26,7 +28,7 @@ pub async fn run_api_server(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), Error> {
- let addr = &garage.system.config.api_bind_addr;
+ let addr = &garage.config.api_bind_addr;
let service = make_service_fn(|conn: &AddrStream| {
let garage = garage.clone();
@@ -111,7 +113,7 @@ async fn handle_put(
) -> Result<UUID, Error> {
let version_uuid = gen_uuid();
- let mut chunker = BodyChunker::new(body, garage.system.config.block_size);
+ let mut chunker = BodyChunker::new(body, garage.config.block_size);
let first_block = match chunker.next().await? {
Some(x) => x,
None => return Err(Error::BadRequest(format!("Empty body"))),
diff --git a/src/http_util.rs b/src/api/http_util.rs
index 228448f0..228448f0 100644
--- a/src/http_util.rs
+++ b/src/api/http_util.rs
diff --git a/src/api/mod.rs b/src/api/mod.rs
new file mode 100644
index 00000000..8e62d1e7
--- /dev/null
+++ b/src/api/mod.rs
@@ -0,0 +1,2 @@
+pub mod api_server;
+pub mod http_util;
diff --git a/src/config.rs b/src/config.rs
new file mode 100644
index 00000000..7a6ae3f2
--- /dev/null
+++ b/src/config.rs
@@ -0,0 +1,66 @@
+use std::io::Read;
+use std::net::SocketAddr;
+use std::path::PathBuf;
+
+use serde::Deserialize;
+
+use crate::error::Error;
+
+#[derive(Deserialize, Debug, Clone)]
+pub struct Config {
+ pub metadata_dir: PathBuf,
+ pub data_dir: PathBuf,
+
+ pub api_bind_addr: SocketAddr,
+ pub rpc_bind_addr: SocketAddr,
+
+ pub bootstrap_peers: Vec<SocketAddr>,
+
+ #[serde(default = "default_max_concurrent_rpc_requests")]
+ pub max_concurrent_rpc_requests: usize,
+
+ #[serde(default = "default_block_size")]
+ pub block_size: usize,
+
+ #[serde(default = "default_replication_factor")]
+ pub meta_replication_factor: usize,
+
+ #[serde(default = "default_epidemic_factor")]
+ pub meta_epidemic_factor: usize,
+
+ #[serde(default = "default_replication_factor")]
+ pub data_replication_factor: usize,
+
+ pub rpc_tls: Option<TlsConfig>,
+}
+
+fn default_max_concurrent_rpc_requests() -> usize {
+ 12
+}
+fn default_block_size() -> usize {
+ 1048576
+}
+fn default_replication_factor() -> usize {
+ 3
+}
+fn default_epidemic_factor() -> usize {
+ 3
+}
+
+#[derive(Deserialize, Debug, Clone)]
+pub struct TlsConfig {
+ pub ca_cert: String,
+ pub node_cert: String,
+ pub node_key: String,
+}
+
+pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
+ let mut file = std::fs::OpenOptions::new()
+ .read(true)
+ .open(config_file.as_path())?;
+
+ let mut config = String::new();
+ file.read_to_string(&mut config)?;
+
+ Ok(toml::from_str(&config)?)
+}
diff --git a/src/error.rs b/src/error.rs
index e217f9ae..6290dc24 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -3,7 +3,7 @@ use hyper::StatusCode;
use std::io;
use crate::data::Hash;
-use crate::rpc_client::RPCError;
+use crate::rpc::rpc_client::RPCError;
#[derive(Debug, Error)]
pub enum Error {
diff --git a/src/main.rs b/src/main.rs
index 0b41805b..c693b12c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,29 +3,18 @@
#[macro_use]
extern crate log;
+mod background;
+mod config;
mod data;
mod error;
-mod background;
-mod membership;
+mod api;
+mod rpc;
+mod store;
mod table;
-mod table_fullcopy;
-mod table_sharded;
-mod table_sync;
-
-mod block;
-mod block_ref_table;
-mod bucket_table;
-mod object_table;
-mod version_table;
mod admin_rpc;
-mod api_server;
-mod http_util;
-mod rpc_client;
-mod rpc_server;
mod server;
-mod tls_util;
use std::collections::HashSet;
use std::net::SocketAddr;
@@ -36,11 +25,12 @@ use std::time::Duration;
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
+use config::TlsConfig;
use data::*;
use error::Error;
-use membership::*;
-use rpc_client::*;
-use server::TlsConfig;
+
+use rpc::membership::*;
+use rpc::rpc_client::*;
use admin_rpc::*;
diff --git a/src/membership.rs b/src/rpc/membership.rs
index 87b065a7..e0509536 100644
--- a/src/membership.rs
+++ b/src/rpc/membership.rs
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::hash::Hash as StdHash;
use std::hash::Hasher;
-use std::io::Read;
+use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
@@ -20,9 +20,9 @@ use tokio::sync::Mutex;
use crate::background::BackgroundRunner;
use crate::data::*;
use crate::error::Error;
-use crate::rpc_client::*;
-use crate::rpc_server::*;
-use crate::server::Config;
+
+use crate::rpc::rpc_client::*;
+use crate::rpc::rpc_server::*;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
@@ -78,8 +78,9 @@ pub struct NetworkConfigEntry {
}
pub struct System {
- pub config: Config,
pub id: UUID,
+ pub data_dir: PathBuf,
+ pub rpc_local_port: u16,
pub state_info: StateInfo,
@@ -251,6 +252,29 @@ impl Ring {
}
}
+fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
+ let mut id_file = metadata_dir.clone();
+ id_file.push("node_id");
+ if id_file.as_path().exists() {
+ let mut f = std::fs::File::open(id_file.as_path())?;
+ let mut d = vec![];
+ f.read_to_end(&mut d)?;
+ if d.len() != 32 {
+ return Err(Error::Message(format!("Corrupt node_id file")));
+ }
+
+ let mut id = [0u8; 32];
+ id.copy_from_slice(&d[..]);
+ Ok(id.into())
+ } else {
+ let id = gen_uuid();
+
+ let mut f = std::fs::File::create(id_file.as_path())?;
+ f.write_all(id.as_slice())?;
+ Ok(id)
+ }
+}
+
fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
let mut path = metadata_dir.clone();
path.push("network_config");
@@ -270,12 +294,15 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
impl System {
pub fn new(
- config: Config,
- id: UUID,
+ data_dir: PathBuf,
+ rpc_http_client: Arc<RpcHttpClient>,
background: Arc<BackgroundRunner>,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let net_config = match read_network_config(&config.metadata_dir) {
+ let id = gen_node_id(&data_dir).expect("Unable to read or generate node ID");
+ info!("Node ID: {}", hex::encode(&id));
+
+ let net_config = match read_network_config(&data_dir) {
Ok(x) => x,
Err(e) => {
info!(
@@ -309,11 +336,6 @@ impl System {
ring.rebuild_ring();
let (update_ring, ring) = watch::channel(Arc::new(ring));
- let rpc_http_client = Arc::new(
- RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls)
- .expect("Could not create RPC client"),
- );
-
let rpc_path = MEMBERSHIP_RPC_PATH.to_string();
let rpc_client = RpcClient::new(
RpcAddrClient::<Message>::new(rpc_http_client.clone(), rpc_path.clone()),
@@ -322,8 +344,9 @@ impl System {
);
let sys = Arc::new(System {
- config,
id,
+ data_dir,
+ rpc_local_port: rpc_server.bind_addr.port(),
state_info,
rpc_http_client,
rpc_client,
@@ -363,7 +386,7 @@ impl System {
}
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
- let mut path = self.config.metadata_dir.clone();
+ let mut path = self.data_dir.clone();
path.push("network_config");
let ring = self.ring.borrow().clone();
@@ -379,7 +402,7 @@ impl System {
let ring = self.ring.borrow().clone();
Message::Ping(PingMessage {
id: self.id,
- rpc_port: self.config.rpc_bind_addr.port(),
+ rpc_port: self.rpc_local_port,
status_hash: status.hash,
config_version: ring.config.version,
state_info: self.state_info.clone(),
@@ -397,13 +420,8 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await;
}
- pub async fn bootstrap(self: Arc<Self>) {
- let bootstrap_peers = self
- .config
- .bootstrap_peers
- .iter()
- .map(|ip| (*ip, None))
- .collect::<Vec<_>>();
+ pub async fn bootstrap(self: Arc<Self>, peers: &[SocketAddr]) {
+ let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
self.clone().ping_nodes(bootstrap_peers).await;
self.clone()
@@ -557,7 +575,7 @@ impl System {
for node in adv.iter() {
if node.id == self.id {
// learn our own ip address
- let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port());
+ let self_addr = SocketAddr::new(node.addr.ip(), self.rpc_local_port);
let old_self = status.nodes.insert(
node.id,
Arc::new(StatusEntry {
diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs
new file mode 100644
index 00000000..83fd0aac
--- /dev/null
+++ b/src/rpc/mod.rs
@@ -0,0 +1,4 @@
+pub mod membership;
+pub mod rpc_client;
+pub mod rpc_server;
+pub mod tls_util;
diff --git a/src/rpc_client.rs b/src/rpc/rpc_client.rs
index ba036c60..027a3cde 100644
--- a/src/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -20,10 +20,12 @@ use tokio::sync::{watch, Semaphore};
use crate::background::BackgroundRunner;
use crate::data::*;
use crate::error::Error;
-use crate::membership::Status;
-use crate::rpc_server::RpcMessage;
-use crate::server::TlsConfig;
-use crate::tls_util;
+
+use crate::rpc::membership::Status;
+use crate::rpc::rpc_server::RpcMessage;
+use crate::rpc::tls_util;
+
+use crate::config::TlsConfig;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
diff --git a/src/rpc_server.rs b/src/rpc/rpc_server.rs
index bcf7496f..4ee53909 100644
--- a/src/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -16,10 +16,11 @@ use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::server::TlsStream;
use tokio_rustls::TlsAcceptor;
+use crate::config::TlsConfig;
use crate::data::*;
use crate::error::Error;
-use crate::server::TlsConfig;
-use crate::tls_util;
+
+use crate::rpc::tls_util;
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
diff --git a/src/tls_util.rs b/src/rpc/tls_util.rs
index 52c52110..52c52110 100644
--- a/src/tls_util.rs
+++ b/src/rpc/tls_util.rs
diff --git a/src/server.rs b/src/server.rs
index 3ea29105..de04615f 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,79 +1,34 @@
-use std::io::{Read, Write};
-use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use futures_util::future::*;
-use serde::Deserialize;
use tokio::sync::watch;
use crate::background::*;
-use crate::data::*;
+use crate::config::*;
use crate::error::Error;
-use crate::membership::System;
-use crate::rpc_server::RpcServer;
-use crate::table::*;
-use crate::table_fullcopy::*;
-use crate::table_sharded::*;
-
-use crate::block::*;
-use crate::block_ref_table::*;
-use crate::bucket_table::*;
-use crate::object_table::*;
-use crate::version_table::*;
-
-use crate::admin_rpc::*;
-use crate::api_server;
-
-#[derive(Deserialize, Debug, Clone)]
-pub struct Config {
- pub metadata_dir: PathBuf,
- pub data_dir: PathBuf,
-
- pub api_bind_addr: SocketAddr,
- pub rpc_bind_addr: SocketAddr,
- pub bootstrap_peers: Vec<SocketAddr>,
+use crate::rpc::membership::System;
+use crate::rpc::rpc_client::RpcHttpClient;
+use crate::rpc::rpc_server::RpcServer;
- #[serde(default = "default_max_concurrent_rpc_requests")]
- pub max_concurrent_rpc_requests: usize,
-
- #[serde(default = "default_block_size")]
- pub block_size: usize,
-
- #[serde(default = "default_replication_factor")]
- pub meta_replication_factor: usize,
-
- #[serde(default = "default_epidemic_factor")]
- pub meta_epidemic_factor: usize,
+use crate::table::table_fullcopy::*;
+use crate::table::table_sharded::*;
+use crate::table::*;
- #[serde(default = "default_replication_factor")]
- pub data_replication_factor: usize,
+use crate::store::block::*;
+use crate::store::block_ref_table::*;
+use crate::store::bucket_table::*;
+use crate::store::object_table::*;
+use crate::store::version_table::*;
- pub rpc_tls: Option<TlsConfig>,
-}
+use crate::api::api_server;
-fn default_max_concurrent_rpc_requests() -> usize {
- 12
-}
-fn default_block_size() -> usize {
- 1048576
-}
-fn default_replication_factor() -> usize {
- 3
-}
-fn default_epidemic_factor() -> usize {
- 3
-}
-
-#[derive(Deserialize, Debug, Clone)]
-pub struct TlsConfig {
- pub ca_cert: String,
- pub node_cert: String,
- pub node_key: String,
-}
+use crate::admin_rpc::*;
pub struct Garage {
+ pub config: Config,
+
pub db: sled::Db,
pub background: Arc<BackgroundRunner>,
pub system: Arc<System>,
@@ -88,33 +43,46 @@ pub struct Garage {
impl Garage {
pub async fn new(
config: Config,
- id: UUID,
db: sled::Db,
background: Arc<BackgroundRunner>,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
info!("Initialize membership management system...");
- let system = System::new(config.clone(), id, background.clone(), rpc_server);
-
- info!("Initialize block manager...");
- let block_manager =
- BlockManager::new(&db, config.data_dir.clone(), system.clone(), rpc_server);
+ let rpc_http_client = Arc::new(
+ RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls)
+ .expect("Could not create RPC client"),
+ );
+ let system = System::new(
+ config.metadata_dir.clone(),
+ rpc_http_client,
+ background.clone(),
+ rpc_server,
+ );
let data_rep_param = TableShardedReplication {
- replication_factor: system.config.data_replication_factor,
- write_quorum: (system.config.data_replication_factor + 1) / 2,
+ replication_factor: config.data_replication_factor,
+ write_quorum: (config.data_replication_factor + 1) / 2,
read_quorum: 1,
};
let meta_rep_param = TableShardedReplication {
- replication_factor: system.config.meta_replication_factor,
- write_quorum: (system.config.meta_replication_factor + 1) / 2,
- read_quorum: (system.config.meta_replication_factor + 1) / 2,
+ replication_factor: config.meta_replication_factor,
+ write_quorum: (config.meta_replication_factor + 1) / 2,
+ read_quorum: (config.meta_replication_factor + 1) / 2,
};
let control_rep_param = TableFullReplication::new(
- system.config.meta_epidemic_factor,
- (system.config.meta_epidemic_factor + 1) / 2,
+ config.meta_epidemic_factor,
+ (config.meta_epidemic_factor + 1) / 2,
+ );
+
+ info!("Initialize block manager...");
+ let block_manager = BlockManager::new(
+ &db,
+ config.data_dir.clone(),
+ data_rep_param.clone(),
+ system.clone(),
+ rpc_server,
);
info!("Initialize block_ref_table...");
@@ -172,6 +140,7 @@ impl Garage {
info!("Initialize Garage...");
let garage = Arc::new(Self {
+ config,
db,
system: system.clone(),
block_manager,
@@ -193,40 +162,6 @@ impl Garage {
}
}
-fn read_config(config_file: PathBuf) -> Result<Config, Error> {
- let mut file = std::fs::OpenOptions::new()
- .read(true)
- .open(config_file.as_path())?;
-
- let mut config = String::new();
- file.read_to_string(&mut config)?;
-
- Ok(toml::from_str(&config)?)
-}
-
-fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
- let mut id_file = metadata_dir.clone();
- id_file.push("node_id");
- if id_file.as_path().exists() {
- let mut f = std::fs::File::open(id_file.as_path())?;
- let mut d = vec![];
- f.read_to_end(&mut d)?;
- if d.len() != 32 {
- return Err(Error::Message(format!("Corrupt node_id file")));
- }
-
- let mut id = [0u8; 32];
- id.copy_from_slice(&d[..]);
- Ok(id.into())
- } else {
- let id = gen_uuid();
-
- let mut f = std::fs::File::create(id_file.as_path())?;
- f.write_all(id.as_slice())?;
- Ok(id)
- }
-}
-
async fn shutdown_signal(send_cancel: watch::Sender<bool>) -> Result<(), Error> {
// Wait for the CTRL+C signal
tokio::signal::ctrl_c()
@@ -249,9 +184,6 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Loading configuration...");
let config = read_config(config_file).expect("Unable to read config file");
- let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID");
- info!("Node ID: {}", hex::encode(&id));
-
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
db_path.push("db");
@@ -264,17 +196,21 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
let (send_cancel, watch_cancel) = watch::channel(false);
let background = BackgroundRunner::new(16, watch_cancel.clone());
- let garage = Garage::new(config, id, db, background.clone(), &mut rpc_server).await;
+ let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await;
info!("Initializing RPC and API servers...");
let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone()));
let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone()));
futures::try_join!(
- garage.system.clone().bootstrap().map(|rv| {
- info!("Bootstrap done");
- Ok(rv)
- }),
+ garage
+ .system
+ .clone()
+ .bootstrap(&garage.config.bootstrap_peers[..])
+ .map(|rv| {
+ info!("Bootstrap done");
+ Ok(rv)
+ }),
run_rpc_server.map(|rv| {
info!("RPC server exited");
rv
diff --git a/src/block.rs b/src/store/block.rs
index 23222a7f..e2ef32e0 100644
--- a/src/block.rs
+++ b/src/store/block.rs
@@ -14,11 +14,16 @@ use tokio::sync::{watch, Mutex, Notify};
use crate::data;
use crate::data::*;
use crate::error::Error;
-use crate::membership::System;
-use crate::rpc_client::*;
-use crate::rpc_server::*;
-use crate::block_ref_table::*;
+use crate::rpc::membership::System;
+use crate::rpc::rpc_client::*;
+use crate::rpc::rpc_server::*;
+
+use crate::table::table_sharded::TableShardedReplication;
+use crate::table::TableReplication;
+
+use crate::store::block_ref_table::*;
+
use crate::server::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
@@ -47,6 +52,7 @@ pub struct PutBlockMessage {
impl RpcMessage for Message {}
pub struct BlockManager {
+ pub replication: TableShardedReplication,
pub data_dir: PathBuf,
pub data_dir_lock: Mutex<()>,
@@ -64,6 +70,7 @@ impl BlockManager {
pub fn new(
db: &sled::Db,
data_dir: PathBuf,
+ replication: TableShardedReplication,
system: Arc<System>,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
@@ -80,6 +87,7 @@ impl BlockManager {
let rpc_client = system.rpc_client::<Message>(rpc_path);
let block_manager = Arc::new(Self {
+ replication,
data_dir,
data_dir_lock: Mutex::new(()),
rc,
@@ -302,8 +310,8 @@ impl BlockManager {
.await?;
let needed_by_others = !active_refs.is_empty();
if needed_by_others {
- let ring = garage.system.ring.borrow().clone();
- let who = ring.walk_ring(&hash, garage.system.config.data_replication_factor);
+ let ring = self.system.ring.borrow().clone();
+ let who = self.replication.replication_nodes(&hash, &ring);
let msg = Arc::new(Message::NeedBlockQuery(*hash));
let who_needs_fut = who.iter().map(|to| {
self.rpc_client
@@ -361,8 +369,7 @@ impl BlockManager {
}
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
- let ring = self.system.ring.borrow().clone();
- let who = ring.walk_ring(&hash, self.system.config.data_replication_factor);
+ let who = self.replication.read_nodes(&hash, &self.system);
let resps = self
.rpc_client
.try_call_many(
@@ -386,13 +393,12 @@ impl BlockManager {
}
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
- let ring = self.system.ring.borrow().clone();
- let who = ring.walk_ring(&hash, self.system.config.data_replication_factor);
+ let who = self.replication.write_nodes(&hash, &self.system);
self.rpc_client
.try_call_many(
&who[..],
Message::PutBlock(PutBlockMessage { hash, data }),
- RequestStrategy::with_quorum((self.system.config.data_replication_factor + 1) / 2)
+ RequestStrategy::with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
diff --git a/src/block_ref_table.rs b/src/store/block_ref_table.rs
index 6a256aa3..c8a2a2a1 100644
--- a/src/block_ref_table.rs
+++ b/src/store/block_ref_table.rs
@@ -5,9 +5,10 @@ use std::sync::Arc;
use crate::background::*;
use crate::data::*;
use crate::error::Error;
+
use crate::table::*;
-use crate::block::*;
+use crate::store::block::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {
diff --git a/src/bucket_table.rs b/src/store/bucket_table.rs
index 5604049c..5604049c 100644
--- a/src/bucket_table.rs
+++ b/src/store/bucket_table.rs
diff --git a/src/store/mod.rs b/src/store/mod.rs
new file mode 100644
index 00000000..afadc9bb
--- /dev/null
+++ b/src/store/mod.rs
@@ -0,0 +1,5 @@
+pub mod block;
+pub mod block_ref_table;
+pub mod bucket_table;
+pub mod object_table;
+pub mod version_table;
diff --git a/src/object_table.rs b/src/store/object_table.rs
index edad4925..97de0cdb 100644
--- a/src/object_table.rs
+++ b/src/store/object_table.rs
@@ -5,10 +5,11 @@ use std::sync::Arc;
use crate::background::BackgroundRunner;
use crate::data::*;
use crate::error::Error;
+
+use crate::table::table_sharded::*;
use crate::table::*;
-use crate::table_sharded::*;
-use crate::version_table::*;
+use crate::store::version_table::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
diff --git a/src/version_table.rs b/src/store/version_table.rs
index 74174dce..d25a56ca 100644
--- a/src/version_table.rs
+++ b/src/store/version_table.rs
@@ -5,10 +5,11 @@ use std::sync::Arc;
use crate::background::BackgroundRunner;
use crate::data::*;
use crate::error::Error;
+
+use crate::table::table_sharded::*;
use crate::table::*;
-use crate::table_sharded::*;
-use crate::block_ref_table::*;
+use crate::store::block_ref_table::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
diff --git a/src/table/mod.rs b/src/table/mod.rs
new file mode 100644
index 00000000..e03b8d0b
--- /dev/null
+++ b/src/table/mod.rs
@@ -0,0 +1,6 @@
+pub mod table;
+pub mod table_fullcopy;
+pub mod table_sharded;
+pub mod table_sync;
+
+pub use table::*;
diff --git a/src/table.rs b/src/table/table.rs
index a3d02d0c..50e8739a 100644
--- a/src/table.rs
+++ b/src/table/table.rs
@@ -10,10 +10,12 @@ use serde_bytes::ByteBuf;
use crate::data::*;
use crate::error::Error;
-use crate::membership::{Ring, System};
-use crate::rpc_client::*;
-use crate::rpc_server::*;
-use crate::table_sync::*;
+
+use crate::rpc::membership::{Ring, System};
+use crate::rpc::rpc_client::*;
+use crate::rpc::rpc_server::*;
+
+use crate::table::table_sync::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
diff --git a/src/table_fullcopy.rs b/src/table/table_fullcopy.rs
index 2fcf56db..2cd2e464 100644
--- a/src/table_fullcopy.rs
+++ b/src/table/table_fullcopy.rs
@@ -2,7 +2,7 @@ use arc_swap::ArcSwapOption;
use std::sync::Arc;
use crate::data::*;
-use crate::membership::{Ring, System};
+use crate::rpc::membership::{Ring, System};
use crate::table::*;
#[derive(Clone)]
diff --git a/src/table_sharded.rs b/src/table/table_sharded.rs
index c17ea0d4..5190f5d4 100644
--- a/src/table_sharded.rs
+++ b/src/table/table_sharded.rs
@@ -1,5 +1,5 @@
use crate::data::*;
-use crate::membership::{Ring, System};
+use crate::rpc::membership::{Ring, System};
use crate::table::*;
#[derive(Clone)]
diff --git a/src/table_sync.rs b/src/table/table_sync.rs
index 60d5c4df..8f6582a7 100644
--- a/src/table_sync.rs
+++ b/src/table/table_sync.rs
@@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch};
use crate::data::*;
use crate::error::Error;
-use crate::membership::Ring;
+use crate::rpc::membership::Ring;
use crate::table::*;
const MAX_DEPTH: usize = 16;