From c94406f4282d48e2e2ac82ffb57eafaad23f7edc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 9 Nov 2021 12:24:04 +0100 Subject: Improve how node roles are assigned in Garage - change the terminology: the network configuration becomes the role table, the configuration of a nodes becomes a node's role - the modification of the role table takes place in two steps: first, changes are staged in a CRDT data structure. Then, once the user is happy with the changes, they can commit them all at once (or revert them). - update documentation - fix tests - implement smarter partition assignation algorithm This patch breaks the format of the network configuration: when migrating, the cluster will be in a state where no roles are assigned. All roles must be re-assigned and commited at once. This migration should not pose an issue. --- src/api/Cargo.toml | 9 +- src/garage/Cargo.toml | 15 +- src/garage/admin.rs | 4 +- src/garage/cli/cmd.rs | 214 ++++---------- src/garage/cli/layout.rs | 340 ++++++++++++++++++++++ src/garage/cli/mod.rs | 2 + src/garage/cli/structs.rs | 78 +++-- src/garage/main.rs | 4 +- src/model/Cargo.toml | 9 +- src/rpc/Cargo.toml | 6 +- src/rpc/layout.rs | 579 ++++++++++++++++++++++++++++++++++++++ src/rpc/lib.rs | 1 + src/rpc/ring.rs | 197 ++----------- src/rpc/rpc_helper.rs | 4 +- src/rpc/system.rs | 100 ++++--- src/table/Cargo.toml | 7 +- src/table/crdt/bool.rs | 34 --- src/table/crdt/crdt.rs | 71 ----- src/table/crdt/lww.rs | 114 -------- src/table/crdt/lww_map.rs | 161 ----------- src/table/crdt/map.rs | 99 ------- src/table/crdt/mod.rs | 23 -- src/table/lib.rs | 5 +- src/table/replication/fullcopy.rs | 4 +- src/util/Cargo.toml | 3 +- src/util/crdt/bool.rs | 34 +++ src/util/crdt/crdt.rs | 71 +++++ src/util/crdt/lww.rs | 120 ++++++++ src/util/crdt/lww_map.rs | 167 +++++++++++ src/util/crdt/map.rs | 99 +++++++ src/util/crdt/mod.rs | 23 ++ src/util/lib.rs | 1 + src/web/Cargo.toml | 11 +- 33 files changed, 1682 insertions(+), 927 deletions(-) create mode 100644 src/garage/cli/layout.rs create mode 100644 src/rpc/layout.rs delete mode 100644 src/table/crdt/bool.rs delete mode 100644 src/table/crdt/crdt.rs delete mode 100644 src/table/crdt/lww.rs delete mode 100644 src/table/crdt/lww_map.rs delete mode 100644 src/table/crdt/map.rs delete mode 100644 src/table/crdt/mod.rs create mode 100644 src/util/crdt/bool.rs create mode 100644 src/util/crdt/crdt.rs create mode 100644 src/util/crdt/lww.rs create mode 100644 src/util/crdt/lww_map.rs create mode 100644 src/util/crdt/map.rs create mode 100644 src/util/crdt/mod.rs (limited to 'src') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index f06c67e4..3ca46764 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -1,11 +1,12 @@ [package] name = "garage_api" -version = "0.4.0" +version = "0.5.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" description = "S3 API server crate for the Garage object store" repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" [lib] path = "lib.rs" @@ -13,9 +14,9 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_model = { version = "0.4.0", path = "../model" } -garage_table = { version = "0.4.0", path = "../table" } -garage_util = { version = "0.4.0", path = "../util" } +garage_model = { version = "0.5.0", path = "../model" } +garage_table = { version = "0.5.0", path = "../table" } +garage_util = { version = "0.5.0", path = "../util" } base64 = "0.13" bytes = "1.0" diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 0a3bc537..74a6ab0e 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -1,11 +1,12 @@ [package] name = "garage" -version = "0.4.0" +version = "0.5.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" description = "Garage, an S3-compatible distributed object store for self-hosted deployments" repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" [[bin]] name = "garage" @@ -14,12 +15,12 @@ path = "main.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_api = { version = "0.4.0", path = "../api" } -garage_model = { version = "0.4.0", path = "../model" } -garage_rpc = { version = "0.4.0", path = "../rpc" } -garage_table = { version = "0.4.0", path = "../table" } -garage_util = { version = "0.4.0", path = "../util" } -garage_web = { version = "0.4.0", path = "../web" } +garage_api = { version = "0.5.0", path = "../api" } +garage_model = { version = "0.5.0", path = "../model" } +garage_rpc = { version = "0.5.0", path = "../rpc" } +garage_table = { version = "0.5.0", path = "../table" } +garage_util = { version = "0.5.0", path = "../util" } +garage_web = { version = "0.5.0", path = "../web" } bytes = "1.0" git-version = "0.3.4" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c3a83d02..f0444988 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -339,7 +339,7 @@ impl AdminRpcHandler { let mut failures = vec![]; let ring = self.garage.system.ring.borrow().clone(); - for node in ring.config.members.keys() { + for node in ring.layout.node_ids().iter() { let node = (*node).into(); let resp = self .endpoint @@ -383,7 +383,7 @@ impl AdminRpcHandler { let mut ret = String::new(); let ring = self.garage.system.ring.borrow().clone(); - for node in ring.config.members.keys() { + for node in ring.layout.node_ids().iter() { let mut opt = opt.clone(); opt.all_nodes = false; diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 2ff46088..a916974e 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use garage_util::error::*; -use garage_rpc::ring::*; +use garage_rpc::layout::*; use garage_rpc::system::*; use garage_rpc::*; @@ -20,11 +20,8 @@ pub async fn cli_command_dispatch( Command::Node(NodeOperation::Connect(connect_opt)) => { cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await } - Command::Node(NodeOperation::Configure(configure_opt)) => { - cmd_configure(system_rpc_endpoint, rpc_host, configure_opt).await - } - Command::Node(NodeOperation::Remove(remove_opt)) => { - cmd_remove(system_rpc_endpoint, rpc_host, remove_opt).await + Command::Layout(layout_opt) => { + cli_layout_command_dispatch(layout_opt, system_rpc_endpoint, rpc_host).await } Command::Bucket(bo) => { cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await @@ -48,56 +45,60 @@ pub async fn cmd_status(rpc_cli: &Endpoint, rpc_host: NodeID) -> SystemRpc::ReturnKnownNodes(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; - let config = match rpc_cli - .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await?? - { - SystemRpc::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; + let layout = fetch_layout(rpc_cli, rpc_host).await?; println!("==== HEALTHY NODES ===="); - let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity".to_string()]; + let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity".to_string()]; for adv in status.iter().filter(|adv| adv.is_up) { - if let Some(cfg) = config.members.get(&adv.id) { - healthy_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}", - id = adv.id, - host = adv.status.hostname, - addr = adv.addr, - tag = cfg.tag, - zone = cfg.zone, - capacity = cfg.capacity_string(), - )); - } else { - healthy_nodes.push(format!( - "{id:?}\t{h}\t{addr}\tNO ROLE ASSIGNED", - id = adv.id, - h = adv.status.hostname, - addr = adv.addr, - )); + match layout.roles.get(&adv.id) { + Some(NodeRoleV(Some(cfg))) => { + healthy_nodes.push(format!( + "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}", + id = adv.id, + host = adv.status.hostname, + addr = adv.addr, + tags = cfg.tags.join(","), + zone = cfg.zone, + capacity = cfg.capacity_string(), + )); + } + _ => { + let new_role = match layout.staging.get(&adv.id) { + Some(NodeRoleV(Some(_))) => "(pending)", + _ => "NO ROLE ASSIGNED", + }; + healthy_nodes.push(format!( + "{id:?}\t{h}\t{addr}\t{new_role}", + id = adv.id, + h = adv.status.hostname, + addr = adv.addr, + new_role = new_role, + )); + } } } format_table(healthy_nodes); let status_keys = status.iter().map(|adv| adv.id).collect::>(); let failure_case_1 = status.iter().any(|adv| !adv.is_up); - let failure_case_2 = config - .members + let failure_case_2 = layout + .roles + .items() .iter() - .any(|(id, _)| !status_keys.contains(id)); + .filter(|(_, _, v)| v.0.is_some()) + .any(|(id, _, _)| !status_keys.contains(id)); if failure_case_1 || failure_case_2 { println!("\n==== FAILED NODES ===="); let mut failed_nodes = - vec!["ID\tHostname\tAddress\tTag\tZone\tCapacity\tLast seen".to_string()]; + vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()]; for adv in status.iter().filter(|adv| !adv.is_up) { - if let Some(cfg) = config.members.get(&adv.id) { + if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) { failed_nodes.push(format!( - "{id:?}\t{host}\t{addr}\t[{tag}]\t{zone}\t{capacity}\t{last_seen}", + "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", id = adv.id, host = adv.status.hostname, addr = adv.addr, - tag = cfg.tag, + tags = cfg.tags.join(","), zone = cfg.zone, capacity = cfg.capacity_string(), last_seen = adv @@ -107,20 +108,28 @@ pub async fn cmd_status(rpc_cli: &Endpoint, rpc_host: NodeID) -> )); } } - for (id, cfg) in config.members.iter() { - if !status_keys.contains(id) { - failed_nodes.push(format!( - "{id:?}\t??\t??\t[{tag}]\t{zone}\t{capacity}\tnever seen", - id = id, - tag = cfg.tag, - zone = cfg.zone, - capacity = cfg.capacity_string(), - )); + for (id, _, role_v) in layout.roles.items().iter() { + if let NodeRoleV(Some(cfg)) = role_v { + if !status_keys.contains(id) { + failed_nodes.push(format!( + "{id:?}\t??\t??\t[{tags}]\t{zone}\t{capacity}\tnever seen", + id = id, + tags = cfg.tags.join(","), + zone = cfg.zone, + capacity = cfg.capacity_string(), + )); + } } } format_table(failed_nodes); } + if print_staging_role_changes(&layout) { + println!(); + println!("Please use `garage layout show` to check the proposed new layout and apply it."); + println!(); + } + Ok(()) } @@ -141,115 +150,6 @@ pub async fn cmd_connect( } } -pub async fn cmd_configure( - rpc_cli: &Endpoint, - rpc_host: NodeID, - args: ConfigureNodeOpt, -) -> Result<(), Error> { - let status = match rpc_cli - .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await?? - { - SystemRpc::ReturnKnownNodes(nodes) => nodes, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; - - let mut config = match rpc_cli - .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await?? - { - SystemRpc::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - for replaced in args.replace.iter() { - let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?; - if config.members.remove(&replaced_node).is_none() { - return Err(Error::Message(format!( - "Cannot replace node {:?} as it is not in current configuration", - replaced_node - ))); - } - } - - if args.capacity.is_some() && args.gateway { - return Err(Error::Message( - "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); - } - if args.capacity == Some(0) { - return Err(Error::Message("Invalid capacity value: 0".into())); - } - - let new_entry = match config.members.get(&added_node) { - None => { - let capacity = match args.capacity { - Some(c) => Some(c), - None if args.gateway => None, - _ => return Err(Error::Message( - "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), - }; - NetworkConfigEntry { - zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?, - capacity, - tag: args.tag.unwrap_or_default(), - } - } - Some(old) => { - let capacity = match args.capacity { - Some(c) => Some(c), - None if args.gateway => None, - _ => old.capacity, - }; - NetworkConfigEntry { - zone: args.zone.unwrap_or_else(|| old.zone.to_string()), - capacity, - tag: args.tag.unwrap_or_else(|| old.tag.to_string()), - } - } - }; - - config.members.insert(added_node, new_entry); - config.version += 1; - - rpc_cli - .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await??; - Ok(()) -} - -pub async fn cmd_remove( - rpc_cli: &Endpoint, - rpc_host: NodeID, - args: RemoveNodeOpt, -) -> Result<(), Error> { - let mut config = match rpc_cli - .call(&rpc_host, &SystemRpc::PullConfig, PRIO_NORMAL) - .await?? - { - SystemRpc::AdvertiseConfig(cfg) => cfg, - resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), - }; - - let deleted_node = find_matching_node(config.members.keys().cloned(), &args.node_id)?; - - if !args.yes { - return Err(Error::Message(format!( - "Add the flag --yes to really remove {:?} from the cluster", - deleted_node - ))); - } - - config.members.remove(&deleted_node); - config.version += 1; - - rpc_cli - .call(&rpc_host, &SystemRpc::AdvertiseConfig(config), PRIO_NORMAL) - .await??; - Ok(()) -} - pub async fn cmd_admin( rpc_cli: &Endpoint, rpc_host: NodeID, @@ -283,5 +183,3 @@ pub async fn cmd_admin( } Ok(()) } - -// --- Utility functions ---- diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs new file mode 100644 index 00000000..0d9e4fa4 --- /dev/null +++ b/src/garage/cli/layout.rs @@ -0,0 +1,340 @@ +use garage_util::crdt::Crdt; +use garage_util::data::*; +use garage_util::error::*; + +use garage_rpc::layout::*; +use garage_rpc::system::*; +use garage_rpc::*; + +use crate::cli::*; + +pub async fn cli_layout_command_dispatch( + cmd: LayoutOperation, + system_rpc_endpoint: &Endpoint, + rpc_host: NodeID, +) -> Result<(), Error> { + match cmd { + LayoutOperation::Assign(configure_opt) => { + cmd_assign_role(system_rpc_endpoint, rpc_host, configure_opt).await + } + LayoutOperation::Remove(remove_opt) => { + cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await + } + LayoutOperation::Show => cmd_show_layout(system_rpc_endpoint, rpc_host).await, + LayoutOperation::Apply(apply_opt) => { + cmd_apply_layout(system_rpc_endpoint, rpc_host, apply_opt).await + } + LayoutOperation::Revert(revert_opt) => { + cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await + } + } +} + +pub async fn cmd_assign_role( + rpc_cli: &Endpoint, + rpc_host: NodeID, + args: AssignRoleOpt, +) -> Result<(), Error> { + let status = match rpc_cli + .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await?? + { + SystemRpc::ReturnKnownNodes(nodes) => nodes, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let added_node = find_matching_node(status.iter().map(|adv| adv.id), &args.node_id)?; + + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + let mut roles = layout.roles.clone(); + roles.merge(&layout.staging); + + for replaced in args.replace.iter() { + let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?; + match roles.get(&replaced_node) { + Some(NodeRoleV(Some(_))) => { + layout + .staging + .merge(&roles.update_mutator(replaced_node, NodeRoleV(None))); + } + _ => { + return Err(Error::Message(format!( + "Cannot replace node {:?} as it is not currently in planned layout", + replaced_node + ))); + } + } + } + + if args.capacity.is_some() && args.gateway { + return Err(Error::Message( + "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); + } + if args.capacity == Some(0) { + return Err(Error::Message("Invalid capacity value: 0".into())); + } + + let new_entry = match roles.get(&added_node) { + Some(NodeRoleV(Some(old))) => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + None => old.capacity, + }; + let tags = if args.tags.is_empty() { + old.tags.clone() + } else { + args.tags + }; + NodeRole { + zone: args.zone.unwrap_or_else(|| old.zone.to_string()), + capacity, + tags, + } + } + _ => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + None => return Err(Error::Message( + "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), + }; + NodeRole { + zone: args.zone.ok_or("Please specifiy a zone with the -z flag")?, + capacity, + tags: args.tags, + } + } + }; + + layout + .staging + .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); + + send_layout(rpc_cli, rpc_host, layout).await?; + + println!("Role change is staged but not yet commited."); + println!("Use `garage layout show` to view staged role changes,"); + println!("and `garage layout apply` to enact staged changes."); + Ok(()) +} + +pub async fn cmd_remove_role( + rpc_cli: &Endpoint, + rpc_host: NodeID, + args: RemoveRoleOpt, +) -> Result<(), Error> { + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + let mut roles = layout.roles.clone(); + roles.merge(&layout.staging); + + let deleted_node = + find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?; + + layout + .staging + .merge(&roles.update_mutator(deleted_node, NodeRoleV(None))); + + send_layout(rpc_cli, rpc_host, layout).await?; + + println!("Role removal is staged but not yet commited."); + println!("Use `garage layout show` to view staged role changes,"); + println!("and `garage layout apply` to enact staged changes."); + Ok(()) +} + +pub async fn cmd_show_layout( + rpc_cli: &Endpoint, + rpc_host: NodeID, +) -> Result<(), Error> { + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + println!("==== CURRENT CLUSTER LAYOUT ===="); + if !print_cluster_layout(&layout) { + println!("No nodes currently have a role in the cluster."); + println!("See `garage status` to view available nodes."); + } + println!(); + println!("Current cluster layout version: {}", layout.version); + + if print_staging_role_changes(&layout) { + layout.roles.merge(&layout.staging); + + println!(); + println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ===="); + if !print_cluster_layout(&layout) { + println!("No nodes have a role in the new layout."); + } + println!(); + + // this will print the stats of what partitions + // will move around when we apply + if layout.calculate_partition_assignation() { + println!("To enact the staged role changes, type:"); + println!(); + println!(" garage layout apply --version {}", layout.version + 1); + println!(); + println!( + "You can also revert all proposed changes with: garage layout revert --version {}", + layout.version + 1 + ); + } else { + println!("Not enough nodes have an assigned role to maintain enough copies of data."); + println!("This new layout cannot yet be applied."); + } + } + + Ok(()) +} + +pub async fn cmd_apply_layout( + rpc_cli: &Endpoint, + rpc_host: NodeID, + apply_opt: ApplyLayoutOpt, +) -> Result<(), Error> { + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + match apply_opt.version { + None => { + println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); + println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); + return Err(Error::Message("--version flag is missing".into())); + } + Some(v) => { + if v != layout.version + 1 { + return Err(Error::Message("Invalid value of --version flag".into())); + } + } + } + + layout.roles.merge(&layout.staging); + + if !layout.calculate_partition_assignation() { + return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); + } + + layout.staging.clear(); + layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); + + layout.version += 1; + + send_layout(rpc_cli, rpc_host, layout).await?; + + println!("New cluster layout with updated role assignation has been applied in cluster."); + println!("Data will now be moved around between nodes accordingly."); + + Ok(()) +} + +pub async fn cmd_revert_layout( + rpc_cli: &Endpoint, + rpc_host: NodeID, + revert_opt: RevertLayoutOpt, +) -> Result<(), Error> { + let mut layout = fetch_layout(rpc_cli, rpc_host).await?; + + match revert_opt.version { + None => { + println!("Please pass the --version flag to ensure that you are writing the correct version of the cluster layout."); + println!("To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes."); + return Err(Error::Message("--version flag is missing".into())); + } + Some(v) => { + if v != layout.version + 1 { + return Err(Error::Message("Invalid value of --version flag".into())); + } + } + } + + layout.staging.clear(); + layout.staging_hash = blake2sum(&rmp_to_vec_all_named(&layout.staging).unwrap()[..]); + + layout.version += 1; + + send_layout(rpc_cli, rpc_host, layout).await?; + + println!("All proposed role changes in cluster layout have been canceled."); + Ok(()) +} + +// --- utility --- + +pub async fn fetch_layout( + rpc_cli: &Endpoint, + rpc_host: NodeID, +) -> Result { + match rpc_cli + .call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL) + .await?? + { + SystemRpc::AdvertiseClusterLayout(t) => Ok(t), + resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + } +} + +pub async fn send_layout( + rpc_cli: &Endpoint, + rpc_host: NodeID, + layout: ClusterLayout, +) -> Result<(), Error> { + rpc_cli + .call( + &rpc_host, + &SystemRpc::AdvertiseClusterLayout(layout), + PRIO_NORMAL, + ) + .await??; + Ok(()) +} + +pub fn print_cluster_layout(layout: &ClusterLayout) -> bool { + let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; + for (id, _, role) in layout.roles.items().iter() { + let role = match &role.0 { + Some(r) => r, + _ => continue, + }; + let tags = role.tags.join(","); + table.push(format!( + "{:?}\t{}\t{}\t{}", + id, + tags, + role.zone, + role.capacity_string() + )); + } + if table.len() == 1 { + false + } else { + format_table(table); + true + } +} + +pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { + if !layout.staging.items().is_empty() { + println!(); + println!("==== STAGED ROLE CHANGES ===="); + let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; + for (id, _, role) in layout.staging.items().iter() { + if let Some(role) = &role.0 { + let tags = role.tags.join(","); + table.push(format!( + "{:?}\t{}\t{}\t{}", + id, + tags, + role.zone, + role.capacity_string() + )); + } else { + table.push(format!("{:?}\tREMOVED", id)); + } + } + format_table(table); + true + } else { + false + } +} diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs index 1567f377..17a2d8ce 100644 --- a/src/garage/cli/mod.rs +++ b/src/garage/cli/mod.rs @@ -1,9 +1,11 @@ pub(crate) mod cmd; pub(crate) mod init; +pub(crate) mod layout; pub(crate) mod structs; pub(crate) mod util; pub(crate) use cmd::*; pub(crate) use init::*; +pub(crate) use layout::*; pub(crate) use structs::*; pub(crate) use util::*; diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index b930d8a8..b2b5375d 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -8,23 +8,23 @@ pub enum Command { #[structopt(name = "server")] Server, - /// Print identifier (public key) of this Garage node - #[structopt(name = "node-id")] - NodeId(NodeIdOpt), - /// Get network status #[structopt(name = "status")] Status, - /// Garage node operations + /// Operations on individual Garage nodes #[structopt(name = "node")] Node(NodeOperation), - /// Bucket operations + /// Operations on the assignation of node roles in the cluster layout + #[structopt(name = "layout")] + Layout(LayoutOperation), + + /// Operations on buckets #[structopt(name = "bucket")] Bucket(BucketOperation), - /// Key operations + /// Operations on S3 access keys #[structopt(name = "key")] Key(KeyOperation), @@ -39,17 +39,13 @@ pub enum Command { #[derive(StructOpt, Debug)] pub enum NodeOperation { + /// Print identifier (public key) of this Garage node + #[structopt(name = "id")] + NodeId(NodeIdOpt), + /// Connect to Garage node that is currently isolated from the system #[structopt(name = "connect")] Connect(ConnectNodeOpt), - - /// Configure Garage node - #[structopt(name = "configure")] - Configure(ConfigureNodeOpt), - - /// Remove Garage node from cluster - #[structopt(name = "remove")] - Remove(RemoveNodeOpt), } #[derive(StructOpt, Debug)] @@ -67,8 +63,31 @@ pub struct ConnectNodeOpt { } #[derive(StructOpt, Debug)] -pub struct ConfigureNodeOpt { - /// Node to configure (prefix of hexadecimal node id) +pub enum LayoutOperation { + /// Assign role to Garage node + #[structopt(name = "assign")] + Assign(AssignRoleOpt), + + /// Remove role from Garage cluster node + #[structopt(name = "remove")] + Remove(RemoveRoleOpt), + + /// Show roles currently assigned to nodes and changes staged for commit + #[structopt(name = "show")] + Show, + + /// Apply staged changes to cluster layout + #[structopt(name = "apply")] + Apply(ApplyLayoutOpt), + + /// Revert staged changes to cluster layout + #[structopt(name = "revert")] + Revert(RevertLayoutOpt), +} + +#[derive(StructOpt, Debug)] +pub struct AssignRoleOpt { + /// Node to which to assign role (prefix of hexadecimal node id) pub(crate) node_id: String, /// Location (zone or datacenter) of the node @@ -83,9 +102,9 @@ pub struct ConfigureNodeOpt { #[structopt(short = "g", long = "gateway")] pub(crate) gateway: bool, - /// Optional node tag + /// Optional tags to add to node #[structopt(short = "t", long = "tag")] - pub(crate) tag: Option, + pub(crate) tags: Vec, /// Replaced node(s): list of node IDs that will be removed from the current cluster #[structopt(long = "replace")] @@ -93,13 +112,24 @@ pub struct ConfigureNodeOpt { } #[derive(StructOpt, Debug)] -pub struct RemoveNodeOpt { - /// Node to configure (prefix of hexadecimal node id) +pub struct RemoveRoleOpt { + /// Node whose role to remove (prefix of hexadecimal node id) pub(crate) node_id: String, +} - /// If this flag is not given, the node won't be removed - #[structopt(long = "yes")] - pub(crate) yes: bool, +#[derive(StructOpt, Debug)] +pub struct ApplyLayoutOpt { + /// Version number of new configuration: this command will fail if + /// it is not exactly 1 + the previous configuration's version + #[structopt(long = "version")] + pub(crate) version: Option, +} + +#[derive(StructOpt, Debug)] +pub struct RevertLayoutOpt { + /// Version number of old configuration to which to revert + #[structopt(long = "version")] + pub(crate) version: Option, } #[derive(Serialize, Deserialize, StructOpt, Debug)] diff --git a/src/garage/main.rs b/src/garage/main.rs index 70c959f8..69cd16e7 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -70,7 +70,9 @@ async fn main() { server::run_server(opt.config_file).await } - Command::NodeId(node_id_opt) => node_id_command(opt.config_file, node_id_opt.quiet), + Command::Node(NodeOperation::NodeId(node_id_opt)) => { + node_id_command(opt.config_file, node_id_opt.quiet) + } _ => cli_command(opt).await, }; diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index f4085c13..7979a79a 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -1,11 +1,12 @@ [package] name = "garage_model" -version = "0.4.0" +version = "0.5.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" description = "Core data model for the Garage object store" repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" [lib] path = "lib.rs" @@ -13,9 +14,9 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_rpc = { version = "0.4.0", path = "../rpc" } -garage_table = { version = "0.4.0", path = "../table" } -garage_util = { version = "0.4.0", path = "../util" } +garage_rpc = { version = "0.5.0", path = "../rpc" } +garage_table = { version = "0.5.0", path = "../table" } +garage_util = { version = "0.5.0", path = "../util" } async-trait = "0.1.7" arc-swap = "1.0" diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index ac7c2a2e..d8ebb71e 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -1,11 +1,12 @@ [package] name = "garage_rpc" -version = "0.4.0" +version = "0.5.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" description = "Cluster membership management and RPC protocol for the Garage object store" repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" [lib] path = "lib.rs" @@ -13,7 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_util = { version = "0.4.0", path = "../util" } +garage_util = { version = "0.5.0", path = "../util" } arc-swap = "1.0" bytes = "1.0" @@ -26,6 +27,7 @@ sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } async-trait = "0.1.7" rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } +serde_bytes = "0.11" serde_json = "1.0" futures = "0.3" diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs new file mode 100644 index 00000000..895dbf1c --- /dev/null +++ b/src/rpc/layout.rs @@ -0,0 +1,579 @@ +use std::cmp::Ordering; +use std::collections::{HashMap, HashSet}; + +use serde::{Deserialize, Serialize}; + +use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; +use garage_util::data::*; + +use crate::ring::*; + +/// The layout of the cluster, i.e. the list of roles +/// which are assigned to each cluster node +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + pub roles: LwwMap, + + /// node_id_vec: a vector of node IDs with a role assigned + /// in the system (this includes gateway nodes). + /// The order here is different than the vec stored by `roles`, because: + /// 1. non-gateway nodes are first so that they have lower numbers + /// 2. nodes that don't have a role are excluded (but they need to + /// stay in the CRDT as tombstones) + pub node_id_vec: Vec, + /// the assignation of data partitions to node, the values + /// are indices in node_id_vec + #[serde(with = "serde_bytes")] + pub ring_assignation_data: Vec, + + /// Role changes which are staged for the next version of the layout + pub staging: LwwMap, + pub staging_hash: Hash, +} + +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct NodeRoleV(pub Option); + +impl AutoCrdt for NodeRoleV { + const WARN_IF_DIFFERENT: bool = true; +} + +/// The user-assigned roles of cluster nodes +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] +pub struct NodeRole { + /// Datacenter at which this entry belong. This information might be used to perform a better + /// geodistribution + pub zone: String, + /// The (relative) capacity of the node + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option, + /// A set of tags to recognize the node + pub tags: Vec, +} + +impl NodeRole { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => format!("{}", c), + None => "gateway".to_string(), + } + } +} + +impl ClusterLayout { + pub fn new(replication_factor: usize) -> Self { + let empty_lwwmap = LwwMap::new(); + let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]); + + ClusterLayout { + version: 0, + replication_factor, + roles: LwwMap::new(), + node_id_vec: Vec::new(), + ring_assignation_data: Vec::new(), + staging: empty_lwwmap, + staging_hash: empty_lwwmap_hash, + } + } + + pub fn merge(&mut self, other: &ClusterLayout) -> bool { + match other.version.cmp(&self.version) { + Ordering::Greater => { + *self = other.clone(); + true + } + Ordering::Equal => { + self.staging.merge(&other.staging); + + let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + let changed = new_staging_hash != self.staging_hash; + + self.staging_hash = new_staging_hash; + + changed + } + Ordering::Less => false, + } + } + + /// Returns a list of IDs of nodes that currently have + /// a role in the cluster + pub fn node_ids(&self) -> &[Uuid] { + &self.node_id_vec[..] + } + + pub fn num_nodes(&self) -> usize { + self.node_id_vec.len() + } + + /// Returns the role of a node in the layout + pub fn node_role(&self, node: &Uuid) -> Option<&NodeRole> { + match self.roles.get(node) { + Some(NodeRoleV(Some(v))) => Some(v), + _ => None, + } + } + + /// Check a cluster layout for internal consistency + /// returns true if consistent, false if error + pub fn check(&self) -> bool { + // Check that the hash of the staging data is correct + let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + if staging_hash != self.staging_hash { + return false; + } + + // Check that node_id_vec contains the correct list of nodes + let mut expected_nodes = self + .roles + .items() + .iter() + .filter(|(_, _, v)| v.0.is_some()) + .map(|(id, _, _)| *id) + .collect::>(); + expected_nodes.sort(); + let mut node_id_vec = self.node_id_vec.clone(); + node_id_vec.sort(); + if expected_nodes != node_id_vec { + return false; + } + + // Check that the assignation data has the correct length + if self.ring_assignation_data.len() != (1 << PARTITION_BITS) * self.replication_factor { + return false; + } + + // Check that the assigned nodes are correct identifiers + // of nodes that are assigned a role + // and that role is not the role of a gateway nodes + for x in self.ring_assignation_data.iter() { + if *x as usize >= self.node_id_vec.len() { + return false; + } + let node = self.node_id_vec[*x as usize]; + match self.roles.get(&node) { + Some(NodeRoleV(Some(x))) if x.capacity.is_some() => (), + _ => return false, + } + } + + true + } + + /// Calculate an assignation of partitions to nodes + pub fn calculate_partition_assignation(&mut self) -> bool { + let (configured_nodes, zones) = self.configured_nodes_and_zones(); + let n_zones = zones.len(); + + println!("Calculating updated partition assignation, this may take some time..."); + println!(); + + let old_partitions = self.parse_assignation_data(); + + let mut partitions = old_partitions.clone(); + for part in partitions.iter_mut() { + part.nodes + .retain(|(_, info)| info.map(|x| x.capacity.is_some()).unwrap_or(false)); + } + + // When nodes are removed, or when bootstraping an assignation from + // scratch for a new cluster, the old partitions will have holes (or be empty). + // Here we add more nodes to make a complete (sub-optimal) assignation, + // using an initial partition assignation that is calculated using the multi-dc maglev trick + match self.initial_partition_assignation() { + Some(initial_partitions) => { + for (part, ipart) in partitions.iter_mut().zip(initial_partitions.iter()) { + for (id, info) in ipart.nodes.iter() { + if part.nodes.len() < self.replication_factor { + part.add(part.nodes.len() + 1, n_zones, id, info.unwrap()); + } + } + assert!(part.nodes.len() == self.replication_factor); + } + } + None => { + return false; + } + } + + // Calculate how many partitions each node should ideally store, + // and how many partitions they are storing with the current assignation + // This defines our target for which we will optimize in the following loop. + let total_capacity = configured_nodes + .iter() + .map(|(_, info)| info.capacity.unwrap_or(0)) + .sum::() as usize; + let total_partitions = self.replication_factor * (1 << PARTITION_BITS); + let target_partitions_per_node = configured_nodes + .iter() + .map(|(id, info)| { + ( + *id, + info.capacity.unwrap_or(0) as usize * total_partitions / total_capacity, + ) + }) + .collect::>(); + + let mut partitions_per_node = self.partitions_per_node(&partitions[..]); + + println!("Target number of partitions per node:"); + for (node, npart) in target_partitions_per_node.iter() { + println!("{:?}\t{}", node, npart); + } + println!(); + + // Shuffle partitions between nodes so that nodes will reach (or better approach) + // their target number of stored partitions + loop { + let mut option = None; + for (i, part) in partitions.iter_mut().enumerate() { + for (irm, (idrm, _)) in part.nodes.iter().enumerate() { + let suprm = partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32 + - target_partitions_per_node.get(*idrm).cloned().unwrap_or(0) as i32; + + for (idadd, infoadd) in configured_nodes.iter() { + // skip replacing a node by itself + // and skip replacing by gateway nodes + if idadd == idrm || infoadd.capacity.is_none() { + continue; + } + + let supadd = partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32 + - target_partitions_per_node.get(*idadd).cloned().unwrap_or(0) as i32; + + // We want to try replacing node idrm by node idadd + // if that brings us close to our goal. + let square = |i: i32| i * i; + let oldcost = square(suprm) + square(supadd); + let newcost = square(suprm - 1) + square(supadd + 1); + if newcost >= oldcost { + // not closer to our goal + continue; + } + let gain = oldcost - newcost; + + let mut newpart = part.clone(); + + newpart.nodes.remove(irm); + if !newpart.add(newpart.nodes.len() + 1, n_zones, idadd, infoadd) { + continue; + } + assert!(newpart.nodes.len() == self.replication_factor); + + if !old_partitions[i] + .is_valid_transition_to(&newpart, self.replication_factor) + { + continue; + } + + if option + .as_ref() + .map(|(old_gain, _, _, _, _)| gain > *old_gain) + .unwrap_or(true) + { + option = Some((gain, i, idadd, idrm, newpart)); + } + } + } + } + if let Some((_gain, i, idadd, idrm, newpart)) = option { + *partitions_per_node.entry(idadd).or_insert(0) += 1; + *partitions_per_node.get_mut(idrm).unwrap() -= 1; + partitions[i] = newpart; + } else { + break; + } + } + + // Check we completed the assignation correctly + // (this is a set of checks for the algorithm's consistency) + assert!(partitions.len() == (1 << PARTITION_BITS)); + assert!(partitions + .iter() + .all(|p| p.nodes.len() == self.replication_factor)); + + let new_partitions_per_node = self.partitions_per_node(&partitions[..]); + assert!(new_partitions_per_node == partitions_per_node); + + // Show statistics + println!("New number of partitions per node:"); + for (node, npart) in partitions_per_node.iter() { + println!("{:?}\t{}", node, npart); + } + println!(); + + let mut diffcount = HashMap::new(); + for (oldpart, newpart) in old_partitions.iter().zip(partitions.iter()) { + let nminus = oldpart.txtplus(newpart); + let nplus = newpart.txtplus(oldpart); + if nminus != "[...]" || nplus != "[...]" { + let tup = (nminus, nplus); + *diffcount.entry(tup).or_insert(0) += 1; + } + } + if diffcount.is_empty() { + println!("No data will be moved between nodes."); + } else { + let mut diffcount = diffcount.into_iter().collect::>(); + diffcount.sort(); + println!("Number of partitions that move:"); + for ((nminus, nplus), npart) in diffcount { + println!("\t{}\t{} -> {}", npart, nminus, nplus); + } + } + println!(); + + // Calculate and save new assignation data + let (nodes, assignation_data) = + self.compute_assignation_data(&configured_nodes[..], &partitions[..]); + + self.node_id_vec = nodes; + self.ring_assignation_data = assignation_data; + + true + } + + fn initial_partition_assignation(&self) -> Option>> { + let (configured_nodes, zones) = self.configured_nodes_and_zones(); + let n_zones = zones.len(); + + // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) + let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::>(); + + // Prepare ring + let mut partitions: Vec = partitions_idx + .iter() + .map(|_i| PartitionAss::new()) + .collect::>(); + + // Create MagLev priority queues for each node + let mut queues = configured_nodes + .iter() + .filter(|(_id, info)| info.capacity.is_some()) + .map(|(node_id, node_info)| { + let mut parts = partitions_idx + .iter() + .map(|i| { + let part_data = + [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat(); + (*i, fasthash(&part_data[..])) + }) + .collect::>(); + parts.sort_by_key(|(_i, h)| *h); + let parts_i = parts.iter().map(|(i, _h)| *i).collect::>(); + (node_id, node_info, parts_i, 0) + }) + .collect::>(); + + let max_capacity = configured_nodes + .iter() + .filter_map(|(_, node_info)| node_info.capacity) + .fold(0, std::cmp::max); + + // Fill up ring + for rep in 0..self.replication_factor { + queues.sort_by_key(|(ni, _np, _q, _p)| { + let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat(); + fasthash(&queue_data[..]) + }); + + for (_, _, _, pos) in queues.iter_mut() { + *pos = 0; + } + + let mut remaining = partitions_idx.len(); + while remaining > 0 { + let remaining0 = remaining; + for i_round in 0..max_capacity { + for (node_id, node_info, q, pos) in queues.iter_mut() { + if i_round >= node_info.capacity.unwrap() { + continue; + } + for (pos2, &qv) in q.iter().enumerate().skip(*pos) { + if partitions[qv].add(rep + 1, n_zones, node_id, node_info) { + remaining -= 1; + *pos = pos2 + 1; + break; + } + } + } + } + if remaining == remaining0 { + // No progress made, exit + return None; + } + } + } + + Some(partitions) + } + + fn configured_nodes_and_zones(&self) -> (Vec<(&Uuid, &NodeRole)>, HashSet<&str>) { + let configured_nodes = self + .roles + .items() + .iter() + .filter(|(_id, _, info)| info.0.is_some()) + .map(|(id, _, info)| (id, info.0.as_ref().unwrap())) + .collect::>(); + + let zones = configured_nodes + .iter() + .filter(|(_id, info)| info.capacity.is_some()) + .map(|(_id, info)| info.zone.as_str()) + .collect::>(); + + (configured_nodes, zones) + } + + fn compute_assignation_data<'a>( + &self, + configured_nodes: &[(&'a Uuid, &'a NodeRole)], + partitions: &[PartitionAss<'a>], + ) -> (Vec, Vec) { + assert!(partitions.len() == (1 << PARTITION_BITS)); + + // Make a canonical order for nodes + let mut nodes = configured_nodes + .iter() + .filter(|(_id, info)| info.capacity.is_some()) + .map(|(id, _)| **id) + .collect::>(); + let nodes_rev = nodes + .iter() + .enumerate() + .map(|(i, id)| (*id, i as CompactNodeType)) + .collect::>(); + + let mut assignation_data = vec![]; + for partition in partitions.iter() { + assert!(partition.nodes.len() == self.replication_factor); + for (id, _) in partition.nodes.iter() { + assignation_data.push(*nodes_rev.get(id).unwrap()); + } + } + + nodes.extend( + configured_nodes + .iter() + .filter(|(_id, info)| info.capacity.is_none()) + .map(|(id, _)| **id), + ); + + (nodes, assignation_data) + } + + fn parse_assignation_data(&self) -> Vec> { + if self.ring_assignation_data.len() == self.replication_factor * (1 << PARTITION_BITS) { + // If the previous assignation data is correct, use that + let mut partitions = vec![]; + for i in 0..(1 << PARTITION_BITS) { + let mut part = PartitionAss::new(); + for node_i in self.ring_assignation_data + [i * self.replication_factor..(i + 1) * self.replication_factor] + .iter() + { + let node_id = &self.node_id_vec[*node_i as usize]; + + if let Some(NodeRoleV(Some(info))) = self.roles.get(node_id) { + part.nodes.push((node_id, Some(info))); + } else { + part.nodes.push((node_id, None)); + } + } + partitions.push(part); + } + partitions + } else { + // Otherwise start fresh + (0..(1 << PARTITION_BITS)) + .map(|_| PartitionAss::new()) + .collect() + } + } + + fn partitions_per_node<'a>(&self, partitions: &[PartitionAss<'a>]) -> HashMap<&'a Uuid, usize> { + let mut partitions_per_node = HashMap::<&Uuid, usize>::new(); + for p in partitions.iter() { + for (id, _) in p.nodes.iter() { + *partitions_per_node.entry(*id).or_insert(0) += 1; + } + } + partitions_per_node + } +} + +// ---- Internal structs for partition assignation in layout ---- + +#[derive(Clone)] +struct PartitionAss<'a> { + nodes: Vec<(&'a Uuid, Option<&'a NodeRole>)>, +} + +impl<'a> PartitionAss<'a> { + fn new() -> Self { + Self { nodes: Vec::new() } + } + + fn nplus(&self, other: &PartitionAss<'a>) -> usize { + self.nodes + .iter() + .filter(|x| !other.nodes.contains(x)) + .count() + } + + fn txtplus(&self, other: &PartitionAss<'a>) -> String { + let mut nodes = self + .nodes + .iter() + .filter(|x| !other.nodes.contains(x)) + .map(|x| format!("{:?}", x.0)) + .collect::>(); + nodes.sort(); + if self.nodes.iter().any(|x| other.nodes.contains(x)) { + nodes.push("...".into()); + } + format!("[{}]", nodes.join(" ")) + } + + fn is_valid_transition_to(&self, other: &PartitionAss<'a>, replication_factor: usize) -> bool { + let min_keep_nodes_per_part = (replication_factor + 1) / 2; + let n_removed = self.nplus(other); + + if self.nodes.len() <= min_keep_nodes_per_part { + n_removed == 0 + } else { + n_removed <= self.nodes.len() - min_keep_nodes_per_part + } + } + + fn add( + &mut self, + target_len: usize, + n_zones: usize, + node: &'a Uuid, + role: &'a NodeRole, + ) -> bool { + if self.nodes.len() != target_len - 1 { + return false; + } + + let p_zns = self + .nodes + .iter() + .map(|(_id, info)| info.unwrap().zone.as_str()) + .collect::>(); + if (p_zns.len() < n_zones && !p_zns.contains(&role.zone.as_str())) + || (p_zns.len() == n_zones && !self.nodes.iter().any(|(id, _)| *id == node)) + { + self.nodes.push((node, Some(role))); + true + } else { + false + } + } +} diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index ea3f1139..b72392ab 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -5,6 +5,7 @@ extern crate log; mod consul; +pub mod layout; pub mod ring; pub mod system; diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 3cb0d233..73a126a2 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -1,12 +1,11 @@ //! Module containing types related to computing nodes which should receive a copy of data blocks //! and metadata -use std::collections::{HashMap, HashSet}; use std::convert::TryInto; -use serde::{Deserialize, Serialize}; - use garage_util::data::*; +use crate::layout::ClusterLayout; + /// A partition id, which is stored on 16 bits /// i.e. we have up to 2**16 partitions. /// (in practice we have exactly 2**PARTITION_BITS partitions) @@ -22,47 +21,6 @@ pub const PARTITION_BITS: usize = 8; const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); -/// The user-defined configuration of the cluster's nodes -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct NetworkConfig { - /// Map of each node's id to it's configuration - pub members: HashMap, - /// Version of this config - pub version: u64, -} - -impl NetworkConfig { - pub(crate) fn new() -> Self { - Self { - members: HashMap::new(), - version: 0, - } - } -} - -/// The overall configuration of one (possibly remote) node -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct NetworkConfigEntry { - /// Datacenter at which this entry belong. This infromation might be used to perform a better - /// geodistribution - pub zone: String, - /// The (relative) capacity of the node - /// If this is set to None, the node does not participate in storing data for the system - /// and is only active as an API gateway to other nodes - pub capacity: Option, - /// A tag to recognize the entry, not used for other things than display - pub tag: String, -} - -impl NetworkConfigEntry { - pub fn capacity_string(&self) -> String { - match self.capacity { - Some(c) => format!("{}", c), - None => "gateway".to_string(), - } - } -} - /// A ring distributing fairly objects to nodes #[derive(Clone)] pub struct Ring { @@ -70,7 +28,7 @@ pub struct Ring { pub replication_factor: usize, /// The network configuration used to generate this ring - pub config: NetworkConfig, + pub layout: ClusterLayout, // Internal order of nodes used to make a more compact representation of the ring nodes: Vec, @@ -81,7 +39,7 @@ pub struct Ring { // Type to store compactly the id of a node in the system // Change this to u16 the day we want to have more than 256 nodes in a cluster -type CompactNodeType = u8; +pub type CompactNodeType = u8; // The maximum number of times an object might get replicated // This must be at least 3 because Garage supports 3-way replication @@ -102,132 +60,26 @@ struct RingEntry { } impl Ring { - // TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6 - // levels of imbrication. It is basically impossible to test, maintain, or understand for an - // outsider. - pub(crate) fn new(config: NetworkConfig, replication_factor: usize) -> Self { - // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) - let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::>(); - - let zones = config - .members - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(_id, info)| info.zone.as_str()) - .collect::>(); - let n_zones = zones.len(); - - // Prepare ring - let mut partitions: Vec> = partitions_idx - .iter() - .map(|_i| Vec::new()) - .collect::>(); - - // Create MagLev priority queues for each node - let mut queues = config - .members - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(node_id, node_info)| { - let mut parts = partitions_idx - .iter() - .map(|i| { - let part_data = - [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat(); - (*i, fasthash(&part_data[..])) - }) - .collect::>(); - parts.sort_by_key(|(_i, h)| *h); - let parts_i = parts.iter().map(|(i, _h)| *i).collect::>(); - (node_id, node_info, parts_i, 0) - }) - .collect::>(); - - let max_capacity = config - .members - .iter() - .filter_map(|(_, node_info)| node_info.capacity) - .fold(0, std::cmp::max); - - assert!(replication_factor <= MAX_REPLICATION); - - // Fill up ring - for rep in 0..replication_factor { - queues.sort_by_key(|(ni, _np, _q, _p)| { - let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat(); - fasthash(&queue_data[..]) - }); - - for (_, _, _, pos) in queues.iter_mut() { - *pos = 0; - } - - let mut remaining = partitions_idx.len(); - while remaining > 0 { - let remaining0 = remaining; - for i_round in 0..max_capacity { - for (node_id, node_info, q, pos) in queues.iter_mut() { - if i_round >= node_info.capacity.unwrap() { - continue; - } - for (pos2, &qv) in q.iter().enumerate().skip(*pos) { - if partitions[qv].len() != rep { - continue; - } - let p_zns = partitions[qv] - .iter() - .map(|(_id, info)| info.zone.as_str()) - .collect::>(); - if (p_zns.len() < n_zones && !p_zns.contains(&node_info.zone.as_str())) - || (p_zns.len() == n_zones - && !partitions[qv].iter().any(|(id, _i)| id == node_id)) - { - partitions[qv].push((node_id, node_info)); - remaining -= 1; - *pos = pos2 + 1; - break; - } - } - } - } - if remaining == remaining0 { - // No progress made, exit - warn!("Could not build ring, not enough nodes configured."); - return Self { - replication_factor, - config, - nodes: vec![], - ring: vec![], - }; - } - } + pub(crate) fn new(layout: ClusterLayout, replication_factor: usize) -> Self { + if replication_factor != layout.replication_factor { + warn!("Could not build ring: replication factor does not match between local configuration and network role assignation."); + return Self::empty(layout, replication_factor); } - // Make a canonical order for nodes - let nodes = config - .members - .iter() - .filter(|(_id, info)| info.capacity.is_some()) - .map(|(id, _)| *id) - .collect::>(); - let nodes_rev = nodes - .iter() - .enumerate() - .map(|(i, id)| (*id, i as CompactNodeType)) - .collect::>(); + if layout.ring_assignation_data.len() != replication_factor * (1 << PARTITION_BITS) { + warn!("Could not build ring: network role assignation data has invalid length"); + return Self::empty(layout, replication_factor); + } - let ring = partitions - .iter() - .enumerate() - .map(|(i, nodes)| { + let nodes = layout.node_id_vec.clone(); + let ring = (0..(1 << PARTITION_BITS)) + .map(|i| { let top = (i as u16) << (16 - PARTITION_BITS); - let nodes = nodes - .iter() - .map(|(id, _info)| *nodes_rev.get(id).unwrap()) - .collect::>(); - assert!(nodes.len() == replication_factor); let mut nodes_buf = [0u8; MAX_REPLICATION]; - nodes_buf[..replication_factor].copy_from_slice(&nodes[..]); + nodes_buf[..replication_factor].copy_from_slice( + &layout.ring_assignation_data + [replication_factor * i..replication_factor * (i + 1)], + ); RingEntry { hash_prefix: top, nodes_buf, @@ -237,12 +89,21 @@ impl Ring { Self { replication_factor, - config, + layout, nodes, ring, } } + fn empty(layout: ClusterLayout, replication_factor: usize) -> Self { + Self { + replication_factor, + layout, + nodes: vec![], + ring: vec![], + } + } + /// Get the partition in which data would fall on pub fn partition_of(&self, position: &Hash) -> Partition { let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index df0e94f8..68bdfc4f 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -225,7 +225,7 @@ impl RpcHelper { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.fullmesh.get_peer_list(); let ring: Arc = self.0.ring.borrow().clone(); - let our_zone = match ring.config.members.get(&self.0.our_node_id) { + let our_zone = match ring.layout.node_role(&self.0.our_node_id) { Some(pc) => &pc.zone, None => "", }; @@ -238,7 +238,7 @@ impl RpcHelper { // and within a same zone we priorize nodes with the lowest latency. let mut requests = requests .map(|(to, fut)| { - let peer_zone = match ring.config.members.get(&to) { + let peer_zone = match ring.layout.node_role(&to) { Some(pc) => &pc.zone, None => "", }; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 3f5f7fb1..aa8947ea 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -23,12 +23,13 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; use garage_util::background::BackgroundRunner; use garage_util::config::Config; -use garage_util::data::Uuid; +use garage_util::data::*; use garage_util::error::*; use garage_util::persister::Persister; use garage_util::time::*; use crate::consul::*; +use crate::layout::*; use crate::ring::*; use crate::rpc_helper::*; @@ -48,13 +49,13 @@ pub enum SystemRpc { Ok, /// Request to connect to a specific node (in @: format) Connect(String), - /// Ask other node its config. Answered with AdvertiseConfig - PullConfig, + /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout + PullClusterLayout, /// Advertise Garage status. Answered with another AdvertiseStatus. /// Exchanged with every node on a regular basis. AdvertiseStatus(NodeStatus), - /// Advertisement of nodes config. Sent spontanously or in response to PullConfig - AdvertiseConfig(NetworkConfig), + /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout + AdvertiseClusterLayout(ClusterLayout), /// Get known nodes states GetKnownNodes, /// Return known nodes @@ -70,7 +71,7 @@ pub struct System { /// The id of this node pub id: Uuid, - persist_config: Persister, + persist_cluster_layout: Persister, persist_peer_list: Persister>, local_status: ArcSwap, @@ -103,8 +104,10 @@ pub struct NodeStatus { pub hostname: String, /// Replication factor configured on the node pub replication_factor: usize, - /// Configuration version - pub config_version: u64, + /// Cluster layout version + pub cluster_layout_version: u64, + /// Hash of cluster layout staging data + pub cluster_layout_staging_hash: Hash, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -187,17 +190,17 @@ impl System { gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); info!("Node public key: {}", hex::encode(&node_key.public_key())); - let persist_config = Persister::new(&config.metadata_dir, "network_config"); + let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout"); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); - let net_config = match persist_config.load() { + let cluster_layout = match persist_cluster_layout.load() { Ok(x) => x, Err(e) => { info!( - "No valid previous network configuration stored ({}), starting fresh.", + "No valid previous cluster layout stored ({}), starting fresh.", e ); - NetworkConfig::new() + ClusterLayout::new(replication_factor) } }; @@ -206,10 +209,11 @@ impl System { .into_string() .unwrap_or_else(|_| "".to_string()), replication_factor, - config_version: net_config.version, + cluster_layout_version: cluster_layout.version, + cluster_layout_staging_hash: cluster_layout.staging_hash, }; - let ring = Ring::new(net_config, replication_factor); + let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); if let Some(addr) = config.rpc_public_addr { @@ -229,7 +233,7 @@ impl System { let sys = Arc::new(System { id: netapp.id.into(), - persist_config, + persist_cluster_layout, persist_peer_list, local_status: ArcSwap::new(Arc::new(local_status)), node_status: RwLock::new(HashMap::new()), @@ -292,12 +296,12 @@ impl System { } /// Save network configuration to disc - async fn save_network_config(self: Arc) -> Result<(), Error> { + async fn save_cluster_layout(self: Arc) -> Result<(), Error> { let ring: Arc = self.ring.borrow().clone(); - self.persist_config - .save_async(&ring.config) + self.persist_cluster_layout + .save_async(&ring.layout) .await - .expect("Cannot save current cluster configuration"); + .expect("Cannot save current cluster layout"); Ok(()) } @@ -305,7 +309,8 @@ impl System { let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); let ring = self.ring.borrow(); - new_si.config_version = ring.config.version; + new_si.cluster_layout_version = ring.layout.version; + new_si.cluster_layout_staging_hash = ring.layout.staging_hash; self.local_status.swap(Arc::new(new_si)); } @@ -337,9 +342,9 @@ impl System { ))); } - fn handle_pull_config(&self) -> SystemRpc { + fn handle_pull_cluster_layout(&self) -> SystemRpc { let ring = self.ring.borrow().clone(); - SystemRpc::AdvertiseConfig(ring.config.clone()) + SystemRpc::AdvertiseClusterLayout(ring.layout.clone()) } fn handle_get_known_nodes(&self) -> SystemRpc { @@ -360,7 +365,8 @@ impl System { .unwrap_or(NodeStatus { hostname: "?".to_string(), replication_factor: 0, - config_version: 0, + cluster_layout_version: 0, + cluster_layout_staging_hash: Hash::from([0u8; 32]), }), }) .collect::>(); @@ -381,10 +387,12 @@ impl System { std::process::exit(1); } - if info.config_version > local_info.config_version { + if info.cluster_layout_version > local_info.cluster_layout_version + || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash + { let self2 = self.clone(); self.background.spawn_cancellable(async move { - self2.pull_config(from).await; + self2.pull_cluster_layout(from).await; Ok(()) }); } @@ -397,32 +405,39 @@ impl System { Ok(SystemRpc::Ok) } - async fn handle_advertise_config( + async fn handle_advertise_cluster_layout( self: Arc, - adv: &NetworkConfig, + adv: &ClusterLayout, ) -> Result { let update_ring = self.update_ring.lock().await; - let ring: Arc = self.ring.borrow().clone(); + let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); + + let prev_layout_check = layout.check(); + if layout.merge(adv) { + if prev_layout_check && !layout.check() { + error!("New cluster layout is invalid, discarding."); + return Err(Error::Message( + "New cluster layout is invalid, discarding.".into(), + )); + } - if adv.version > ring.config.version { - let ring = Ring::new(adv.clone(), self.replication_factor); + let ring = Ring::new(layout.clone(), self.replication_factor); update_ring.send(Arc::new(ring))?; drop(update_ring); let self2 = self.clone(); - let adv = adv.clone(); self.background.spawn_cancellable(async move { self2 .rpc .broadcast( &self2.system_endpoint, - SystemRpc::AdvertiseConfig(adv), + SystemRpc::AdvertiseClusterLayout(layout), RequestStrategy::with_priority(PRIO_HIGH), ) .await; Ok(()) }); - self.background.spawn(self.clone().save_network_config()); + self.background.spawn(self.clone().save_cluster_layout()); } Ok(SystemRpc::Ok) @@ -456,14 +471,15 @@ impl System { }; while !*stop_signal.borrow() { - let not_configured = self.ring.borrow().config.members.is_empty(); + let not_configured = !self.ring.borrow().layout.check(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; + let expected_n_nodes = self.ring.borrow().layout.num_nodes(); let bad_peers = self .fullmesh .get_peer_list() .iter() .filter(|p| p.is_up()) - .count() != self.ring.borrow().config.members.len(); + .count() != expected_n_nodes; if not_configured || no_peers || bad_peers { info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); @@ -533,18 +549,18 @@ impl System { self.persist_peer_list.save_async(&peer_list).await } - async fn pull_config(self: Arc, peer: Uuid) { + async fn pull_cluster_layout(self: Arc, peer: Uuid) { let resp = self .rpc .call( &self.system_endpoint, peer, - SystemRpc::PullConfig, + SystemRpc::PullClusterLayout, RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT), ) .await; - if let Ok(SystemRpc::AdvertiseConfig(config)) = resp { - let _: Result<_, _> = self.handle_advertise_config(&config).await; + if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { + let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await; } } } @@ -554,9 +570,11 @@ impl EndpointHandler for System { async fn handle(self: &Arc, msg: &SystemRpc, from: NodeID) -> Result { match msg { SystemRpc::Connect(node) => self.handle_connect(node).await, - SystemRpc::PullConfig => Ok(self.handle_pull_config()), + SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()), SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await, - SystemRpc::AdvertiseConfig(adv) => self.clone().handle_advertise_config(adv).await, + SystemRpc::AdvertiseClusterLayout(adv) => { + self.clone().handle_advertise_cluster_layout(adv).await + } SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()), _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), } diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 616bf275..dc37f12c 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -1,11 +1,12 @@ [package] name = "garage_table" -version = "0.4.0" +version = "0.5.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" description = "Table sharding and replication engine (DynamoDB-like) for the Garage object store" repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" [lib] path = "lib.rs" @@ -13,8 +14,8 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_rpc = { version = "0.4.0", path = "../rpc" } -garage_util = { version = "0.4.0", path = "../util" } +garage_rpc = { version = "0.5.0", path = "../rpc" } +garage_util = { version = "0.5.0", path = "../util" } async-trait = "0.1.7" bytes = "1.0" diff --git a/src/table/crdt/bool.rs b/src/table/crdt/bool.rs deleted file mode 100644 index 53af8f82..00000000 --- a/src/table/crdt/bool.rs +++ /dev/null @@ -1,34 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use crate::crdt::crdt::*; - -/// Boolean, where `true` is an absorbing state -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] -pub struct Bool(bool); - -impl Bool { - /// Create a new boolean with the specified value - pub fn new(b: bool) -> Self { - Self(b) - } - /// Set the boolean to true - pub fn set(&mut self) { - self.0 = true; - } - /// Get the boolean value - pub fn get(&self) -> bool { - self.0 - } -} - -impl From for Bool { - fn from(b: bool) -> Bool { - Bool::new(b) - } -} - -impl Crdt for Bool { - fn merge(&mut self, other: &Self) { - self.0 = self.0 || other.0; - } -} diff --git a/src/table/crdt/crdt.rs b/src/table/crdt/crdt.rs deleted file mode 100644 index a8f1b9aa..00000000 --- a/src/table/crdt/crdt.rs +++ /dev/null @@ -1,71 +0,0 @@ -use garage_util::data::*; - -/// Definition of a CRDT - all CRDT Rust types implement this. -/// -/// A CRDT is defined as a merge operator that respects a certain set of axioms. -/// -/// In particular, the merge operator must be commutative, associative, -/// idempotent, and monotonic. -/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator, -/// the following axioms must apply: -/// -/// ```text -/// a ⊔ b = b ⊔ a (commutativity) -/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity) -/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence) -/// ``` -/// -/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order. -/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`, -/// as this would imply a cycle in the partial order. -pub trait Crdt { - /// Merge the two datastructures according to the CRDT rules. - /// `self` is modified to contain the merged CRDT value. `other` is not modified. - /// - /// # Arguments - /// - /// * `other` - the other CRDT we wish to merge with - fn merge(&mut self, other: &Self); -} - -/// All types that implement `Ord` (a total order) can also implement a trivial CRDT -/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type -/// to enable this behavior. -pub trait AutoCrdt: Ord + Clone + std::fmt::Debug { - /// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if - /// different values in your application should never happen. Set this to false - /// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`. - const WARN_IF_DIFFERENT: bool; -} - -impl Crdt for T -where - T: AutoCrdt, -{ - fn merge(&mut self, other: &Self) { - if Self::WARN_IF_DIFFERENT && self != other { - warn!( - "Different CRDT values should be the same (logic error!): {:?} vs {:?}", - self, other - ); - if other > self { - *self = other.clone(); - } - warn!("Making an arbitrary choice: {:?}", self); - } else if other > self { - *self = other.clone(); - } - } -} - -impl AutoCrdt for String { - const WARN_IF_DIFFERENT: bool = true; -} - -impl AutoCrdt for bool { - const WARN_IF_DIFFERENT: bool = true; -} - -impl AutoCrdt for FixedBytes32 { - const WARN_IF_DIFFERENT: bool = true; -} diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs deleted file mode 100644 index be197d88..00000000 --- a/src/table/crdt/lww.rs +++ /dev/null @@ -1,114 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use garage_util::time::now_msec; - -use crate::crdt::crdt::*; - -/// Last Write Win (LWW) -/// -/// An LWW CRDT associates a timestamp with a value, in order to implement a -/// time-based reconciliation rule: the most recent write wins. -/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs -/// with the same timestamp but different values. -/// -/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must -/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to -/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types -/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value. -/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is -/// generally desirable in this case to never explicitly produce LWW values with the same timestamp -/// but different inner values, as the rule to keep the maximum value isn't generally the desired -/// semantics.) -/// -/// As multiple computers clocks are always desynchronized, -/// when operations are close enough, it is equivalent to -/// take one copy and drop the other one. -/// -/// Given that clocks are not too desynchronized, this assumption -/// is enough for most cases, as there is few chance that two humans -/// coordonate themself faster than the time difference between two NTP servers. -/// -/// As a more concret example, let's suppose you want to upload a file -/// with the same key (path) in the same bucket at the very same time. -/// For each request, the file will be timestamped by the receiving server -/// and may differ from what you observed with your atomic clock! -/// -/// This scheme is used by AWS S3 or Soundcloud and often without knowing -/// in enterprise when reconciliating databases with ad-hoc scripts. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct Lww { - ts: u64, - v: T, -} - -impl Lww -where - T: Crdt, -{ - /// Creates a new CRDT - /// - /// CRDT's internal timestamp is set with current node's clock. - pub fn new(value: T) -> Self { - Self { - ts: now_msec(), - v: value, - } - } - - /// Build a new CRDT from a previous non-compatible one - /// - /// Compared to new, the CRDT's timestamp is not set to now - /// but must be set to the previous, non-compatible, CRDT's timestamp. - pub fn migrate_from_raw(ts: u64, value: T) -> Self { - Self { ts, v: value } - } - - /// Update the LWW CRDT while keeping some causal ordering. - /// - /// The timestamp of the LWW CRDT is updated to be the current node's clock - /// at time of update, or the previous timestamp + 1 if that's bigger, - /// so that the new timestamp is always strictly larger than the previous one. - /// This ensures that merging the update with the old value will result in keeping - /// the updated value. - pub fn update(&mut self, new_value: T) { - self.ts = std::cmp::max(self.ts + 1, now_msec()); - self.v = new_value; - } - - /// Get the CRDT value - pub fn get(&self) -> &T { - &self.v - } - - /// Get a mutable reference to the CRDT's value - /// - /// This is usefull to mutate the inside value without changing the LWW timestamp. - /// When such mutation is done, the merge between two LWW values is done using the inner - /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large - /// data type, such as a map, and we only want to change a single item in the map. - /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification. - /// This delta consists in a LWW with the same timestamp, and the map - /// inside only contains the updated value. - /// The advantage of such a delta is that it is much smaller than the whole map. - /// - /// Avoid using this if the inner data type is a primitive type such as a number or a string, - /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum - /// of both values. - pub fn get_mut(&mut self) -> &mut T { - &mut self.v - } -} - -impl Crdt for Lww -where - T: Clone + Crdt, -{ - fn merge(&mut self, other: &Self) { - if other.ts > self.ts { - self.ts = other.ts; - self.v = other.v.clone(); - } else if other.ts == self.ts { - self.v.merge(&other.v); - } - } -} diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs deleted file mode 100644 index fb25fd46..00000000 --- a/src/table/crdt/lww_map.rs +++ /dev/null @@ -1,161 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use garage_util::time::now_msec; - -use crate::crdt::crdt::*; - -/// Last Write Win Map -/// -/// This types defines a CRDT for a map from keys to values. -/// The values have an associated timestamp, such that the last written value -/// takes precedence over previous ones. As for the simpler `LWW` type, the value -/// type `V` is also required to implement the CRDT trait. -/// We do not encourage mutating the values associated with a given key -/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()` -/// method that would allow that. -/// -/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. -/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, -/// such that two values can be compared for equality based on their hashes). As a consequence, -/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps. -/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, -/// the serialization cost `O(n)` would still have to be paid at each modification, so we are -/// actually not losing anything here. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LwwMap { - vals: Vec<(K, u64, V)>, -} - -impl LwwMap -where - K: Ord, - V: Crdt, -{ - /// Create a new empty map CRDT - pub fn new() -> Self { - Self { vals: vec![] } - } - /// Used to migrate from a map defined in an incompatible format. This produces - /// a map that contains a single item with the specified timestamp (copied from - /// the incompatible format). Do this as many times as you have items to migrate, - /// and put them all together using the CRDT merge operator. - pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { - Self { - vals: vec![(k, ts, v)], - } - } - /// Returns a map that contains a single mapping from the specified key to the specified value. - /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map, - /// the previous value will be replaced with the one specified here. - /// The timestamp in the provided mutator is set to the maximum of the current system's clock - /// and 1 + the previous value's timestamp (if there is one), so that the new value will always - /// take precedence (LWW rule). - /// - /// Typically, to update the value associated to a key in the map, you would do the following: - /// - /// ```ignore - /// let my_update = my_crdt.update_mutator(key_to_modify, new_value); - /// my_crdt.merge(&my_update); - /// ``` - /// - /// However extracting the mutator on its own and only sending that on the network is very - /// interesting as it is much smaller than the whole map. - pub fn update_mutator(&self, k: K, new_v: V) -> Self { - let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { - Ok(i) => { - let (_, old_ts, _) = self.vals[i]; - let new_ts = std::cmp::max(old_ts + 1, now_msec()); - vec![(k, new_ts, new_v)] - } - Err(_) => vec![(k, now_msec(), new_v)], - }; - Self { vals: new_vals } - } - /// Takes all of the values of the map and returns them. The current map is reset to the - /// empty map. This is very usefull to produce in-place a new map that contains only a delta - /// that modifies a certain value: - /// - /// ```ignore - /// let mut a = get_my_crdt_value(); - /// let old_a = a.take_and_clear(); - /// a.merge(&old_a.update_mutator(key_to_modify, new_value)); - /// put_my_crdt_value(a); - /// ``` - /// - /// Of course in this simple example we could have written simply - /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`, - /// but in the case where the map is a field in a struct for instance (as is always the case), - /// this becomes very handy: - /// - /// ```ignore - /// let mut a = get_my_crdt_value(); - /// let old_a_map = a.map_field.take_and_clear(); - /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value)); - /// put_my_crdt_value(a); - /// ``` - pub fn take_and_clear(&mut self) -> Self { - let vals = std::mem::take(&mut self.vals); - Self { vals } - } - /// Removes all values from the map - pub fn clear(&mut self) { - self.vals.clear(); - } - /// Get a reference to the value assigned to a key - pub fn get(&self, k: &K) -> Option<&V> { - match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { - Ok(i) => Some(&self.vals[i].2), - Err(_) => None, - } - } - /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. - /// In most case you will want to ignore the timestamp (second item of the tuple). - pub fn items(&self) -> &[(K, u64, V)] { - &self.vals[..] - } - - /// Returns the number of items in the map - pub fn len(&self) -> usize { - self.vals.len() - } - - /// Returns true if the map is empty - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -impl Crdt for LwwMap -where - K: Clone + Ord, - V: Clone + Crdt, -{ - fn merge(&mut self, other: &Self) { - for (k, ts2, v2) in other.vals.iter() { - match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { - Ok(i) => { - let (_, ts1, _v1) = &self.vals[i]; - if ts2 > ts1 { - self.vals[i].1 = *ts2; - self.vals[i].2 = v2.clone(); - } else if ts1 == ts2 { - self.vals[i].2.merge(v2); - } - } - Err(i) => { - self.vals.insert(i, (k.clone(), *ts2, v2.clone())); - } - } - } - } -} - -impl Default for LwwMap -where - K: Ord, - V: Crdt, -{ - fn default() -> Self { - Self::new() - } -} diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs deleted file mode 100644 index 7553cd50..00000000 --- a/src/table/crdt/map.rs +++ /dev/null @@ -1,99 +0,0 @@ -use serde::{Deserialize, Serialize}; - -use crate::crdt::crdt::*; - -/// Simple CRDT Map -/// -/// This types defines a CRDT for a map from keys to values. Values are CRDT types which -/// can have their own updating logic. -/// -/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. -/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, -/// such that two values can be compared for equality based on their hashes). As a consequence, -/// insertions take `O(n)` time. This means that Map should be used for reasonably small maps. -/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, -/// the serialization cost `O(n)` would still have to be paid at each modification, so we are -/// actually not losing anything here. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct Map { - vals: Vec<(K, V)>, -} - -impl Map -where - K: Clone + Ord, - V: Clone + Crdt, -{ - /// Create a new empty map CRDT - pub fn new() -> Self { - Self { vals: vec![] } - } - - /// Returns a map that contains a single mapping from the specified key to the specified value. - /// This can be used to build a delta-mutator: - /// when merged with another map, the value will be added or CRDT-merged if a previous - /// value already exists. - pub fn put_mutator(k: K, v: V) -> Self { - Self { vals: vec![(k, v)] } - } - - /// Add a value to the map - pub fn put(&mut self, k: K, v: V) { - self.merge(&Self::put_mutator(k, v)); - } - - /// Removes all values from the map - pub fn clear(&mut self) { - self.vals.clear(); - } - - /// Get a reference to the value assigned to a key - pub fn get(&self, k: &K) -> Option<&V> { - match self.vals.binary_search_by(|(k2, _)| k2.cmp(k)) { - Ok(i) => Some(&self.vals[i].1), - Err(_) => None, - } - } - /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. - pub fn items(&self) -> &[(K, V)] { - &self.vals[..] - } - /// Returns the number of items in the map - pub fn len(&self) -> usize { - self.vals.len() - } - - /// Returns true if the map is empty - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -impl Crdt for Map -where - K: Clone + Ord, - V: Clone + Crdt, -{ - fn merge(&mut self, other: &Self) { - for (k, v2) in other.vals.iter() { - match self.vals.binary_search_by(|(k2, _)| k2.cmp(k)) { - Ok(i) => { - self.vals[i].1.merge(v2); - } - Err(i) => { - self.vals.insert(i, (k.clone(), v2.clone())); - } - } - } - } -} - -impl Default for Map -where - K: Clone + Ord, - V: Clone + Crdt, -{ - fn default() -> Self { - Self::new() - } -} diff --git a/src/table/crdt/mod.rs b/src/table/crdt/mod.rs deleted file mode 100644 index 9663a5a5..00000000 --- a/src/table/crdt/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -//! This package provides a simple implementation of conflict-free replicated data types (CRDTs) -//! -//! CRDTs are a type of data structures that do not require coordination. In other words, we can -//! edit them in parallel, we will always find a way to merge it. -//! -//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the -//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now, -//! it is easy to merge their counters, order does not count: we always get 4. -//! -//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) - -mod bool; -#[allow(clippy::module_inception)] -mod crdt; -mod lww; -mod lww_map; -mod map; - -pub use self::bool::*; -pub use crdt::*; -pub use lww::*; -pub use lww_map::*; -pub use map::*; diff --git a/src/table/lib.rs b/src/table/lib.rs index 53d2c93b..d6c19f1b 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -4,7 +4,6 @@ #[macro_use] extern crate log; -pub mod crdt; pub mod schema; pub mod util; @@ -18,3 +17,7 @@ pub mod table; pub use schema::*; pub use table::*; pub use util::*; + +pub mod crdt { + pub use garage_util::crdt::*; +} diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 8f01fbdd..18682ace 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -28,10 +28,10 @@ impl TableReplication for TableFullReplication { fn write_nodes(&self, _hash: &Hash) -> Vec { let ring = self.system.ring.borrow(); - ring.config.members.keys().cloned().collect::>() + ring.layout.node_ids().to_vec() } fn write_quorum(&self) -> usize { - let nmembers = self.system.ring.borrow().config.members.len(); + let nmembers = self.system.ring.borrow().layout.node_ids().len(); if nmembers > self.max_faults { nmembers - self.max_faults } else { diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index f2a001fa..e33f8a66 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -1,11 +1,12 @@ [package] name = "garage_util" -version = "0.4.0" +version = "0.5.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" description = "Utility crate for the Garage object store" repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" [lib] path = "lib.rs" diff --git a/src/util/crdt/bool.rs b/src/util/crdt/bool.rs new file mode 100644 index 00000000..53af8f82 --- /dev/null +++ b/src/util/crdt/bool.rs @@ -0,0 +1,34 @@ +use serde::{Deserialize, Serialize}; + +use crate::crdt::crdt::*; + +/// Boolean, where `true` is an absorbing state +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +pub struct Bool(bool); + +impl Bool { + /// Create a new boolean with the specified value + pub fn new(b: bool) -> Self { + Self(b) + } + /// Set the boolean to true + pub fn set(&mut self) { + self.0 = true; + } + /// Get the boolean value + pub fn get(&self) -> bool { + self.0 + } +} + +impl From for Bool { + fn from(b: bool) -> Bool { + Bool::new(b) + } +} + +impl Crdt for Bool { + fn merge(&mut self, other: &Self) { + self.0 = self.0 || other.0; + } +} diff --git a/src/util/crdt/crdt.rs b/src/util/crdt/crdt.rs new file mode 100644 index 00000000..9b5f230d --- /dev/null +++ b/src/util/crdt/crdt.rs @@ -0,0 +1,71 @@ +use crate::data::*; + +/// Definition of a CRDT - all CRDT Rust types implement this. +/// +/// A CRDT is defined as a merge operator that respects a certain set of axioms. +/// +/// In particular, the merge operator must be commutative, associative, +/// idempotent, and monotonic. +/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator, +/// the following axioms must apply: +/// +/// ```text +/// a ⊔ b = b ⊔ a (commutativity) +/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity) +/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence) +/// ``` +/// +/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order. +/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`, +/// as this would imply a cycle in the partial order. +pub trait Crdt { + /// Merge the two datastructures according to the CRDT rules. + /// `self` is modified to contain the merged CRDT value. `other` is not modified. + /// + /// # Arguments + /// + /// * `other` - the other CRDT we wish to merge with + fn merge(&mut self, other: &Self); +} + +/// All types that implement `Ord` (a total order) can also implement a trivial CRDT +/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type +/// to enable this behavior. +pub trait AutoCrdt: Ord + Clone + std::fmt::Debug { + /// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if + /// different values in your application should never happen. Set this to false + /// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`. + const WARN_IF_DIFFERENT: bool; +} + +impl Crdt for T +where + T: AutoCrdt, +{ + fn merge(&mut self, other: &Self) { + if Self::WARN_IF_DIFFERENT && self != other { + warn!( + "Different CRDT values should be the same (logic error!): {:?} vs {:?}", + self, other + ); + if other > self { + *self = other.clone(); + } + warn!("Making an arbitrary choice: {:?}", self); + } else if other > self { + *self = other.clone(); + } + } +} + +impl AutoCrdt for String { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for bool { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for FixedBytes32 { + const WARN_IF_DIFFERENT: bool = true; +} diff --git a/src/util/crdt/lww.rs b/src/util/crdt/lww.rs new file mode 100644 index 00000000..43d13f27 --- /dev/null +++ b/src/util/crdt/lww.rs @@ -0,0 +1,120 @@ +use std::cmp::Ordering; + +use serde::{Deserialize, Serialize}; + +use crate::time::now_msec; + +use crate::crdt::crdt::*; + +/// Last Write Win (LWW) +/// +/// An LWW CRDT associates a timestamp with a value, in order to implement a +/// time-based reconciliation rule: the most recent write wins. +/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs +/// with the same timestamp but different values. +/// +/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must +/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to +/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types +/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value. +/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is +/// generally desirable in this case to never explicitly produce LWW values with the same timestamp +/// but different inner values, as the rule to keep the maximum value isn't generally the desired +/// semantics.) +/// +/// As multiple computers clocks are always desynchronized, +/// when operations are close enough, it is equivalent to +/// take one copy and drop the other one. +/// +/// Given that clocks are not too desynchronized, this assumption +/// is enough for most cases, as there is few chance that two humans +/// coordonate themself faster than the time difference between two NTP servers. +/// +/// As a more concret example, let's suppose you want to upload a file +/// with the same key (path) in the same bucket at the very same time. +/// For each request, the file will be timestamped by the receiving server +/// and may differ from what you observed with your atomic clock! +/// +/// This scheme is used by AWS S3 or Soundcloud and often without knowing +/// in enterprise when reconciliating databases with ad-hoc scripts. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct Lww { + ts: u64, + v: T, +} + +impl Lww +where + T: Crdt, +{ + /// Creates a new CRDT + /// + /// CRDT's internal timestamp is set with current node's clock. + pub fn new(value: T) -> Self { + Self { + ts: now_msec(), + v: value, + } + } + + /// Build a new CRDT from a previous non-compatible one + /// + /// Compared to new, the CRDT's timestamp is not set to now + /// but must be set to the previous, non-compatible, CRDT's timestamp. + pub fn migrate_from_raw(ts: u64, value: T) -> Self { + Self { ts, v: value } + } + + /// Update the LWW CRDT while keeping some causal ordering. + /// + /// The timestamp of the LWW CRDT is updated to be the current node's clock + /// at time of update, or the previous timestamp + 1 if that's bigger, + /// so that the new timestamp is always strictly larger than the previous one. + /// This ensures that merging the update with the old value will result in keeping + /// the updated value. + pub fn update(&mut self, new_value: T) { + self.ts = std::cmp::max(self.ts + 1, now_msec()); + self.v = new_value; + } + + /// Get the CRDT value + pub fn get(&self) -> &T { + &self.v + } + + /// Get a mutable reference to the CRDT's value + /// + /// This is usefull to mutate the inside value without changing the LWW timestamp. + /// When such mutation is done, the merge between two LWW values is done using the inner + /// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large + /// data type, such as a map, and we only want to change a single item in the map. + /// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification. + /// This delta consists in a LWW with the same timestamp, and the map + /// inside only contains the updated value. + /// The advantage of such a delta is that it is much smaller than the whole map. + /// + /// Avoid using this if the inner data type is a primitive type such as a number or a string, + /// as you will then rely on the merge function defined on `Ord` types by keeping the maximum + /// of both values. + pub fn get_mut(&mut self) -> &mut T { + &mut self.v + } +} + +impl Crdt for Lww +where + T: Clone + Crdt, +{ + fn merge(&mut self, other: &Self) { + match other.ts.cmp(&self.ts) { + Ordering::Greater => { + self.ts = other.ts; + self.v = other.v.clone(); + } + Ordering::Equal => { + self.v.merge(&other.v); + } + Ordering::Less => (), + } + } +} diff --git a/src/util/crdt/lww_map.rs b/src/util/crdt/lww_map.rs new file mode 100644 index 00000000..3e9aba79 --- /dev/null +++ b/src/util/crdt/lww_map.rs @@ -0,0 +1,167 @@ +use std::cmp::Ordering; + +use serde::{Deserialize, Serialize}; + +use crate::time::now_msec; + +use crate::crdt::crdt::*; + +/// Last Write Win Map +/// +/// This types defines a CRDT for a map from keys to values. +/// The values have an associated timestamp, such that the last written value +/// takes precedence over previous ones. As for the simpler `LWW` type, the value +/// type `V` is also required to implement the CRDT trait. +/// We do not encourage mutating the values associated with a given key +/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()` +/// method that would allow that. +/// +/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. +/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, +/// such that two values can be compared for equality based on their hashes). As a consequence, +/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps. +/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, +/// the serialization cost `O(n)` would still have to be paid at each modification, so we are +/// actually not losing anything here. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LwwMap { + vals: Vec<(K, u64, V)>, +} + +impl LwwMap +where + K: Ord, + V: Crdt, +{ + /// Create a new empty map CRDT + pub fn new() -> Self { + Self { vals: vec![] } + } + /// Used to migrate from a map defined in an incompatible format. This produces + /// a map that contains a single item with the specified timestamp (copied from + /// the incompatible format). Do this as many times as you have items to migrate, + /// and put them all together using the CRDT merge operator. + pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { + Self { + vals: vec![(k, ts, v)], + } + } + /// Returns a map that contains a single mapping from the specified key to the specified value. + /// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map, + /// the previous value will be replaced with the one specified here. + /// The timestamp in the provided mutator is set to the maximum of the current system's clock + /// and 1 + the previous value's timestamp (if there is one), so that the new value will always + /// take precedence (LWW rule). + /// + /// Typically, to update the value associated to a key in the map, you would do the following: + /// + /// ```ignore + /// let my_update = my_crdt.update_mutator(key_to_modify, new_value); + /// my_crdt.merge(&my_update); + /// ``` + /// + /// However extracting the mutator on its own and only sending that on the network is very + /// interesting as it is much smaller than the whole map. + pub fn update_mutator(&self, k: K, new_v: V) -> Self { + let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { + Ok(i) => { + let (_, old_ts, _) = self.vals[i]; + let new_ts = std::cmp::max(old_ts + 1, now_msec()); + vec![(k, new_ts, new_v)] + } + Err(_) => vec![(k, now_msec(), new_v)], + }; + Self { vals: new_vals } + } + /// Takes all of the values of the map and returns them. The current map is reset to the + /// empty map. This is very usefull to produce in-place a new map that contains only a delta + /// that modifies a certain value: + /// + /// ```ignore + /// let mut a = get_my_crdt_value(); + /// let old_a = a.take_and_clear(); + /// a.merge(&old_a.update_mutator(key_to_modify, new_value)); + /// put_my_crdt_value(a); + /// ``` + /// + /// Of course in this simple example we could have written simply + /// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`, + /// but in the case where the map is a field in a struct for instance (as is always the case), + /// this becomes very handy: + /// + /// ```ignore + /// let mut a = get_my_crdt_value(); + /// let old_a_map = a.map_field.take_and_clear(); + /// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value)); + /// put_my_crdt_value(a); + /// ``` + pub fn take_and_clear(&mut self) -> Self { + let vals = std::mem::take(&mut self.vals); + Self { vals } + } + /// Removes all values from the map + pub fn clear(&mut self) { + self.vals.clear(); + } + /// Get a reference to the value assigned to a key + pub fn get(&self, k: &K) -> Option<&V> { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { + Ok(i) => Some(&self.vals[i].2), + Err(_) => None, + } + } + /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. + /// In most case you will want to ignore the timestamp (second item of the tuple). + pub fn items(&self) -> &[(K, u64, V)] { + &self.vals[..] + } + + /// Returns the number of items in the map + pub fn len(&self) -> usize { + self.vals.len() + } + + /// Returns true if the map is empty + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl Crdt for LwwMap +where + K: Clone + Ord, + V: Clone + Crdt, +{ + fn merge(&mut self, other: &Self) { + for (k, ts2, v2) in other.vals.iter() { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(k)) { + Ok(i) => { + let (_, ts1, _v1) = &self.vals[i]; + match ts2.cmp(ts1) { + Ordering::Greater => { + self.vals[i].1 = *ts2; + self.vals[i].2 = v2.clone(); + } + Ordering::Equal => { + self.vals[i].2.merge(v2); + } + Ordering::Less => (), + } + } + Err(i) => { + self.vals.insert(i, (k.clone(), *ts2, v2.clone())); + } + } + } + } +} + +impl Default for LwwMap +where + K: Ord, + V: Crdt, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/src/util/crdt/map.rs b/src/util/crdt/map.rs new file mode 100644 index 00000000..7553cd50 --- /dev/null +++ b/src/util/crdt/map.rs @@ -0,0 +1,99 @@ +use serde::{Deserialize, Serialize}; + +use crate::crdt::crdt::*; + +/// Simple CRDT Map +/// +/// This types defines a CRDT for a map from keys to values. Values are CRDT types which +/// can have their own updating logic. +/// +/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order. +/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization, +/// such that two values can be compared for equality based on their hashes). As a consequence, +/// insertions take `O(n)` time. This means that Map should be used for reasonably small maps. +/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`, +/// the serialization cost `O(n)` would still have to be paid at each modification, so we are +/// actually not losing anything here. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct Map { + vals: Vec<(K, V)>, +} + +impl Map +where + K: Clone + Ord, + V: Clone + Crdt, +{ + /// Create a new empty map CRDT + pub fn new() -> Self { + Self { vals: vec![] } + } + + /// Returns a map that contains a single mapping from the specified key to the specified value. + /// This can be used to build a delta-mutator: + /// when merged with another map, the value will be added or CRDT-merged if a previous + /// value already exists. + pub fn put_mutator(k: K, v: V) -> Self { + Self { vals: vec![(k, v)] } + } + + /// Add a value to the map + pub fn put(&mut self, k: K, v: V) { + self.merge(&Self::put_mutator(k, v)); + } + + /// Removes all values from the map + pub fn clear(&mut self) { + self.vals.clear(); + } + + /// Get a reference to the value assigned to a key + pub fn get(&self, k: &K) -> Option<&V> { + match self.vals.binary_search_by(|(k2, _)| k2.cmp(k)) { + Ok(i) => Some(&self.vals[i].1), + Err(_) => None, + } + } + /// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values. + pub fn items(&self) -> &[(K, V)] { + &self.vals[..] + } + /// Returns the number of items in the map + pub fn len(&self) -> usize { + self.vals.len() + } + + /// Returns true if the map is empty + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl Crdt for Map +where + K: Clone + Ord, + V: Clone + Crdt, +{ + fn merge(&mut self, other: &Self) { + for (k, v2) in other.vals.iter() { + match self.vals.binary_search_by(|(k2, _)| k2.cmp(k)) { + Ok(i) => { + self.vals[i].1.merge(v2); + } + Err(i) => { + self.vals.insert(i, (k.clone(), v2.clone())); + } + } + } + } +} + +impl Default for Map +where + K: Clone + Ord, + V: Clone + Crdt, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/src/util/crdt/mod.rs b/src/util/crdt/mod.rs new file mode 100644 index 00000000..9663a5a5 --- /dev/null +++ b/src/util/crdt/mod.rs @@ -0,0 +1,23 @@ +//! This package provides a simple implementation of conflict-free replicated data types (CRDTs) +//! +//! CRDTs are a type of data structures that do not require coordination. In other words, we can +//! edit them in parallel, we will always find a way to merge it. +//! +//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the +//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now, +//! it is easy to merge their counters, order does not count: we always get 4. +//! +//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) + +mod bool; +#[allow(clippy::module_inception)] +mod crdt; +mod lww; +mod lww_map; +mod map; + +pub use self::bool::*; +pub use crdt::*; +pub use lww::*; +pub use lww_map::*; +pub use map::*; diff --git a/src/util/lib.rs b/src/util/lib.rs index 478b9ea4..64874095 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -5,6 +5,7 @@ extern crate log; pub mod background; pub mod config; +pub mod crdt; pub mod data; pub mod error; pub mod persister; diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index 634ce282..72701c90 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -1,11 +1,12 @@ [package] name = "garage_web" -version = "0.4.0" +version = "0.5.0" authors = ["Alex Auvolat ", "Quentin Dufour "] edition = "2018" license = "AGPL-3.0" description = "S3-like website endpoint crate for the Garage object store" repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" [lib] path = "lib.rs" @@ -13,10 +14,10 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_api = { version = "0.4.0", path = "../api" } -garage_model = { version = "0.4.0", path = "../model" } -garage_util = { version = "0.4.0", path = "../util" } -garage_table = { version = "0.4.0", path = "../table" } +garage_api = { version = "0.5.0", path = "../api" } +garage_model = { version = "0.5.0", path = "../model" } +garage_util = { version = "0.5.0", path = "../util" } +garage_table = { version = "0.5.0", path = "../table" } err-derive = "0.3" log = "0.4" -- cgit v1.2.3