diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/table/crdt/crdt.rs | 6 | ||||
-rw-r--r-- | src/table/crdt/lww_map.rs | 18 | ||||
-rw-r--r-- | src/table/crdt/map.rs | 15 | ||||
-rw-r--r-- | src/table/crdt/mod.rs | 1 | ||||
-rw-r--r-- | src/table/gc.rs | 8 | ||||
-rw-r--r-- | src/table/lib.rs | 1 | ||||
-rw-r--r-- | src/table/merkle.rs | 19 | ||||
-rw-r--r-- | src/table/schema.rs | 2 | ||||
-rw-r--r-- | src/table/sync.rs | 23 | ||||
-rw-r--r-- | src/table/table.rs | 13 |
10 files changed, 63 insertions, 43 deletions
diff --git a/src/table/crdt/crdt.rs b/src/table/crdt/crdt.rs index 636b6df6..7abe8ba9 100644 --- a/src/table/crdt/crdt.rs +++ b/src/table/crdt/crdt.rs @@ -52,10 +52,8 @@ where *self = other.clone(); } warn!("Making an arbitrary choice: {:?}", self); - } else { - if other > self { - *self = other.clone(); - } + } else if other > self { + *self = other.clone(); } } } diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs index 7b372191..4ed26809 100644 --- a/src/table/crdt/lww_map.rs +++ b/src/table/crdt/lww_map.rs @@ -94,7 +94,7 @@ where /// put_my_crdt_value(a); /// ``` pub fn take_and_clear(&mut self) -> Self { - let vals = std::mem::replace(&mut self.vals, vec![]); + let vals = std::mem::take(&mut self.vals); Self { vals } } /// Removes all values from the map @@ -113,10 +113,16 @@ where pub fn items(&self) -> &[(K, u64, V)] { &self.vals[..] } + /// Returns the number of items in the map pub fn len(&self) -> usize { self.vals.len() } + + /// Returns true if the map is empty + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl<K, V> CRDT for LWWMap<K, V> @@ -143,3 +149,13 @@ where } } } + +impl<K, V> Default for LWWMap<K, V> +where + K: Ord, + V: CRDT, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs index c4a30a26..c4dd1613 100644 --- a/src/table/crdt/map.rs +++ b/src/table/crdt/map.rs @@ -62,6 +62,11 @@ where pub fn len(&self) -> usize { self.vals.len() } + + /// Returns true if the map is empty + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } impl<K, V> CRDT for Map<K, V> @@ -82,3 +87,13 @@ where } } } + +impl<K, V> Default for Map<K, V> +where + K: Clone + Ord, + V: Clone + CRDT, +{ + fn default() -> Self { + Self::new() + } +} diff --git a/src/table/crdt/mod.rs b/src/table/crdt/mod.rs index eb75d061..9663a5a5 100644 --- a/src/table/crdt/mod.rs +++ b/src/table/crdt/mod.rs @@ -10,6 +10,7 @@ //! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) mod bool; +#[allow(clippy::module_inception)] mod crdt; mod lww; mod lww_map; diff --git a/src/table/gc.rs b/src/table/gc.rs index 694a3789..2dcbcaa0 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -85,8 +85,8 @@ where } } select! { - _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (), - _ = must_exit.changed().fuse() => (), + _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {}, + _ = must_exit.changed().fuse() => {}, } } } @@ -120,7 +120,7 @@ where self.todo_remove_if_equal(&k[..], vhash)?; } - if entries.len() == 0 { + if entries.is_empty() { // Nothing to do in this iteration return Ok(false); } @@ -247,7 +247,7 @@ where } Ok(GcRPC::Ok) } - _ => Err(Error::Message(format!("Unexpected GC RPC"))), + _ => Err(Error::Message("Unexpected GC RPC".to_string())), } } } diff --git a/src/table/lib.rs b/src/table/lib.rs index c3e14ab8..7b5d0512 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -1,4 +1,5 @@ #![recursion_limit = "1024"] +#![allow(clippy::comparison_chain, clippy::upper_case_acronyms)] #[macro_use] extern crate log; diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 39b87aa1..5c5cbec7 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -111,8 +111,8 @@ where } } else { select! { - _ = self.data.merkle_todo_notify.notified().fuse() => (), - _ = must_exit.changed().fuse() => (), + _ = self.data.merkle_todo_notify.notified().fuse() => {}, + _ = must_exit.changed().fuse() => {}, } } } @@ -121,10 +121,10 @@ where fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { let khash = blake2sum(k); - let new_vhash = if vhash_by.len() == 0 { + let new_vhash = if vhash_by.is_empty() { None } else { - Some(Hash::try_from(&vhash_by[..]).unwrap()) + Some(Hash::try_from(vhash_by).unwrap()) }; let key = MerkleNodeKey { @@ -168,14 +168,7 @@ where // This update is an Option<_>, so that it is None if the update is a no-op // and we can thus skip recalculating and re-storing everything let mutate = match self.read_node_txn(tx, &key)? { - MerkleNode::Empty => { - if let Some(vhv) = new_vhash { - Some(MerkleNode::Leaf(k.to_vec(), vhv)) - } else { - // Nothing to do, keep empty node - None - } - } + MerkleNode::Empty => new_vhash.map(|vhv| MerkleNode::Leaf(k.to_vec(), vhv)), MerkleNode::Intermediate(mut children) => { let key2 = key.next_key(khash); if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? { @@ -186,7 +179,7 @@ where intermediate_set_child(&mut children, key2.prefix[i], subhash); } - if children.len() == 0 { + if children.is_empty() { // should not happen warn!( "({}) Replacing intermediate node with empty node, should not happen.", diff --git a/src/table/schema.rs b/src/table/schema.rs index 13517271..74611749 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -18,7 +18,7 @@ impl PartitionKey for String { impl PartitionKey for Hash { fn hash(&self) -> Hash { - self.clone() + *self } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 3130abe8..33b01455 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -150,14 +150,12 @@ where if let Some(busy) = busy_opt { if busy { nothing_to_do_since = None; - } else { - if nothing_to_do_since.is_none() { - nothing_to_do_since = Some(Instant::now()); - } + } else if nothing_to_do_since.is_none() { + nothing_to_do_since = Some(Instant::now()); } } } - _ = must_exit.changed().fuse() => (), + _ = must_exit.changed().fuse() => {}, _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; @@ -277,7 +275,7 @@ where } } - if items.len() > 0 { + if !items.is_empty() { let nodes = self .data .replication @@ -292,9 +290,10 @@ where break; } if nodes.len() < self.data.replication.write_quorum() { - return Err(Error::Message(format!( + return Err(Error::Message( "Not offloading as we don't have a quorum of nodes to write to." - ))); + .to_string(), + )); } counter += 1; @@ -317,14 +316,14 @@ where async fn offload_items( self: &Arc<Self>, - items: &Vec<(Vec<u8>, Arc<ByteBuf>)>, + items: &[(Vec<u8>, Arc<ByteBuf>)], nodes: &[UUID], ) -> Result<(), Error> { let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>(); self.rpc_client .try_call_many( - &nodes[..], + nodes, SyncRPC::Items(values), RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT), ) @@ -467,7 +466,7 @@ where } if todo_items.len() >= 256 { - self.send_items(who, std::mem::replace(&mut todo_items, vec![])) + self.send_items(who, std::mem::take(&mut todo_items)) .await?; } } @@ -523,7 +522,7 @@ where self.data.update_many(items)?; Ok(SyncRPC::Ok) } - _ => Err(Error::Message(format!("Unexpected sync RPC"))), + _ => Err(Error::Message("Unexpected sync RPC".to_string())), } } } diff --git a/src/table/table.rs b/src/table/table.rs index e203b178..833d5771 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -109,17 +109,14 @@ where } pub async fn insert_many(&self, entries: &[F::E]) -> Result<(), Error> { - let mut call_list = HashMap::new(); + let mut call_list: HashMap<_, Vec<_>> = HashMap::new(); for entry in entries.iter() { let hash = entry.partition_key().hash(); let who = self.data.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { - if !call_list.contains_key(&node) { - call_list.insert(node, vec![]); - } - call_list.get_mut(&node).unwrap().push(e_enc.clone()); + call_list.entry(node).or_default().push(e_enc.clone()); } } @@ -183,7 +180,7 @@ where } } } else { - return Err(Error::Message(format!("Invalid return value to read"))); + return Err(Error::Message("Invalid return value to read".to_string())); } } if let Some(ret_entry) = &ret { @@ -268,7 +265,7 @@ where let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); self.rpc_client .try_call_many( - &who[..], + who, TableRPC::<F>::Update(vec![what_enc]), RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT), ) @@ -307,7 +304,7 @@ where self.data.update_many(pairs)?; Ok(TableRPC::Ok) } - _ => Err(Error::BadRPC(format!("Unexpected table RPC"))), + _ => Err(Error::BadRPC("Unexpected table RPC".to_string())), } } } |