aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-11 23:53:32 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-11 23:53:32 +0200
commit9c931f5edacbaaab746ecf180fac2dd7062d0336 (patch)
treef29cfd82f573ac871408256a33e11f9153bae1da /src/table.rs
parent5dd59e437d5af84dfa2cf5dcc2c15807b971002d (diff)
downloadgarage-9c931f5edacbaaab746ecf180fac2dd7062d0336.tar.gz
garage-9c931f5edacbaaab746ecf180fac2dd7062d0336.zip
Keep network status & ring in a tokio::sync::watch
advantages - reads don't prevent preparing writes - can be followed from other parts of the system by cloning the receiver
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs49
1 files changed, 20 insertions, 29 deletions
diff --git a/src/table.rs b/src/table.rs
index 9f5eca33..d0f24119 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -3,9 +3,9 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
+use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
-use futures::stream::*;
use crate::data::*;
use crate::error::Error;
@@ -150,9 +150,9 @@ impl<F: TableFormat + 'static> Table<F> {
let hash = e.partition_key().hash();
let who = self
.system
- .members
- .read()
- .await
+ .ring
+ .borrow()
+ .clone()
.walk_ring(&hash, self.param.replication_factor);
eprintln!("insert who: {:?}", who);
@@ -171,9 +171,9 @@ impl<F: TableFormat + 'static> Table<F> {
let hash = entry.partition_key().hash();
let who = self
.system
- .members
- .read()
- .await
+ .ring
+ .borrow()
+ .clone()
.walk_ring(&hash, self.param.replication_factor);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
@@ -184,20 +184,14 @@ impl<F: TableFormat + 'static> Table<F> {
}
}
- let call_futures = call_list.drain()
- .map(|(node, entries)| async move {
- let rpc = TableRPC::<F>::Update(entries);
- let rpc_bytes = rmp_to_vec_all_named(&rpc)?;
- let message = Message::TableRPC(self.name.to_string(), rpc_bytes);
-
- let resp = rpc_call(
- self.system.clone(),
- &node,
- &message,
- self.param.timeout
- ).await?;
- Ok::<_, Error>((node, resp))
- });
+ let call_futures = call_list.drain().map(|(node, entries)| async move {
+ let rpc = TableRPC::<F>::Update(entries);
+ let rpc_bytes = rmp_to_vec_all_named(&rpc)?;
+ let message = Message::TableRPC(self.name.to_string(), rpc_bytes);
+
+ let resp = rpc_call(self.system.clone(), &node, &message, self.param.timeout).await?;
+ Ok::<_, Error>((node, resp))
+ });
let mut resps = call_futures.collect::<FuturesUnordered<_>>();
let mut errors = vec![];
@@ -217,9 +211,9 @@ impl<F: TableFormat + 'static> Table<F> {
let hash = partition_key.hash();
let who = self
.system
- .members
- .read()
- .await
+ .ring
+ .borrow()
+ .clone()
.walk_ring(&hash, self.param.replication_factor);
eprintln!("get who: {:?}", who);
@@ -259,12 +253,9 @@ impl<F: TableFormat + 'static> Table<F> {
async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?));
- self.rpc_try_call_many(&who[..],
- &TableRPC::<F>::Update(vec![what_enc]),
- who.len(),
- )
+ self.rpc_try_call_many(&who[..], &TableRPC::<F>::Update(vec![what_enc]), who.len())
.await
- .map(|_|())
+ .map(|_| ())
}
async fn rpc_try_call_many(