diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 43 |
1 files changed, 25 insertions, 18 deletions
diff --git a/src/table.rs b/src/table.rs index bd26a79d..2ae70398 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,12 +1,12 @@ -use std::collections::{HashMap, BTreeMap}; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use arc_swap::ArcSwapOption; use crate::data::*; use crate::error::Error; @@ -122,7 +122,9 @@ pub trait TableSchema: Send + Sync { type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>); - fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true } + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { + true + } } impl<F: TableSchema + 'static> Table<F> { @@ -244,9 +246,7 @@ impl<F: TableSchema + 'static> Table<F> { let ent2 = ret_entry.clone(); self.system .background - .spawn(async move { - self2.repair_on_read(&who[..], ent2).await - }); + .spawn(async move { self2.repair_on_read(&who[..], ent2).await }); } } Ok(ret) @@ -263,7 +263,8 @@ impl<F: TableSchema + 'static> Table<F> { let ring = self.system.ring.borrow().clone(); let who = ring.walk_ring(&hash, self.param.replication_factor); - let rpc = &TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); + let rpc = + &TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit); let resps = self .rpc_try_call_many(&who[..], &rpc, self.param.read_quorum) .await?; @@ -273,7 +274,8 @@ impl<F: TableSchema + 'static> Table<F> { for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; + let entry = + rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?; let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { @@ -294,16 +296,18 @@ impl<F: TableSchema + 'static> Table<F> { } if !to_repair.is_empty() { let self2 = self.clone(); - self.system - .background - .spawn(async move { - for (_, v) in to_repair.iter_mut() { - self2.repair_on_read(&who[..], v.take().unwrap()).await?; - } - Ok(()) - }); + self.system.background.spawn(async move { + for (_, v) in to_repair.iter_mut() { + self2.repair_on_read(&who[..], v.take().unwrap()).await?; + } + Ok(()) + }); } - let ret_vec = ret.iter_mut().take(limit).map(|(_k, v)| v.take().unwrap()).collect::<Vec<_>>(); + let ret_vec = ret + .iter_mut() + .take(limit) + .map(|(_k, v)| v.take().unwrap()) + .collect::<Vec<_>>(); Ok(ret_vec) } @@ -408,7 +412,10 @@ impl<F: TableSchema + 'static> Table<F> { } } - pub async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> { + pub async fn handle_update( + self: &Arc<Self>, + mut entries: Vec<Arc<ByteBuf>>, + ) -> Result<(), Error> { for update_bytes in entries.drain(..) { let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; |