aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/table/crdt/crdt.rs6
-rw-r--r--src/table/crdt/lww_map.rs18
-rw-r--r--src/table/crdt/map.rs15
-rw-r--r--src/table/crdt/mod.rs1
-rw-r--r--src/table/gc.rs8
-rw-r--r--src/table/lib.rs1
-rw-r--r--src/table/merkle.rs19
-rw-r--r--src/table/schema.rs2
-rw-r--r--src/table/sync.rs23
-rw-r--r--src/table/table.rs13
10 files changed, 63 insertions, 43 deletions
diff --git a/src/table/crdt/crdt.rs b/src/table/crdt/crdt.rs
index 636b6df6..7abe8ba9 100644
--- a/src/table/crdt/crdt.rs
+++ b/src/table/crdt/crdt.rs
@@ -52,10 +52,8 @@ where
*self = other.clone();
}
warn!("Making an arbitrary choice: {:?}", self);
- } else {
- if other > self {
- *self = other.clone();
- }
+ } else if other > self {
+ *self = other.clone();
}
}
}
diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs
index 7b372191..4ed26809 100644
--- a/src/table/crdt/lww_map.rs
+++ b/src/table/crdt/lww_map.rs
@@ -94,7 +94,7 @@ where
/// put_my_crdt_value(a);
/// ```
pub fn take_and_clear(&mut self) -> Self {
- let vals = std::mem::replace(&mut self.vals, vec![]);
+ let vals = std::mem::take(&mut self.vals);
Self { vals }
}
/// Removes all values from the map
@@ -113,10 +113,16 @@ where
pub fn items(&self) -> &[(K, u64, V)] {
&self.vals[..]
}
+
/// Returns the number of items in the map
pub fn len(&self) -> usize {
self.vals.len()
}
+
+ /// Returns true if the map is empty
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
}
impl<K, V> CRDT for LWWMap<K, V>
@@ -143,3 +149,13 @@ where
}
}
}
+
+impl<K, V> Default for LWWMap<K, V>
+where
+ K: Ord,
+ V: CRDT,
+{
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs
index c4a30a26..c4dd1613 100644
--- a/src/table/crdt/map.rs
+++ b/src/table/crdt/map.rs
@@ -62,6 +62,11 @@ where
pub fn len(&self) -> usize {
self.vals.len()
}
+
+ /// Returns true if the map is empty
+ pub fn is_empty(&self) -> bool {
+ self.len() == 0
+ }
}
impl<K, V> CRDT for Map<K, V>
@@ -82,3 +87,13 @@ where
}
}
}
+
+impl<K, V> Default for Map<K, V>
+where
+ K: Clone + Ord,
+ V: Clone + CRDT,
+{
+ fn default() -> Self {
+ Self::new()
+ }
+}
diff --git a/src/table/crdt/mod.rs b/src/table/crdt/mod.rs
index eb75d061..9663a5a5 100644
--- a/src/table/crdt/mod.rs
+++ b/src/table/crdt/mod.rs
@@ -10,6 +10,7 @@
//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
mod bool;
+#[allow(clippy::module_inception)]
mod crdt;
mod lww;
mod lww_map;
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 694a3789..2dcbcaa0 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -85,8 +85,8 @@ where
}
}
select! {
- _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (),
- _ = must_exit.changed().fuse() => (),
+ _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {},
+ _ = must_exit.changed().fuse() => {},
}
}
}
@@ -120,7 +120,7 @@ where
self.todo_remove_if_equal(&k[..], vhash)?;
}
- if entries.len() == 0 {
+ if entries.is_empty() {
// Nothing to do in this iteration
return Ok(false);
}
@@ -247,7 +247,7 @@ where
}
Ok(GcRPC::Ok)
}
- _ => Err(Error::Message(format!("Unexpected GC RPC"))),
+ _ => Err(Error::Message("Unexpected GC RPC".to_string())),
}
}
}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index c3e14ab8..7b5d0512 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -1,4 +1,5 @@
#![recursion_limit = "1024"]
+#![allow(clippy::comparison_chain, clippy::upper_case_acronyms)]
#[macro_use]
extern crate log;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 39b87aa1..5c5cbec7 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -111,8 +111,8 @@ where
}
} else {
select! {
- _ = self.data.merkle_todo_notify.notified().fuse() => (),
- _ = must_exit.changed().fuse() => (),
+ _ = self.data.merkle_todo_notify.notified().fuse() => {},
+ _ = must_exit.changed().fuse() => {},
}
}
}
@@ -121,10 +121,10 @@ where
fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
let khash = blake2sum(k);
- let new_vhash = if vhash_by.len() == 0 {
+ let new_vhash = if vhash_by.is_empty() {
None
} else {
- Some(Hash::try_from(&vhash_by[..]).unwrap())
+ Some(Hash::try_from(vhash_by).unwrap())
};
let key = MerkleNodeKey {
@@ -168,14 +168,7 @@ where
// This update is an Option<_>, so that it is None if the update is a no-op
// and we can thus skip recalculating and re-storing everything
let mutate = match self.read_node_txn(tx, &key)? {
- MerkleNode::Empty => {
- if let Some(vhv) = new_vhash {
- Some(MerkleNode::Leaf(k.to_vec(), vhv))
- } else {
- // Nothing to do, keep empty node
- None
- }
- }
+ MerkleNode::Empty => new_vhash.map(|vhv| MerkleNode::Leaf(k.to_vec(), vhv)),
MerkleNode::Intermediate(mut children) => {
let key2 = key.next_key(khash);
if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? {
@@ -186,7 +179,7 @@ where
intermediate_set_child(&mut children, key2.prefix[i], subhash);
}
- if children.len() == 0 {
+ if children.is_empty() {
// should not happen
warn!(
"({}) Replacing intermediate node with empty node, should not happen.",
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 13517271..74611749 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -18,7 +18,7 @@ impl PartitionKey for String {
impl PartitionKey for Hash {
fn hash(&self) -> Hash {
- self.clone()
+ *self
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 3130abe8..33b01455 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -150,14 +150,12 @@ where
if let Some(busy) = busy_opt {
if busy {
nothing_to_do_since = None;
- } else {
- if nothing_to_do_since.is_none() {
- nothing_to_do_since = Some(Instant::now());
- }
+ } else if nothing_to_do_since.is_none() {
+ nothing_to_do_since = Some(Instant::now());
}
}
}
- _ = must_exit.changed().fuse() => (),
+ _ = must_exit.changed().fuse() => {},
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
@@ -277,7 +275,7 @@ where
}
}
- if items.len() > 0 {
+ if !items.is_empty() {
let nodes = self
.data
.replication
@@ -292,9 +290,10 @@ where
break;
}
if nodes.len() < self.data.replication.write_quorum() {
- return Err(Error::Message(format!(
+ return Err(Error::Message(
"Not offloading as we don't have a quorum of nodes to write to."
- )));
+ .to_string(),
+ ));
}
counter += 1;
@@ -317,14 +316,14 @@ where
async fn offload_items(
self: &Arc<Self>,
- items: &Vec<(Vec<u8>, Arc<ByteBuf>)>,
+ items: &[(Vec<u8>, Arc<ByteBuf>)],
nodes: &[UUID],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
self.rpc_client
.try_call_many(
- &nodes[..],
+ nodes,
SyncRPC::Items(values),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
)
@@ -467,7 +466,7 @@ where
}
if todo_items.len() >= 256 {
- self.send_items(who, std::mem::replace(&mut todo_items, vec![]))
+ self.send_items(who, std::mem::take(&mut todo_items))
.await?;
}
}
@@ -523,7 +522,7 @@ where
self.data.update_many(items)?;
Ok(SyncRPC::Ok)
}
- _ => Err(Error::Message(format!("Unexpected sync RPC"))),
+ _ => Err(Error::Message("Unexpected sync RPC".to_string())),
}
}
}
diff --git a/src/table/table.rs b/src/table/table.rs
index e203b178..833d5771 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -109,17 +109,14 @@ where
}
pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> {
- let mut call_list = HashMap::new();
+ let mut call_list: HashMap<_, Vec<_>> = HashMap::new();
for entry in entries.iter() {
let hash = entry.partition_key().hash();
let who = self.data.replication.write_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
- if !call_list.contains_key(&node) {
- call_list.insert(node, vec![]);
- }
- call_list.get_mut(&node).unwrap().push(e_enc.clone());
+ call_list.entry(node).or_default().push(e_enc.clone());
}
}
@@ -183,7 +180,7 @@ where
}
}
} else {
- return Err(Error::Message(format!("Invalid return value to read")));
+ return Err(Error::Message("Invalid return value to read".to_string()));
}
}
if let Some(ret_entry) = &ret {
@@ -268,7 +265,7 @@ where
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
self.rpc_client
.try_call_many(
- &who[..],
+ who,
TableRPC::<F>::Update(vec![what_enc]),
RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT),
)
@@ -307,7 +304,7 @@ where
self.data.update_many(pairs)?;
Ok(TableRPC::Ok)
}
- _ => Err(Error::BadRPC(format!("Unexpected table RPC"))),
+ _ => Err(Error::BadRPC("Unexpected table RPC".to_string())),
}
}
}