aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-17 18:51:29 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-17 18:51:29 +0200
commitdb1c4222cefa99c6a4453da13bdb4f206b4b05a5 (patch)
tree5a3fabb3626ed15152fd500a1454c02a3c1b382e /src/table.rs
parent4bacaaf53f14ab9340795a5aa98124816d2dab9b (diff)
downloadgarage-db1c4222cefa99c6a4453da13bdb4f206b4b05a5.tar.gz
garage-db1c4222cefa99c6a4453da13bdb4f206b4b05a5.zip
Don't send items...
...if syncer doesn't need them because he's going to delete the partition anyway. Also, fix block resync queue
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs43
1 files changed, 25 insertions, 18 deletions
diff --git a/src/table.rs b/src/table.rs
index bd26a79d..2ae70398 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -1,12 +1,12 @@
-use std::collections::{HashMap, BTreeMap};
+use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
+use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
-use arc_swap::ArcSwapOption;
use crate::data::*;
use crate::error::Error;
@@ -122,7 +122,9 @@ pub trait TableSchema: Send + Sync {
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>);
- fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true }
+ fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
+ true
+ }
}
impl<F: TableSchema + 'static> Table<F> {
@@ -244,9 +246,7 @@ impl<F: TableSchema + 'static> Table<F> {
let ent2 = ret_entry.clone();
self.system
.background
- .spawn(async move {
- self2.repair_on_read(&who[..], ent2).await
- });
+ .spawn(async move { self2.repair_on_read(&who[..], ent2).await });
}
}
Ok(ret)
@@ -263,7 +263,8 @@ impl<F: TableSchema + 'static> Table<F> {
let ring = self.system.ring.borrow().clone();
let who = ring.walk_ring(&hash, self.param.replication_factor);
- let rpc = &TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit);
+ let rpc =
+ &TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key.clone(), filter, limit);
let resps = self
.rpc_try_call_many(&who[..], &rpc, self.param.read_quorum)
.await?;
@@ -273,7 +274,8 @@ impl<F: TableSchema + 'static> Table<F> {
for resp in resps {
if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() {
- let entry = rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?;
+ let entry =
+ rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?;
let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) {
None => {
@@ -294,16 +296,18 @@ impl<F: TableSchema + 'static> Table<F> {
}
if !to_repair.is_empty() {
let self2 = self.clone();
- self.system
- .background
- .spawn(async move {
- for (_, v) in to_repair.iter_mut() {
- self2.repair_on_read(&who[..], v.take().unwrap()).await?;
- }
- Ok(())
- });
+ self.system.background.spawn(async move {
+ for (_, v) in to_repair.iter_mut() {
+ self2.repair_on_read(&who[..], v.take().unwrap()).await?;
+ }
+ Ok(())
+ });
}
- let ret_vec = ret.iter_mut().take(limit).map(|(_k, v)| v.take().unwrap()).collect::<Vec<_>>();
+ let ret_vec = ret
+ .iter_mut()
+ .take(limit)
+ .map(|(_k, v)| v.take().unwrap())
+ .collect::<Vec<_>>();
Ok(ret_vec)
}
@@ -408,7 +412,10 @@ impl<F: TableSchema + 'static> Table<F> {
}
}
- pub async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
+ pub async fn handle_update(
+ self: &Arc<Self>,
+ mut entries: Vec<Arc<ByteBuf>>,
+ ) -> Result<(), Error> {
for update_bytes in entries.drain(..) {
let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;