diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/src/table.rs b/src/table.rs index 40114aec..bd26a79d 100644 --- a/src/table.rs +++ b/src/table.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use tokio::sync::RwLock; +use arc_swap::ArcSwapOption; use crate::data::*; use crate::error::Error; @@ -22,7 +22,7 @@ pub struct Table<F: TableSchema> { pub system: Arc<System>, pub store: sled::Tree, - pub syncer: RwLock<Option<Arc<TableSyncer<F>>>>, + pub syncer: ArcSwapOption<TableSyncer<F>>, pub param: TableReplicationParams, } @@ -142,10 +142,10 @@ impl<F: TableSchema + 'static> Table<F> { system, store, param, - syncer: RwLock::new(None), + syncer: ArcSwapOption::from(None), }); let syncer = TableSyncer::launch(table.clone()).await; - *table.syncer.write().await = Some(syncer); + table.syncer.swap(Some(syncer)); table } @@ -389,7 +389,7 @@ impl<F: TableSchema + 'static> Table<F> { Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { - let syncer = self.syncer.read().await.as_ref().unwrap().clone(); + let syncer = self.syncer.load_full().unwrap(); let response = syncer .handle_rpc(&rpc, self.system.background.stop_signal.clone()) .await?; @@ -408,7 +408,7 @@ impl<F: TableSchema + 'static> Table<F> { } } - async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> { + pub async fn handle_update(self: &Arc<Self>, mut entries: Vec<Arc<ByteBuf>>) -> Result<(), Error> { for update_bytes in entries.drain(..) { let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; @@ -437,7 +437,7 @@ impl<F: TableSchema + 'static> Table<F> { if old_entry != new_entry { self.instance.updated(old_entry, new_entry).await; - let syncer = self.syncer.read().await.as_ref().unwrap().clone(); + let syncer = self.syncer.load_full().unwrap(); self.system.background.spawn(syncer.invalidate(tree_key)); } } |