aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/merkle.rs29
-rw-r--r--src/table/replication/fullcopy.rs15
-rw-r--r--src/table/replication/parameters.rs10
-rw-r--r--src/table/replication/sharded.rs22
-rw-r--r--src/table/sync.rs134
5 files changed, 65 insertions, 145 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 86fef4c5..db05cca4 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -1,4 +1,3 @@
-use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;
@@ -15,22 +14,12 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_rpc::ring::*;
+
use crate::data::*;
use crate::replication::*;
use crate::schema::*;
-pub type MerklePartition = [u8; 2];
-
-pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash {
- let mut partition_pos = [0u8; 32];
- partition_pos[0..2].copy_from_slice(&p[..]);
- partition_pos.into()
-}
-
-pub fn hash_of_merkle_partition_opt(p: Option<MerklePartition>) -> Hash {
- p.map(hash_of_merkle_partition)
- .unwrap_or([0xFFu8; 32].into())
-}
// This modules partitions the data in 2**16 partitions, based on the top
// 16 bits (two bytes) of item's partition keys' hashes.
@@ -57,8 +46,8 @@ pub struct MerkleUpdater<F: TableSchema, R: TableReplication> {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MerkleNodeKey {
- // partition: first 16 bits (two bytes) of the partition_key's hash
- pub partition: [u8; 2],
+ // partition number
+ pub partition: Partition,
// prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
#[serde(with = "serde_bytes")]
@@ -143,7 +132,7 @@ where
};
let key = MerkleNodeKey {
- partition: k[0..2].try_into().unwrap(),
+ partition: self.data.replication.partition_of(&Hash::try_from(&k[0..32]).unwrap()),
prefix: vec![],
};
self.data
@@ -325,7 +314,7 @@ where
impl MerkleNodeKey {
fn encode(&self) -> Vec<u8> {
let mut ret = Vec::with_capacity(2 + self.prefix.len());
- ret.extend(&self.partition[..]);
+ ret.extend(&u16::to_be_bytes(self.partition)[..]);
ret.extend(&self.prefix[..]);
ret
}
@@ -443,3 +432,9 @@ fn test_intermediate_aux() {
]
);
}
+
+impl MerkleNode {
+ pub fn is_empty(&self) -> bool {
+ *self == MerkleNode::Empty
+ }
+}
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index aea8c1f3..bd658f63 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -1,7 +1,7 @@
use std::sync::Arc;
use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
+use garage_rpc::ring::*;
use garage_util::data::*;
use crate::replication::*;
@@ -19,10 +19,6 @@ impl TableReplication for TableFullReplication {
// Advantage: do all reads locally, extremely fast
// Inconvenient: only suitable to reasonably small tables
- fn partition_of(&self, _hash: &Hash) -> u16 {
- 0u16
- }
-
fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
vec![self.system.id]
}
@@ -46,9 +42,10 @@ impl TableReplication for TableFullReplication {
self.max_faults
}
- fn split_points(&self, _ring: &Ring) -> Vec<Hash> {
- let mut ret = vec![];
- ret.push([0u8; 32].into());
- ret
+ fn partition_of(&self, _hash: &Hash) -> Partition {
+ 0u16
+ }
+ fn partitions(&self) -> Vec<(Partition, Hash)> {
+ vec![(0u16, [0u8; 32].into())]
}
}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index ace82bd9..e46bd172 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -1,4 +1,4 @@
-use garage_rpc::ring::Ring;
+use garage_rpc::ring::*;
use garage_util::data::*;
@@ -6,9 +6,6 @@ pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
- // Partition number of data item (for Merkle tree)
- fn partition_of(&self, hash: &Hash) -> u16;
-
// Which nodes to send reads from
fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
fn read_quorum(&self) -> usize;
@@ -18,6 +15,7 @@ pub trait TableReplication: Send + Sync {
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
- // Get partition boundaries
- fn split_points(&self, ring: &Ring) -> Vec<Hash>;
+ // Accessing partitions, for Merkle tree & sync
+ fn partition_of(&self, hash: &Hash) -> Partition;
+ fn partitions(&self) -> Vec<(Partition, Hash)>;
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 966be31a..dce74b03 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -1,7 +1,7 @@
use std::sync::Arc;
use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
+use garage_rpc::ring::*;
use garage_util::data::*;
use crate::replication::*;
@@ -22,10 +22,6 @@ impl TableReplication for TableShardedReplication {
// - reads are done on all of the nodes that replicate the data
// - writes as well
- fn partition_of(&self, hash: &Hash) -> u16 {
- self.system.ring.borrow().partition_of(hash)
- }
-
fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor)
@@ -45,16 +41,10 @@ impl TableReplication for TableShardedReplication {
self.replication_factor - self.write_quorum
}
- fn split_points(&self, ring: &Ring) -> Vec<Hash> {
- let mut ret = vec![];
-
- for entry in ring.ring.iter() {
- ret.push(entry.location);
- }
- if ret.len() > 0 {
- assert_eq!(ret[0], [0u8; 32].into());
- }
-
- ret
+ fn partition_of(&self, hash: &Hash) -> Partition {
+ self.system.ring.borrow().partition_of(hash)
+ }
+ fn partitions(&self) -> Vec<(Partition, Hash)> {
+ self.system.ring.borrow().partitions()
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 9c148393..f5c2ef33 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -1,5 +1,4 @@
use std::collections::VecDeque;
-use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
@@ -15,7 +14,7 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_rpc::membership::System;
-use garage_rpc::ring::Ring;
+use garage_rpc::ring::*;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
@@ -38,20 +37,10 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> {
rpc_client: Arc<RpcClient<SyncRPC>>,
}
-type RootCk = Vec<(MerklePartition, Hash)>;
-
-#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
-pub struct PartitionRange {
- begin: MerklePartition,
- // if end is None, go all the way to partition 0xFFFF included
- end: Option<MerklePartition>,
-}
-
#[derive(Serialize, Deserialize)]
pub(crate) enum SyncRPC {
- RootCkHash(PartitionRange, Hash),
- RootCkList(PartitionRange, RootCk),
- CkNoDifference,
+ RootCkHash(Partition, Hash),
+ RootCkDifferent(bool),
GetNode(MerkleNodeKey),
Node(MerkleNodeKey, MerkleNode),
Items(Vec<Arc<ByteBuf>>),
@@ -66,7 +55,9 @@ struct SyncTodo {
#[derive(Debug, Clone)]
struct TodoPartition {
- range: PartitionRange,
+ partition: Partition,
+ begin: Hash,
+ end: Hash,
// Are we a node that stores this partition or not?
retain: bool,
@@ -222,7 +213,7 @@ where
let nodes = self
.data
.replication
- .write_nodes(&hash_of_merkle_partition(partition.range.begin))
+ .write_nodes(&partition.begin)
.into_iter()
.filter(|node| *node != my_id)
.collect::<Vec<_>>();
@@ -254,8 +245,8 @@ where
}
} else {
self.offload_partition(
- &hash_of_merkle_partition(partition.range.begin),
- &hash_of_merkle_partition_opt(partition.range.end),
+ &partition.begin,
+ &partition.end,
must_exit,
)
.await?;
@@ -364,30 +355,13 @@ where
// side to the other will happen when the other side syncs with us,
// which they also do regularly.
- fn get_root_ck(&self, range: PartitionRange) -> Result<RootCk, Error> {
- let begin = u16::from_be_bytes(range.begin);
- let range_iter = match range.end {
- Some(end) => {
- let end = u16::from_be_bytes(end);
- begin..=(end - 1)
- }
- None => begin..=0xFFFF,
+ fn get_root_ck(&self, partition: Partition) -> Result<(MerkleNodeKey, MerkleNode), Error> {
+ let key = MerkleNodeKey {
+ partition,
+ prefix: vec![],
};
-
- let mut ret = vec![];
- for i in range_iter {
- let key = MerkleNodeKey {
- partition: u16::to_be_bytes(i),
- prefix: vec![],
- };
- match self.merkle.read_node(&key)? {
- MerkleNode::Empty => (),
- x => {
- ret.push((key.partition, hash_of(&x)?));
- }
- }
- }
- Ok(ret)
+ let node = self.merkle.read_node(&key)?;
+ Ok((key, node))
}
async fn do_sync_with(
@@ -396,7 +370,7 @@ where
who: UUID,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
- let root_ck = self.get_root_ck(partition.range)?;
+ let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
if root_ck.is_empty() {
debug!(
"({}) Sync {:?} with {:?}: partition is empty.",
@@ -404,51 +378,29 @@ where
);
return Ok(());
}
+ let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?;
- let root_ck_hash = hash_of(&root_ck)?;
-
- // If their root checksum has level > than us, use that as a reference
+ // Check if they have the same root checksum
+ // If so, do nothing.
let root_resp = self
.rpc_client
.call(
who,
- SyncRPC::RootCkHash(partition.range, root_ck_hash),
+ SyncRPC::RootCkHash(partition.partition, root_ck_hash),
TABLE_SYNC_RPC_TIMEOUT,
)
.await?;
let mut todo = match root_resp {
- SyncRPC::CkNoDifference => {
+ SyncRPC::RootCkDifferent(false) => {
debug!(
"({}) Sync {:?} with {:?}: no difference",
self.data.name, partition, who
);
return Ok(());
}
- SyncRPC::RootCkList(_, their_root_ck) => {
- let join = join_ordered(&root_ck[..], &their_root_ck[..]);
- let mut todo = VecDeque::new();
- for (p, v1, v2) in join.iter() {
- let diff = match (v1, v2) {
- (Some(_), None) | (None, Some(_)) => true,
- (Some(a), Some(b)) => a != b,
- _ => false,
- };
- if diff {
- todo.push_back(MerkleNodeKey {
- partition: **p,
- prefix: vec![],
- });
- }
- }
- debug!(
- "({}) Sync {:?} with {:?}: todo.len() = {}",
- self.data.name,
- partition,
- who,
- todo.len()
- );
- todo
+ SyncRPC::RootCkDifferent(true) => {
+ VecDeque::from(vec![root_ck_key])
}
x => {
return Err(Error::Message(format!(
@@ -565,13 +517,9 @@ where
async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
match message {
SyncRPC::RootCkHash(range, h) => {
- let root_ck = self.get_root_ck(*range)?;
- let hash = hash_of(&root_ck)?;
- if hash == *h {
- Ok(SyncRPC::CkNoDifference)
- } else {
- Ok(SyncRPC::RootCkList(*range, root_ck))
- }
+ let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
+ let hash = hash_of::<MerkleNode>(&root_ck)?;
+ Ok(SyncRPC::RootCkDifferent(hash != *h))
}
SyncRPC::GetNode(k) => {
let node = self.merkle.read_node(&k)?;
@@ -596,39 +544,31 @@ impl SyncTodo {
self.todo.clear();
- let ring = system.ring.borrow().clone();
- let split_points = data.replication.split_points(&ring);
+ let partitions = data.replication.partitions();
- for i in 0..split_points.len() {
- let begin: MerklePartition = {
- let b = split_points[i];
- assert_eq!(b.as_slice()[2..], [0u8; 30][..]);
- b.as_slice()[..2].try_into().unwrap()
- };
+ for i in 0..partitions.len() {
+ let begin = partitions[i].1;
- let end: Option<MerklePartition> = if i + 1 < split_points.len() {
- let e = split_points[i + 1];
- assert_eq!(e.as_slice()[2..], [0u8; 30][..]);
- Some(e.as_slice()[..2].try_into().unwrap())
+ let end = if i + 1 < partitions.len() {
+ partitions[i+1].1
} else {
- None
+ [0xFFu8; 32].into()
};
- let begin_hash = hash_of_merkle_partition(begin);
- let end_hash = hash_of_merkle_partition_opt(end);
-
- let nodes = data.replication.write_nodes(&begin_hash);
+ let nodes = data.replication.write_nodes(&begin);
let retain = nodes.contains(&my_id);
if !retain {
// Check if we have some data to send, otherwise skip
- if data.store.range(begin_hash..end_hash).next().is_none() {
+ if data.store.range(begin..end).next().is_none() {
continue;
}
}
self.todo.push(TodoPartition {
- range: PartitionRange { begin, end },
+ partition: partitions[i].0,
+ begin,
+ end,
retain,
});
}