diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api_server.rs | 28 | ||||
-rw-r--r-- | src/block.rs | 13 | ||||
-rw-r--r-- | src/data.rs | 2 | ||||
-rw-r--r-- | src/main.rs | 6 | ||||
-rw-r--r-- | src/membership.rs | 42 | ||||
-rw-r--r-- | src/object_table.rs | 2 | ||||
-rw-r--r-- | src/rpc_client.rs | 4 | ||||
-rw-r--r-- | src/table.rs | 2 | ||||
-rw-r--r-- | src/table_fullcopy.rs | 6 | ||||
-rw-r--r-- | src/table_sharded.rs | 2 | ||||
-rw-r--r-- | src/table_sync.rs | 33 | ||||
-rw-r--r-- | src/version_table.rs | 4 |
12 files changed, 64 insertions, 80 deletions
diff --git a/src/api_server.rs b/src/api_server.rs index 4ee28f57..f4bb4177 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -123,7 +123,7 @@ async fn handle_put( versions: Vec::new(), }; object.versions.push(Box::new(ObjectVersion { - uuid: version_uuid.clone(), + uuid: version_uuid, timestamp: now_msec(), mime_type: mime_type.to_string(), size: first_block.len() as u64, @@ -139,7 +139,7 @@ async fn handle_put( } let version = Version { - uuid: version_uuid.clone(), + uuid: version_uuid, deleted: false, blocks: Vec::new(), bucket: bucket.into(), @@ -147,12 +147,11 @@ async fn handle_put( }; let first_block_hash = hash(&first_block[..]); - object.versions[0].data = ObjectVersionData::FirstBlock(first_block_hash.clone()); + object.versions[0].data = ObjectVersionData::FirstBlock(first_block_hash); garage.object_table.insert(&object).await?; let mut next_offset = first_block.len(); - let mut put_curr_version_block = - put_block_meta(garage.clone(), &version, 0, first_block_hash.clone()); + let mut put_curr_version_block = put_block_meta(garage.clone(), &version, 0, first_block_hash); let mut put_curr_block = garage .block_manager .rpc_put_block(first_block_hash, first_block); @@ -163,12 +162,8 @@ async fn handle_put( if let Some(block) = next_block { let block_hash = hash(&block[..]); let block_len = block.len(); - put_curr_version_block = put_block_meta( - garage.clone(), - &version, - next_offset as u64, - block_hash.clone(), - ); + put_curr_version_block = + put_block_meta(garage.clone(), &version, next_offset as u64, block_hash); put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); next_offset += block_len; } else { @@ -191,14 +186,11 @@ async fn put_block_meta( hash: Hash, ) -> Result<(), Error> { let mut version = version.clone(); - version.blocks.push(VersionBlock { - offset, - hash: hash.clone(), - }); + version.blocks.push(VersionBlock { offset, hash: hash }); let block_ref = BlockRef { block: hash, - version: version.uuid.clone(), + version: version.uuid, deleted: false, }; @@ -279,7 +271,7 @@ async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<U versions: Vec::new(), }; object.versions.push(Box::new(ObjectVersion { - uuid: version_uuid.clone(), + uuid: version_uuid, timestamp: now_msec(), mime_type: "application/x-delete-marker".into(), size: 0, @@ -339,7 +331,7 @@ async fn handle_get( let mut blocks = version .blocks .iter() - .map(|vb| (vb.hash.clone(), None)) + .map(|vb| (vb.hash, None)) .collect::<Vec<_>>(); blocks[0].1 = Some(first_block); diff --git a/src/block.rs b/src/block.rs index ec29db12..46abcf02 100644 --- a/src/block.rs +++ b/src/block.rs @@ -156,13 +156,10 @@ impl BlockManager { warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); fs::remove_file(path).await?; self.put_to_resync(&hash, 0)?; - return Err(Error::CorruptData(hash.clone())); + return Err(Error::CorruptData(*hash)); } - Ok(Message::PutBlock(PutBlockMessage { - hash: hash.clone(), - data, - })) + Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) } pub async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { @@ -273,7 +270,7 @@ impl BlockManager { if needed_by_others { let ring = garage.system.ring.borrow().clone(); let who = ring.walk_ring(&hash, garage.system.config.data_replication_factor); - let msg = Arc::new(Message::NeedBlockQuery(hash.clone())); + let msg = Arc::new(Message::NeedBlockQuery(*hash)); let who_needs_fut = who.iter().map(|to| { self.rpc_client .call(to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT) @@ -329,7 +326,7 @@ impl BlockManager { pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { let ring = self.system.ring.borrow().clone(); let who = ring.walk_ring(&hash, self.system.config.data_replication_factor); - let msg = Arc::new(Message::GetBlock(hash.clone())); + let msg = Arc::new(Message::GetBlock(*hash)); let mut resp_stream = who .iter() .map(|to| self.rpc_client.call(to, msg.clone(), BLOCK_RW_TIMEOUT)) @@ -374,7 +371,7 @@ impl BlockManager { continue; } if !block_ref.deleted { - last_hash = Some(block_ref.block.clone()); + last_hash = Some(block_ref.block); self.put_to_resync(&block_ref.block, 0)?; } i += 1; diff --git a/src/data.rs b/src/data.rs index 7db715cb..8f976f71 100644 --- a/src/data.rs +++ b/src/data.rs @@ -5,7 +5,7 @@ use sha2::{Digest, Sha256}; use std::fmt; use std::time::{SystemTime, UNIX_EPOCH}; -#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq)] +#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)] pub struct FixedBytes32([u8; 32]); impl From<[u8; 32]> for FixedBytes32 { diff --git a/src/main.rs b/src/main.rs index 6ecf1024..e0ae7db7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -299,7 +299,7 @@ async fn cmd_status(rpc_cli: RpcAddrClient<Message>, rpc_host: SocketAddr) -> Re } } - let status_keys = status.iter().map(|x| x.id.clone()).collect::<HashSet<_>>(); + let status_keys = status.iter().map(|x| x.id).collect::<HashSet<_>>(); if config .members .iter() @@ -347,7 +347,7 @@ async fn cmd_configure( let mut candidates = vec![]; for adv in status.iter() { if hex::encode(&adv.id).starts_with(&args.node_id) { - candidates.push(adv.id.clone()); + candidates.push(adv.id); } } if candidates.len() != 1 { @@ -401,7 +401,7 @@ async fn cmd_remove( let mut candidates = vec![]; for (key, _) in config.members.iter() { if hex::encode(key).starts_with(&args.node_id) { - candidates.push(key.clone()); + candidates.push(*key); } } if candidates.len() != 1 { diff --git a/src/membership.rs b/src/membership.rs index 89b0fd67..78c1dbe3 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -125,9 +125,9 @@ impl Status { fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool { let addr = SocketAddr::new(ip, info.rpc_port); let old_status = self.nodes.insert( - info.id.clone(), + info.id, StatusEntry { - addr: addr.clone(), + addr, remaining_ping_attempts: MAX_FAILED_PINGS, state_info: info.state_info.clone(), }, @@ -177,7 +177,7 @@ impl Ring { new_ring.push(RingEntry { location: location.into(), - node: id.clone(), + node: *id, datacenter, }) } @@ -227,10 +227,10 @@ impl Ring { delta += 1; if !datacenters.contains(&self.ring[i].datacenter) { - ret.push(self.ring[i].node.clone()); + ret.push(self.ring[i].node); datacenters.push(self.ring[i].datacenter); } else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) { - ret.push(self.ring[i].node.clone()); + ret.push(self.ring[i].node); } } @@ -363,9 +363,9 @@ impl System { let status = self.status.borrow().clone(); let ring = self.ring.borrow().clone(); Message::Ping(PingMessage { - id: self.id.clone(), + id: self.id, rpc_port: self.config.rpc_bind_addr.port(), - status_hash: status.hash.clone(), + status_hash: status.hash, config_version: ring.config.version, state_info: self.state_info.clone(), }) @@ -387,7 +387,7 @@ impl System { .config .bootstrap_peers .iter() - .map(|ip| (ip.clone(), None)) + .map(|ip| (*ip, None)) .collect::<Vec<_>>(); self.clone().ping_nodes(bootstrap_peers).await; @@ -407,7 +407,7 @@ impl System { async move { ( id_option, - addr.clone(), + addr, sys.rpc_client .by_addr() .call(&addr, ping_msg_ref, PING_TIMEOUT) @@ -430,18 +430,18 @@ impl System { if is_new { has_changes = true; to_advertise.push(AdvertisedNode { - id: info.id.clone(), - addr: addr.clone(), + id: info.id, + addr: *addr, state_info: info.state_info.clone(), }); } if is_new || status.hash != info.status_hash { self.background - .spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok)); + .spawn_cancellable(self.clone().pull_status(info.id).map(Ok)); } if is_new || ring.config.version < info.config_version { self.background - .spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok)); + .spawn_cancellable(self.clone().pull_config(info.id).map(Ok)); } } else if let Some(id) = id_option { let remaining_attempts = status @@ -489,7 +489,7 @@ impl System { if is_new { status.recalculate_hash(); } - let status_hash = status.hash.clone(); + let status_hash = status.hash; let config_version = self.ring.borrow().config.version; update_locked.0.broadcast(Arc::new(status))?; @@ -497,11 +497,11 @@ impl System { if is_new || status_hash != ping.status_hash { self.background - .spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok)); + .spawn_cancellable(self.clone().pull_status(ping.id).map(Ok)); } if is_new || config_version < ping.config_version { self.background - .spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok)); + .spawn_cancellable(self.clone().pull_config(ping.id).map(Ok)); } Ok(self.make_ping()) @@ -517,8 +517,8 @@ impl System { status.state_info.clone() }; mem.push(AdvertisedNode { - id: node.clone(), - addr: status.addr.clone(), + id: *node, + addr: status.addr, state_info, }); } @@ -545,7 +545,7 @@ impl System { // learn our own ip address let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_bind_addr.port()); let old_self = status.nodes.insert( - node.id.clone(), + node.id, StatusEntry { addr: self_addr, remaining_ping_attempts: MAX_FAILED_PINGS, @@ -557,7 +557,7 @@ impl System { Some(x) => x.addr != self_addr, }; } else if !status.nodes.contains_key(&node.id) { - to_ping.push((node.addr.clone(), Some(node.id.clone()))); + to_ping.push((node.addr, Some(node.id))); } } if has_changed { @@ -607,7 +607,7 @@ impl System { .nodes .iter() .filter(|(id, _)| **id != self.id) - .map(|(id, status)| (status.addr.clone(), Some(id.clone()))) + .map(|(id, status)| (status.addr, Some(*id))) .collect::<Vec<_>>(); self.clone().ping_nodes(ping_addrs).await; diff --git a/src/object_table.rs b/src/object_table.rs index 0d0de146..edad4925 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -113,7 +113,7 @@ impl TableSchema for ObjectTable { .is_err() { let deleted_version = Version { - uuid: v.uuid.clone(), + uuid: v.uuid, deleted: true, blocks: vec![], bucket: old_v.bucket.clone(), diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 2b994402..4a065264 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -53,7 +53,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { let addr = { let status = self.status.borrow().clone(); match status.nodes.get(to.borrow()) { - Some(status) => status.addr.clone(), + Some(status) => status.addr, None => { return Err(Error::Message(format!( "Peer ID not found: {:?}", @@ -93,7 +93,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { .map(|to| { let self2 = self.clone(); let msg = msg.clone(); - async move { self2.call(to.clone(), msg, timeout).await } + async move { self2.call(to, msg, timeout).await } }) .collect::<FuturesUnordered<_>>(); diff --git a/src/table.rs b/src/table.rs index 8bcb3c66..3a21dfc7 100644 --- a/src/table.rs +++ b/src/table.rs @@ -195,7 +195,7 @@ where let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { - call_list.insert(node.clone(), vec![]); + call_list.insert(node, vec![]); } call_list.get_mut(&node).unwrap().push(e_enc.clone()); } diff --git a/src/table_fullcopy.rs b/src/table_fullcopy.rs index d5194d55..2fcf56db 100644 --- a/src/table_fullcopy.rs +++ b/src/table_fullcopy.rs @@ -38,12 +38,12 @@ impl TableFullReplication { // Recalculate neighbors let ring = system.ring.borrow().clone(); - let my_id = system.id.clone(); + let my_id = system.id; let mut nodes = vec![]; for (node, _) in ring.config.members.iter() { let node_ranking = hash(&[node.as_slice(), my_id.as_slice()].concat()); - nodes.push((node.clone(), node_ranking)); + nodes.push((*node, node_ranking)); } nodes.sort_by(|(_, rank1), (_, rank2)| rank1.cmp(rank2)); let mut neighbors = nodes @@ -69,7 +69,7 @@ impl TableReplication for TableFullReplication { // Inconvenient: only suitable to reasonably small tables fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> { - vec![system.id.clone()] + vec![system.id] } fn read_quorum(&self) -> usize { 1 diff --git a/src/table_sharded.rs b/src/table_sharded.rs index 6a174d05..c17ea0d4 100644 --- a/src/table_sharded.rs +++ b/src/table_sharded.rs @@ -47,7 +47,7 @@ impl TableReplication for TableShardedReplication { ret.push([0u8; 32].into()); for entry in ring.ring.iter() { - ret.push(entry.location.clone()); + ret.push(entry.location); } ret.push([0xFFu8; 32].into()); ret diff --git a/src/table_sync.rs b/src/table_sync.rs index aa0610e5..6442841d 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -228,7 +228,7 @@ where partition: &TodoPartition, must_exit: &mut watch::Receiver<bool>, ) -> Result<(), Error> { - let my_id = self.table.system.id.clone(); + let my_id = self.table.system.id; let nodes = self .table .replication @@ -251,7 +251,7 @@ where self.clone().do_sync_with( partition.clone(), root_cks.clone(), - node.clone(), + *node, partition.retain, must_exit.clone(), ) @@ -361,8 +361,8 @@ where .range_checksum_cached_hash(&sub_range, must_exit) .await?; - if let Some(hash) = &sub_ck.hash { - children.push((sub_range.clone(), hash.clone())); + if let Some(hash) = sub_ck.hash { + children.push((sub_range.clone(), hash)); if sub_ck.time < time { time = sub_ck.time; } @@ -527,7 +527,7 @@ where self.table.handle_update(diff_items).await?; } if items_to_send.len() > 0 { - self.send_items(who.clone(), items_to_send).await?; + self.send_items(who, items_to_send).await?; } } else { return Err(Error::BadRequest(format!( @@ -688,7 +688,7 @@ where impl SyncTodo { fn add_full_scan<F: TableSchema, R: TableReplication>(&mut self, table: &Table<F, R>) { - let my_id = table.system.id.clone(); + let my_id = table.system.id; self.todo.clear(); @@ -696,19 +696,14 @@ impl SyncTodo { let split_points = table.replication.split_points(&ring); for i in 0..split_points.len() - 1 { - let begin = split_points[i].clone(); - let end = split_points[i + 1].clone(); + let begin = split_points[i]; + let end = split_points[i + 1]; let nodes = table.replication.replication_nodes(&begin, &ring); let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if table - .store - .range(begin.clone()..end.clone()) - .next() - .is_none() - { + if table.store.range(begin..end).next().is_none() { continue; } } @@ -723,7 +718,7 @@ impl SyncTodo { old_ring: &Ring, new_ring: &Ring, ) { - let my_id = table.system.id.clone(); + let my_id = table.system.id; // If it is us who are entering or leaving the system, // initiate a full sync instead of incremental sync @@ -738,8 +733,8 @@ impl SyncTodo { .into_iter() .chain(table.replication.split_points(old_ring).drain(..)) .chain(table.replication.split_points(new_ring).drain(..)) - .chain(self.todo.iter().map(|x| x.begin.clone())) - .chain(self.todo.iter().map(|x| x.end.clone())) + .chain(self.todo.iter().map(|x| x.begin)) + .chain(self.todo.iter().map(|x| x.end)) .collect::<Vec<_>>(); all_points.sort(); all_points.dedup(); @@ -749,8 +744,8 @@ impl SyncTodo { let mut new_todo = vec![]; for i in 0..all_points.len() - 1 { - let begin = all_points[i].clone(); - let end = all_points[i + 1].clone(); + let begin = all_points[i]; + let end = all_points[i + 1]; let was_ours = table .replication .replication_nodes(&begin, &old_ring) diff --git a/src/version_table.rs b/src/version_table.rs index 230b7f1c..74174dce 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -77,8 +77,8 @@ impl TableSchema for VersionTable { .blocks .iter() .map(|vb| BlockRef { - block: vb.hash.clone(), - version: old_v.uuid.clone(), + block: vb.hash, + version: old_v.uuid, deleted: true, }) .collect::<Vec<_>>(); |