aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-17 18:27:29 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-17 18:27:29 +0200
commitb780f6485ddf385b584f02b3b17b859b6c432eb9 (patch)
treea0aa4de5b5d1cc6d8c6ad2c13aba26f0f693fa2e /src/table.rs
parent69f1d8fef23149e45189c296e0c0d23e040cbb0e (diff)
downloadgarage-b780f6485ddf385b584f02b3b17b859b6c432eb9.tar.gz
garage-b780f6485ddf385b584f02b3b17b859b6c432eb9.zip
Make sync send data both ways
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs14
1 files changed, 7 insertions, 7 deletions
diff --git a/src/table.rs b/src/table.rs
index 40114aec..bd26a79d 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -6,7 +6,7 @@ use async_trait::async_trait;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
-use tokio::sync::RwLock;
+use arc_swap::ArcSwapOption;
use crate::data::*;
use crate::error::Error;
@@ -22,7 +22,7 @@ pub struct Table<F: TableSchema> {
pub system: Arc<System>,
pub store: sled::Tree,
- pub syncer: RwLock<Option<Arc<TableSyncer<F>>>>,
+ pub syncer: ArcSwapOption<TableSyncer<F>>,
pub param: TableReplicationParams,
}
@@ -142,10 +142,10 @@ impl<F: TableSchema + 'static> Table<F> {
system,
store,
param,
- syncer: RwLock::new(None),
+ syncer: ArcSwapOption::from(None),
});
let syncer = TableSyncer::launch(table.clone()).await;
- *table.syncer.write().await = Some(syncer);
+ table.syncer.swap(Some(syncer));
table
}
@@ -389,7 +389,7 @@ impl<F: TableSchema + 'static> Table<F> {
Ok(TableRPC::Ok)
}
TableRPC::SyncRPC(rpc) => {
- let syncer = self.syncer.read().await.as_ref().unwrap().clone();
+ let syncer = self.syncer.load_full().unwrap();
let response = syncer
.handle_rpc(&rpc, self.system.background.stop_signal.clone())
.await?;
@@ -408,7 +408,7 @@ impl<F: TableSchema + 'static> Table<F> {
}
}
- async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
+ pub async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> {
for update_bytes in entries.drain(..) {
let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?;
@@ -437,7 +437,7 @@ impl<F: TableSchema + 'static> Table<F> {
if old_entry != new_entry {
self.instance.updated(old_entry, new_entry).await;
- let syncer = self.syncer.read().await.as_ref().unwrap().clone();
+ let syncer = self.syncer.load_full().unwrap();
self.system.background.spawn(syncer.invalidate(tree_key));
}
}