diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 49 |
1 files changed, 20 insertions, 29 deletions
diff --git a/src/table.rs b/src/table.rs index 9f5eca33..d0f24119 100644 --- a/src/table.rs +++ b/src/table.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use futures::stream::*; use crate::data::*; use crate::error::Error; @@ -150,9 +150,9 @@ impl<F: TableFormat + 'static> Table<F> { let hash = e.partition_key().hash(); let who = self .system - .members - .read() - .await + .ring + .borrow() + .clone() .walk_ring(&hash, self.param.replication_factor); eprintln!("insert who: {:?}", who); @@ -171,9 +171,9 @@ impl<F: TableFormat + 'static> Table<F> { let hash = entry.partition_key().hash(); let who = self .system - .members - .read() - .await + .ring + .borrow() + .clone() .walk_ring(&hash, self.param.replication_factor); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { @@ -184,20 +184,14 @@ impl<F: TableFormat + 'static> Table<F> { } } - let call_futures = call_list.drain() - .map(|(node, entries)| async move { - let rpc = TableRPC::<F>::Update(entries); - let rpc_bytes = rmp_to_vec_all_named(&rpc)?; - let message = Message::TableRPC(self.name.to_string(), rpc_bytes); - - let resp = rpc_call( - self.system.clone(), - &node, - &message, - self.param.timeout - ).await?; - Ok::<_, Error>((node, resp)) - }); + let call_futures = call_list.drain().map(|(node, entries)| async move { + let rpc = TableRPC::<F>::Update(entries); + let rpc_bytes = rmp_to_vec_all_named(&rpc)?; + let message = Message::TableRPC(self.name.to_string(), rpc_bytes); + + let resp = rpc_call(self.system.clone(), &node, &message, self.param.timeout).await?; + Ok::<_, Error>((node, resp)) + }); let mut resps = call_futures.collect::<FuturesUnordered<_>>(); let mut errors = vec![]; @@ -217,9 +211,9 @@ impl<F: TableFormat + 'static> Table<F> { let hash = partition_key.hash(); let who = self .system - .members - .read() - .await + .ring + .borrow() + .clone() .walk_ring(&hash, self.param.replication_factor); eprintln!("get who: {:?}", who); @@ -259,12 +253,9 @@ impl<F: TableFormat + 'static> Table<F> { async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?)); - self.rpc_try_call_many(&who[..], - &TableRPC::<F>::Update(vec![what_enc]), - who.len(), - ) + self.rpc_try_call_many(&who[..], &TableRPC::<F>::Update(vec![what_enc]), who.len()) .await - .map(|_|()) + .map(|_| ()) } async fn rpc_try_call_many( |