aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/admin/cluster.rs14
-rw-r--r--src/api/router_macros.rs1
-rw-r--r--src/block/Cargo.toml1
-rw-r--r--src/block/manager.rs30
-rw-r--r--src/block/repair.rs5
-rw-r--r--src/block/resync.rs3
-rw-r--r--src/db/lib.rs3
-rw-r--r--src/garage/Cargo.toml3
-rw-r--r--src/garage/admin.rs17
-rw-r--r--src/garage/cli/cmd.rs2
-rw-r--r--src/garage/cli/layout.rs176
-rw-r--r--src/garage/cli/structs.rs15
-rw-r--r--src/garage/main.rs3
-rw-r--r--src/garage/repair/offline.rs17
-rw-r--r--src/garage/repair/online.rs79
-rw-r--r--src/garage/server.rs11
-rw-r--r--src/garage/tests/common/garage.rs2
-rw-r--r--src/model/Cargo.toml1
-rw-r--r--src/model/bucket_alias_table.rs24
-rw-r--r--src/model/bucket_table.rs133
-rw-r--r--src/model/garage.rs30
-rw-r--r--src/model/index_counter.rs231
-rw-r--r--src/model/k2v/item_table.rs54
-rw-r--r--src/model/k2v/rpc.rs7
-rw-r--r--src/model/key_table.rs154
-rw-r--r--src/model/migrate.rs5
-rw-r--r--src/model/prev/v051/bucket_table.rs2
-rw-r--r--src/model/prev/v051/key_table.rs50
-rw-r--r--src/model/prev/v051/mod.rs3
-rw-r--r--src/model/prev/v051/object_table.rs149
-rw-r--r--src/model/prev/v051/version_table.rs79
-rw-r--r--src/model/s3/block_ref_table.rs29
-rw-r--r--src/model/s3/object_table.rs296
-rw-r--r--src/model/s3/version_table.rs219
-rw-r--r--src/rpc/Cargo.toml3
-rw-r--r--src/rpc/graph_algo.rs411
-rw-r--r--src/rpc/layout.rs1271
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/ring.rs1
-rw-r--r--src/rpc/rpc_helper.rs18
-rw-r--r--src/rpc/system.rs129
-rw-r--r--src/table/Cargo.toml2
-rw-r--r--src/table/data.rs110
-rw-r--r--src/table/gc.rs48
-rw-r--r--src/table/lib.rs8
-rw-r--r--src/table/merkle.rs45
-rw-r--r--src/table/queue.rs77
-rw-r--r--src/table/replication/parameters.rs2
-rw-r--r--src/table/schema.rs22
-rw-r--r--src/table/sync.rs77
-rw-r--r--src/table/table.rs58
-rw-r--r--src/util/Cargo.toml1
-rw-r--r--src/util/background/job_worker.rs48
-rw-r--r--src/util/background/mod.rs58
-rw-r--r--src/util/background/worker.rs73
-rw-r--r--src/util/data.rs31
-rw-r--r--src/util/encode.rs42
-rw-r--r--src/util/error.rs1
-rw-r--r--src/util/lib.rs2
-rw-r--r--src/util/migrate.rs159
-rw-r--r--src/util/persister.rs38
61 files changed, 2702 insertions, 1882 deletions
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index 182a4f6f..540c6009 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -91,7 +91,7 @@ fn get_cluster_layout(garage: &Arc<Garage>) -> GetClusterLayoutResponse {
.map(|(k, _, v)| (hex::encode(k), v.0.clone()))
.collect(),
staged_role_changes: layout
- .staging
+ .staging_roles
.items()
.iter()
.filter(|(k, _, v)| layout.roles.get(k) != Some(v))
@@ -142,14 +142,14 @@ pub async fn handle_update_cluster_layout(
let mut layout = garage.system.get_cluster_layout();
let mut roles = layout.roles.clone();
- roles.merge(&layout.staging);
+ roles.merge(&layout.staging_roles);
for (node, role) in updates {
let node = hex::decode(node).ok_or_bad_request("Invalid node identifier")?;
let node = Uuid::try_from(&node).ok_or_bad_request("Invalid node identifier")?;
layout
- .staging
+ .staging_roles
.merge(&roles.update_mutator(node, NodeRoleV(role)));
}
@@ -167,12 +167,14 @@ pub async fn handle_apply_cluster_layout(
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
let layout = garage.system.get_cluster_layout();
- let layout = layout.apply_staged_changes(Some(param.version))?;
+ let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
+
garage.system.update_cluster_layout(&layout).await?;
Ok(Response::builder()
- .status(StatusCode::NO_CONTENT)
- .body(Body::empty())?)
+ .status(StatusCode::OK)
+ .header(http::header::CONTENT_TYPE, "text/plain")
+ .body(Body::from(msg.join("\n")))?)
}
pub async fn handle_revert_cluster_layout(
diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs
index 959e69a3..07b5570c 100644
--- a/src/api/router_macros.rs
+++ b/src/api/router_macros.rs
@@ -145,6 +145,7 @@ macro_rules! generateQueryParameters {
) => {
#[derive(Debug)]
#[allow(non_camel_case_types)]
+ #[allow(clippy::upper_case_acronyms)]
enum Keyword {
EMPTY,
$( $kw_name, )*
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index cbd58d32..1e4eb64e 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -31,7 +31,6 @@ rand = "0.8"
async-compression = { version = "0.3", features = ["tokio", "zstd"] }
zstd = { version = "0.9", default-features = false }
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 28523a93..1b5a5df0 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -3,6 +3,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
@@ -22,6 +23,7 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
+use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
@@ -87,7 +89,7 @@ pub struct BlockManager {
pub(crate) metrics: BlockManagerMetrics,
- tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>,
+ tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@@ -126,8 +128,6 @@ impl BlockManager {
let metrics =
BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
- let (scrub_tx, scrub_rx) = mpsc::channel(1);
-
let block_manager = Arc::new(Self {
replication,
data_dir,
@@ -138,21 +138,24 @@ impl BlockManager {
system,
endpoint,
metrics,
- tx_scrub_command: scrub_tx,
+ tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
+ block_manager
+ }
+
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
// Spawn a bunch of resync workers
for index in 0..MAX_RESYNC_WORKERS {
- let worker = ResyncWorker::new(index, block_manager.clone());
- block_manager.system.background.spawn_worker(worker);
+ let worker = ResyncWorker::new(index, self.clone());
+ bg.spawn_worker(worker);
}
// Spawn scrub worker
- let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx);
- block_manager.system.background.spawn_worker(scrub_worker);
-
- block_manager
+ let (scrub_tx, scrub_rx) = mpsc::channel(1);
+ self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
+ bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx));
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -325,8 +328,11 @@ impl BlockManager {
}
/// Send command to start/stop/manager scrub worker
- pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
- let _ = self.tx_scrub_command.send(cmd).await;
+ pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) -> Result<(), Error> {
+ let tx = self.tx_scrub_command.load();
+ let tx = tx.as_ref().ok_or_message("scrub worker is not running")?;
+ tx.send(cmd).await.ok_or_message("send error")?;
+ Ok(())
}
/// Get the reference count of a block
diff --git a/src/block/repair.rs b/src/block/repair.rs
index 1878027e..a6ded65a 100644
--- a/src/block/repair.rs
+++ b/src/block/repair.rs
@@ -148,7 +148,7 @@ impl Worker for RepairWorker {
}
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
unreachable!()
}
}
@@ -178,6 +178,7 @@ struct ScrubWorkerPersisted {
time_last_complete_scrub: u64,
corruptions_detected: u64,
}
+impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
enum ScrubWorkerState {
Running(BlockStoreIterator),
@@ -341,7 +342,7 @@ impl Worker for ScrubWorker {
}
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
let (wait_until, command) = match &self.work {
ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 8231b55d..9c7b3b0e 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -63,6 +63,7 @@ struct ResyncPersistedConfig {
n_workers: usize,
tranquility: u32,
}
+impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {}
enum ResyncIterResult {
BusyDidSomething,
@@ -540,7 +541,7 @@ impl Worker for ResyncWorker {
}
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
while self.index >= self.manager.resync.persisted.load().n_workers {
self.manager.resync.notify.notified().await
}
diff --git a/src/db/lib.rs b/src/db/lib.rs
index 11cae4e3..22bd9364 100644
--- a/src/db/lib.rs
+++ b/src/db/lib.rs
@@ -2,9 +2,6 @@
#[cfg(feature = "sqlite")]
extern crate tracing;
-#[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))]
-compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite.");
-
#[cfg(feature = "lmdb")]
pub mod lmdb_adapter;
#[cfg(feature = "sled")]
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index cee7060e..b43b0242 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -42,7 +42,6 @@ rand = "0.8"
async-trait = "0.1.7"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
structopt = { version = "0.3", default-features = false }
@@ -74,7 +73,7 @@ base64 = "0.13"
[features]
-default = [ "bundled-libs", "metrics", "sled" ]
+default = [ "bundled-libs", "metrics", "sled", "k2v" ]
k2v = [ "garage_util/k2v", "garage_api/k2v" ]
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index af9e9ea9..58d645ac 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -5,6 +5,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use garage_util::background::BackgroundRunner;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
@@ -74,13 +75,18 @@ impl Rpc for AdminRpc {
pub struct AdminRpcHandler {
garage: Arc<Garage>,
+ background: Arc<BackgroundRunner>,
endpoint: Arc<Endpoint<AdminRpc, Self>>,
}
impl AdminRpcHandler {
- pub fn new(garage: Arc<Garage>) -> Arc<Self> {
+ pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
- let admin = Arc::new(Self { garage, endpoint });
+ let admin = Arc::new(Self {
+ garage,
+ background,
+ endpoint,
+ });
admin.endpoint.set_handler(admin.clone());
admin
}
@@ -759,7 +765,7 @@ impl AdminRpcHandler {
)))
}
} else {
- launch_online_repair(self.garage.clone(), opt).await;
+ launch_online_repair(&self.garage, &self.background, opt).await?;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
@@ -925,12 +931,11 @@ impl AdminRpcHandler {
async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
match cmd {
WorkerOperation::List { opt } => {
- let workers = self.garage.background.get_worker_info();
+ let workers = self.background.get_worker_info();
Ok(AdminRpc::WorkerList(workers, *opt))
}
WorkerOperation::Info { tid } => {
let info = self
- .garage
.background
.get_worker_info()
.get(tid)
@@ -944,7 +949,7 @@ impl AdminRpcHandler {
self.garage
.block_manager
.send_scrub_command(scrub_command)
- .await;
+ .await?;
Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
}
WorkerSetCmd::ResyncWorkerCount { worker_count } => {
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 6c5598b1..0d180ecd 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -74,7 +74,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
));
}
_ => {
- let new_role = match layout.staging.get(&adv.id) {
+ let new_role = match layout.staging_roles.get(&adv.id) {
Some(NodeRoleV(Some(_))) => "(pending)",
_ => "NO ROLE ASSIGNED",
};
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index 3884bb92..27bb7eb8 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,3 +1,5 @@
+use bytesize::ByteSize;
+
use garage_util::crdt::Crdt;
use garage_util::error::*;
use garage_util::formater::format_table;
@@ -14,8 +16,8 @@ pub async fn cli_layout_command_dispatch(
rpc_host: NodeID,
) -> Result<(), Error> {
match cmd {
- LayoutOperation::Assign(configure_opt) => {
- cmd_assign_role(system_rpc_endpoint, rpc_host, configure_opt).await
+ LayoutOperation::Assign(assign_opt) => {
+ cmd_assign_role(system_rpc_endpoint, rpc_host, assign_opt).await
}
LayoutOperation::Remove(remove_opt) => {
cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await
@@ -27,6 +29,9 @@ pub async fn cli_layout_command_dispatch(
LayoutOperation::Revert(revert_opt) => {
cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await
}
+ LayoutOperation::Config(config_opt) => {
+ cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await
+ }
}
}
@@ -60,14 +65,14 @@ pub async fn cmd_assign_role(
.collect::<Result<Vec<_>, _>>()?;
let mut roles = layout.roles.clone();
- roles.merge(&layout.staging);
+ roles.merge(&layout.staging_roles);
for replaced in args.replace.iter() {
let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?;
match roles.get(&replaced_node) {
Some(NodeRoleV(Some(_))) => {
layout
- .staging
+ .staging_roles
.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
}
_ => {
@@ -83,7 +88,7 @@ pub async fn cmd_assign_role(
return Err(Error::Message(
"-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
}
- if args.capacity == Some(0) {
+ if args.capacity == Some(ByteSize::b(0)) {
return Err(Error::Message("Invalid capacity value: 0".into()));
}
@@ -91,7 +96,7 @@ pub async fn cmd_assign_role(
let new_entry = match roles.get(&added_node) {
Some(NodeRoleV(Some(old))) => {
let capacity = match args.capacity {
- Some(c) => Some(c),
+ Some(c) => Some(c.as_u64()),
None if args.gateway => None,
None => old.capacity,
};
@@ -108,7 +113,7 @@ pub async fn cmd_assign_role(
}
_ => {
let capacity = match args.capacity {
- Some(c) => Some(c),
+ Some(c) => Some(c.as_u64()),
None if args.gateway => None,
None => return Err(Error::Message(
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
@@ -125,7 +130,7 @@ pub async fn cmd_assign_role(
};
layout
- .staging
+ .staging_roles
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
}
@@ -145,13 +150,13 @@ pub async fn cmd_remove_role(
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
let mut roles = layout.roles.clone();
- roles.merge(&layout.staging);
+ roles.merge(&layout.staging_roles);
let deleted_node =
find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
layout
- .staging
+ .staging_roles
.merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -166,7 +171,7 @@ pub async fn cmd_show_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<(), Error> {
- let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+ let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== CURRENT CLUSTER LAYOUT ====");
if !print_cluster_layout(&layout) {
@@ -176,30 +181,41 @@ pub async fn cmd_show_layout(
println!();
println!("Current cluster layout version: {}", layout.version);
- if print_staging_role_changes(&layout) {
- layout.roles.merge(&layout.staging);
-
- println!();
- println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
- if !print_cluster_layout(&layout) {
- println!("No nodes have a role in the new layout.");
- }
- println!();
+ let has_role_changes = print_staging_role_changes(&layout);
+ let has_param_changes = print_staging_parameters_changes(&layout);
+ if has_role_changes || has_param_changes {
+ let v = layout.version;
+ let res_apply = layout.apply_staged_changes(Some(v + 1));
// this will print the stats of what partitions
// will move around when we apply
- if layout.calculate_partition_assignation() {
- println!("To enact the staged role changes, type:");
- println!();
- println!(" garage layout apply --version {}", layout.version + 1);
- println!();
- println!(
- "You can also revert all proposed changes with: garage layout revert --version {}",
- layout.version + 1
- );
- } else {
- println!("Not enough nodes have an assigned role to maintain enough copies of data.");
- println!("This new layout cannot yet be applied.");
+ match res_apply {
+ Ok((layout, msg)) => {
+ println!();
+ println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ====");
+ if !print_cluster_layout(&layout) {
+ println!("No nodes have a role in the new layout.");
+ }
+ println!();
+
+ for line in msg.iter() {
+ println!("{}", line);
+ }
+ println!("To enact the staged role changes, type:");
+ println!();
+ println!(" garage layout apply --version {}", v + 1);
+ println!();
+ println!(
+ "You can also revert all proposed changes with: garage layout revert --version {}",
+ v + 1)
+ }
+ Err(e) => {
+ println!("Error while trying to compute the assignation: {}", e);
+ println!("This new layout cannot yet be applied.");
+ println!(
+ "You can also revert all proposed changes with: garage layout revert --version {}",
+ v + 1)
+ }
}
}
@@ -213,7 +229,10 @@ pub async fn cmd_apply_layout(
) -> Result<(), Error> {
let layout = fetch_layout(rpc_cli, rpc_host).await?;
- let layout = layout.apply_staged_changes(apply_opt.version)?;
+ let (layout, msg) = layout.apply_staged_changes(apply_opt.version)?;
+ for line in msg.iter() {
+ println!("{}", line);
+ }
send_layout(rpc_cli, rpc_host, layout).await?;
@@ -238,6 +257,45 @@ pub async fn cmd_revert_layout(
Ok(())
}
+pub async fn cmd_config_layout(
+ rpc_cli: &Endpoint<SystemRpc, ()>,
+ rpc_host: NodeID,
+ config_opt: ConfigLayoutOpt,
+) -> Result<(), Error> {
+ let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
+
+ let mut did_something = false;
+ match config_opt.redundancy {
+ None => (),
+ Some(r) => {
+ if r > layout.replication_factor {
+ println!(
+ "The zone redundancy must be smaller or equal to the \
+ replication factor ({}).",
+ layout.replication_factor
+ );
+ } else if r < 1 {
+ println!("The zone redundancy must be at least 1.");
+ } else {
+ layout
+ .staging_parameters
+ .update(LayoutParameters { zone_redundancy: r });
+ println!("The new zone redundancy has been saved ({}).", r);
+ }
+ did_something = true;
+ }
+ }
+
+ if !did_something {
+ return Err(Error::Message(
+ "Please specify an action for `garage layout config` to do".into(),
+ ));
+ }
+
+ send_layout(rpc_cli, rpc_host, layout).await?;
+ Ok(())
+}
+
// --- utility ---
pub async fn fetch_layout(
@@ -269,21 +327,39 @@ pub async fn send_layout(
}
pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
- let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
+ let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()];
for (id, _, role) in layout.roles.items().iter() {
let role = match &role.0 {
Some(r) => r,
_ => continue,
};
let tags = role.tags.join(",");
- table.push(format!(
- "{:?}\t{}\t{}\t{}",
- id,
- tags,
- role.zone,
- role.capacity_string()
- ));
+ let usage = layout.get_node_usage(id).unwrap_or(0);
+ let capacity = layout.get_node_capacity(id).unwrap_or(0);
+ if capacity > 0 {
+ table.push(format!(
+ "{:?}\t{}\t{}\t{}\t{} ({:.1}%)",
+ id,
+ tags,
+ role.zone,
+ role.capacity_string(),
+ ByteSize::b(usage as u64 * layout.partition_size).to_string_as(false),
+ (100.0 * usage as f32 * layout.partition_size as f32) / (capacity as f32)
+ ));
+ } else {
+ table.push(format!(
+ "{:?}\t{}\t{}\t{}",
+ id,
+ tags,
+ role.zone,
+ role.capacity_string()
+ ));
+ };
}
+ println!();
+ println!("Parameters of the layout computation:");
+ println!("Zone redundancy: {}", layout.parameters.zone_redundancy);
+ println!();
if table.len() == 1 {
false
} else {
@@ -292,9 +368,23 @@ pub fn print_cluster_layout(layout: &ClusterLayout) -> bool {
}
}
+pub fn print_staging_parameters_changes(layout: &ClusterLayout) -> bool {
+ let has_changes = *layout.staging_parameters.get() != layout.parameters;
+ if has_changes {
+ println!();
+ println!("==== NEW LAYOUT PARAMETERS ====");
+ println!(
+ "Zone redundancy: {}",
+ layout.staging_parameters.get().zone_redundancy
+ );
+ println!();
+ }
+ has_changes
+}
+
pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
let has_changes = layout
- .staging
+ .staging_roles
.items()
.iter()
.any(|(k, _, v)| layout.roles.get(k) != Some(v));
@@ -303,7 +393,7 @@ pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool {
println!();
println!("==== STAGED ROLE CHANGES ====");
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
- for (id, _, role) in layout.staging.items().iter() {
+ for (id, _, role) in layout.staging_roles.items().iter() {
if layout.roles.get(id) == Some(role) {
continue;
}
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index fe2c2a26..531501bf 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -91,6 +91,10 @@ pub enum LayoutOperation {
#[structopt(name = "remove", version = garage_version())]
Remove(RemoveRoleOpt),
+ /// Configure parameters value for the layout computation
+ #[structopt(name = "config", version = garage_version())]
+ Config(ConfigLayoutOpt),
+
/// Show roles currently assigned to nodes and changes staged for commit
#[structopt(name = "show", version = garage_version())]
Show,
@@ -114,9 +118,9 @@ pub struct AssignRoleOpt {
#[structopt(short = "z", long = "zone")]
pub(crate) zone: Option<String>,
- /// Capacity (in relative terms, use 1 to represent your smallest server)
+ /// Storage capacity, in bytes (supported suffixes: B, KB, MB, GB, TB, PB)
#[structopt(short = "c", long = "capacity")]
- pub(crate) capacity: Option<u32>,
+ pub(crate) capacity: Option<bytesize::ByteSize>,
/// Gateway-only node
#[structopt(short = "g", long = "gateway")]
@@ -138,6 +142,13 @@ pub struct RemoveRoleOpt {
}
#[derive(StructOpt, Debug)]
+pub struct ConfigLayoutOpt {
+ /// Zone redundancy parameter
+ #[structopt(short = "r", long = "redundancy")]
+ pub(crate) redundancy: Option<usize>,
+}
+
+#[derive(StructOpt, Debug)]
pub struct ApplyLayoutOpt {
/// Version number of new configuration: this command will fail if
/// it is not exactly 1 + the previous configuration's version
diff --git a/src/garage/main.rs b/src/garage/main.rs
index 107b1389..cd1d6228 100644
--- a/src/garage/main.rs
+++ b/src/garage/main.rs
@@ -17,6 +17,9 @@ compile_error!("Either bundled-libs or system-libs Cargo feature must be enabled
#[cfg(all(feature = "bundled-libs", feature = "system-libs"))]
compile_error!("Only one of bundled-libs and system-libs Cargo features must be enabled");
+#[cfg(not(any(feature = "lmdb", feature = "sled", feature = "sqlite")))]
+compile_error!("Must activate the Cargo feature for at least one DB engine: lmdb, sled or sqlite.");
+
use std::net::SocketAddr;
use std::path::PathBuf;
diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs
index 7760a8bd..25193e4a 100644
--- a/src/garage/repair/offline.rs
+++ b/src/garage/repair/offline.rs
@@ -1,8 +1,5 @@
use std::path::PathBuf;
-use tokio::sync::watch;
-
-use garage_util::background::*;
use garage_util::config::*;
use garage_util::error::*;
@@ -20,12 +17,8 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu
info!("Loading configuration...");
let config = read_config(config_file)?;
- info!("Initializing background runner...");
- let (done_tx, done_rx) = watch::channel(false);
- let (background, await_background_done) = BackgroundRunner::new(16, done_rx);
-
info!("Initializing Garage main data store...");
- let garage = Garage::new(config.clone(), background)?;
+ let garage = Garage::new(config)?;
info!("Launching repair operation...");
match opt.what {
@@ -43,13 +36,7 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu
}
}
- info!("Repair operation finished, shutting down Garage internals...");
- done_tx.send(true).unwrap();
- drop(garage);
-
- await_background_done.await?;
-
- info!("Cleaning up...");
+ info!("Repair operation finished, shutting down...");
Ok(())
}
diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs
index 42221c2a..627e3bf3 100644
--- a/src/garage/repair/online.rs
+++ b/src/garage/repair/online.rs
@@ -12,38 +12,37 @@ use garage_model::s3::version_table::*;
use garage_table::*;
use garage_util::background::*;
use garage_util::error::Error;
+use garage_util::migrate::Migrate;
use crate::*;
-pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
+pub async fn launch_online_repair(
+ garage: &Arc<Garage>,
+ bg: &BackgroundRunner,
+ opt: RepairOpt,
+) -> Result<(), Error> {
match opt.what {
RepairWhat::Tables => {
info!("Launching a full sync of tables");
- garage.bucket_table.syncer.add_full_sync();
- garage.object_table.syncer.add_full_sync();
- garage.version_table.syncer.add_full_sync();
- garage.block_ref_table.syncer.add_full_sync();
- garage.key_table.syncer.add_full_sync();
+ garage.bucket_table.syncer.add_full_sync()?;
+ garage.object_table.syncer.add_full_sync()?;
+ garage.version_table.syncer.add_full_sync()?;
+ garage.block_ref_table.syncer.add_full_sync()?;
+ garage.key_table.syncer.add_full_sync()?;
}
RepairWhat::Versions => {
info!("Repairing the versions table");
- garage
- .background
- .spawn_worker(RepairVersionsWorker::new(garage.clone()));
+ bg.spawn_worker(RepairVersionsWorker::new(garage.clone()));
}
RepairWhat::BlockRefs => {
info!("Repairing the block refs table");
- garage
- .background
- .spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
+ bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
}
RepairWhat::Blocks => {
info!("Repairing the stored blocks");
- garage
- .background
- .spawn_worker(garage_block::repair::RepairWorker::new(
- garage.block_manager.clone(),
- ));
+ bg.spawn_worker(garage_block::repair::RepairWorker::new(
+ garage.block_manager.clone(),
+ ));
}
RepairWhat::Scrub { cmd } => {
let cmd = match cmd {
@@ -56,9 +55,10 @@ pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
}
};
info!("Sending command to scrub worker: {:?}", cmd);
- garage.block_manager.send_scrub_command(cmd).await;
+ garage.block_manager.send_scrub_command(cmd).await?;
}
}
+ Ok(())
}
// ----
@@ -93,20 +93,15 @@ impl Worker for RepairVersionsWorker {
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? {
- Some((k, v)) => {
- self.pos = k;
- v
- }
+ let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? {
+ Some((k, v)) => (v, k),
None => {
info!("repair_versions: finished, done {}", self.counter);
return Ok(WorkerState::Done);
}
};
- self.counter += 1;
-
- let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
+ let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?;
if !version.deleted.get() {
let object = self
.garage
@@ -134,10 +129,13 @@ impl Worker for RepairVersionsWorker {
}
}
+ self.counter += 1;
+ self.pos = next_pos;
+
Ok(WorkerState::Busy)
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
unreachable!()
}
}
@@ -174,20 +172,16 @@ impl Worker for RepairBlockrefsWorker {
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
- Some((k, v)) => {
- self.pos = k;
- v
- }
- None => {
- info!("repair_block_ref: finished, done {}", self.counter);
- return Ok(WorkerState::Done);
- }
- };
-
- self.counter += 1;
+ let (item_bytes, next_pos) =
+ match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
+ Some((k, v)) => (v, k),
+ None => {
+ info!("repair_block_ref: finished, done {}", self.counter);
+ return Ok(WorkerState::Done);
+ }
+ };
- let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
+ let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?;
if !block_ref.deleted.get() {
let version = self
.garage
@@ -212,10 +206,13 @@ impl Worker for RepairBlockrefsWorker {
}
}
+ self.counter += 1;
+ self.pos = next_pos;
+
Ok(WorkerState::Busy)
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
+ async fn wait_for_work(&mut self) -> WorkerState {
unreachable!()
}
}
diff --git a/src/garage/server.rs b/src/garage/server.rs
index d4099a97..16f1b625 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -35,12 +35,15 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
#[cfg(feature = "metrics")]
let metrics_exporter = opentelemetry_prometheus::exporter().init();
+ info!("Initializing Garage main data store...");
+ let garage = Garage::new(config.clone())?;
+
info!("Initializing background runner...");
let watch_cancel = watch_shutdown_signal();
- let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone());
+ let (background, await_background_done) = BackgroundRunner::new(watch_cancel.clone());
- info!("Initializing Garage main data store...");
- let garage = Garage::new(config.clone(), background)?;
+ info!("Spawning Garage workers...");
+ garage.spawn_workers(&background);
if config.admin.trace_sink.is_some() {
info!("Initialize tracing...");
@@ -63,7 +66,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");
- AdminRpcHandler::new(garage.clone());
+ AdminRpcHandler::new(garage.clone(), background.clone());
// ---- Launch public-facing API servers ----
diff --git a/src/garage/tests/common/garage.rs b/src/garage/tests/common/garage.rs
index 730d5889..dbebe5b1 100644
--- a/src/garage/tests/common/garage.rs
+++ b/src/garage/tests/common/garage.rs
@@ -126,7 +126,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
self.command()
.args(["layout", "assign"])
.arg(node_short_id)
- .args(["-c", "1", "-z", "unzonned"])
+ .args(["-c", "1G", "-z", "unzonned"])
.quiet()
.expect_success_status("Could not assign garage node layout");
self.command()
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 3d3fb693..323c2d64 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -30,7 +30,6 @@ tracing = "0.1.30"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/model/bucket_alias_table.rs b/src/model/bucket_alias_table.rs
index fcd1536e..54d7fbad 100644
--- a/src/model/bucket_alias_table.rs
+++ b/src/model/bucket_alias_table.rs
@@ -1,18 +1,26 @@
-use serde::{Deserialize, Serialize};
-
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::*;
-/// The bucket alias table holds the names given to buckets
-/// in the global namespace.
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct BucketAlias {
- name: String,
- pub state: crdt::Lww<Option<Uuid>>,
+mod v08 {
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ /// The bucket alias table holds the names given to buckets
+ /// in the global namespace.
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct BucketAlias {
+ pub(super) name: String,
+ pub state: crdt::Lww<Option<Uuid>>,
+ }
+
+ impl garage_util::migrate::InitialFormat for BucketAlias {}
}
+pub use v08::*;
+
impl BucketAlias {
pub fn new(name: String, ts: u64, bucket_id: Option<Uuid>) -> Option<Self> {
if !is_valid_bucket_name(&name) {
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 7be42702..ac163736 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -1,5 +1,3 @@
-use serde::{Deserialize, Serialize};
-
use garage_table::crdt::*;
use garage_table::*;
use garage_util::data::*;
@@ -7,72 +5,83 @@ use garage_util::time::*;
use crate::permission::BucketKeyPerm;
-/// A bucket is a collection of objects
-///
-/// Its parameters are not directly accessible as:
-/// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
-/// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Bucket {
- /// ID of the bucket
- pub id: Uuid,
- /// State, and configuration if not deleted, of the bucket
- pub state: crdt::Deletable<BucketParams>,
-}
+mod v08 {
+ use crate::permission::BucketKeyPerm;
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ /// A bucket is a collection of objects
+ ///
+ /// Its parameters are not directly accessible as:
+ /// - It must be possible to merge paramaters, hence the use of a LWW CRDT.
+ /// - A bucket has 2 states, Present or Deleted and parameters make sense only if present.
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Bucket {
+ /// ID of the bucket
+ pub id: Uuid,
+ /// State, and configuration if not deleted, of the bucket
+ pub state: crdt::Deletable<BucketParams>,
+ }
-/// Configuration for a bucket
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct BucketParams {
- /// Bucket's creation date
- pub creation_date: u64,
- /// Map of key with access to the bucket, and what kind of access they give
- pub authorized_keys: crdt::Map<String, BucketKeyPerm>,
-
- /// Map of aliases that are or have been given to this bucket
- /// in the global namespace
- /// (not authoritative: this is just used as an indication to
- /// map back to aliases when doing ListBuckets)
- pub aliases: crdt::LwwMap<String, bool>,
- /// Map of aliases that are or have been given to this bucket
- /// in namespaces local to keys
- /// key = (access key id, alias name)
- pub local_aliases: crdt::LwwMap<(String, String), bool>,
-
- /// Whether this bucket is allowed for website access
- /// (under all of its global alias names),
- /// and if so, the website configuration XML document
- pub website_config: crdt::Lww<Option<WebsiteConfig>>,
- /// CORS rules
- pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
- /// Bucket quotas
- #[serde(default)]
- pub quotas: crdt::Lww<BucketQuotas>,
-}
+ /// Configuration for a bucket
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct BucketParams {
+ /// Bucket's creation date
+ pub creation_date: u64,
+ /// Map of key with access to the bucket, and what kind of access they give
+ pub authorized_keys: crdt::Map<String, BucketKeyPerm>,
+
+ /// Map of aliases that are or have been given to this bucket
+ /// in the global namespace
+ /// (not authoritative: this is just used as an indication to
+ /// map back to aliases when doing ListBuckets)
+ pub aliases: crdt::LwwMap<String, bool>,
+ /// Map of aliases that are or have been given to this bucket
+ /// in namespaces local to keys
+ /// key = (access key id, alias name)
+ pub local_aliases: crdt::LwwMap<(String, String), bool>,
+
+ /// Whether this bucket is allowed for website access
+ /// (under all of its global alias names),
+ /// and if so, the website configuration XML document
+ pub website_config: crdt::Lww<Option<WebsiteConfig>>,
+ /// CORS rules
+ pub cors_config: crdt::Lww<Option<Vec<CorsRule>>>,
+ /// Bucket quotas
+ #[serde(default)]
+ pub quotas: crdt::Lww<BucketQuotas>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct WebsiteConfig {
- pub index_document: String,
- pub error_document: Option<String>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct WebsiteConfig {
+ pub index_document: String,
+ pub error_document: Option<String>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct CorsRule {
- pub id: Option<String>,
- pub max_age_seconds: Option<u64>,
- pub allow_origins: Vec<String>,
- pub allow_methods: Vec<String>,
- pub allow_headers: Vec<String>,
- pub expose_headers: Vec<String>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct CorsRule {
+ pub id: Option<String>,
+ pub max_age_seconds: Option<u64>,
+ pub allow_origins: Vec<String>,
+ pub allow_methods: Vec<String>,
+ pub allow_headers: Vec<String>,
+ pub expose_headers: Vec<String>,
+ }
-#[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct BucketQuotas {
- /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
- pub max_size: Option<u64>,
- /// Maximum number of non-deleted objects in the bucket
- pub max_objects: Option<u64>,
+ #[derive(Default, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct BucketQuotas {
+ /// Maximum size in bytes (bucket size = sum of sizes of objects in the bucket)
+ pub max_size: Option<u64>,
+ /// Maximum number of non-deleted objects in the bucket
+ pub max_objects: Option<u64>,
+ }
+
+ impl garage_util::migrate::InitialFormat for Bucket {}
}
+pub use v08::*;
+
impl AutoCrdt for BucketQuotas {
const WARN_IF_DIFFERENT: bool = true;
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index e34d034f..5bea6b4f 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -39,8 +39,6 @@ pub struct Garage {
/// The local database
pub db: db::Db,
- /// A background job runner
- pub background: Arc<BackgroundRunner>,
/// The membership manager
pub system: Arc<System>,
/// The block manager
@@ -78,7 +76,7 @@ pub struct GarageK2V {
impl Garage {
/// Create and run garage
- pub fn new(config: Config, background: Arc<BackgroundRunner>) -> Result<Arc<Self>, Error> {
+ pub fn new(config: Config) -> Result<Arc<Self>, Error> {
// Create meta dir and data dir if they don't exist already
std::fs::create_dir_all(&config.metadata_dir)
.ok_or_message("Unable to create Garage metadata directory")?;
@@ -167,7 +165,7 @@ impl Garage {
.expect("Invalid replication_mode in config file.");
info!("Initialize membership management system...");
- let system = System::new(network_key, background.clone(), replication_mode, &config)?;
+ let system = System::new(network_key, replication_mode, &config)?;
let data_rep_param = TableShardedReplication {
system: system.clone(),
@@ -225,7 +223,6 @@ impl Garage {
info!("Initialize version_table...");
let version_table = Table::new(
VersionTable {
- background: background.clone(),
block_ref_table: block_ref_table.clone(),
},
meta_rep_param.clone(),
@@ -240,7 +237,6 @@ impl Garage {
#[allow(clippy::redundant_clone)]
let object_table = Table::new(
ObjectTable {
- background: background.clone(),
version_table: version_table.clone(),
object_counter_table: object_counter_table.clone(),
},
@@ -258,7 +254,6 @@ impl Garage {
config,
replication_mode,
db,
- background,
system,
block_manager,
bucket_table,
@@ -273,6 +268,22 @@ impl Garage {
}))
}
+ pub fn spawn_workers(&self, bg: &BackgroundRunner) {
+ self.block_manager.spawn_workers(bg);
+
+ self.bucket_table.spawn_workers(bg);
+ self.bucket_alias_table.spawn_workers(bg);
+ self.key_table.spawn_workers(bg);
+
+ self.object_table.spawn_workers(bg);
+ self.object_counter_table.spawn_workers(bg);
+ self.version_table.spawn_workers(bg);
+ self.block_ref_table.spawn_workers(bg);
+
+ #[cfg(feature = "k2v")]
+ self.k2v.spawn_workers(bg);
+ }
+
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
helper::bucket::BucketHelper(self)
}
@@ -307,4 +318,9 @@ impl GarageK2V {
rpc,
}
}
+
+ pub fn spawn_workers(&self, bg: &BackgroundRunner) {
+ self.item_table.spawn_workers(bg);
+ self.counter_table.spawn_workers(bg);
+ }
}
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index b9594406..35d6596d 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -1,19 +1,18 @@
use core::ops::Bound;
-use std::collections::{hash_map, BTreeMap, HashMap};
+use std::collections::{BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
-use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use tokio::sync::{mpsc, watch};
use garage_db as db;
use garage_rpc::ring::Ring;
use garage_rpc::system::System;
-use garage_util::background::*;
+use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::migrate::Migrate;
use garage_util::time::*;
use garage_table::crdt::*;
@@ -31,14 +30,44 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
fn counts(&self) -> Vec<(&'static str, i64)>;
}
-/// A counter entry in the global table
-#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
-pub struct CounterEntry<T: CountedItem> {
- pub pk: T::CP,
- pub sk: T::CS,
- pub values: BTreeMap<String, CounterValue>,
+mod v08 {
+ use super::CountedItem;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
+
+ // ---- Global part (the table everyone queries) ----
+
+ /// A counter entry in the global table
+ #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
+ pub struct CounterEntry<T: CountedItem> {
+ pub pk: T::CP,
+ pub sk: T::CS,
+ pub values: BTreeMap<String, CounterValue>,
+ }
+
+ /// A counter entry in the global table
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct CounterValue {
+ pub node_values: BTreeMap<Uuid, (u64, i64)>,
+ }
+
+ impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {}
+
+ // ---- Local part (the counter we maintain transactionnaly on each node) ----
+
+ #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+ pub(super) struct LocalCounterEntry<T: CountedItem> {
+ pub(super) pk: T::CP,
+ pub(super) sk: T::CS,
+ pub(super) values: BTreeMap<String, (u64, i64)>,
+ }
+
+ impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {}
}
+pub use v08::*;
+
impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
fn partition_key(&self) -> &T::CP {
&self.pk
@@ -80,12 +109,6 @@ impl<T: CountedItem> CounterEntry<T> {
}
}
-/// A counter entry in the global table
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct CounterValue {
- pub node_values: BTreeMap<Uuid, (u64, i64)>,
-}
-
impl<T: CountedItem> Crdt for CounterEntry<T> {
fn merge(&mut self, other: &Self) {
for (name, e2) in other.values.iter() {
@@ -142,7 +165,6 @@ impl<T: CountedItem> TableSchema for CounterTable<T> {
pub struct IndexCounter<T: CountedItem> {
this_node: Uuid,
local_counter: db::Tree,
- propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry<T>)>,
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
}
@@ -152,16 +174,11 @@ impl<T: CountedItem> IndexCounter<T> {
replication: TableShardedReplication,
db: &db::Db,
) -> Arc<Self> {
- let background = system.background.clone();
-
- let (propagate_tx, propagate_rx) = mpsc::unbounded_channel();
-
- let this = Arc::new(Self {
+ Arc::new(Self {
this_node: system.id,
local_counter: db
.open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME))
.expect("Unable to open local counter tree"),
- propagate_tx,
table: Table::new(
CounterTable {
_phantom_t: Default::default(),
@@ -170,16 +187,11 @@ impl<T: CountedItem> IndexCounter<T> {
system,
db,
),
- });
-
- background.spawn_worker(IndexPropagatorWorker {
- index_counter: this.clone(),
- propagate_rx,
- buf: HashMap::new(),
- errors: 0,
- });
+ })
+ }
- this
+ pub fn spawn_workers(&self, bg: &BackgroundRunner) {
+ self.table.spawn_workers(bg);
}
pub fn count(
@@ -208,11 +220,9 @@ impl<T: CountedItem> IndexCounter<T> {
let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
- Some(old_bytes) => {
- rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes)
- .map_err(Error::RmpDecode)
- .map_err(db::TxError::Abort)?
- }
+ Some(old_bytes) => LocalCounterEntry::<T>::decode(&old_bytes)
+ .ok_or_message("Cannot decode local counter entry")
+ .map_err(db::TxError::Abort)?,
None => LocalCounterEntry {
pk: pk.clone(),
sk: sk.clone(),
@@ -227,17 +237,14 @@ impl<T: CountedItem> IndexCounter<T> {
ent.1 += *inc;
}
- let new_entry_bytes = rmp_to_vec_all_named(&entry)
+ let new_entry_bytes = entry
+ .encode()
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
- if let Err(e) = self.propagate_tx.send((pk.clone(), sk.clone(), entry)) {
- error!(
- "Could not propagate updated counter values, failed to send to channel: {}",
- e
- );
- }
+ let dist_entry = entry.into_counter_entry(self.this_node);
+ self.table.queue_insert(tx, &dist_entry)?;
Ok(())
}
@@ -250,23 +257,6 @@ impl<T: CountedItem> IndexCounter<T> {
TS: TableSchema<E = T>,
TR: TableReplication,
{
- let save_counter_entry = |entry: CounterEntry<T>| -> Result<(), Error> {
- let entry_k = self
- .table
- .data
- .tree_key(entry.partition_key(), entry.sort_key());
- self.table
- .data
- .update_entry_with(&entry_k, |ent| match ent {
- Some(mut ent) => {
- ent.merge(&entry);
- ent
- }
- None => entry.clone(),
- })?;
- Ok(())
- };
-
// 1. Set all old local counters to zero
let now = now_msec();
let mut next_start: Option<Vec<u8>> = None;
@@ -289,20 +279,22 @@ impl<T: CountedItem> IndexCounter<T> {
info!("zeroing old counters... ({})", hex::encode(&batch[0].0));
for (local_counter_k, local_counter) in batch {
- let mut local_counter =
- rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;
+ let mut local_counter = LocalCounterEntry::<T>::decode(&local_counter)
+ .ok_or_message("Cannot decode local counter entry")?;
for (_, tv) in local_counter.values.iter_mut() {
tv.0 = std::cmp::max(tv.0 + 1, now);
tv.1 = 0;
}
- let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ let local_counter_bytes = local_counter.encode()?;
self.local_counter
.insert(&local_counter_k, &local_counter_bytes)?;
let counter_entry = local_counter.into_counter_entry(self.this_node);
- save_counter_entry(counter_entry)?;
+ self.local_counter
+ .db()
+ .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
next_start = Some(local_counter_k);
}
@@ -343,9 +335,8 @@ impl<T: CountedItem> IndexCounter<T> {
let local_counter_key = self.table.data.tree_key(pk, sk);
let mut local_counter = match self.local_counter.get(&local_counter_key)? {
Some(old_bytes) => {
- let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
- &old_bytes,
- )?;
+ let ent = LocalCounterEntry::<T>::decode(&old_bytes)
+ .ok_or_message("Cannot decode local counter entry")?;
assert!(ent.pk == *pk);
assert!(ent.sk == *sk);
ent
@@ -362,12 +353,14 @@ impl<T: CountedItem> IndexCounter<T> {
tv.1 += v;
}
- let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?;
+ let local_counter_bytes = local_counter.encode()?;
self.local_counter
.insert(&local_counter_key, local_counter_bytes)?;
let counter_entry = local_counter.into_counter_entry(self.this_node);
- save_counter_entry(counter_entry)?;
+ self.local_counter
+ .db()
+ .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?;
next_start = Some(counted_entry_k);
}
@@ -378,103 +371,7 @@ impl<T: CountedItem> IndexCounter<T> {
}
}
-struct IndexPropagatorWorker<T: CountedItem> {
- index_counter: Arc<IndexCounter<T>>,
- propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
-
- buf: HashMap<Vec<u8>, CounterEntry<T>>,
- errors: usize,
-}
-
-impl<T: CountedItem> IndexPropagatorWorker<T> {
- fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) {
- let tree_key = self.index_counter.table.data.tree_key(&pk, &sk);
- let dist_entry = counters.into_counter_entry(self.index_counter.this_node);
- match self.buf.entry(tree_key) {
- hash_map::Entry::Vacant(e) => {
- e.insert(dist_entry);
- }
- hash_map::Entry::Occupied(mut e) => {
- e.get_mut().merge(&dist_entry);
- }
- }
- }
-}
-
-#[async_trait]
-impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
- fn name(&self) -> String {
- format!("{} counter", T::COUNTER_TABLE_NAME)
- }
-
- fn status(&self) -> WorkerStatus {
- WorkerStatus {
- queue_length: Some(self.buf.len() as u64),
- ..Default::default()
- }
- }
-
- async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- // This loop batches updates to counters to be sent all at once.
- // They are sent once the propagate_rx channel has been emptied (or is closed).
- let closed = loop {
- match self.propagate_rx.try_recv() {
- Ok((pk, sk, counters)) => {
- self.add_ent(pk, sk, counters);
- }
- Err(mpsc::error::TryRecvError::Empty) => break false,
- Err(mpsc::error::TryRecvError::Disconnected) => break true,
- }
- };
-
- if !self.buf.is_empty() {
- let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>();
- let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap());
- if let Err(e) = self.index_counter.table.insert_many(entries).await {
- self.errors += 1;
- if self.errors >= 2 && *must_exit.borrow() {
- error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
- return Ok(WorkerState::Done);
- }
- // Propagate error up to worker manager, it will log it, increment a counter,
- // and sleep for a certain delay (with exponential backoff), waiting for
- // things to go back to normal
- return Err(e);
- } else {
- for k in entries_k {
- self.buf.remove(&k);
- }
- self.errors = 0;
- }
-
- return Ok(WorkerState::Busy);
- } else if closed {
- return Ok(WorkerState::Done);
- } else {
- return Ok(WorkerState::Idle);
- }
- }
-
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
- match self.propagate_rx.recv().await {
- Some((pk, sk, counters)) => {
- self.add_ent(pk, sk, counters);
- WorkerState::Busy
- }
- None => match self.buf.is_empty() {
- false => WorkerState::Busy,
- true => WorkerState::Done,
- },
- }
- }
-}
-
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-struct LocalCounterEntry<T: CountedItem> {
- pk: T::CP,
- sk: T::CS,
- values: BTreeMap<String, (u64, i64)>,
-}
+// ----
impl<T: CountedItem> LocalCounterEntry<T> {
fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 7860cb17..ce3e4129 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -1,7 +1,8 @@
-use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
+use serde::{Deserialize, Serialize};
+
use garage_db as db;
use garage_util::data::*;
@@ -17,32 +18,43 @@ pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct K2VItem {
- pub partition: K2VItemPartition,
- pub sort_key: String,
+mod v08 {
+ use crate::k2v::causality::K2VNodeId;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
- items: BTreeMap<K2VNodeId, DvvsEntry>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct K2VItem {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
-pub struct K2VItemPartition {
- pub bucket_id: Uuid,
- pub partition_key: String,
-}
+ pub(super) items: BTreeMap<K2VNodeId, DvvsEntry>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-struct DvvsEntry {
- t_discard: u64,
- values: Vec<(u64, DvvsValue)>,
-}
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
+ pub struct K2VItemPartition {
+ pub bucket_id: Uuid,
+ pub partition_key: String,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct DvvsEntry {
+ pub(super) t_discard: u64,
+ pub(super) values: Vec<(u64, DvvsValue)>,
+ }
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub enum DvvsValue {
- Value(#[serde(with = "serde_bytes")] Vec<u8>),
- Deleted,
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum DvvsValue {
+ Value(#[serde(with = "serde_bytes")] Vec<u8>),
+ Deleted,
+ }
+
+ impl garage_util::migrate::InitialFormat for K2VItem {}
}
+pub use v08::*;
+
impl K2VItem {
/// Creates a new K2VItem when no previous entry existed in the db
pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self {
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index a74df277..f64a7984 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -273,14 +273,9 @@ impl K2VRpcHandler {
}
fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
- let tree_key = self
- .item_table
- .data
- .tree_key(&item.partition, &item.sort_key);
-
self.item_table
.data
- .update_entry_with(&tree_key[..], |ent| {
+ .update_entry_with(&item.partition, &item.sort_key, |ent| {
let mut ent = ent.unwrap_or_else(|| {
K2VItem::new(
item.partition.bucket_id,
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index 9d2fc783..bb5334a3 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -1,45 +1,121 @@
use serde::{Deserialize, Serialize};
-use garage_table::crdt::*;
-use garage_table::*;
+use garage_util::crdt::{self, Crdt};
use garage_util::data::*;
+use garage_table::{DeletedFilter, EmptyKey, Entry, TableSchema};
+
use crate::permission::BucketKeyPerm;
-use crate::prev::v051::key_table as old;
+pub(crate) mod v05 {
+ use garage_util::crdt;
+ use serde::{Deserialize, Serialize};
-/// An api key
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Key {
- /// The id of the key (immutable), used as partition key
- pub key_id: String,
+ /// An api key
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Key {
+ /// The id of the key (immutable), used as partition key
+ pub key_id: String,
- /// Internal state of the key
- pub state: crdt::Deletable<KeyParams>,
-}
+ /// The secret_key associated
+ pub secret_key: String,
-/// Configuration for a key
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct KeyParams {
- /// The secret_key associated (immutable)
- pub secret_key: String,
+ /// Name for the key
+ pub name: crdt::Lww<String>,
- /// Name for the key
- pub name: crdt::Lww<String>,
+ /// Is the key deleted
+ pub deleted: crdt::Bool,
+
+ /// Buckets in which the key is authorized. Empty if `Key` is deleted
+ // CRDT interaction: deleted implies authorized_buckets is empty
+ pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
+ }
- /// Flag to allow users having this key to create buckets
- pub allow_create_bucket: crdt::Lww<bool>,
+ /// Permission given to a key in a bucket
+ #[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct PermissionSet {
+ /// The key can be used to read the bucket
+ pub allow_read: bool,
+ /// The key can be used to write in the bucket
+ pub allow_write: bool,
+ }
+
+ impl crdt::AutoCrdt for PermissionSet {
+ const WARN_IF_DIFFERENT: bool = true;
+ }
- /// If the key is present: it gives some permissions,
- /// a map of bucket IDs (uuids) to permissions.
- /// Otherwise no permissions are granted to key
- pub authorized_buckets: crdt::Map<Uuid, BucketKeyPerm>,
+ impl garage_util::migrate::InitialFormat for Key {}
+}
- /// A key can have a local view of buckets names it is
- /// the only one to see, this is the namespace for these aliases
- pub local_aliases: crdt::LwwMap<String, Option<Uuid>>,
+mod v08 {
+ use super::v05;
+ use crate::permission::BucketKeyPerm;
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ /// An api key
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Key {
+ /// The id of the key (immutable), used as partition key
+ pub key_id: String,
+
+ /// Internal state of the key
+ pub state: crdt::Deletable<KeyParams>,
+ }
+
+ /// Configuration for a key
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct KeyParams {
+ /// The secret_key associated (immutable)
+ pub secret_key: String,
+
+ /// Name for the key
+ pub name: crdt::Lww<String>,
+
+ /// Flag to allow users having this key to create buckets
+ pub allow_create_bucket: crdt::Lww<bool>,
+
+ /// If the key is present: it gives some permissions,
+ /// a map of bucket IDs (uuids) to permissions.
+ /// Otherwise no permissions are granted to key
+ pub authorized_buckets: crdt::Map<Uuid, BucketKeyPerm>,
+
+ /// A key can have a local view of buckets names it is
+ /// the only one to see, this is the namespace for these aliases
+ pub local_aliases: crdt::LwwMap<String, Option<Uuid>>,
+ }
+
+ impl garage_util::migrate::Migrate for Key {
+ type Previous = v05::Key;
+
+ fn migrate(old_k: v05::Key) -> Key {
+ let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
+
+ let state = if old_k.deleted.get() {
+ crdt::Deletable::Deleted
+ } else {
+ // Authorized buckets is ignored here,
+ // migration is performed in specific migration code in
+ // garage/migrate.rs
+ crdt::Deletable::Present(KeyParams {
+ secret_key: old_k.secret_key,
+ name,
+ allow_create_bucket: crdt::Lww::new(false),
+ authorized_buckets: crdt::Map::new(),
+ local_aliases: crdt::LwwMap::new(),
+ })
+ };
+ Key {
+ key_id: old_k.key_id,
+ state,
+ }
+ }
+ }
}
+pub use v08::*;
+
impl KeyParams {
fn new(secret_key: &str, name: &str) -> Self {
KeyParams {
@@ -173,28 +249,4 @@ impl TableSchema for KeyTable {
}
}
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old_k = rmp_serde::decode::from_read_ref::<_, old::Key>(bytes).ok()?;
- let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
-
- let state = if old_k.deleted.get() {
- crdt::Deletable::Deleted
- } else {
- // Authorized buckets is ignored here,
- // migration is performed in specific migration code in
- // garage/migrate.rs
- crdt::Deletable::Present(KeyParams {
- secret_key: old_k.secret_key,
- name,
- allow_create_bucket: crdt::Lww::new(false),
- authorized_buckets: crdt::Map::new(),
- local_aliases: crdt::LwwMap::new(),
- })
- };
- Some(Key {
- key_id: old_k.key_id,
- state,
- })
- }
}
diff --git a/src/model/migrate.rs b/src/model/migrate.rs
index cd6ad26a..6b4c3eed 100644
--- a/src/model/migrate.rs
+++ b/src/model/migrate.rs
@@ -2,6 +2,7 @@ use std::sync::Arc;
use garage_util::crdt::*;
use garage_util::data::*;
+use garage_util::encode::nonversioned_decode;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
@@ -28,8 +29,8 @@ impl Migrate {
let mut old_buckets = vec![];
for res in tree.iter().map_err(GarageError::from)? {
let (_k, v) = res.map_err(GarageError::from)?;
- let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])
- .map_err(GarageError::from)?;
+ let bucket =
+ nonversioned_decode::<old_bucket::Bucket>(&v[..]).map_err(GarageError::from)?;
old_buckets.push(bucket);
}
diff --git a/src/model/prev/v051/bucket_table.rs b/src/model/prev/v051/bucket_table.rs
index 628a49dd..19893458 100644
--- a/src/model/prev/v051/bucket_table.rs
+++ b/src/model/prev/v051/bucket_table.rs
@@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::Crdt;
use garage_table::*;
-use super::key_table::PermissionSet;
+use crate::key_table::v05::PermissionSet;
/// A bucket is a collection of objects
///
diff --git a/src/model/prev/v051/key_table.rs b/src/model/prev/v051/key_table.rs
deleted file mode 100644
index 37516b1c..00000000
--- a/src/model/prev/v051/key_table.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-use serde::{Deserialize, Serialize};
-
-use garage_table::crdt::*;
-use garage_table::*;
-
-/// An api key
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Key {
- /// The id of the key (immutable), used as partition key
- pub key_id: String,
-
- /// The secret_key associated
- pub secret_key: String,
-
- /// Name for the key
- pub name: crdt::Lww<String>,
-
- /// Is the key deleted
- pub deleted: crdt::Bool,
-
- /// Buckets in which the key is authorized. Empty if `Key` is deleted
- // CRDT interaction: deleted implies authorized_buckets is empty
- pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
-}
-
-/// Permission given to a key in a bucket
-#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct PermissionSet {
- /// The key can be used to read the bucket
- pub allow_read: bool,
- /// The key can be used to write in the bucket
- pub allow_write: bool,
-}
-
-impl AutoCrdt for PermissionSet {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl Crdt for Key {
- fn merge(&mut self, other: &Self) {
- self.name.merge(&other.name);
- self.deleted.merge(&other.deleted);
-
- if self.deleted.get() {
- self.authorized_buckets.clear();
- } else {
- self.authorized_buckets.merge(&other.authorized_buckets);
- }
- }
-}
diff --git a/src/model/prev/v051/mod.rs b/src/model/prev/v051/mod.rs
index 7a954752..8c1335a5 100644
--- a/src/model/prev/v051/mod.rs
+++ b/src/model/prev/v051/mod.rs
@@ -1,4 +1 @@
pub(crate) mod bucket_table;
-pub(crate) mod key_table;
-pub(crate) mod object_table;
-pub(crate) mod version_table;
diff --git a/src/model/prev/v051/object_table.rs b/src/model/prev/v051/object_table.rs
deleted file mode 100644
index e79e5787..00000000
--- a/src/model/prev/v051/object_table.rs
+++ /dev/null
@@ -1,149 +0,0 @@
-use serde::{Deserialize, Serialize};
-use std::collections::BTreeMap;
-
-use garage_util::data::*;
-
-use garage_table::crdt::*;
-
-/// An object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Object {
- /// The bucket in which the object is stored, used as partition key
- pub bucket: String,
-
- /// The key at which the object is stored in its bucket, used as sorting key
- pub key: String,
-
- /// The list of currenty stored versions of the object
- versions: Vec<ObjectVersion>,
-}
-
-impl Object {
- /// Get a list of currently stored versions of `Object`
- pub fn versions(&self) -> &[ObjectVersion] {
- &self.versions[..]
- }
-}
-
-/// Informations about a version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersion {
- /// Id of the version
- pub uuid: Uuid,
- /// Timestamp of when the object was created
- pub timestamp: u64,
- /// State of the version
- pub state: ObjectVersionState,
-}
-
-/// State of an object version
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionState {
- /// The version is being received
- Uploading(ObjectVersionHeaders),
- /// The version is fully received
- Complete(ObjectVersionData),
- /// The version uploaded containded errors or the upload was explicitly aborted
- Aborted,
-}
-
-impl Crdt for ObjectVersionState {
- fn merge(&mut self, other: &Self) {
- use ObjectVersionState::*;
- match other {
- Aborted => {
- *self = Aborted;
- }
- Complete(b) => match self {
- Aborted => {}
- Complete(a) => {
- a.merge(b);
- }
- Uploading(_) => {
- *self = Complete(b.clone());
- }
- },
- Uploading(_) => {}
- }
- }
-}
-
-/// Data stored in object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionData {
- /// The object was deleted, this Version is a tombstone to mark it as such
- DeleteMarker,
- /// The object is short, it's stored inlined
- Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
- /// The object is not short, Hash of first block is stored here, next segments hashes are
- /// stored in the version table
- FirstBlock(ObjectVersionMeta, Hash),
-}
-
-impl AutoCrdt for ObjectVersionData {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-/// Metadata about the object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionMeta {
- /// Headers to send to the client
- pub headers: ObjectVersionHeaders,
- /// Size of the object
- pub size: u64,
- /// etag of the object
- pub etag: String,
-}
-
-/// Additional headers for an object
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionHeaders {
- /// Content type of the object
- pub content_type: String,
- /// Any other http headers to send
- pub other: BTreeMap<String, String>,
-}
-
-impl ObjectVersion {
- fn cmp_key(&self) -> (u64, Uuid) {
- (self.timestamp, self.uuid)
- }
-
- /// Is the object version completely received
- pub fn is_complete(&self) -> bool {
- matches!(self.state, ObjectVersionState::Complete(_))
- }
-}
-
-impl Crdt for Object {
- fn merge(&mut self, other: &Self) {
- // Merge versions from other into here
- for other_v in other.versions.iter() {
- match self
- .versions
- .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key()))
- {
- Ok(i) => {
- self.versions[i].state.merge(&other_v.state);
- }
- Err(i) => {
- self.versions.insert(i, other_v.clone());
- }
- }
- }
-
- // Remove versions which are obsolete, i.e. those that come
- // before the last version which .is_complete().
- let last_complete = self
- .versions
- .iter()
- .enumerate()
- .rev()
- .find(|(_, v)| v.is_complete())
- .map(|(vi, _)| vi);
-
- if let Some(last_vi) = last_complete {
- self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>();
- }
- }
-}
diff --git a/src/model/prev/v051/version_table.rs b/src/model/prev/v051/version_table.rs
deleted file mode 100644
index c11c62d5..00000000
--- a/src/model/prev/v051/version_table.rs
+++ /dev/null
@@ -1,79 +0,0 @@
-use serde::{Deserialize, Serialize};
-
-use garage_util::data::*;
-
-use garage_table::crdt::*;
-use garage_table::*;
-
-/// A version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Version {
- /// UUID of the version, used as partition key
- pub uuid: Uuid,
-
- // Actual data: the blocks for this version
- // In the case of a multipart upload, also store the etags
- // of individual parts and check them when doing CompleteMultipartUpload
- /// Is this version deleted
- pub deleted: crdt::Bool,
- /// list of blocks of data composing the version
- pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
- /// Etag of each part in case of a multipart upload, empty otherwise
- pub parts_etags: crdt::Map<u64, String>,
-
- // Back link to bucket+key so that we can figure if
- // this was deleted later on
- /// Bucket in which the related object is stored
- pub bucket: String,
- /// Key in which the related object is stored
- pub key: String,
-}
-
-#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlockKey {
- /// Number of the part
- pub part_number: u64,
- /// Offset of this sub-segment in its part
- pub offset: u64,
-}
-
-impl Ord for VersionBlockKey {
- fn cmp(&self, other: &Self) -> std::cmp::Ordering {
- self.part_number
- .cmp(&other.part_number)
- .then(self.offset.cmp(&other.offset))
- }
-}
-
-impl PartialOrd for VersionBlockKey {
- fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
- Some(self.cmp(other))
- }
-}
-
-/// Informations about a single block
-#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlock {
- /// Blake2 sum of the block
- pub hash: Hash,
- /// Size of the block
- pub size: u64,
-}
-
-impl AutoCrdt for VersionBlock {
- const WARN_IF_DIFFERENT: bool = true;
-}
-
-impl Crdt for Version {
- fn merge(&mut self, other: &Self) {
- self.deleted.merge(&other.deleted);
-
- if self.deleted.get() {
- self.blocks.clear();
- self.parts_etags.clear();
- } else {
- self.blocks.merge(&other.blocks);
- self.parts_etags.merge(&other.parts_etags);
- }
- }
-}
diff --git a/src/model/s3/block_ref_table.rs b/src/model/s3/block_ref_table.rs
index c7017409..7b023d87 100644
--- a/src/model/s3/block_ref_table.rs
+++ b/src/model/s3/block_ref_table.rs
@@ -1,4 +1,3 @@
-use serde::{Deserialize, Serialize};
use std::sync::Arc;
use garage_db as db;
@@ -10,19 +9,29 @@ use garage_table::*;
use garage_block::manager::*;
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct BlockRef {
- /// Hash (blake2 sum) of the block, used as partition key
- pub block: Hash,
+mod v08 {
+ use garage_util::crdt;
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
- /// Id of the Version for the object containing this block, used as sorting key
- pub version: Uuid,
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct BlockRef {
+ /// Hash (blake2 sum) of the block, used as partition key
+ pub block: Hash,
- // Keep track of deleted status
- /// Is the Version that contains this block deleted
- pub deleted: crdt::Bool,
+ /// Id of the Version for the object containing this block, used as sorting key
+ pub version: Uuid,
+
+ // Keep track of deleted status
+ /// Is the Version that contains this block deleted
+ pub deleted: crdt::Bool,
+ }
+
+ impl garage_util::migrate::InitialFormat for BlockRef {}
}
+pub use v08::*;
+
impl Entry<Hash, Uuid> for BlockRef {
fn partition_key(&self) -> &Hash {
&self.block
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 26ff57f6..518acc95 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -1,10 +1,8 @@
use serde::{Deserialize, Serialize};
-use std::collections::BTreeMap;
use std::sync::Arc;
use garage_db as db;
-use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
@@ -14,25 +12,126 @@ use garage_table::*;
use crate::index_counter::*;
use crate::s3::version_table::*;
-use crate::prev::v051::object_table as old;
-
pub const OBJECTS: &str = "objects";
pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
pub const BYTES: &str = "bytes";
-/// An object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Object {
- /// The bucket in which the object is stored, used as partition key
- pub bucket_id: Uuid,
+mod v05 {
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+ use std::collections::BTreeMap;
+
+ /// An object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Object {
+ /// The bucket in which the object is stored, used as partition key
+ pub bucket: String,
+
+ /// The key at which the object is stored in its bucket, used as sorting key
+ pub key: String,
+
+ /// The list of currenty stored versions of the object
+ pub(super) versions: Vec<ObjectVersion>,
+ }
+
+ /// Informations about a version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersion {
+ /// Id of the version
+ pub uuid: Uuid,
+ /// Timestamp of when the object was created
+ pub timestamp: u64,
+ /// State of the version
+ pub state: ObjectVersionState,
+ }
+
+ /// State of an object version
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionState {
+ /// The version is being received
+ Uploading(ObjectVersionHeaders),
+ /// The version is fully received
+ Complete(ObjectVersionData),
+ /// The version uploaded containded errors or the upload was explicitly aborted
+ Aborted,
+ }
+
+ /// Data stored in object version
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionData {
+ /// The object was deleted, this Version is a tombstone to mark it as such
+ DeleteMarker,
+ /// The object is short, it's stored inlined
+ Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
+ /// The object is not short, Hash of first block is stored here, next segments hashes are
+ /// stored in the version table
+ FirstBlock(ObjectVersionMeta, Hash),
+ }
+
+ /// Metadata about the object version
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersionMeta {
+ /// Headers to send to the client
+ pub headers: ObjectVersionHeaders,
+ /// Size of the object
+ pub size: u64,
+ /// etag of the object
+ pub etag: String,
+ }
- /// The key at which the object is stored in its bucket, used as sorting key
- pub key: String,
+ /// Additional headers for an object
+ #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersionHeaders {
+ /// Content type of the object
+ pub content_type: String,
+ /// Any other http headers to send
+ pub other: BTreeMap<String, String>,
+ }
- /// The list of currenty stored versions of the object
- versions: Vec<ObjectVersion>,
+ impl garage_util::migrate::InitialFormat for Object {}
}
+mod v08 {
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v05;
+
+ pub use v05::{
+ ObjectVersion, ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta,
+ ObjectVersionState,
+ };
+
+ /// An object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Object {
+ /// The bucket in which the object is stored, used as partition key
+ pub bucket_id: Uuid,
+
+ /// The key at which the object is stored in its bucket, used as sorting key
+ pub key: String,
+
+ /// The list of currenty stored versions of the object
+ pub(super) versions: Vec<ObjectVersion>,
+ }
+
+ impl garage_util::migrate::Migrate for Object {
+ type Previous = v05::Object;
+
+ fn migrate(old: v05::Object) -> Object {
+ use garage_util::data::blake2sum;
+
+ Object {
+ bucket_id: blake2sum(old.bucket.as_bytes()),
+ key: old.key,
+ versions: old.versions,
+ }
+ }
+ }
+}
+
+pub use v08::*;
+
impl Object {
/// Initialize an Object struct from parts
pub fn new(bucket_id: Uuid, key: String, versions: Vec<ObjectVersion>) -> Self {
@@ -69,28 +168,6 @@ impl Object {
}
}
-/// Informations about a version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersion {
- /// Id of the version
- pub uuid: Uuid,
- /// Timestamp of when the object was created
- pub timestamp: u64,
- /// State of the version
- pub state: ObjectVersionState,
-}
-
-/// State of an object version
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionState {
- /// The version is being received
- Uploading(ObjectVersionHeaders),
- /// The version is fully received
- Complete(ObjectVersionData),
- /// The version uploaded containded errors or the upload was explicitly aborted
- Aborted,
-}
-
impl Crdt for ObjectVersionState {
fn merge(&mut self, other: &Self) {
use ObjectVersionState::*;
@@ -112,42 +189,10 @@ impl Crdt for ObjectVersionState {
}
}
-/// Data stored in object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub enum ObjectVersionData {
- /// The object was deleted, this Version is a tombstone to mark it as such
- DeleteMarker,
- /// The object is short, it's stored inlined
- Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
- /// The object is not short, Hash of first block is stored here, next segments hashes are
- /// stored in the version table
- FirstBlock(ObjectVersionMeta, Hash),
-}
-
impl AutoCrdt for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true;
}
-/// Metadata about the object version
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionMeta {
- /// Headers to send to the client
- pub headers: ObjectVersionHeaders,
- /// Size of the object
- pub size: u64,
- /// etag of the object
- pub etag: String,
-}
-
-/// Additional headers for an object
-#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
-pub struct ObjectVersionHeaders {
- /// Content type of the object
- pub content_type: String,
- /// Any other http headers to send
- pub other: BTreeMap<String, String>,
-}
-
impl ObjectVersion {
fn cmp_key(&self) -> (u64, Uuid) {
(self.timestamp, self.uuid)
@@ -221,7 +266,6 @@ impl Crdt for Object {
}
pub struct ObjectTable {
- pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
pub object_counter_table: Arc<IndexCounter<Object>>,
}
@@ -255,34 +299,34 @@ impl TableSchema for ObjectTable {
);
}
- // 2. Spawn threads that propagates deletions to version table
- let version_table = self.version_table.clone();
- let old = old.cloned();
- let new = new.cloned();
-
- self.background.spawn(async move {
- if let (Some(old_v), Some(new_v)) = (old, new) {
- // Propagate deletion of old versions
- for v in old_v.versions.iter() {
- let newly_deleted = match new_v
- .versions
- .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
- {
- Err(_) => true,
- Ok(i) => {
- new_v.versions[i].state == ObjectVersionState::Aborted
- && v.state != ObjectVersionState::Aborted
- }
- };
- if newly_deleted {
- let deleted_version =
- Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
- version_table.insert(&deleted_version).await?;
+ // 2. Enqueue propagation deletions to version table
+ if let (Some(old_v), Some(new_v)) = (old, new) {
+ // Propagate deletion of old versions
+ for v in old_v.versions.iter() {
+ let newly_deleted = match new_v
+ .versions
+ .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key()))
+ {
+ Err(_) => true,
+ Ok(i) => {
+ new_v.versions[i].state == ObjectVersionState::Aborted
+ && v.state != ObjectVersionState::Aborted
+ }
+ };
+ if newly_deleted {
+ let deleted_version =
+ Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
+ let res = self.version_table.queue_insert(tx, &deleted_version);
+ if let Err(e) = db::unabort(res)? {
+ error!(
+ "Unable to enqueue version deletion propagation: {}. A repair will be needed.",
+ e
+ );
}
}
}
- Ok(())
- });
+ }
+
Ok(())
}
@@ -292,11 +336,6 @@ impl TableSchema for ObjectTable {
ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()),
}
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?;
- Some(migrate_object(old_obj))
- }
}
impl CountedItem for Object {
@@ -341,64 +380,3 @@ impl CountedItem for Object {
]
}
}
-
-// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
-// (we just want to change bucket into bucket_id by hashing it)
-
-fn migrate_object(o: old::Object) -> Object {
- let versions = o
- .versions()
- .iter()
- .cloned()
- .map(migrate_object_version)
- .collect();
- Object {
- bucket_id: blake2sum(o.bucket.as_bytes()),
- key: o.key,
- versions,
- }
-}
-
-fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion {
- ObjectVersion {
- uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(),
- timestamp: v.timestamp,
- state: match v.state {
- old::ObjectVersionState::Uploading(h) => {
- ObjectVersionState::Uploading(migrate_object_version_headers(h))
- }
- old::ObjectVersionState::Complete(d) => {
- ObjectVersionState::Complete(migrate_object_version_data(d))
- }
- old::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
- },
- }
-}
-
-fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders {
- ObjectVersionHeaders {
- content_type: h.content_type,
- other: h.other,
- }
-}
-
-fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData {
- match d {
- old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker,
- old::ObjectVersionData::Inline(m, b) => {
- ObjectVersionData::Inline(migrate_object_version_meta(m), b)
- }
- old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock(
- migrate_object_version_meta(m),
- Hash::try_from(h.as_slice()).unwrap(),
- ),
- }
-}
-
-fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta {
- ObjectVersionMeta {
- headers: migrate_object_version_headers(m.headers),
- size: m.size,
- etag: m.etag,
- }
-}
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index 6bc2ecd1..6edc83f4 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -1,9 +1,7 @@
-use serde::{Deserialize, Serialize};
use std::sync::Arc;
use garage_db as db;
-use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_table::crdt::*;
@@ -12,32 +10,108 @@ use garage_table::*;
use crate::s3::block_ref_table::*;
-use crate::prev::v051::version_table as old;
-
-/// A version of an object
-#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
-pub struct Version {
- /// UUID of the version, used as partition key
- pub uuid: Uuid,
-
- // Actual data: the blocks for this version
- // In the case of a multipart upload, also store the etags
- // of individual parts and check them when doing CompleteMultipartUpload
- /// Is this version deleted
- pub deleted: crdt::Bool,
- /// list of blocks of data composing the version
- pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
- /// Etag of each part in case of a multipart upload, empty otherwise
- pub parts_etags: crdt::Map<u64, String>,
-
- // Back link to bucket+key so that we can figure if
- // this was deleted later on
- /// Bucket in which the related object is stored
- pub bucket_id: Uuid,
- /// Key in which the related object is stored
- pub key: String,
+mod v05 {
+ use garage_util::crdt;
+ use garage_util::data::{Hash, Uuid};
+ use serde::{Deserialize, Serialize};
+
+ /// A version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
+ pub parts_etags: crdt::Map<u64, String>,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ /// Bucket in which the related object is stored
+ pub bucket: String,
+ /// Key in which the related object is stored
+ pub key: String,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct VersionBlockKey {
+ /// Number of the part
+ pub part_number: u64,
+ /// Offset of this sub-segment in its part
+ pub offset: u64,
+ }
+
+ /// Informations about a single block
+ #[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct VersionBlock {
+ /// Blake2 sum of the block
+ pub hash: Hash,
+ /// Size of the block
+ pub size: u64,
+ }
+
+ impl garage_util::migrate::InitialFormat for Version {}
}
+mod v08 {
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v05;
+
+ /// A version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
+ /// Etag of each part in case of a multipart upload, empty otherwise
+ pub parts_etags: crdt::Map<u64, String>,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ /// Bucket in which the related object is stored
+ pub bucket_id: Uuid,
+ /// Key in which the related object is stored
+ pub key: String,
+ }
+
+ pub use v05::{VersionBlock, VersionBlockKey};
+
+ impl garage_util::migrate::Migrate for Version {
+ type Previous = v05::Version;
+
+ fn migrate(old: v05::Version) -> Version {
+ use garage_util::data::blake2sum;
+
+ Version {
+ uuid: old.uuid,
+ deleted: old.deleted,
+ blocks: old.blocks,
+ parts_etags: old.parts_etags,
+ bucket_id: blake2sum(old.bucket.as_bytes()),
+ key: old.key,
+ }
+ }
+ }
+}
+
+pub use v08::*;
+
impl Version {
pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self {
Self {
@@ -65,14 +139,6 @@ impl Version {
}
}
-#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlockKey {
- /// Number of the part
- pub part_number: u64,
- /// Offset of this sub-segment in its part
- pub offset: u64,
-}
-
impl Ord for VersionBlockKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.part_number
@@ -87,15 +153,6 @@ impl PartialOrd for VersionBlockKey {
}
}
-/// Informations about a single block
-#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
-pub struct VersionBlock {
- /// Blake2 sum of the block
- pub hash: Hash,
- /// Size of the block
- pub size: u64,
-}
-
impl AutoCrdt for VersionBlock {
const WARN_IF_DIFFERENT: bool = true;
}
@@ -127,7 +184,6 @@ impl Crdt for Version {
}
pub struct VersionTable {
- pub background: Arc<BackgroundRunner>,
pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
}
@@ -141,33 +197,26 @@ impl TableSchema for VersionTable {
fn updated(
&self,
- _tx: &mut db::Transaction,
+ tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
- let block_ref_table = self.block_ref_table.clone();
- let old = old.cloned();
- let new = new.cloned();
-
- self.background.spawn(async move {
- if let (Some(old_v), Some(new_v)) = (old, new) {
- // Propagate deletion of version blocks
- if new_v.deleted.get() && !old_v.deleted.get() {
- let deleted_block_refs = old_v
- .blocks
- .items()
- .iter()
- .map(|(_k, vb)| BlockRef {
- block: vb.hash,
- version: old_v.uuid,
- deleted: true.into(),
- })
- .collect::<Vec<_>>();
- block_ref_table.insert_many(&deleted_block_refs[..]).await?;
+ if let (Some(old_v), Some(new_v)) = (old, new) {
+ // Propagate deletion of version blocks
+ if new_v.deleted.get() && !old_v.deleted.get() {
+ let deleted_block_refs = old_v.blocks.items().iter().map(|(_k, vb)| BlockRef {
+ block: vb.hash,
+ version: old_v.uuid,
+ deleted: true.into(),
+ });
+ for block_ref in deleted_block_refs {
+ let res = self.block_ref_table.queue_insert(tx, &block_ref);
+ if let Err(e) = db::unabort(res)? {
+ error!("Unable to enqueue block ref deletion propagation: {}. A repair will be needed.", e);
+ }
}
}
- Ok(())
- });
+ }
Ok(())
}
@@ -175,42 +224,4 @@ impl TableSchema for VersionTable {
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.deleted.get())
}
-
- fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
- let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?;
-
- let blocks = old
- .blocks
- .items()
- .iter()
- .map(|(k, v)| {
- (
- VersionBlockKey {
- part_number: k.part_number,
- offset: k.offset,
- },
- VersionBlock {
- hash: Hash::try_from(v.hash.as_slice()).unwrap(),
- size: v.size,
- },
- )
- })
- .collect::<crdt::Map<_, _>>();
-
- let parts_etags = old
- .parts_etags
- .items()
- .iter()
- .map(|(k, v)| (*k, v.clone()))
- .collect::<crdt::Map<_, _>>();
-
- Some(Version {
- uuid: Hash::try_from(old.uuid.as_slice()).unwrap(),
- deleted: crdt::Bool::new(old.deleted.get()),
- blocks,
- parts_etags,
- bucket_id: blake2sum(old.bucket.as_bytes()),
- key: old.key,
- })
- }
}
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index b87374ad..3d4d3ff5 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -18,14 +18,15 @@ garage_util = { version = "0.8.1", path = "../util" }
arc-swap = "1.0"
bytes = "1.0"
+bytesize = "1.1"
gethostname = "0.2"
hex = "0.4"
tracing = "0.1.30"
rand = "0.8"
+itertools="0.10"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
async-trait = "0.1.7"
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
serde_json = "1.0"
diff --git a/src/rpc/graph_algo.rs b/src/rpc/graph_algo.rs
new file mode 100644
index 00000000..f181e2ba
--- /dev/null
+++ b/src/rpc/graph_algo.rs
@@ -0,0 +1,411 @@
+//! This module deals with graph algorithms.
+//! It is used in layout.rs to build the partition to node assignation.
+
+use rand::prelude::SliceRandom;
+use std::cmp::{max, min};
+use std::collections::HashMap;
+use std::collections::VecDeque;
+
+/// Vertex data structures used in all the graphs used in layout.rs.
+/// usize parameters correspond to node/zone/partitions ids.
+/// To understand the vertex roles below, please refer to the formal description
+/// of the layout computation algorithm.
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum Vertex {
+ Source,
+ Pup(usize), // The vertex p+ of partition p
+ Pdown(usize), // The vertex p- of partition p
+ PZ(usize, usize), // The vertex corresponding to x_(partition p, zone z)
+ N(usize), // The vertex corresponding to node n
+ Sink,
+}
+
+/// Edge data structure for the flow algorithm.
+#[derive(Clone, Copy, Debug)]
+pub struct FlowEdge {
+ cap: u64, // flow maximal capacity of the edge
+ flow: i64, // flow value on the edge
+ dest: usize, // destination vertex id
+ rev: usize, // index of the reversed edge (v, self) in the edge list of vertex v
+}
+
+/// Edge data structure for the detection of negative cycles.
+#[derive(Clone, Copy, Debug)]
+pub struct WeightedEdge {
+ w: i64, // weight of the edge
+ dest: usize,
+}
+
+pub trait Edge: Clone + Copy {}
+impl Edge for FlowEdge {}
+impl Edge for WeightedEdge {}
+
+/// Struct for the graph structure. We do encapsulation here to be able to both
+/// provide user friendly Vertex enum to address vertices, and to use internally usize
+/// indices and Vec instead of HashMap in the graph algorithm to optimize execution speed.
+pub struct Graph<E: Edge> {
+ vertex_to_id: HashMap<Vertex, usize>,
+ id_to_vertex: Vec<Vertex>,
+
+ // The graph is stored as an adjacency list
+ graph: Vec<Vec<E>>,
+}
+
+pub type CostFunction = HashMap<(Vertex, Vertex), i64>;
+
+impl<E: Edge> Graph<E> {
+ pub fn new(vertices: &[Vertex]) -> Self {
+ let mut map = HashMap::<Vertex, usize>::new();
+ for (i, vert) in vertices.iter().enumerate() {
+ map.insert(*vert, i);
+ }
+ Graph::<E> {
+ vertex_to_id: map,
+ id_to_vertex: vertices.to_vec(),
+ graph: vec![Vec::<E>::new(); vertices.len()],
+ }
+ }
+
+ fn get_vertex_id(&self, v: &Vertex) -> Result<usize, String> {
+ self.vertex_to_id
+ .get(v)
+ .cloned()
+ .ok_or_else(|| format!("The graph does not contain vertex {:?}", v))
+ }
+}
+
+impl Graph<FlowEdge> {
+ /// This function adds a directed edge to the graph with capacity c, and the
+ /// corresponding reversed edge with capacity 0.
+ pub fn add_edge(&mut self, u: Vertex, v: Vertex, c: u64) -> Result<(), String> {
+ let idu = self.get_vertex_id(&u)?;
+ let idv = self.get_vertex_id(&v)?;
+ if idu == idv {
+ return Err("Cannot add edge from vertex to itself in flow graph".into());
+ }
+
+ let rev_u = self.graph[idu].len();
+ let rev_v = self.graph[idv].len();
+ self.graph[idu].push(FlowEdge {
+ cap: c,
+ dest: idv,
+ flow: 0,
+ rev: rev_v,
+ });
+ self.graph[idv].push(FlowEdge {
+ cap: 0,
+ dest: idu,
+ flow: 0,
+ rev: rev_u,
+ });
+ Ok(())
+ }
+
+ /// This function returns the list of vertices that receive a positive flow from
+ /// vertex v.
+ pub fn get_positive_flow_from(&self, v: Vertex) -> Result<Vec<Vertex>, String> {
+ let idv = self.get_vertex_id(&v)?;
+ let mut result = Vec::<Vertex>::new();
+ for edge in self.graph[idv].iter() {
+ if edge.flow > 0 {
+ result.push(self.id_to_vertex[edge.dest]);
+ }
+ }
+ Ok(result)
+ }
+
+ /// This function returns the value of the flow incoming to v.
+ pub fn get_inflow(&self, v: Vertex) -> Result<i64, String> {
+ let idv = self.get_vertex_id(&v)?;
+ let mut result = 0;
+ for edge in self.graph[idv].iter() {
+ result += max(0, self.graph[edge.dest][edge.rev].flow);
+ }
+ Ok(result)
+ }
+
+ /// This function returns the value of the flow outgoing from v.
+ pub fn get_outflow(&self, v: Vertex) -> Result<i64, String> {
+ let idv = self.get_vertex_id(&v)?;
+ let mut result = 0;
+ for edge in self.graph[idv].iter() {
+ result += max(0, edge.flow);
+ }
+ Ok(result)
+ }
+
+ /// This function computes the flow total value by computing the outgoing flow
+ /// from the source.
+ pub fn get_flow_value(&mut self) -> Result<i64, String> {
+ self.get_outflow(Vertex::Source)
+ }
+
+ /// This function shuffles the order of the edge lists. It keeps the ids of the
+ /// reversed edges consistent.
+ fn shuffle_edges(&mut self) {
+ let mut rng = rand::thread_rng();
+ for i in 0..self.graph.len() {
+ self.graph[i].shuffle(&mut rng);
+ // We need to update the ids of the reverse edges.
+ for j in 0..self.graph[i].len() {
+ let target_v = self.graph[i][j].dest;
+ let target_rev = self.graph[i][j].rev;
+ self.graph[target_v][target_rev].rev = j;
+ }
+ }
+ }
+
+ /// Computes an upper bound of the flow on the graph
+ pub fn flow_upper_bound(&self) -> Result<u64, String> {
+ let idsource = self.get_vertex_id(&Vertex::Source)?;
+ let mut flow_upper_bound = 0;
+ for edge in self.graph[idsource].iter() {
+ flow_upper_bound += edge.cap;
+ }
+ Ok(flow_upper_bound)
+ }
+
+ /// This function computes the maximal flow using Dinic's algorithm. It starts with
+ /// the flow values already present in the graph. So it is possible to add some edge to
+ /// the graph, compute a flow, add other edges, update the flow.
+ pub fn compute_maximal_flow(&mut self) -> Result<(), String> {
+ let idsource = self.get_vertex_id(&Vertex::Source)?;
+ let idsink = self.get_vertex_id(&Vertex::Sink)?;
+
+ let nb_vertices = self.graph.len();
+
+ let flow_upper_bound = self.flow_upper_bound()?;
+
+ // To ensure the dispersion of the associations generated by the
+ // assignation, we shuffle the neighbours of the nodes. Hence,
+ // the vertices do not consider their neighbours in the same order.
+ self.shuffle_edges();
+
+ // We run Dinic's max flow algorithm
+ loop {
+ // We build the level array from Dinic's algorithm.
+ let mut level = vec![None; nb_vertices];
+
+ let mut fifo = VecDeque::new();
+ fifo.push_back((idsource, 0));
+ while let Some((id, lvl)) = fifo.pop_front() {
+ if level[id] == None {
+ // it means id has not yet been reached
+ level[id] = Some(lvl);
+ for edge in self.graph[id].iter() {
+ if edge.cap as i64 - edge.flow > 0 {
+ fifo.push_back((edge.dest, lvl + 1));
+ }
+ }
+ }
+ }
+ if level[idsink] == None {
+ // There is no residual flow
+ break;
+ }
+ // Now we run DFS respecting the level array
+ let mut next_nbd = vec![0; nb_vertices];
+ let mut lifo = Vec::new();
+
+ lifo.push((idsource, flow_upper_bound));
+
+ while let Some((id, f)) = lifo.last().cloned() {
+ if id == idsink {
+ // The DFS reached the sink, we can add a
+ // residual flow.
+ lifo.pop();
+ while let Some((id, _)) = lifo.pop() {
+ let nbd = next_nbd[id];
+ self.graph[id][nbd].flow += f as i64;
+ let id_rev = self.graph[id][nbd].dest;
+ let nbd_rev = self.graph[id][nbd].rev;
+ self.graph[id_rev][nbd_rev].flow -= f as i64;
+ }
+ lifo.push((idsource, flow_upper_bound));
+ continue;
+ }
+ // else we did not reach the sink
+ let nbd = next_nbd[id];
+ if nbd >= self.graph[id].len() {
+ // There is nothing to explore from id anymore
+ lifo.pop();
+ if let Some((parent, _)) = lifo.last() {
+ next_nbd[*parent] += 1;
+ }
+ continue;
+ }
+ // else we can try to send flow from id to its nbd
+ let new_flow = min(
+ f as i64,
+ self.graph[id][nbd].cap as i64 - self.graph[id][nbd].flow,
+ ) as u64;
+ if new_flow == 0 {
+ next_nbd[id] += 1;
+ continue;
+ }
+ if let (Some(lvldest), Some(lvlid)) = (level[self.graph[id][nbd].dest], level[id]) {
+ if lvldest <= lvlid {
+ // We cannot send flow to nbd.
+ next_nbd[id] += 1;
+ continue;
+ }
+ }
+ // otherwise, we send flow to nbd.
+ lifo.push((self.graph[id][nbd].dest, new_flow));
+ }
+ }
+ Ok(())
+ }
+
+ /// This function takes a flow, and a cost function on the edges, and tries to find an
+ /// equivalent flow with a better cost, by finding improving overflow cycles. It uses
+ /// as subroutine the Bellman Ford algorithm run up to path_length.
+ /// We assume that the cost of edge (u,v) is the opposite of the cost of (v,u), and
+ /// only one needs to be present in the cost function.
+ pub fn optimize_flow_with_cost(
+ &mut self,
+ cost: &CostFunction,
+ path_length: usize,
+ ) -> Result<(), String> {
+ // We build the weighted graph g where we will look for negative cycle
+ let mut gf = self.build_cost_graph(cost)?;
+ let mut cycles = gf.list_negative_cycles(path_length);
+ while !cycles.is_empty() {
+ // we enumerate negative cycles
+ for c in cycles.iter() {
+ for i in 0..c.len() {
+ // We add one flow unit to the edge (u,v) of cycle c
+ let idu = self.vertex_to_id[&c[i]];
+ let idv = self.vertex_to_id[&c[(i + 1) % c.len()]];
+ for j in 0..self.graph[idu].len() {
+ // since idu appears at most once in the cycles, we enumerate every
+ // edge at most once.
+ let edge = self.graph[idu][j];
+ if edge.dest == idv {
+ self.graph[idu][j].flow += 1;
+ self.graph[idv][edge.rev].flow -= 1;
+ break;
+ }
+ }
+ }
+ }
+
+ gf = self.build_cost_graph(cost)?;
+ cycles = gf.list_negative_cycles(path_length);
+ }
+ Ok(())
+ }
+
+ /// Construct the weighted graph G_f from the flow and the cost function
+ fn build_cost_graph(&self, cost: &CostFunction) -> Result<Graph<WeightedEdge>, String> {
+ let mut g = Graph::<WeightedEdge>::new(&self.id_to_vertex);
+ let nb_vertices = self.id_to_vertex.len();
+ for i in 0..nb_vertices {
+ for edge in self.graph[i].iter() {
+ if edge.cap as i64 - edge.flow > 0 {
+ // It is possible to send overflow through this edge
+ let u = self.id_to_vertex[i];
+ let v = self.id_to_vertex[edge.dest];
+ if cost.contains_key(&(u, v)) {
+ g.add_edge(u, v, cost[&(u, v)])?;
+ } else if cost.contains_key(&(v, u)) {
+ g.add_edge(u, v, -cost[&(v, u)])?;
+ } else {
+ g.add_edge(u, v, 0)?;
+ }
+ }
+ }
+ }
+ Ok(g)
+ }
+}
+
+impl Graph<WeightedEdge> {
+ /// This function adds a single directed weighted edge to the graph.
+ pub fn add_edge(&mut self, u: Vertex, v: Vertex, w: i64) -> Result<(), String> {
+ let idu = self.get_vertex_id(&u)?;
+ let idv = self.get_vertex_id(&v)?;
+ self.graph[idu].push(WeightedEdge { w, dest: idv });
+ Ok(())
+ }
+
+ /// This function lists the negative cycles it manages to find after path_length
+ /// iterations of the main loop of the Bellman-Ford algorithm. For the classical
+ /// algorithm, path_length needs to be equal to the number of vertices. However,
+ /// for particular graph structures like in our case, the algorithm is still correct
+ /// when path_length is the length of the longest possible simple path.
+ /// See the formal description of the algorithm for more details.
+ fn list_negative_cycles(&self, path_length: usize) -> Vec<Vec<Vertex>> {
+ let nb_vertices = self.graph.len();
+
+ // We start with every vertex at distance 0 of some imaginary extra -1 vertex.
+ let mut distance = vec![0; nb_vertices];
+ // The prev vector collects for every vertex from where does the shortest path come
+ let mut prev = vec![None; nb_vertices];
+
+ for _ in 0..path_length + 1 {
+ for id in 0..nb_vertices {
+ for e in self.graph[id].iter() {
+ if distance[id] + e.w < distance[e.dest] {
+ distance[e.dest] = distance[id] + e.w;
+ prev[e.dest] = Some(id);
+ }
+ }
+ }
+ }
+
+ // If self.graph contains a negative cycle, then at this point the graph described
+ // by prev (which is a directed 1-forest/functional graph)
+ // must contain a cycle. We list the cycles of prev.
+ let cycles_prev = cycles_of_1_forest(&prev);
+
+ // Remark that the cycle in prev is in the reverse order compared to the cycle
+ // in the graph. Thus the .rev().
+ return cycles_prev
+ .iter()
+ .map(|cycle| {
+ cycle
+ .iter()
+ .rev()
+ .map(|id| self.id_to_vertex[*id])
+ .collect()
+ })
+ .collect();
+ }
+}
+
+/// This function returns the list of cycles of a directed 1 forest. It does not
+/// check for the consistency of the input.
+fn cycles_of_1_forest(forest: &[Option<usize>]) -> Vec<Vec<usize>> {
+ let mut cycles = Vec::<Vec<usize>>::new();
+ let mut time_of_discovery = vec![None; forest.len()];
+
+ for t in 0..forest.len() {
+ let mut id = t;
+ // while we are on a valid undiscovered node
+ while time_of_discovery[id] == None {
+ time_of_discovery[id] = Some(t);
+ if let Some(i) = forest[id] {
+ id = i;
+ } else {
+ break;
+ }
+ }
+ if forest[id] != None && time_of_discovery[id] == Some(t) {
+ // We discovered an id that we explored at this iteration t.
+ // It means we are on a cycle
+ let mut cy = vec![id; 1];
+ let mut id2 = id;
+ while let Some(id_next) = forest[id2] {
+ id2 = id_next;
+ if id2 != id {
+ cy.push(id2);
+ } else {
+ break;
+ }
+ }
+ cycles.push(cy);
+ }
+ }
+ cycles
+}
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index 2fd5acfc..d756f0aa 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -1,14 +1,28 @@
use std::cmp::Ordering;
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
+use std::collections::HashSet;
+
+use bytesize::ByteSize;
+use itertools::Itertools;
use serde::{Deserialize, Serialize};
-use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
+use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap};
use garage_util::data::*;
+use garage_util::encode::nonversioned_encode;
use garage_util::error::*;
+use crate::graph_algo::*;
+
use crate::ring::*;
+use std::convert::TryInto;
+
+const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
+
+// The Message type will be used to collect information on the algorithm.
+type Message = Vec<String>;
+
/// The layout of the cluster, i.e. the list of roles
/// which are assigned to each cluster node
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -16,12 +30,21 @@ pub struct ClusterLayout {
pub version: u64,
pub replication_factor: usize,
+
+ /// This attribute is only used to retain the previously computed partition size,
+ /// to know to what extent does it change with the layout update.
+ pub partition_size: u64,
+ /// Parameters used to compute the assignation currently given by
+ /// ring_assignation_data
+ pub parameters: LayoutParameters,
+
pub roles: LwwMap<Uuid, NodeRoleV>,
/// node_id_vec: a vector of node IDs with a role assigned
/// in the system (this includes gateway nodes).
/// The order here is different than the vec stored by `roles`, because:
- /// 1. non-gateway nodes are first so that they have lower numbers
+ /// 1. non-gateway nodes are first so that they have lower numbers holding
+ /// in u8 (the number of non-gateway nodes is at most 256).
/// 2. nodes that don't have a role are excluded (but they need to
/// stay in the CRDT as tombstones)
pub node_id_vec: Vec<Uuid>,
@@ -30,10 +53,24 @@ pub struct ClusterLayout {
#[serde(with = "serde_bytes")]
pub ring_assignation_data: Vec<CompactNodeType>,
+ /// Parameters to be used in the next partition assignation computation.
+ pub staging_parameters: Lww<LayoutParameters>,
/// Role changes which are staged for the next version of the layout
- pub staging: LwwMap<Uuid, NodeRoleV>,
+ pub staging_roles: LwwMap<Uuid, NodeRoleV>,
pub staging_hash: Hash,
}
+impl garage_util::migrate::InitialFormat for ClusterLayout {}
+
+/// This struct is used to set the parameters to be used in the assignation computation
+/// algorithm. It is stored as a Crdt.
+#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
+pub struct LayoutParameters {
+ pub zone_redundancy: usize,
+}
+
+impl AutoCrdt for LayoutParameters {
+ const WARN_IF_DIFFERENT: bool = true;
+}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRoleV(pub Option<NodeRole>);
@@ -45,13 +82,13 @@ impl AutoCrdt for NodeRoleV {
/// The user-assigned roles of cluster nodes
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRole {
- /// Datacenter at which this entry belong. This information might be used to perform a better
- /// geodistribution
+ /// Datacenter at which this entry belong. This information is used to
+ /// perform a better geodistribution
pub zone: String,
- /// The (relative) capacity of the node
+ /// The capacity of the node
/// If this is set to None, the node does not participate in storing data for the system
/// and is only active as an API gateway to other nodes
- pub capacity: Option<u32>,
+ pub capacity: Option<u64>,
/// A set of tags to recognize the node
pub tags: Vec<String>,
}
@@ -59,26 +96,47 @@ pub struct NodeRole {
impl NodeRole {
pub fn capacity_string(&self) -> String {
match self.capacity {
- Some(c) => format!("{}", c),
+ Some(c) => ByteSize::b(c).to_string_as(false),
None => "gateway".to_string(),
}
}
+
+ pub fn tags_string(&self) -> String {
+ self.tags.join(",")
+ }
}
+// Implementation of the ClusterLayout methods unrelated to the assignation algorithm.
impl ClusterLayout {
pub fn new(replication_factor: usize) -> Self {
+ // We set the default zone redundancy to be equal to the replication factor,
+ // i.e. as strict as possible.
+ let parameters = LayoutParameters {
+ zone_redundancy: replication_factor,
+ };
+ let staging_parameters = Lww::<LayoutParameters>::new(parameters.clone());
+
let empty_lwwmap = LwwMap::new();
- let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
- ClusterLayout {
+ let mut ret = ClusterLayout {
version: 0,
replication_factor,
+ partition_size: 0,
roles: LwwMap::new(),
node_id_vec: Vec::new(),
ring_assignation_data: Vec::new(),
- staging: empty_lwwmap,
- staging_hash: empty_lwwmap_hash,
- }
+ parameters,
+ staging_parameters,
+ staging_roles: empty_lwwmap,
+ staging_hash: [0u8; 32].into(),
+ };
+ ret.staging_hash = ret.calculate_staging_hash();
+ ret
+ }
+
+ fn calculate_staging_hash(&self) -> Hash {
+ let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
+ blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
}
pub fn merge(&mut self, other: &ClusterLayout) -> bool {
@@ -88,9 +146,10 @@ impl ClusterLayout {
true
}
Ordering::Equal => {
- self.staging.merge(&other.staging);
+ self.staging_parameters.merge(&other.staging_parameters);
+ self.staging_roles.merge(&other.staging_roles);
- let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ let new_staging_hash = self.calculate_staging_hash();
let changed = new_staging_hash != self.staging_hash;
self.staging_hash = new_staging_hash;
@@ -101,7 +160,7 @@ impl ClusterLayout {
}
}
- pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
+ pub fn apply_staged_changes(mut self, version: Option<u64>) -> Result<(Self, Message), Error> {
match version {
None => {
let error = r#"
@@ -117,19 +176,18 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- self.roles.merge(&self.staging);
+ self.roles.merge(&self.staging_roles);
self.roles.retain(|(_, _, v)| v.0.is_some());
+ self.parameters = self.staging_parameters.get().clone();
- if !self.calculate_partition_assignation() {
- return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into()));
- }
+ self.staging_roles.clear();
+ self.staging_hash = self.calculate_staging_hash();
- self.staging.clear();
- self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ let msg = self.calculate_partition_assignation()?;
self.version += 1;
- Ok(self)
+ Ok((self, msg))
}
pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
@@ -148,8 +206,9 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
- self.staging.clear();
- self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ self.staging_roles.clear();
+ self.staging_parameters.update(self.parameters.clone());
+ self.staging_hash = self.calculate_staging_hash();
self.version += 1;
@@ -174,13 +233,81 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
+ /// Returns the uuids of the non_gateway nodes in self.node_id_vec.
+ fn nongateway_nodes(&self) -> Vec<Uuid> {
+ let mut result = Vec::<Uuid>::new();
+ for uuid in self.node_id_vec.iter() {
+ match self.node_role(uuid) {
+ Some(role) if role.capacity != None => result.push(*uuid),
+ _ => (),
+ }
+ }
+ result
+ }
+
+ /// Given a node uuids, this function returns the label of its zone
+ fn get_node_zone(&self, uuid: &Uuid) -> Result<String, Error> {
+ match self.node_role(uuid) {
+ Some(role) => Ok(role.zone.clone()),
+ _ => Err(Error::Message(
+ "The Uuid does not correspond to a node present in the cluster.".into(),
+ )),
+ }
+ }
+
+ /// Given a node uuids, this function returns its capacity or fails if it does not have any
+ pub fn get_node_capacity(&self, uuid: &Uuid) -> Result<u64, Error> {
+ match self.node_role(uuid) {
+ Some(NodeRole {
+ capacity: Some(cap),
+ zone: _,
+ tags: _,
+ }) => Ok(*cap),
+ _ => Err(Error::Message(
+ "The Uuid does not correspond to a node present in the \
+ cluster or this node does not have a positive capacity."
+ .into(),
+ )),
+ }
+ }
+
+ /// Returns the number of partitions associated to this node in the ring
+ pub fn get_node_usage(&self, uuid: &Uuid) -> Result<usize, Error> {
+ for (i, id) in self.node_id_vec.iter().enumerate() {
+ if id == uuid {
+ let mut count = 0;
+ for nod in self.ring_assignation_data.iter() {
+ if i as u8 == *nod {
+ count += 1
+ }
+ }
+ return Ok(count);
+ }
+ }
+ Err(Error::Message(
+ "The Uuid does not correspond to a node present in the \
+ cluster or this node does not have a positive capacity."
+ .into(),
+ ))
+ }
+
+ /// Returns the sum of capacities of non gateway nodes in the cluster
+ fn get_total_capacity(&self) -> Result<u64, Error> {
+ let mut total_capacity = 0;
+ for uuid in self.nongateway_nodes().iter() {
+ total_capacity += self.get_node_capacity(uuid)?;
+ }
+ Ok(total_capacity)
+ }
+
/// Check a cluster layout for internal consistency
+ /// (assignation, roles, parameters, partition size)
/// returns true if consistent, false if error
- pub fn check(&self) -> bool {
+ pub fn check(&self) -> Result<(), String> {
// Check that the hash of the staging data is correct
- let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
+ let staging_hash = self.calculate_staging_hash();
if staging_hash != self.staging_hash {
- return false;
+ return Err("staging_hash is incorrect".into());
}
// Check that node_id_vec contains the correct list of nodes
@@ -195,12 +322,17 @@ To know the correct value of the new layout version, invoke `garage layout show`
let mut node_id_vec = self.node_id_vec.clone();
node_id_vec.sort();
if expected_nodes != node_id_vec {
- return false;
+ return Err(format!("node_id_vec does not contain the correct set of nodes\nnode_id_vec: {:?}\nexpected: {:?}", node_id_vec, expected_nodes));
}
// Check that the assignation data has the correct length
- if self.ring_assignation_data.len() != (1 << PARTITION_BITS) * self.replication_factor {
- return false;
+ let expected_assignation_data_len = (1 << PARTITION_BITS) * self.replication_factor;
+ if self.ring_assignation_data.len() != expected_assignation_data_len {
+ return Err(format!(
+ "ring_assignation_data has incorrect length {} instead of {}",
+ self.ring_assignation_data.len(),
+ expected_assignation_data_len
+ ));
}
// Check that the assigned nodes are correct identifiers
@@ -208,459 +340,776 @@ To know the correct value of the new layout version, invoke `garage layout show`
// and that role is not the role of a gateway nodes
for x in self.ring_assignation_data.iter() {
if *x as usize >= self.node_id_vec.len() {
- return false;
+ return Err(format!(
+ "ring_assignation_data contains invalid node id {}",
+ *x
+ ));
}
let node = self.node_id_vec[*x as usize];
match self.roles.get(&node) {
Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (),
- _ => return false,
+ _ => return Err("ring_assignation_data contains id of a gateway node".into()),
}
}
- true
- }
+ // Check that every partition is associated to distinct nodes
+ let rf = self.replication_factor;
+ for p in 0..(1 << PARTITION_BITS) {
+ let nodes_of_p = self.ring_assignation_data[rf * p..rf * (p + 1)].to_vec();
+ if nodes_of_p.iter().unique().count() != rf {
+ return Err(format!("partition does not contain {} unique node ids", rf));
+ }
+ // Check that every partition is spread over at least zone_redundancy zones.
+ let zones_of_p = nodes_of_p
+ .iter()
+ .map(|n| {
+ self.get_node_zone(&self.node_id_vec[*n as usize])
+ .expect("Zone not found.")
+ })
+ .collect::<Vec<_>>();
+ let redundancy = self.parameters.zone_redundancy;
+ if zones_of_p.iter().unique().count() < redundancy {
+ return Err(format!(
+ "nodes of partition are in less than {} distinct zones",
+ redundancy
+ ));
+ }
+ }
- /// Calculate an assignation of partitions to nodes
- pub fn calculate_partition_assignation(&mut self) -> bool {
- let (configured_nodes, zones) = self.configured_nodes_and_zones();
- let n_zones = zones.len();
+ // Check that the nodes capacities is consistent with the stored partitions
+ let mut node_usage = vec![0; MAX_NODE_NUMBER];
+ for n in self.ring_assignation_data.iter() {
+ node_usage[*n as usize] += 1;
+ }
+ for (n, usage) in node_usage.iter().enumerate() {
+ if *usage > 0 {
+ let uuid = self.node_id_vec[n];
+ let partusage = usage * self.partition_size;
+ let nodecap = self.get_node_capacity(&uuid).unwrap();
+ if partusage > nodecap {
+ return Err(format!(
+ "node usage ({}) is bigger than node capacity ({})",
+ usage * self.partition_size,
+ nodecap
+ ));
+ }
+ }
+ }
- println!("Calculating updated partition assignation, this may take some time...");
- println!();
+ // Check that the partition size stored is the one computed by the asignation
+ // algorithm.
+ let cl2 = self.clone();
+ let (_, zone_to_id) = cl2.generate_nongateway_zone_ids().unwrap();
+ match cl2.compute_optimal_partition_size(&zone_to_id) {
+ Ok(s) if s != self.partition_size => {
+ return Err(format!(
+ "partition_size ({}) is different than optimal value ({})",
+ self.partition_size, s
+ ))
+ }
+ Err(e) => return Err(format!("could not calculate optimal partition size: {}", e)),
+ _ => (),
+ }
- // Get old partition assignation
- let old_partitions = self.parse_assignation_data();
+ Ok(())
+ }
+}
- // Start new partition assignation with nodes from old assignation where it is relevant
- let mut partitions = old_partitions
- .iter()
- .map(|old_part| {
- let mut new_part = PartitionAss::new();
- for node in old_part.nodes.iter() {
- if let Some(role) = node.1 {
- if role.capacity.is_some() {
- new_part.add(None, n_zones, node.0, role);
- }
- }
- }
- new_part
- })
- .collect::<Vec<_>>();
+// Implementation of the ClusterLayout methods related to the assignation algorithm.
+impl ClusterLayout {
+ /// This function calculates a new partition-to-node assignation.
+ /// The computed assignation respects the node replication factor
+ /// and the zone redundancy parameter It maximizes the capacity of a
+ /// partition (assuming all partitions have the same size).
+ /// Among such optimal assignation, it minimizes the distance to
+ /// the former assignation (if any) to minimize the amount of
+ /// data to be moved.
+ /// Staged role changes must be merged with nodes roles before calling this function,
+ /// hence it must only be called from apply_staged_changes() and hence is not public.
+ fn calculate_partition_assignation(&mut self) -> Result<Message, Error> {
+ // We update the node ids, since the node role list might have changed with the
+ // changes in the layout. We retrieve the old_assignation reframed with new ids
+ let old_assignation_opt = self.update_node_id_vec()?;
+
+ let mut msg = Message::new();
+ msg.push("==== COMPUTATION OF A NEW PARTITION ASSIGNATION ====".into());
+ msg.push("".into());
+ msg.push(format!(
+ "Partitions are \
+ replicated {} times on at least {} distinct zones.",
+ self.replication_factor, self.parameters.zone_redundancy
+ ));
+
+ // We generate for once numerical ids for the zones of non gateway nodes,
+ // to use them as indices in the flow graphs.
+ let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
+
+ let nb_nongateway_nodes = self.nongateway_nodes().len();
+ if nb_nongateway_nodes < self.replication_factor {
+ return Err(Error::Message(format!(
+ "The number of nodes with positive \
+ capacity ({}) is smaller than the replication factor ({}).",
+ nb_nongateway_nodes, self.replication_factor
+ )));
+ }
+ if id_to_zone.len() < self.parameters.zone_redundancy {
+ return Err(Error::Message(format!(
+ "The number of zones with non-gateway \
+ nodes ({}) is smaller than the redundancy parameter ({})",
+ id_to_zone.len(),
+ self.parameters.zone_redundancy
+ )));
+ }
- // In various cases, not enough nodes will have been added for all partitions
- // in the step above (e.g. due to node removals, or new zones being added).
- // Here we add more nodes to make a complete (but sub-optimal) assignation,
- // using an initial partition assignation that is calculated using the multi-dc maglev trick
- match self.initial_partition_assignation() {
- Some(initial_partitions) => {
- for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) {
- for _ in 0..2 {
- for (id, info) in ipart.nodes.iter() {
- if part.nodes.len() < self.replication_factor {
- part.add(None, n_zones, id, info.unwrap());
- }
- }
- }
- assert!(part.nodes.len() == self.replication_factor);
- }
- }
- None => {
- // Not enough nodes in cluster to build a correct assignation.
- // Signal it by returning an error.
- return false;
- }
+ // We compute the optimal partition size
+ // Capacities should be given in a unit so that partition size is at least 100.
+ // In this case, integer rounding plays a marginal role in the percentages of
+ // optimality.
+ let partition_size = self.compute_optimal_partition_size(&zone_to_id)?;
+
+ if old_assignation_opt != None {
+ msg.push(format!(
+ "Optimal size of a partition: {} (was {} in the previous layout).",
+ ByteSize::b(partition_size).to_string_as(false),
+ ByteSize::b(self.partition_size).to_string_as(false)
+ ));
+ } else {
+ msg.push(format!(
+ "Given the replication and redundancy constraints, the \
+ optimal size of a partition is {}.",
+ ByteSize::b(partition_size).to_string_as(false)
+ ));
+ }
+ // We write the partition size.
+ self.partition_size = partition_size;
+
+ if partition_size < 100 {
+ msg.push(
+ "WARNING: The partition size is low (< 100), make sure the capacities of your nodes are correct and are of at least a few MB"
+ .into(),
+ );
}
- // Calculate how many partitions each node should ideally store,
- // and how many partitions they are storing with the current assignation
- // This defines our target for which we will optimize in the following loop.
- let total_capacity = configured_nodes
- .iter()
- .map(|(_, info)| info.capacity.unwrap_or(0))
- .sum::<u32>() as usize;
- let total_partitions = self.replication_factor * (1 << PARTITION_BITS);
- let target_partitions_per_node = configured_nodes
- .iter()
- .map(|(id, info)| {
- (
- *id,
- info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity,
- )
- })
- .collect::<HashMap<&Uuid, usize>>();
-
- let mut partitions_per_node = self.partitions_per_node(&partitions[..]);
-
- println!("Target number of partitions per node:");
- for (node, npart) in target_partitions_per_node.iter() {
- println!("{:?}\t{}", node, npart);
- }
- println!();
-
- // Shuffle partitions between nodes so that nodes will reach (or better approach)
- // their target number of stored partitions
- loop {
- let mut option = None;
- for (i, part) in partitions.iter_mut().enumerate() {
- for (irm, (idrm, _)) in part.nodes.iter().enumerate() {
- let errratio = |node, parts| {
- let tgt = *target_partitions_per_node.get(node).unwrap() as f32;
- (parts - tgt) / tgt
- };
- let square = |x| x * x;
-
- let partsrm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as f32;
-
- for (idadd, infoadd) in configured_nodes.iter() {
- // skip replacing a node by itself
- // and skip replacing by gateway nodes
- if idadd == idrm || infoadd.capacity.is_none() {
- continue;
- }
+ // We compute a first flow/assignation that is heuristically close to the previous
+ // assignation
+ let mut gflow = self.compute_candidate_assignation(&zone_to_id, &old_assignation_opt)?;
+ if let Some(assoc) = &old_assignation_opt {
+ // We minimize the distance to the previous assignation.
+ self.minimize_rebalance_load(&mut gflow, &zone_to_id, assoc)?;
+ }
- // We want to try replacing node idrm by node idadd
- // if that brings us close to our goal.
- let partsadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as f32;
- let oldcost = square(errratio(*idrm, partsrm) - errratio(*idadd, partsadd));
- let newcost =
- square(errratio(*idrm, partsrm - 1.) - errratio(*idadd, partsadd + 1.));
- if newcost >= oldcost {
- // not closer to our goal
- continue;
- }
- let gain = oldcost - newcost;
+ // We display statistics of the computation
+ msg.extend(self.output_stat(&gflow, &old_assignation_opt, &zone_to_id, &id_to_zone)?);
+ msg.push("".to_string());
- let mut newpart = part.clone();
+ // We update the layout structure
+ self.update_ring_from_flow(id_to_zone.len(), &gflow)?;
- newpart.nodes.remove(irm);
- if !newpart.add(None, n_zones, idadd, infoadd) {
- continue;
- }
- assert!(newpart.nodes.len() == self.replication_factor);
+ if let Err(e) = self.check() {
+ return Err(Error::Message(
+ format!("Layout check returned an error: {}\nOriginal result of computation: <<<<\n{}\n>>>>", e, msg.join("\n"))
+ ));
+ }
- if !old_partitions[i]
- .is_valid_transition_to(&newpart, self.replication_factor)
- {
- continue;
- }
+ Ok(msg)
+ }
- if option
- .as_ref()
- .map(|(old_gain, _, _, _, _)| gain > *old_gain)
- .unwrap_or(true)
- {
- option = Some((gain, i, idadd, idrm, newpart));
- }
- }
- }
- }
- if let Some((_gain, i, idadd, idrm, newpart)) = option {
- *partitions_per_node.entry(idadd).or_insert(0) += 1;
- *partitions_per_node.get_mut(idrm).unwrap() -= 1;
- partitions[i] = newpart;
- } else {
- break;
- }
+ /// The LwwMap of node roles might have changed. This function updates the node_id_vec
+ /// and returns the assignation given by ring, with the new indices of the nodes, and
+ /// None if the node is not present anymore.
+ /// We work with the assumption that only this function and calculate_new_assignation
+ /// do modify assignation_ring and node_id_vec.
+ fn update_node_id_vec(&mut self) -> Result<Option<Vec<Vec<usize>>>, Error> {
+ // (1) We compute the new node list
+ // Non gateway nodes should be coded on 8bits, hence they must be first in the list
+ // We build the new node ids
+ let new_non_gateway_nodes: Vec<Uuid> = self
+ .roles
+ .items()
+ .iter()
+ .filter(|(_, _, v)| matches!(&v.0, Some(r) if r.capacity != None))
+ .map(|(k, _, _)| *k)
+ .collect();
+
+ if new_non_gateway_nodes.len() > MAX_NODE_NUMBER {
+ return Err(Error::Message(format!(
+ "There are more than {} non-gateway nodes in the new \
+ layout. This is not allowed.",
+ MAX_NODE_NUMBER
+ )));
}
- // Check we completed the assignation correctly
- // (this is a set of checks for the algorithm's consistency)
- assert!(partitions.len() == (1 << PARTITION_BITS));
- assert!(partitions
+ let new_gateway_nodes: Vec<Uuid> = self
+ .roles
+ .items()
.iter()
- .all(|p| p.nodes.len() == self.replication_factor));
-
- let new_partitions_per_node = self.partitions_per_node(&partitions[..]);
- assert!(new_partitions_per_node == partitions_per_node);
-
- // Show statistics
- println!("New number of partitions per node:");
- for (node, npart) in partitions_per_node.iter() {
- let tgt = *target_partitions_per_node.get(node).unwrap();
- let pct = 100f32 * (*npart as f32) / (tgt as f32);
- println!("{:?}\t{}\t({}% of {})", node, npart, pct as i32, tgt);
- }
- println!();
-
- let mut diffcount = HashMap::new();
- for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) {
- let nminus = oldpart.txtplus(newpart);
- let nplus = newpart.txtplus(oldpart);
- if nminus != "[...]" || nplus != "[...]" {
- let tup = (nminus, nplus);
- *diffcount.entry(tup).or_insert(0) += 1;
- }
+ .filter(|(_, _, v)| matches!(v, NodeRoleV(Some(r)) if r.capacity == None))
+ .map(|(k, _, _)| *k)
+ .collect();
+
+ let mut new_node_id_vec = Vec::<Uuid>::new();
+ new_node_id_vec.extend(new_non_gateway_nodes);
+ new_node_id_vec.extend(new_gateway_nodes);
+
+ let old_node_id_vec = self.node_id_vec.clone();
+ self.node_id_vec = new_node_id_vec.clone();
+
+ // (2) We retrieve the old association
+ // We rewrite the old association with the new indices. We only consider partition
+ // to node assignations where the node is still in use.
+ if self.ring_assignation_data.is_empty() {
+ // This is a new association
+ return Ok(None);
}
- if diffcount.is_empty() {
- println!("No data will be moved between nodes.");
- } else {
- let mut diffcount = diffcount.into_iter().collect::<Vec<_>>();
- diffcount.sort();
- println!("Number of partitions that move:");
- for ((nminus, nplus), npart) in diffcount {
- println!("\t{}\t{} -> {}", npart, nminus, nplus);
- }
+
+ if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor {
+ return Err(Error::Message(
+ "The old assignation does not have a size corresponding to \
+ the old replication factor or the number of partitions."
+ .into(),
+ ));
}
- println!();
- // Calculate and save new assignation data
- let (nodes, assignation_data) =
- self.compute_assignation_data(&configured_nodes[..], &partitions[..]);
+ // We build a translation table between the uuid and new ids
+ let mut uuid_to_new_id = HashMap::<Uuid, usize>::new();
+
+ // We add the indices of only the new non-gateway nodes that can be used in the
+ // association ring
+ for (i, uuid) in new_node_id_vec.iter().enumerate() {
+ uuid_to_new_id.insert(*uuid, i);
+ }
- self.node_id_vec = nodes;
- self.ring_assignation_data = assignation_data;
+ let mut old_assignation = vec![Vec::<usize>::new(); NB_PARTITIONS];
+ let rf = self.replication_factor;
- true
+ for (p, old_assign_p) in old_assignation.iter_mut().enumerate() {
+ for old_id in &self.ring_assignation_data[p * rf..(p + 1) * rf] {
+ let uuid = old_node_id_vec[*old_id as usize];
+ if uuid_to_new_id.contains_key(&uuid) {
+ old_assign_p.push(uuid_to_new_id[&uuid]);
+ }
+ }
+ }
+
+ // We write the ring
+ self.ring_assignation_data = Vec::<CompactNodeType>::new();
+
+ Ok(Some(old_assignation))
}
- fn initial_partition_assignation(&self) -> Option<Vec<PartitionAss<'_>>> {
- let (configured_nodes, zones) = self.configured_nodes_and_zones();
- let n_zones = zones.len();
+ /// This function generates ids for the zone of the nodes appearing in
+ /// self.node_id_vec.
+ fn generate_nongateway_zone_ids(&self) -> Result<(Vec<String>, HashMap<String, usize>), Error> {
+ let mut id_to_zone = Vec::<String>::new();
+ let mut zone_to_id = HashMap::<String, usize>::new();
+
+ for uuid in self.nongateway_nodes().iter() {
+ let r = self.node_role(uuid).unwrap();
+ if !zone_to_id.contains_key(&r.zone) && r.capacity != None {
+ zone_to_id.insert(r.zone.clone(), id_to_zone.len());
+ id_to_zone.push(r.zone.clone());
+ }
+ }
+ Ok((id_to_zone, zone_to_id))
+ }
- // Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
- let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
+ /// This function computes by dichotomy the largest realizable partition size, given
+ /// the layout roles and parameters.
+ fn compute_optimal_partition_size(
+ &self,
+ zone_to_id: &HashMap<String, usize>,
+ ) -> Result<u64, Error> {
+ let empty_set = HashSet::<(usize, usize)>::new();
+ let mut g = self.generate_flow_graph(1, zone_to_id, &empty_set)?;
+ g.compute_maximal_flow()?;
+ if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 {
+ return Err(Error::Message(
+ "The storage capacity of he cluster is to small. It is \
+ impossible to store partitions of size 1."
+ .into(),
+ ));
+ }
- // Prepare ring
- let mut partitions: Vec<PartitionAss> = partitions_idx
- .iter()
- .map(|_i| PartitionAss::new())
- .collect::<Vec<_>>();
+ let mut s_down = 1;
+ let mut s_up = self.get_total_capacity()?;
+ while s_down + 1 < s_up {
+ g = self.generate_flow_graph((s_down + s_up) / 2, zone_to_id, &empty_set)?;
+ g.compute_maximal_flow()?;
+ if g.get_flow_value()? < (NB_PARTITIONS * self.replication_factor) as i64 {
+ s_up = (s_down + s_up) / 2;
+ } else {
+ s_down = (s_down + s_up) / 2;
+ }
+ }
- // Create MagLev priority queues for each node
- let mut queues = configured_nodes
- .iter()
- .filter(|(_id, info)| info.capacity.is_some())
- .map(|(node_id, node_info)| {
- let mut parts = partitions_idx
- .iter()
- .map(|i| {
- let part_data =
- [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat();
- (*i, fasthash(&part_data[..]))
- })
- .collect::<Vec<_>>();
- parts.sort_by_key(|(_i, h)| *h);
- let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>();
- (node_id, node_info, parts_i, 0)
- })
- .collect::<Vec<_>>();
+ Ok(s_down)
+ }
- let max_capacity = configured_nodes
- .iter()
- .filter_map(|(_, node_info)| node_info.capacity)
- .fold(0, std::cmp::max);
-
- // Fill up ring
- for rep in 0..self.replication_factor {
- queues.sort_by_key(|(ni, _np, _q, _p)| {
- let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat();
- fasthash(&queue_data[..])
- });
-
- for (_, _, _, pos) in queues.iter_mut() {
- *pos = 0;
+ fn generate_graph_vertices(nb_zones: usize, nb_nodes: usize) -> Vec<Vertex> {
+ let mut vertices = vec![Vertex::Source, Vertex::Sink];
+ for p in 0..NB_PARTITIONS {
+ vertices.push(Vertex::Pup(p));
+ vertices.push(Vertex::Pdown(p));
+ for z in 0..nb_zones {
+ vertices.push(Vertex::PZ(p, z));
}
+ }
+ for n in 0..nb_nodes {
+ vertices.push(Vertex::N(n));
+ }
+ vertices
+ }
- let mut remaining = partitions_idx.len();
- while remaining > 0 {
- let remaining0 = remaining;
- for i_round in 0..max_capacity {
- for (node_id, node_info, q, pos) in queues.iter_mut() {
- if i_round >= node_info.capacity.unwrap() {
- continue;
- }
- for (pos2, &qv) in q.iter().enumerate().skip(*pos) {
- if partitions[qv].add(Some(rep + 1), n_zones, node_id, node_info) {
- remaining -= 1;
- *pos = pos2 + 1;
- break;
- }
- }
- }
- }
- if remaining == remaining0 {
- // No progress made, exit
- return None;
+ /// Generates the graph to compute the maximal flow corresponding to the optimal
+ /// partition assignation.
+ /// exclude_assoc is the set of (partition, node) association that we are forbidden
+ /// to use (hence we do not add the corresponding edge to the graph). This parameter
+ /// is used to compute a first flow that uses only edges appearing in the previous
+ /// assignation. This produces a solution that heuristically should be close to the
+ /// previous one.
+ fn generate_flow_graph(
+ &self,
+ partition_size: u64,
+ zone_to_id: &HashMap<String, usize>,
+ exclude_assoc: &HashSet<(usize, usize)>,
+ ) -> Result<Graph<FlowEdge>, Error> {
+ let vertices =
+ ClusterLayout::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
+ let mut g = Graph::<FlowEdge>::new(&vertices);
+ let nb_zones = zone_to_id.len();
+ let redundancy = self.parameters.zone_redundancy;
+ for p in 0..NB_PARTITIONS {
+ g.add_edge(Vertex::Source, Vertex::Pup(p), redundancy as u64)?;
+ g.add_edge(
+ Vertex::Source,
+ Vertex::Pdown(p),
+ (self.replication_factor - redundancy) as u64,
+ )?;
+ for z in 0..nb_zones {
+ g.add_edge(Vertex::Pup(p), Vertex::PZ(p, z), 1)?;
+ g.add_edge(
+ Vertex::Pdown(p),
+ Vertex::PZ(p, z),
+ self.replication_factor as u64,
+ )?;
+ }
+ }
+ for n in 0..self.nongateway_nodes().len() {
+ let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?;
+ let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[n])?];
+ g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
+ for p in 0..NB_PARTITIONS {
+ if !exclude_assoc.contains(&(p, n)) {
+ g.add_edge(Vertex::PZ(p, node_zone), Vertex::N(n), 1)?;
}
}
}
-
- Some(partitions)
+ Ok(g)
}
- fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) {
- let configured_nodes = self
- .roles
- .items()
- .iter()
- .filter(|(_id, _, info)| info.0.is_some())
- .map(|(id, _, info)| (id, info.0.as_ref().unwrap()))
- .collect::<Vec<(&Uuid, &NodeRole)>>();
+ /// This function computes a first optimal assignation (in the form of a flow graph).
+ fn compute_candidate_assignation(
+ &self,
+ zone_to_id: &HashMap<String, usize>,
+ prev_assign_opt: &Option<Vec<Vec<usize>>>,
+ ) -> Result<Graph<FlowEdge>, Error> {
+ // We list the (partition,node) associations that are not used in the
+ // previous assignation
+ let mut exclude_edge = HashSet::<(usize, usize)>::new();
+ if let Some(prev_assign) = prev_assign_opt {
+ let nb_nodes = self.nongateway_nodes().len();
+ for (p, prev_assign_p) in prev_assign.iter().enumerate() {
+ for n in 0..nb_nodes {
+ exclude_edge.insert((p, n));
+ }
+ for n in prev_assign_p.iter() {
+ exclude_edge.remove(&(p, *n));
+ }
+ }
+ }
- let zones = configured_nodes
- .iter()
- .filter(|(_id, info)| info.capacity.is_some())
- .map(|(_id, info)| info.zone.as_str())
- .collect::<HashSet<&str>>();
+ // We compute the best flow using only the edges used in the previous assignation
+ let mut g = self.generate_flow_graph(self.partition_size, zone_to_id, &exclude_edge)?;
+ g.compute_maximal_flow()?;
- (configured_nodes, zones)
+ // We add the excluded edges and compute the maximal flow with the full graph.
+ // The algorithm is such that it will start with the flow that we just computed
+ // and find ameliorating paths from that.
+ for (p, n) in exclude_edge.iter() {
+ let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
+ g.add_edge(Vertex::PZ(*p, node_zone), Vertex::N(*n), 1)?;
+ }
+ g.compute_maximal_flow()?;
+ Ok(g)
}
- fn compute_assignation_data<'a>(
+ /// This function updates the flow graph gflow to minimize the distance between
+ /// its corresponding assignation and the previous one
+ fn minimize_rebalance_load(
&self,
- configured_nodes: &[(&'a Uuid, &'a NodeRole)],
- partitions: &[PartitionAss<'a>],
- ) -> (Vec<Uuid>, Vec<CompactNodeType>) {
- assert!(partitions.len() == (1 << PARTITION_BITS));
-
- // Make a canonical order for nodes
- let mut nodes = configured_nodes
- .iter()
- .filter(|(_id, info)| info.capacity.is_some())
- .map(|(id, _)| **id)
- .collect::<Vec<_>>();
- let nodes_rev = nodes
- .iter()
- .enumerate()
- .map(|(i, id)| (*id, i as CompactNodeType))
- .collect::<HashMap<Uuid, CompactNodeType>>();
-
- let mut assignation_data = vec![];
- for partition in partitions.iter() {
- assert!(partition.nodes.len() == self.replication_factor);
- for (id, _) in partition.nodes.iter() {
- assignation_data.push(*nodes_rev.get(id).unwrap());
+ gflow: &mut Graph<FlowEdge>,
+ zone_to_id: &HashMap<String, usize>,
+ prev_assign: &[Vec<usize>],
+ ) -> Result<(), Error> {
+ // We define a cost function on the edges (pairs of vertices) corresponding
+ // to the distance between the two assignations.
+ let mut cost = CostFunction::new();
+ for (p, assoc_p) in prev_assign.iter().enumerate() {
+ for n in assoc_p.iter() {
+ let node_zone = zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?];
+ cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
}
}
- nodes.extend(
- configured_nodes
- .iter()
- .filter(|(_id, info)| info.capacity.is_none())
- .map(|(id, _)| **id),
- );
+ // We compute the maximal length of a simple path in gflow. It is used in the
+ // Bellman-Ford algorithm in optimize_flow_with_cost to set the number
+ // of iterations.
+ let nb_nodes = self.nongateway_nodes().len();
+ let path_length = 4 * nb_nodes;
+ gflow.optimize_flow_with_cost(&cost, path_length)?;
- (nodes, assignation_data)
+ Ok(())
}
- fn parse_assignation_data(&self) -> Vec<PartitionAss<'_>> {
- if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) {
- // If the previous assignation data is correct, use that
- let mut partitions = vec![];
- for i in 0..(1 << PARTITION_BITS) {
- let mut part = PartitionAss::new();
- for node_i in self.ring_assignation_data
- [i * self.replication_factor..(i + 1) * self.replication_factor]
- .iter()
- {
- let node_id = &self.node_id_vec[*node_i as usize];
-
- if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) {
- part.nodes.push((node_id, Some(info)));
- } else {
- part.nodes.push((node_id, None));
+ /// This function updates the assignation ring from the flow graph.
+ fn update_ring_from_flow(
+ &mut self,
+ nb_zones: usize,
+ gflow: &Graph<FlowEdge>,
+ ) -> Result<(), Error> {
+ self.ring_assignation_data = Vec::<CompactNodeType>::new();
+ for p in 0..NB_PARTITIONS {
+ for z in 0..nb_zones {
+ let assoc_vertex = gflow.get_positive_flow_from(Vertex::PZ(p, z))?;
+ for vertex in assoc_vertex.iter() {
+ if let Vertex::N(n) = vertex {
+ self.ring_assignation_data.push((*n).try_into().unwrap());
}
}
- partitions.push(part);
}
- partitions
- } else {
- // Otherwise start fresh
- (0..(1 << PARTITION_BITS))
- .map(|_| PartitionAss::new())
- .collect()
}
+
+ if self.ring_assignation_data.len() != NB_PARTITIONS * self.replication_factor {
+ return Err(Error::Message(
+ "Critical Error : the association ring we produced does not \
+ have the right size."
+ .into(),
+ ));
+ }
+ Ok(())
}
- fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> {
- let mut partitions_per_node = HashMap::<&Uuid, usize>::new();
- for p in partitions.iter() {
- for (id, _) in p.nodes.iter() {
- *partitions_per_node.entry(*id).or_insert(0) += 1;
+ /// This function returns a message summing up the partition repartition of the new
+ /// layout, and other statistics of the partition assignation computation.
+ fn output_stat(
+ &self,
+ gflow: &Graph<FlowEdge>,
+ prev_assign_opt: &Option<Vec<Vec<usize>>>,
+ zone_to_id: &HashMap<String, usize>,
+ id_to_zone: &[String],
+ ) -> Result<Message, Error> {
+ let mut msg = Message::new();
+
+ let used_cap = self.partition_size * NB_PARTITIONS as u64 * self.replication_factor as u64;
+ let total_cap = self.get_total_capacity()?;
+ let percent_cap = 100.0 * (used_cap as f32) / (total_cap as f32);
+ msg.push("".into());
+ msg.push(format!(
+ "Usable capacity / Total cluster capacity: {} / {} ({:.1} %)",
+ ByteSize::b(used_cap).to_string_as(false),
+ ByteSize::b(total_cap).to_string_as(false),
+ percent_cap
+ ));
+ msg.push("".into());
+ msg.push(
+ "If the percentage is too low, it might be that the \
+ replication/redundancy constraints force the use of nodes/zones with small \
+ storage capacities. \
+ You might want to rebalance the storage capacities or relax the constraints. \
+ See the detailed statistics below and look for saturated nodes/zones."
+ .into(),
+ );
+ msg.push(format!(
+ "Recall that because of the replication factor, the actual available \
+ storage capacity is {} / {} = {}.",
+ ByteSize::b(used_cap).to_string_as(false),
+ self.replication_factor,
+ ByteSize::b(used_cap / self.replication_factor as u64).to_string_as(false)
+ ));
+
+ // We define and fill in the following tables
+ let storing_nodes = self.nongateway_nodes();
+ let mut new_partitions = vec![0; storing_nodes.len()];
+ let mut stored_partitions = vec![0; storing_nodes.len()];
+
+ let mut new_partitions_zone = vec![0; id_to_zone.len()];
+ let mut stored_partitions_zone = vec![0; id_to_zone.len()];
+
+ for p in 0..NB_PARTITIONS {
+ for z in 0..id_to_zone.len() {
+ let pz_nodes = gflow.get_positive_flow_from(Vertex::PZ(p, z))?;
+ if !pz_nodes.is_empty() {
+ stored_partitions_zone[z] += 1;
+ if let Some(prev_assign) = prev_assign_opt {
+ let mut old_zones_of_p = Vec::<usize>::new();
+ for n in prev_assign[p].iter() {
+ old_zones_of_p
+ .push(zone_to_id[&self.get_node_zone(&self.node_id_vec[*n])?]);
+ }
+ if !old_zones_of_p.contains(&z) {
+ new_partitions_zone[z] += 1;
+ }
+ }
+ }
+ for vert in pz_nodes.iter() {
+ if let Vertex::N(n) = *vert {
+ stored_partitions[n] += 1;
+ if let Some(prev_assign) = prev_assign_opt {
+ if !prev_assign[p].contains(&n) {
+ new_partitions[n] += 1;
+ }
+ }
+ }
+ }
}
}
- partitions_per_node
+
+ if *prev_assign_opt == None {
+ new_partitions = stored_partitions.clone();
+ new_partitions_zone = stored_partitions_zone.clone();
+ }
+
+ // We display the statistics
+
+ msg.push("".into());
+ if *prev_assign_opt != None {
+ let total_new_partitions: usize = new_partitions.iter().sum();
+ msg.push(format!(
+ "A total of {} new copies of partitions need to be \
+ transferred.",
+ total_new_partitions
+ ));
+ }
+ msg.push("".into());
+ msg.push("==== DETAILED STATISTICS BY ZONES AND NODES ====".into());
+
+ for z in 0..id_to_zone.len() {
+ let mut nodes_of_z = Vec::<usize>::new();
+ for n in 0..storing_nodes.len() {
+ if self.get_node_zone(&self.node_id_vec[n])? == id_to_zone[z] {
+ nodes_of_z.push(n);
+ }
+ }
+ let replicated_partitions: usize =
+ nodes_of_z.iter().map(|n| stored_partitions[*n]).sum();
+ msg.push("".into());
+
+ msg.push(format!(
+ "Zone {}: {} distinct partitions stored ({} new, \
+ {} partition copies) ",
+ id_to_zone[z],
+ stored_partitions_zone[z],
+ new_partitions_zone[z],
+ replicated_partitions
+ ));
+
+ let available_cap_z: u64 = self.partition_size * replicated_partitions as u64;
+ let mut total_cap_z = 0;
+ for n in nodes_of_z.iter() {
+ total_cap_z += self.get_node_capacity(&self.node_id_vec[*n])?;
+ }
+ let percent_cap_z = 100.0 * (available_cap_z as f32) / (total_cap_z as f32);
+ msg.push(format!(
+ " Usable capacity / Total capacity: {} / {} ({:.1}%).",
+ ByteSize::b(available_cap_z).to_string_as(false),
+ ByteSize::b(total_cap_z).to_string_as(false),
+ percent_cap_z
+ ));
+
+ for n in nodes_of_z.iter() {
+ let available_cap_n = stored_partitions[*n] as u64 * self.partition_size;
+ let total_cap_n = self.get_node_capacity(&self.node_id_vec[*n])?;
+ let tags_n = (self
+ .node_role(&self.node_id_vec[*n])
+ .ok_or("Node not found."))?
+ .tags_string();
+ msg.push(format!(
+ " Node {:?}: {} partitions ({} new) ; \
+ usable/total capacity: {} / {} ({:.1}%) ; tags:{}",
+ self.node_id_vec[*n],
+ stored_partitions[*n],
+ new_partitions[*n],
+ ByteSize::b(available_cap_n).to_string_as(false),
+ ByteSize::b(total_cap_n).to_string_as(false),
+ (available_cap_n as f32) / (total_cap_n as f32) * 100.0,
+ tags_n
+ ));
+ }
+ }
+
+ Ok(msg)
}
}
-// ---- Internal structs for partition assignation in layout ----
+// ====================================================================================
+
+#[cfg(test)]
+mod tests {
+ use super::{Error, *};
+ use std::cmp::min;
+
+ // This function checks that the partition size S computed is at least better than the
+ // one given by a very naive algorithm. To do so, we try to run the naive algorithm
+ // assuming a partion size of S+1. If we succed, it means that the optimal assignation
+ // was not optimal. The naive algorithm is the following :
+ // - we compute the max number of partitions associated to every node, capped at the
+ // partition number. It gives the number of tokens of every node.
+ // - every zone has a number of tokens equal to the sum of the tokens of its nodes.
+ // - we cycle over the partitions and associate zone tokens while respecting the
+ // zone redundancy constraint.
+ // NOTE: the naive algorithm is not optimal. Counter example:
+ // take nb_partition = 3 ; replication_factor = 5; redundancy = 4;
+ // number of tokens by zone : (A, 4), (B,1), (C,4), (D, 4), (E, 2)
+ // With these parameters, the naive algo fails, whereas there is a solution:
+ // (A,A,C,D,E) , (A,B,C,D,D) (A,C,C,D,E)
+ fn check_against_naive(cl: &ClusterLayout) -> Result<bool, Error> {
+ let over_size = cl.partition_size + 1;
+ let mut zone_token = HashMap::<String, usize>::new();
+
+ let (zones, zone_to_id) = cl.generate_nongateway_zone_ids()?;
+
+ if zones.is_empty() {
+ return Ok(false);
+ }
-#[derive(Clone)]
-struct PartitionAss<'a> {
- nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>,
-}
+ for z in zones.iter() {
+ zone_token.insert(z.clone(), 0);
+ }
+ for uuid in cl.nongateway_nodes().iter() {
+ let z = cl.get_node_zone(uuid)?;
+ let c = cl.get_node_capacity(uuid)?;
+ zone_token.insert(
+ z.clone(),
+ zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize),
+ );
+ }
-impl<'a> PartitionAss<'a> {
- fn new() -> Self {
- Self { nodes: Vec::new() }
- }
+ // For every partition, we count the number of zone already associated and
+ // the name of the last zone associated
- fn nplus(&self, other: &PartitionAss<'a>) -> usize {
- self.nodes
- .iter()
- .filter(|x| !other.nodes.contains(x))
- .count()
- }
+ let mut id_zone_token = vec![0; zones.len()];
+ for (z, t) in zone_token.iter() {
+ id_zone_token[zone_to_id[z]] = *t;
+ }
- fn txtplus(&self, other: &PartitionAss<'a>) -> String {
- let mut nodes = self
- .nodes
- .iter()
- .filter(|x| !other.nodes.contains(x))
- .map(|x| format!("{:?}", x.0))
- .collect::<Vec<_>>();
- nodes.sort();
- if self.nodes.iter().any(|x| other.nodes.contains(x)) {
- nodes.push("...".into());
+ let mut nb_token = vec![0; NB_PARTITIONS];
+ let mut last_zone = vec![zones.len(); NB_PARTITIONS];
+
+ let mut curr_zone = 0;
+
+ let redundancy = cl.parameters.zone_redundancy;
+
+ for replic in 0..cl.replication_factor {
+ for p in 0..NB_PARTITIONS {
+ while id_zone_token[curr_zone] == 0
+ || (last_zone[p] == curr_zone
+ && redundancy - nb_token[p] <= cl.replication_factor - replic)
+ {
+ curr_zone += 1;
+ if curr_zone >= zones.len() {
+ return Ok(true);
+ }
+ }
+ id_zone_token[curr_zone] -= 1;
+ if last_zone[p] != curr_zone {
+ nb_token[p] += 1;
+ last_zone[p] = curr_zone;
+ }
+ }
}
- format!("[{}]", nodes.join(" "))
- }
- fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool {
- let min_keep_nodes_per_part = (replication_factor + 1) / 2;
- let n_removed = self.nplus(other);
+ return Ok(false);
+ }
- if self.nodes.len() <= min_keep_nodes_per_part {
- n_removed == 0
- } else {
- n_removed <= self.nodes.len() - min_keep_nodes_per_part
+ fn show_msg(msg: &Message) {
+ for s in msg.iter() {
+ println!("{}", s);
}
}
- // add is a key function in creating a PartitionAss, i.e. the list of nodes
- // to which a partition is assigned. It tries to add a certain node id to the
- // assignation, but checks that doing so is compatible with the NECESSARY
- // condition that the partition assignation must be dispersed over different
- // zones (datacenters) if enough zones exist. This is why it takes a n_zones
- // parameter, which is the total number of zones that have existing nodes:
- // if nodes in the assignation already cover all n_zones zones, then any node
- // that is not yet in the assignation can be added. Otherwise, only nodes
- // that are in a new zone can be added.
- fn add(
- &mut self,
- target_len: Option<usize>,
- n_zones: usize,
- node: &'a Uuid,
- role: &'a NodeRole,
- ) -> bool {
- if let Some(tl) = target_len {
- if self.nodes.len() != tl - 1 {
- return false;
+ fn update_layout(
+ cl: &mut ClusterLayout,
+ node_id_vec: &Vec<u8>,
+ node_capacity_vec: &Vec<u64>,
+ node_zone_vec: &Vec<String>,
+ zone_redundancy: usize,
+ ) {
+ for i in 0..node_id_vec.len() {
+ if let Some(x) = FixedBytes32::try_from(&[i as u8; 32]) {
+ cl.node_id_vec.push(x);
}
- }
- let p_zns = self
- .nodes
- .iter()
- .map(|(_id, info)| info.unwrap().zone.as_str())
- .collect::<HashSet<&str>>();
- if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str()))
- || (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node))
- {
- self.nodes.push((node, Some(role)));
- true
- } else {
- false
+ let update = cl.staging_roles.update_mutator(
+ cl.node_id_vec[i],
+ NodeRoleV(Some(NodeRole {
+ zone: (node_zone_vec[i].to_string()),
+ capacity: (Some(node_capacity_vec[i])),
+ tags: (vec![]),
+ })),
+ );
+ cl.staging_roles.merge(&update);
}
+ cl.staging_parameters
+ .update(LayoutParameters { zone_redundancy });
+ cl.staging_hash = cl.calculate_staging_hash();
+ }
+
+ #[test]
+ fn test_assignation() {
+ let mut node_id_vec = vec![1, 2, 3];
+ let mut node_capacity_vec = vec![4000, 1000, 2000];
+ let mut node_zone_vec = vec!["A", "B", "C"]
+ .into_iter()
+ .map(|x| x.to_string())
+ .collect();
+
+ let mut cl = ClusterLayout::new(3);
+ update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3);
+ let v = cl.version;
+ let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
+ show_msg(&msg);
+ assert_eq!(cl.check(), Ok(()));
+ assert!(matches!(check_against_naive(&cl), Ok(true)));
+
+ node_id_vec = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
+ node_capacity_vec = vec![4000, 1000, 1000, 3000, 1000, 1000, 2000, 10000, 2000];
+ node_zone_vec = vec!["A", "B", "C", "C", "C", "B", "G", "H", "I"]
+ .into_iter()
+ .map(|x| x.to_string())
+ .collect();
+ update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 2);
+ let v = cl.version;
+ let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
+ show_msg(&msg);
+ assert_eq!(cl.check(), Ok(()));
+ assert!(matches!(check_against_naive(&cl), Ok(true)));
+
+ node_capacity_vec = vec![4000, 1000, 2000, 7000, 1000, 1000, 2000, 10000, 2000];
+ update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 3);
+ let v = cl.version;
+ let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
+ show_msg(&msg);
+ assert_eq!(cl.check(), Ok(()));
+ assert!(matches!(check_against_naive(&cl), Ok(true)));
+
+ node_capacity_vec = vec![
+ 4000000, 4000000, 2000000, 7000000, 1000000, 9000000, 2000000, 10000, 2000000,
+ ];
+ update_layout(&mut cl, &node_id_vec, &node_capacity_vec, &node_zone_vec, 1);
+ let v = cl.version;
+ let (cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
+ show_msg(&msg);
+ assert_eq!(cl.check(), Ok(()));
+ assert!(matches!(check_against_naive(&cl), Ok(true)));
}
}
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 86f63568..f734942d 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -8,6 +8,7 @@ mod consul;
#[cfg(feature = "kubernetes-discovery")]
mod kubernetes;
+pub mod graph_algo;
pub mod layout;
pub mod replication_mode;
pub mod ring;
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 73a126a2..743a5cba 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -40,6 +40,7 @@ pub struct Ring {
// Type to store compactly the id of a node in the system
// Change this to u16 the day we want to have more than 256 nodes in a cluster
pub type CompactNodeType = u8;
+pub const MAX_NODE_NUMBER: usize = 256;
// The maximum number of times an object might get replicated
// This must be at least 3 because Garage supports 3-way replication
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 949aced6..1ec250c3 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -5,7 +5,6 @@ use std::time::Duration;
use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
-use futures_util::future::FutureExt;
use tokio::select;
use tokio::sync::watch;
@@ -24,7 +23,6 @@ pub use netapp::message::{
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::{self, NetApp, NodeID};
-use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@@ -94,7 +92,6 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner {
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
metrics: RpcMetrics,
rpc_timeout: Duration,
@@ -104,7 +101,6 @@ impl RpcHelper {
pub(crate) fn new(
our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>,
- background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
rpc_timeout: Option<Duration>,
) -> Self {
@@ -113,7 +109,6 @@ impl RpcHelper {
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
- background,
ring,
metrics,
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
@@ -377,16 +372,13 @@ impl RpcHelper {
if !resp_stream.is_empty() {
// Continue remaining requests in background.
- // Continue the remaining requests immediately using tokio::spawn
- // but enqueue a task in the background runner
- // to ensure that the process won't exit until the requests are done
- // (if we had just enqueued the resp_stream.collect directly in the background runner,
- // the requests might have been put on hold in the background runner's queue,
- // in which case they might timeout or otherwise fail)
- let wait_finished_fut = tokio::spawn(async move {
+ // Note: these requests can get interrupted on process shutdown,
+ // we must not count on them being executed for certain.
+ // For all background things that have to happen with certainty,
+ // they have to be put in a proper queue that is persisted to disk.
+ tokio::spawn(async move {
resp_stream.collect::<Vec<Result<_, _>>>().await;
});
- self.0.background.spawn(wait_finished_fut.map(|_| Ok(())));
}
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 2c6f14fd..1f4d86e7 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -21,7 +21,6 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
-use garage_util::background::BackgroundRunner;
use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
@@ -50,8 +49,6 @@ pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008
/// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
-pub const CONNECT_ERROR_MESSAGE: &str = "Error establishing RPC connection to remote node. This can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret";
-
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum SystemRpc {
@@ -76,13 +73,17 @@ impl Rpc for SystemRpc {
type Response = Result<SystemRpc, Error>;
}
+#[derive(Serialize, Deserialize)]
+pub struct PeerList(Vec<(Uuid, SocketAddr)>);
+impl garage_util::migrate::InitialFormat for PeerList {}
+
/// This node's membership manager
pub struct System {
/// The id of this node
pub id: Uuid,
persist_cluster_layout: Persister<ClusterLayout>,
- persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>,
+ persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
@@ -110,9 +111,6 @@ pub struct System {
pub ring: watch::Receiver<Arc<Ring>>,
update_ring: Mutex<watch::Sender<Arc<Ring>>>,
- /// The job runner of this node
- pub background: Arc<BackgroundRunner>,
-
/// Path to metadata directory
pub metadata_dir: PathBuf,
}
@@ -232,7 +230,6 @@ impl System {
/// Create this node's membership manager
pub fn new(
network_key: NetworkKey,
- background: Arc<BackgroundRunner>,
replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
@@ -354,7 +351,6 @@ impl System {
rpc: RpcHelper::new(
netapp.id.into(),
fullmesh,
- background.clone(),
ring.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
),
@@ -372,7 +368,6 @@ impl System {
ring,
update_ring: Mutex::new(update_ring),
- background,
metadata_dir: config.metadata_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
@@ -444,17 +439,14 @@ impl System {
))
})?;
let mut errors = vec![];
- for ip in addrs.iter() {
- match self
- .netapp
- .clone()
- .try_connect(*ip, pubkey)
- .await
- .err_context(CONNECT_ERROR_MESSAGE)
- {
+ for addr in addrs.iter() {
+ match self.netapp.clone().try_connect(*addr, pubkey).await {
Ok(()) => return Ok(()),
Err(e) => {
- errors.push((*ip, e));
+ errors.push((
+ *addr,
+ Error::Message(connect_error_message(*addr, pubkey, e)),
+ ));
}
}
}
@@ -529,56 +521,61 @@ impl System {
// ---- INTERNALS ----
#[cfg(feature = "consul-discovery")]
- async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
+ async fn advertise_to_consul(self: Arc<Self>) {
let c = match &self.consul_discovery {
Some(c) => c,
- _ => return Ok(()),
+ _ => return,
};
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected.");
- return Ok(());
+ return;
}
};
- c.publish_consul_service(
- self.netapp.id,
- &self.local_status.load_full().hostname,
- rpc_public_addr,
- )
- .await
- .err_context("Error while publishing Consul service")
+ if let Err(e) = c
+ .publish_consul_service(
+ self.netapp.id,
+ &self.local_status.load_full().hostname,
+ rpc_public_addr,
+ )
+ .await
+ {
+ error!("Error while publishing Consul service: {}", e);
+ }
}
#[cfg(feature = "kubernetes-discovery")]
- async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> {
+ async fn advertise_to_kubernetes(self: Arc<Self>) {
let k = match &self.kubernetes_discovery {
Some(k) => k,
- _ => return Ok(()),
+ _ => return,
};
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("Not advertising to Kubernetes because rpc_public_addr is not defined in config file and could not be autodetected.");
- return Ok(());
+ return;
}
};
- publish_kubernetes_node(
+ if let Err(e) = publish_kubernetes_node(
k,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
)
.await
- .err_context("Error while publishing node to kubernetes")
+ {
+ error!("Error while publishing node to Kubernetes: {}", e);
+ }
}
/// Save network configuration to disc
- async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
+ async fn save_cluster_layout(&self) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone();
self.persist_cluster_layout
.save_async(&ring.layout)
@@ -630,11 +627,7 @@ impl System {
if info.cluster_layout_version > local_info.cluster_layout_version
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
{
- let self2 = self.clone();
- self.background.spawn_cancellable(async move {
- self2.pull_cluster_layout(from).await;
- Ok(())
- });
+ tokio::spawn(self.clone().pull_cluster_layout(from));
}
self.node_status
@@ -662,9 +655,9 @@ impl System {
let update_ring = self.update_ring.lock().await;
let mut layout: ClusterLayout = self.ring.borrow().layout.clone();
- let prev_layout_check = layout.check();
+ let prev_layout_check = layout.check().is_ok();
if layout.merge(adv) {
- if prev_layout_check && !layout.check() {
+ if prev_layout_check && !layout.check().is_ok() {
error!("New cluster layout is invalid, discarding.");
return Err(Error::Message(
"New cluster layout is invalid, discarding.".into(),
@@ -676,18 +669,21 @@ impl System {
drop(update_ring);
let self2 = self.clone();
- self.background.spawn_cancellable(async move {
- self2
+ tokio::spawn(async move {
+ if let Err(e) = self2
.rpc
.broadcast(
&self2.system_endpoint,
SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
- .await?;
- Ok(())
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
+ }
});
- self.background.spawn(self.clone().save_cluster_layout());
+
+ self.save_cluster_layout().await?;
}
Ok(SystemRpc::Ok)
@@ -717,7 +713,7 @@ impl System {
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
- let not_configured = !self.ring.borrow().layout.check();
+ let not_configured = !self.ring.borrow().layout.check().is_ok();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
let expected_n_nodes = self.ring.borrow().layout.num_nodes();
let bad_peers = self
@@ -734,7 +730,7 @@ impl System {
// Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await {
- ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr)))
+ ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
}
// Fetch peer list from Consul
@@ -773,12 +769,12 @@ impl System {
}
for (node_id, node_addr) in ping_list {
- tokio::spawn(
- self.netapp
- .clone()
- .try_connect(node_addr, node_id)
- .map(|r| r.err_context(CONNECT_ERROR_MESSAGE)),
- );
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await {
+ error!("{}", connect_error_message(node_addr, node_id, e));
+ }
+ });
}
}
@@ -787,11 +783,10 @@ impl System {
}
#[cfg(feature = "consul-discovery")]
- self.background.spawn(self.clone().advertise_to_consul());
+ tokio::spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")]
- self.background
- .spawn(self.clone().advertise_to_kubernetes());
+ tokio::spawn(self.clone().advertise_to_kubernetes());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
@@ -815,12 +810,16 @@ impl System {
// and append it to the list we are about to save,
// so that no peer ID gets lost in the process.
if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await {
- prev_peer_list.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
- peer_list.extend(prev_peer_list);
+ prev_peer_list
+ .0
+ .retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
+ peer_list.extend(prev_peer_list.0);
}
// Save new peer list to file
- self.persist_peer_list.save_async(&peer_list).await
+ self.persist_peer_list
+ .save_async(&PeerList(peer_list))
+ .await
}
async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
@@ -881,3 +880,11 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
ret
}
+
+fn connect_error_message(
+ addr: SocketAddr,
+ pubkey: ed25519::PublicKey,
+ e: netapp::error::Error,
+) -> String {
+ format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret.\n{}", hex::encode(pubkey), addr, e)
+}
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 861e3843..3911c945 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -21,13 +21,13 @@ garage_util = { version = "0.8.1", path = "../util" }
opentelemetry = "0.17"
async-trait = "0.1.7"
+arc-swap = "1.0"
bytes = "1.0"
hex = "0.4"
hexdump = "0.1"
tracing = "0.1.30"
rand = "0.8"
-rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11"
diff --git a/src/table/data.rs b/src/table/data.rs
index 93da2110..5c792f1f 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -10,6 +10,7 @@ use garage_db::counted_tree_hack::CountedTree;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::migrate::Migrate;
use garage_rpc::system::System;
@@ -31,16 +32,16 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
+
+ pub(crate) insert_queue: db::Tree,
+ pub(crate) insert_queue_notify: Notify,
+
pub(crate) gc_todo: CountedTree,
pub(crate) metrics: TableMetrics,
}
-impl<F, R> TableData<F, R>
-where
- F: TableSchema,
- R: TableReplication,
-{
+impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
.open_tree(&format!("{}:table", F::TABLE_NAME))
@@ -53,9 +54,13 @@ where
.open_tree(&format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
+ let insert_queue = db
+ .open_tree(&format!("{}:insert_queue", F::TABLE_NAME))
+ .expect("Unable to open insert queue DB tree");
+
let gc_todo = db
.open_tree(&format!("{}:gc_todo_v2", F::TABLE_NAME))
- .expect("Unable to open DB tree");
+ .expect("Unable to open GC DB tree");
let gc_todo = CountedTree::new(gc_todo).expect("Cannot count gc_todo_v2");
let metrics = TableMetrics::new(
@@ -74,6 +79,8 @@ where
merkle_tree,
merkle_todo,
merkle_todo_notify: Notify::new(),
+ insert_queue,
+ insert_queue_notify: Notify::new(),
gc_todo,
metrics,
})
@@ -173,9 +180,8 @@ where
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
let update = self.decode_entry(update_bytes)?;
- let tree_key = self.tree_key(update.partition_key(), update.sort_key());
- self.update_entry_with(&tree_key[..], |ent| match ent {
+ self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
Some(mut ent) => {
ent.merge(&update);
ent
@@ -187,11 +193,14 @@ where
pub fn update_entry_with(
&self,
- tree_key: &[u8],
+ partition_key: &F::P,
+ sort_key: &F::S,
f: impl Fn(Option<F::E>) -> F::E,
) -> Result<Option<F::E>, Error> {
+ let tree_key = self.tree_key(partition_key, sort_key);
+
let changed = self.store.db().transaction(|mut tx| {
- let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, tree_key)? {
+ let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
let new_entry = f(Some(old_entry.clone()));
@@ -200,23 +209,24 @@ where
None => (None, None, f(None)),
};
- // Scenario 1: the value changed, so of course there is a change
- let value_changed = Some(&new_entry) != old_entry.as_ref();
-
+ // Changed can be true in two scenarios
+ // Scenario 1: the actual represented value changed,
+ // so of course the messagepack encoding changed as well
// Scenario 2: the value didn't change but due to a migration in the
- // data format, the messagepack encoding changed. In this case
- // we have to write the migrated value in the table and update
- // the associated Merkle tree entry.
- let new_bytes = rmp_to_vec_all_named(&new_entry)
+ // data format, the messagepack encoding changed. In this case,
+ // we also have to write the migrated value in the table and update
+ // the associated Merkle tree entry.
+ let new_bytes = new_entry
+ .encode()
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
- let encoding_changed = Some(&new_bytes[..]) != old_bytes.as_ref().map(|x| &x[..]);
+ let changed = Some(&new_bytes[..]) != old_bytes.as_deref();
drop(old_bytes);
- if value_changed || encoding_changed {
- let new_bytes_hash = blake2sum(&new_bytes[..]);
- tx.insert(&self.merkle_todo, tree_key, new_bytes_hash.as_slice())?;
- tx.insert(&self.store, tree_key, new_bytes)?;
+ if changed {
+ let new_bytes_hash = blake2sum(&new_bytes);
+ tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?;
+ tx.insert(&self.store, &tree_key, new_bytes)?;
self.instance
.updated(&mut tx, old_entry.as_ref(), Some(&new_entry))?;
@@ -242,7 +252,7 @@ where
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
let nodes = self.replication.write_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) {
- GcTodoEntry::new(tree_key.to_vec(), new_bytes_hash).save(&self.gc_todo)?;
+ GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
}
}
@@ -258,10 +268,11 @@ where
.db()
.transaction(|mut tx| match tx.get(&self.store, k)? {
Some(cur_v) if cur_v == v => {
+ let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
+
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
- let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
self.instance.updated(&mut tx, Some(&old_entry), None)?;
Ok(true)
}
@@ -285,10 +296,11 @@ where
.db()
.transaction(|mut tx| match tx.get(&self.store, k)? {
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
+ let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
+
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
- let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
self.instance.updated(&mut tx, Some(&old_entry), None)?;
Ok(true)
}
@@ -302,6 +314,32 @@ where
Ok(removed)
}
+ // ---- Insert queue functions ----
+
+ pub(crate) fn queue_insert(
+ &self,
+ tx: &mut db::Transaction,
+ ins: &F::E,
+ ) -> db::TxResult<(), Error> {
+ let tree_key = self.tree_key(ins.partition_key(), ins.sort_key());
+
+ let new_entry = match tx.get(&self.insert_queue, &tree_key)? {
+ Some(old_v) => {
+ let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?;
+ entry.merge(ins);
+ entry.encode()
+ }
+ None => ins.encode(),
+ };
+ let new_entry = new_entry
+ .map_err(Error::RmpEncode)
+ .map_err(db::TxError::Abort)?;
+ tx.insert(&self.insert_queue, &tree_key, new_entry)?;
+ self.insert_queue_notify.notify_one();
+
+ Ok(())
+ }
+
// ---- Utility functions ----
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
@@ -311,18 +349,18 @@ where
}
pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
- match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
- Ok(x) => Ok(x),
- Err(e) => match F::try_migrate(bytes) {
- Some(x) => Ok(x),
- None => {
- warn!("Unable to decode entry of {}: {}", F::TABLE_NAME, e);
- for line in hexdump::hexdump_iter(bytes) {
- debug!("{}", line);
- }
- Err(e.into())
+ match F::E::decode(bytes) {
+ Some(x) => Ok(x),
+ None => {
+ error!("Unable to decode entry of {}", F::TABLE_NAME);
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
}
- },
+ Err(Error::Message(format!(
+ "Unable to decode entry of {}",
+ F::TABLE_NAME
+ )))
+ }
}
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index cfdc9d2d..5b9124a7 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -31,7 +31,7 @@ const TABLE_GC_BATCH_SIZE: usize = 1024;
// and the moment the garbage collection actually happens)
const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600);
-pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub(crate) struct TableGc<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
@@ -49,29 +49,26 @@ impl Rpc for GcRpc {
type Response = Result<GcRpc, Error>;
}
-impl<F, R> TableGc<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
+impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
+ pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME));
let gc = Arc::new(Self {
- system: system.clone(),
+ system,
data,
endpoint,
});
-
gc.endpoint.set_handler(gc.clone());
- system.background.spawn_worker(GcWorker::new(gc.clone()));
-
gc
}
+ pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ bg.spawn_worker(GcWorker::new(self.clone()));
+ }
+
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
@@ -276,11 +273,7 @@ where
}
#[async_trait]
-impl<F, R> EndpointHandler<GcRpc> for TableGc<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
match message {
GcRpc::Update(items) => {
@@ -298,20 +291,12 @@ where
}
}
-struct GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+struct GcWorker<F: TableSchema, R: TableReplication> {
gc: Arc<TableGc<F, R>>,
wait_delay: Duration,
}
-impl<F, R> GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> GcWorker<F, R> {
fn new(gc: Arc<TableGc<F, R>>) -> Self {
Self {
gc,
@@ -321,11 +306,7 @@ where
}
#[async_trait]
-impl<F, R> Worker for GcWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
fn name(&self) -> String {
format!("{} GC", F::TABLE_NAME)
}
@@ -347,10 +328,7 @@ where
}
}
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- if *must_exit.borrow() {
- return WorkerState::Done;
- }
+ async fn wait_for_work(&mut self) -> WorkerState {
tokio::time::sleep(self.wait_delay).await;
WorkerState::Busy
}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index b0153e9a..fdf114a6 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -4,16 +4,18 @@
#[macro_use]
extern crate tracing;
-mod metrics;
pub mod schema;
pub mod util;
pub mod data;
+pub mod replication;
+pub mod table;
+
mod gc;
mod merkle;
-pub mod replication;
+mod metrics;
+mod queue;
mod sync;
-pub mod table;
pub use schema::*;
pub use table::*;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index e977bfb5..e86d0251 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -3,12 +3,14 @@ use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use tokio::select;
use tokio::sync::watch;
use garage_db as db;
use garage_util::background::*;
use garage_util::data::*;
+use garage_util::encode::{nonversioned_decode, nonversioned_encode};
use garage_util::error::Error;
use garage_rpc::ring::*;
@@ -64,22 +66,18 @@ pub enum MerkleNode {
Leaf(Vec<u8>, Hash),
}
-impl<F, R> MerkleUpdater<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> {
- let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
+impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
+ pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
+ let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]);
- let ret = Arc::new(Self {
+ Arc::new(Self {
data,
empty_node_hash,
- });
-
- background.spawn_worker(MerkleWorker(ret.clone()));
+ })
+ }
- ret
+ pub(crate) fn spawn_workers(self: &Arc<Self>, background: &BackgroundRunner) {
+ background.spawn_worker(MerkleWorker(self.clone()));
}
fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
@@ -276,7 +274,7 @@ where
tx.remove(&self.data.merkle_tree, k.encode())?;
Ok(self.empty_node_hash)
} else {
- let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?;
+ let vby = nonversioned_encode(v).map_err(|e| db::TxError::Abort(e.into()))?;
let rethash = blake2sum(&vby[..]);
tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
Ok(rethash)
@@ -302,17 +300,10 @@ where
}
}
-struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static;
+struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>);
#[async_trait]
-impl<F, R> Worker for MerkleWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> {
fn name(&self) -> String {
format!("{} Merkle", F::TABLE_NAME)
}
@@ -339,11 +330,11 @@ where
.unwrap()
}
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- if *must_exit.borrow() {
- return WorkerState::Done;
+ async fn wait_for_work(&mut self) -> WorkerState {
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(60)) => (),
+ _ = self.0.data.merkle_todo_notify.notified() => (),
}
- tokio::time::sleep(Duration::from_secs(10)).await;
WorkerState::Busy
}
}
@@ -374,7 +365,7 @@ impl MerkleNode {
fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> {
match ent {
None => Ok(MerkleNode::Empty),
- Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
+ Some(v) => Ok(nonversioned_decode::<MerkleNode>(&v[..])?),
}
}
diff --git a/src/table/queue.rs b/src/table/queue.rs
new file mode 100644
index 00000000..0857209b
--- /dev/null
+++ b/src/table/queue.rs
@@ -0,0 +1,77 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use tokio::select;
+use tokio::sync::watch;
+
+use garage_util::background::*;
+use garage_util::error::Error;
+
+use crate::replication::*;
+use crate::schema::*;
+use crate::table::*;
+
+const BATCH_SIZE: usize = 100;
+
+pub(crate) struct InsertQueueWorker<F, R>(pub(crate) Arc<Table<F, R>>)
+where
+ F: TableSchema,
+ R: TableReplication;
+
+#[async_trait]
+impl<F: TableSchema, R: TableReplication> Worker for InsertQueueWorker<F, R> {
+ fn name(&self) -> String {
+ format!("{} queue", F::TABLE_NAME)
+ }
+
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.0.data.insert_queue.len().unwrap_or(0) as u64),
+ ..Default::default()
+ }
+ }
+
+ async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
+ let mut kv_pairs = vec![];
+ let mut values = vec![];
+
+ for entry_kv in self.0.data.insert_queue.iter()? {
+ let (k, v) = entry_kv?;
+
+ values.push(self.0.data.decode_entry(&v)?);
+ kv_pairs.push((k, v));
+
+ if kv_pairs.len() > BATCH_SIZE {
+ break;
+ }
+ }
+
+ if kv_pairs.is_empty() {
+ return Ok(WorkerState::Idle);
+ }
+
+ self.0.insert_many(values).await?;
+
+ self.0.data.insert_queue.db().transaction(|mut tx| {
+ for (k, v) in kv_pairs.iter() {
+ if let Some(v2) = tx.get(&self.0.data.insert_queue, k)? {
+ if &v2 == v {
+ tx.remove(&self.0.data.insert_queue, k)?;
+ }
+ }
+ }
+ Ok(())
+ })?;
+
+ Ok(WorkerState::Busy)
+ }
+
+ async fn wait_for_work(&mut self) -> WorkerState {
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(600)) => (),
+ _ = self.0.data.insert_queue_notify.notified() => (),
+ }
+ WorkerState::Busy
+ }
+}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 3740d947..f00815a2 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -2,7 +2,7 @@ use garage_rpc::ring::*;
use garage_util::data::*;
/// Trait to describe how a table shall be replicated
-pub trait TableReplication: Send + Sync {
+pub trait TableReplication: Send + Sync + 'static {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
diff --git a/src/table/schema.rs b/src/table/schema.rs
index f37e98d8..5cbf6c95 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -2,11 +2,14 @@ use serde::{Deserialize, Serialize};
use garage_db as db;
use garage_util::data::*;
+use garage_util::migrate::Migrate;
use crate::crdt::Crdt;
/// Trait for field used to partition data
-pub trait PartitionKey {
+pub trait PartitionKey:
+ Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
+{
/// Get the key used to partition
fn hash(&self) -> Hash;
}
@@ -27,7 +30,7 @@ impl PartitionKey for FixedBytes32 {
}
/// Trait for field used to sort data
-pub trait SortKey {
+pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
/// Get the key used to sort
fn sort_key(&self) -> &[u8];
}
@@ -46,7 +49,7 @@ impl SortKey for FixedBytes32 {
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>:
- Crdt + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
+ Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static
{
/// Get the key used to partition
fn partition_key(&self) -> &P;
@@ -65,23 +68,16 @@ pub trait TableSchema: Send + Sync + 'static {
const TABLE_NAME: &'static str;
/// The partition key used in that table
- type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type P: PartitionKey;
/// The sort key used int that table
- type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type S: SortKey;
/// They type for an entry in that table
type E: Entry<Self::P, Self::S>;
/// The type for a filter that can be applied to select entries
/// (e.g. filter out deleted entries)
- type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
-
- // Action to take if not able to decode current version:
- // try loading from an older version
- /// Try migrating an entry from an older version
- fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
- None
- }
+ type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
/// Actions triggered by data changing in a table. If such actions
/// include updates to the local database that should be applied
diff --git a/src/table/sync.rs b/src/table/sync.rs
index af7aa640..92a353c6 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
@@ -13,7 +14,8 @@ use tokio::sync::{mpsc, watch};
use garage_util::background::*;
use garage_util::data::*;
-use garage_util::error::Error;
+use garage_util::encode::{debug_serialize, nonversioned_encode};
+use garage_util::error::{Error, OkOrMessage};
use garage_rpc::ring::*;
use garage_rpc::system::System;
@@ -27,12 +29,12 @@ use crate::*;
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
-pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub struct TableSyncer<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
- add_full_sync_tx: mpsc::UnboundedSender<()>,
+ add_full_sync_tx: ArcSwapOption<mpsc::UnboundedSender<()>>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@@ -60,12 +62,8 @@ struct TodoPartition {
retain: bool,
}
-impl<F, R> TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) fn launch(
+impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
+ pub(crate) fn new(
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
@@ -74,34 +72,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
- let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
-
let syncer = Arc::new(Self {
- system: system.clone(),
+ system,
data,
merkle,
- add_full_sync_tx,
+ add_full_sync_tx: ArcSwapOption::new(None),
endpoint,
});
-
syncer.endpoint.set_handler(syncer.clone());
- system.background.spawn_worker(SyncWorker {
- syncer: syncer.clone(),
- ring_recv: system.ring.clone(),
- ring: system.ring.borrow().clone(),
+ syncer
+ }
+
+ pub(crate) fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
+ self.add_full_sync_tx
+ .store(Some(Arc::new(add_full_sync_tx)));
+
+ bg.spawn_worker(SyncWorker {
+ syncer: self.clone(),
+ ring_recv: self.system.ring.clone(),
+ ring: self.system.ring.borrow().clone(),
add_full_sync_rx,
todo: vec![],
next_full_sync: Instant::now() + Duration::from_secs(20),
});
-
- syncer
}
- pub fn add_full_sync(&self) {
- if self.add_full_sync_tx.send(()).is_err() {
- error!("({}) Could not add full sync", F::TABLE_NAME);
- }
+ pub fn add_full_sync(&self) -> Result<(), Error> {
+ let tx = self.add_full_sync_tx.load();
+ let tx = tx
+ .as_ref()
+ .ok_or_message("table sync worker is not running")?;
+ tx.send(()).ok_or_message("send error")?;
+ Ok(())
}
// ----
@@ -295,7 +299,7 @@ where
);
return Ok(());
}
- let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?;
+ let root_ck_hash = hash_of_merkle_node(&root_ck)?;
// Check if they have the same root checksum
// If so, do nothing.
@@ -452,16 +456,12 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
#[async_trait]
-impl<F, R> EndpointHandler<SyncRpc> for TableSyncer<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message {
SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
- let hash = hash_of::<MerkleNode>(&root_ck)?;
+ let hash = hash_of_merkle_node(&root_ck)?;
Ok(SyncRpc::RootCkDifferent(hash != *h))
}
SyncRpc::GetNode(k) => {
@@ -490,7 +490,7 @@ where
// -------- Sync Worker ---------
-struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
+struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
ring_recv: watch::Receiver<Arc<Ring>>,
ring: Arc<Ring>,
@@ -499,7 +499,7 @@ struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
next_full_sync: Instant,
}
-impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> {
fn add_full_sync(&mut self) {
let system = &self.syncer.system;
let data = &self.syncer.data;
@@ -565,7 +565,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
}
#[async_trait]
-impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
+impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
fn name(&self) -> String {
format!("{} sync", F::TABLE_NAME)
}
@@ -586,10 +586,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
}
}
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- if *must_exit.borrow() {
- return WorkerState::Done;
- }
+ async fn wait_for_work(&mut self) -> WorkerState {
select! {
s = self.add_full_sync_rx.recv() => {
if let Some(()) = s {
@@ -618,8 +615,8 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
// ---- UTIL ----
-fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
- Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
+fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> {
+ Ok(blake2sum(&nonversioned_encode(x)?[..]))
}
fn join_ordered<'a, K: Ord + Eq, V1, V2>(
diff --git a/src/table/table.rs b/src/table/table.rs
index 8a66c420..7ad79677 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -14,9 +14,11 @@ use opentelemetry::{
use garage_db as db;
+use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
+use garage_util::migrate::Migrate;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -25,16 +27,18 @@ use crate::crdt::Crdt;
use crate::data::*;
use crate::gc::*;
use crate::merkle::*;
+use crate::queue::InsertQueueWorker;
use crate::replication::*;
use crate::schema::*;
use crate::sync::*;
use crate::util::*;
-pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub struct Table<F: TableSchema, R: TableReplication> {
pub system: Arc<System>,
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
+ gc: Arc<TableGc<F, R>>,
endpoint: Arc<Endpoint<TableRpc<F>, Self>>,
}
@@ -61,11 +65,7 @@ impl<F: TableSchema> Rpc for TableRpc<F> {
type Response = Result<TableRpc<F>, Error>;
}
-impl<F, R> Table<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Table<F, R> {
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
@@ -75,15 +75,16 @@ where
let data = TableData::new(system.clone(), instance, replication, db);
- let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
+ let merkle_updater = MerkleUpdater::new(data.clone());
- let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone());
- TableGc::launch(system.clone(), data.clone());
+ let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
+ let gc = TableGc::new(system.clone(), data.clone());
let table = Arc::new(Self {
system,
data,
merkle_updater,
+ gc,
syncer,
endpoint,
});
@@ -93,6 +94,13 @@ where
table
}
+ pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
+ self.merkle_updater.spawn_workers(bg);
+ self.syncer.spawn_workers(bg);
+ self.gc.spawn_workers(bg);
+ bg.spawn_worker(InsertQueueWorker(self.clone()));
+ }
+
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert", F::TABLE_NAME));
@@ -111,7 +119,7 @@ where
let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
- let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
+ let e_enc = Arc::new(ByteBuf::from(e.encode()?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
self.system
@@ -128,6 +136,11 @@ where
Ok(())
}
+ /// Insert item locally
+ pub fn queue_insert(&self, tx: &mut db::Transaction, e: &F::E) -> db::TxResult<(), Error> {
+ self.data.queue_insert(tx, e)
+ }
+
pub async fn insert_many<I, IE>(&self, entries: I) -> Result<(), Error>
where
I: IntoIterator<Item = IE> + Send + Sync,
@@ -157,7 +170,7 @@ where
let entry = entry.borrow();
let hash = entry.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
- let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
+ let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
for node in who {
call_list.entry(node).or_default().push(e_enc.clone());
}
@@ -259,9 +272,11 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- self.system
- .background
- .spawn_cancellable(async move { self2.repair_on_read(&who[..], ent2).await });
+ tokio::spawn(async move {
+ if let Err(e) = self2.repair_on_read(&who[..], ent2).await {
+ warn!("Error doing repair on read: {}", e);
+ }
+ });
}
}
@@ -358,11 +373,12 @@ where
.into_iter()
.map(|k| ret.get(&k).unwrap().clone())
.collect::<Vec<_>>();
- self.system.background.spawn_cancellable(async move {
+ tokio::spawn(async move {
for v in to_repair {
- self2.repair_on_read(&who[..], v).await?;
+ if let Err(e) = self2.repair_on_read(&who[..], v).await {
+ warn!("Error doing repair on read: {}", e);
+ }
}
- Ok(())
});
}
@@ -393,7 +409,7 @@ where
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
- let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
+ let what_enc = Arc::new(ByteBuf::from(what.encode()?));
self.system
.rpc
.try_call_many(
@@ -408,11 +424,7 @@ where
}
#[async_trait]
-impl<F, R> EndpointHandler<TableRpc<F>> for Table<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
async fn handle(
self: &Arc<Self>,
msg: &TableRpc<F>,
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 11640027..32e9c851 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -23,6 +23,7 @@ bytes = "1.0"
digest = "0.10"
err-derive = "0.3"
git-version = "0.3.4"
+hexdump = "0.1"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4"
lazy_static = "1.4"
diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs
deleted file mode 100644
index 2568ea11..00000000
--- a/src/util/background/job_worker.rs
+++ /dev/null
@@ -1,48 +0,0 @@
-//! Job worker: a generic worker that just processes incoming
-//! jobs one by one
-
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use tokio::sync::{mpsc, Mutex};
-
-use crate::background::worker::*;
-use crate::background::*;
-
-pub(crate) struct JobWorker {
- pub(crate) index: usize,
- pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
- pub(crate) next_job: Option<Job>,
-}
-
-#[async_trait]
-impl Worker for JobWorker {
- fn name(&self) -> String {
- format!("Job worker #{}", self.index)
- }
-
- async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
- match self.next_job.take() {
- None => return Ok(WorkerState::Idle),
- Some(job) => {
- job.await?;
- Ok(WorkerState::Busy)
- }
- }
- }
-
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- loop {
- match self.job_chan.lock().await.recv().await {
- Some((job, cancellable)) => {
- if cancellable && *must_exit.borrow() {
- continue;
- }
- self.next_job = Some(job);
- return WorkerState::Busy;
- }
- None => return WorkerState::Done,
- }
- }
- }
-}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index fd9258b8..41b48e93 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -1,27 +1,18 @@
//! Job runner for futures and async functions
-pub mod job_worker;
pub mod worker;
-use core::future::Future;
-
use std::collections::HashMap;
-use std::pin::Pin;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
-use tokio::sync::{mpsc, watch, Mutex};
+use tokio::sync::{mpsc, watch};
-use crate::error::Error;
use worker::WorkerProcessor;
pub use worker::{Worker, WorkerState};
-pub(crate) type JobOutput = Result<(), Error>;
-pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
-
/// Job runner for futures and async functions
pub struct BackgroundRunner {
- send_job: mpsc::UnboundedSender<(Job, bool)>,
send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
@@ -49,10 +40,7 @@ pub struct WorkerStatus {
impl BackgroundRunner {
/// Create a new BackgroundRunner
- pub fn new(
- n_runners: usize,
- stop_signal: watch::Receiver<bool>,
- ) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
+ pub fn new(stop_signal: watch::Receiver<bool>) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
@@ -63,24 +51,7 @@ impl BackgroundRunner {
worker_processor.run().await;
});
- let (send_job, queue_out) = mpsc::unbounded_channel();
- let queue_out = Arc::new(Mutex::new(queue_out));
-
- for i in 0..n_runners {
- let queue_out = queue_out.clone();
-
- send_worker
- .send(Box::new(job_worker::JobWorker {
- index: i,
- job_chan: queue_out.clone(),
- next_job: None,
- }))
- .ok()
- .unwrap();
- }
-
let bgrunner = Arc::new(Self {
- send_job,
send_worker,
worker_info,
});
@@ -91,31 +62,6 @@ impl BackgroundRunner {
self.worker_info.lock().unwrap().clone()
}
- /// Spawn a task to be run in background
- pub fn spawn<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.send_job
- .send((boxed, false))
- .ok()
- .expect("Could not put job in queue");
- }
-
- /// Spawn a task to be run in background. It may get discarded before running if spawned while
- /// the runner is stopping
- pub fn spawn_cancellable<T>(&self, job: T)
- where
- T: Future<Output = JobOutput> + Send + 'static,
- {
- let boxed: Job = Box::pin(job);
- self.send_job
- .send((boxed, true))
- .ok()
- .expect("Could not put job in queue");
- }
-
pub fn spawn_worker<W>(&self, worker: W)
where
W: Worker + 'static,
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index 7e9da7f8..8165e2cb 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
-use std::time::{Duration, Instant};
+use std::time::Duration;
use async_trait::async_trait;
use futures::future::*;
@@ -14,6 +14,10 @@ use crate::background::{WorkerInfo, WorkerStatus};
use crate::error::Error;
use crate::time::now_msec;
+// All workers that haven't exited for this time after an exit signal was recieved
+// will be interrupted in the middle of whatever they are doing.
+const EXIT_DEADLINE: Duration = Duration::from_secs(8);
+
#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
pub enum WorkerState {
Busy,
@@ -50,10 +54,8 @@ pub trait Worker: Send {
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
/// Wait for work: await for some task to become available. This future can be interrupted in
- /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
- /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
- /// it to check if we are exiting.
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState;
+ /// the middle for any reason, for example if an interrupt signal was recieved.
+ async fn wait_for_work(&mut self) -> WorkerState;
}
pub(crate) struct WorkerProcessor {
@@ -93,11 +95,9 @@ impl WorkerProcessor {
let task_id = next_task_id;
next_task_id += 1;
let stop_signal = self.stop_signal.clone();
- let stop_signal_worker = self.stop_signal.clone();
let mut worker = WorkerHandler {
task_id,
stop_signal,
- stop_signal_worker,
worker: new_worker,
state: WorkerState::Busy,
errors: 0,
@@ -153,26 +153,14 @@ impl WorkerProcessor {
}
// We are exiting, drain everything
- let drain_half_time = Instant::now() + Duration::from_secs(5);
let drain_everything = async move {
- while let Some(mut worker) = workers.next().await {
- if worker.state == WorkerState::Done {
- info!(
- "Worker {} (TID {}) exited",
- worker.worker.name(),
- worker.task_id
- );
- } else if Instant::now() > drain_half_time {
- warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state);
- } else {
- workers.push(
- async move {
- worker.step().await;
- worker
- }
- .boxed(),
- );
- }
+ while let Some(worker) = workers.next().await {
+ info!(
+ "Worker {} (TID {}) exited (last state: {:?})",
+ worker.worker.name(),
+ worker.task_id,
+ worker.state
+ );
}
};
@@ -180,7 +168,7 @@ impl WorkerProcessor {
_ = drain_everything => {
info!("All workers exited peacefully \\o/");
}
- _ = tokio::time::sleep(Duration::from_secs(9)) => {
+ _ = tokio::time::sleep(EXIT_DEADLINE) => {
error!("Some workers could not exit in time, we are cancelling some things in the middle");
}
}
@@ -190,7 +178,6 @@ impl WorkerProcessor {
struct WorkerHandler {
task_id: usize,
stop_signal: watch::Receiver<bool>,
- stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>,
state: WorkerState,
errors: usize,
@@ -225,33 +212,19 @@ impl WorkerHandler {
},
WorkerState::Throttled(delay) => {
// Sleep for given delay and go back to busy state
- if !*self.stop_signal.borrow() {
- select! {
- _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (),
- _ = self.stop_signal.changed() => (),
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => {
+ self.state = WorkerState::Busy;
}
+ _ = self.stop_signal.changed() => (),
}
- self.state = WorkerState::Busy;
}
WorkerState::Idle => {
- if *self.stop_signal.borrow() {
- select! {
- new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
- self.state = new_st;
- }
- _ = tokio::time::sleep(Duration::from_secs(1)) => {
- // stay in Idle state
- }
- }
- } else {
- select! {
- new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
- self.state = new_st;
- }
- _ = self.stop_signal.changed() => {
- // stay in Idle state
- }
+ select! {
+ new_st = self.worker.wait_for_work() => {
+ self.state = new_st;
}
+ _ = self.stop_signal.changed() => (),
}
}
WorkerState::Done => unreachable!(),
diff --git a/src/util/data.rs b/src/util/data.rs
index 7715c2cc..3f61e301 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -140,34 +140,3 @@ pub fn fasthash(data: &[u8]) -> FastHash {
pub fn gen_uuid() -> Uuid {
rand::thread_rng().gen::<[u8; 32]>().into()
}
-
-// RMP serialization with names of fields and variants
-
-/// Serialize to MessagePack
-pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
-where
- T: Serialize + ?Sized,
-{
- let mut wr = Vec::with_capacity(128);
- let mut se = rmp_serde::Serializer::new(&mut wr)
- .with_struct_map()
- .with_string_variants();
- val.serialize(&mut se)?;
- Ok(wr)
-}
-
-/// Serialize to JSON, truncating long result
-pub fn debug_serialize<T: Serialize>(x: T) -> String {
- match serde_json::to_string(&x) {
- Ok(ss) => {
- if ss.len() > 100 {
- // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
- // (or more) codepoint
- ss[..100].to_string()
- } else {
- ss
- }
- }
- Err(e) => format!("<JSON serialization error: {}>", e),
- }
-}
diff --git a/src/util/encode.rs b/src/util/encode.rs
new file mode 100644
index 00000000..1cd3198f
--- /dev/null
+++ b/src/util/encode.rs
@@ -0,0 +1,42 @@
+use serde::{Deserialize, Serialize};
+
+/// Serialize to MessagePacki, without versionning
+/// (see garage_util::migrate for functions that manage versionned
+/// data formats)
+pub fn nonversioned_encode<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
+where
+ T: Serialize + ?Sized,
+{
+ let mut wr = Vec::with_capacity(128);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ val.serialize(&mut se)?;
+ Ok(wr)
+}
+
+/// Deserialize from MessagePacki, without versionning
+/// (see garage_util::migrate for functions that manage versionned
+/// data formats)
+pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Error>
+where
+ T: for<'de> Deserialize<'de> + ?Sized,
+{
+ rmp_serde::decode::from_read_ref::<_, T>(bytes)
+}
+
+/// Serialize to JSON, truncating long result
+pub fn debug_serialize<T: Serialize>(x: T) -> String {
+ match serde_json::to_string(&x) {
+ Ok(ss) => {
+ if ss.len() > 100 {
+ // TODO this can panic if 100 is not a codepoint boundary, but inside a 2 Bytes
+ // (or more) codepoint
+ ss[..100].to_string()
+ } else {
+ ss
+ }
+ }
+ Err(e) => format!("<JSON serialization error: {}>", e),
+ }
+}
diff --git a/src/util/error.rs b/src/util/error.rs
index 9995c746..3fcee71d 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -7,6 +7,7 @@ use err_derive::Error;
use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
use crate::data::*;
+use crate::encode::debug_serialize;
/// Regroup all Garage errors
#[derive(Debug, Error)]
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 264cc192..be82061f 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -8,9 +8,11 @@ pub mod background;
pub mod config;
pub mod crdt;
pub mod data;
+pub mod encode;
pub mod error;
pub mod formater;
pub mod metrics;
+pub mod migrate;
pub mod persister;
pub mod time;
pub mod token_bucket;
diff --git a/src/util/migrate.rs b/src/util/migrate.rs
new file mode 100644
index 00000000..1229fd9c
--- /dev/null
+++ b/src/util/migrate.rs
@@ -0,0 +1,159 @@
+use serde::{Deserialize, Serialize};
+
+/// Indicates that this type has an encoding that can be migrated from
+/// a previous version upon upgrades of Garage.
+pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const VERSION_MARKER: &'static [u8] = b"";
+
+ /// The previous version of this data type, from which items of this version
+ /// can be migrated.
+ type Previous: Migrate;
+
+ /// The migration function that transforms a value decoded in the old format
+ /// to an up-to-date value.
+ fn migrate(previous: Self::Previous) -> Self;
+
+ /// Decode an encoded version of this type, going through a migration if necessary.
+ fn decode(bytes: &[u8]) -> Option<Self> {
+ let marker_len = Self::VERSION_MARKER.len();
+ if bytes.get(..marker_len) == Some(Self::VERSION_MARKER) {
+ if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) {
+ return Some(value);
+ }
+ }
+
+ Self::Previous::decode(bytes).map(Self::migrate)
+ }
+
+ /// Encode this type with optionnal version marker
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ let mut wr = Vec::with_capacity(128);
+ wr.extend_from_slice(Self::VERSION_MARKER);
+ let mut se = rmp_serde::Serializer::new(&mut wr)
+ .with_struct_map()
+ .with_string_variants();
+ self.serialize(&mut se)?;
+ Ok(wr)
+ }
+}
+
+/// Indicates that this type has no previous encoding version to be migrated from.
+pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static {
+ /// A sequence of bytes to add at the beginning of the serialized
+ /// string, to identify that the data is of this version.
+ const VERSION_MARKER: &'static [u8] = b"";
+}
+
+impl<T: InitialFormat> Migrate for T {
+ const VERSION_MARKER: &'static [u8] = <T as InitialFormat>::VERSION_MARKER;
+
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+}
+
+/// Internal type used by InitialFormat, not meant for general use.
+#[derive(Serialize, Deserialize)]
+pub enum NoPrevious {}
+
+impl Migrate for NoPrevious {
+ type Previous = NoPrevious;
+
+ fn migrate(_previous: Self::Previous) -> Self {
+ unreachable!();
+ }
+
+ fn decode(_bytes: &[u8]) -> Option<Self> {
+ None
+ }
+
+ fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
+ unreachable!()
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
+ struct V1 {
+ a: usize,
+ b: String,
+ }
+ impl InitialFormat for V1 {}
+
+ #[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
+ struct V2 {
+ a: usize,
+ b: Vec<String>,
+ c: String,
+ }
+ impl Migrate for V2 {
+ const VERSION_MARKER: &'static [u8] = b"GtestV2";
+ type Previous = V1;
+ fn migrate(prev: V1) -> V2 {
+ V2 {
+ a: prev.a,
+ b: vec![prev.b],
+ c: String::new(),
+ }
+ }
+ }
+
+ #[test]
+ fn test_v1() {
+ let x = V1 {
+ a: 12,
+ b: "hello".into(),
+ };
+ let x_enc = x.encode().unwrap();
+ let y = V1::decode(&x_enc).unwrap();
+ assert_eq!(x, y);
+ }
+
+ #[test]
+ fn test_v2() {
+ let x = V2 {
+ a: 12,
+ b: vec!["hello".into(), "world".into()],
+ c: "plop".into(),
+ };
+ let x_enc = x.encode().unwrap();
+ assert_eq!(&x_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER);
+ let y = V2::decode(&x_enc).unwrap();
+ assert_eq!(x, y);
+ }
+
+ #[test]
+ fn test_migrate() {
+ let x = V1 {
+ a: 12,
+ b: "hello".into(),
+ };
+ let x_enc = x.encode().unwrap();
+
+ let xx = V1::decode(&x_enc).unwrap();
+ assert_eq!(x, xx);
+
+ let y = V2::decode(&x_enc).unwrap();
+ assert_eq!(
+ y,
+ V2 {
+ a: 12,
+ b: vec!["hello".into()],
+ c: "".into(),
+ }
+ );
+
+ let y_enc = y.encode().unwrap();
+ assert_eq!(&y_enc[..V2::VERSION_MARKER.len()], V2::VERSION_MARKER);
+
+ let z = V2::decode(&y_enc).unwrap();
+ assert_eq!(y, z);
+ }
+}
diff --git a/src/util/persister.rs b/src/util/persister.rs
index 9e1a1910..4b9adf51 100644
--- a/src/util/persister.rs
+++ b/src/util/persister.rs
@@ -3,21 +3,16 @@ use std::path::{Path, PathBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use serde::{Deserialize, Serialize};
-
-use crate::data::*;
use crate::error::Error;
+use crate::migrate::Migrate;
-pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
+pub struct Persister<T: Migrate> {
path: PathBuf,
_marker: std::marker::PhantomData<T>,
}
-impl<T> Persister<T>
-where
- T: Serialize + for<'de> Deserialize<'de>,
-{
+impl<T: Migrate> Persister<T> {
pub fn new(base_dir: &Path, file_name: &str) -> Self {
let mut path = base_dir.to_path_buf();
path.push(file_name);
@@ -27,18 +22,37 @@ where
}
}
+ fn decode(&self, bytes: &[u8]) -> Result<T, Error> {
+ match T::decode(bytes) {
+ Some(v) => Ok(v),
+ None => {
+ error!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ );
+ for line in hexdump::hexdump_iter(bytes) {
+ debug!("{}", line);
+ }
+ Err(Error::Message(format!(
+ "Unable to decode persisted data file {}",
+ self.path.display()
+ )))
+ }
+ }
+ }
+
pub fn load(&self) -> Result<T, Error> {
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub fn save(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = std::fs::OpenOptions::new()
.write(true)
@@ -57,12 +71,12 @@ where
let mut bytes = vec![];
file.read_to_end(&mut bytes).await?;
- let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
+ let value = self.decode(&bytes[..])?;
Ok(value)
}
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
- let bytes = rmp_to_vec_all_named(t)?;
+ let bytes = t.encode()?;
let mut file = tokio::fs::File::create(&self.path).await?;
file.write_all(&bytes[..]).await?;