diff options
author | Alex Auvolat <alex@adnab.me> | 2021-03-11 13:47:21 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-03-11 13:47:21 +0100 |
commit | 8d63738cb062e816fc01c6aa2b32936ad31ff65b (patch) | |
tree | d69bb200a86788e2d7cde822afb70d54f7e7f448 /src | |
parent | 3214dd52dd144c99353830d7340ea158e262b06f (diff) | |
download | garage-8d63738cb062e816fc01c6aa2b32936ad31ff65b.tar.gz garage-8d63738cb062e816fc01c6aa2b32936ad31ff65b.zip |
Checkpoint: add merkle tree in data table
Diffstat (limited to 'src')
-rw-r--r-- | src/garage/admin_rpc.rs | 3 | ||||
-rw-r--r-- | src/garage/server.rs | 2 | ||||
-rw-r--r-- | src/model/block.rs | 14 | ||||
-rw-r--r-- | src/model/garage.rs | 19 | ||||
-rw-r--r-- | src/rpc/membership.rs | 6 | ||||
-rw-r--r-- | src/table/lib.rs | 1 | ||||
-rw-r--r-- | src/table/merkle.rs | 352 | ||||
-rw-r--r-- | src/table/table.rs | 91 | ||||
-rw-r--r-- | src/table/table_sync.rs | 28 | ||||
-rw-r--r-- | src/util/background.rs | 16 |
10 files changed, 450 insertions, 82 deletions
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index e1981e3a..b4a65cad 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -350,8 +350,7 @@ impl AdminRpcHandler { .background .spawn_worker("Repair worker".into(), move |must_exit| async move { repair.repair_worker(opt, must_exit).await - }) - .await; + }); Ok(AdminRPC::Ok(format!( "Repair launched on {:?}", self.garage.system.id diff --git a/src/garage/server.rs b/src/garage/server.rs index a0ab17c4..8dddd7bb 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -49,7 +49,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let (send_cancel, watch_cancel) = watch::channel(false); let background = BackgroundRunner::new(16, watch_cancel.clone()); - let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await; + let garage = Garage::new(config, db, background.clone(), &mut rpc_server); info!("Crate admin RPC handler..."); AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server); diff --git a/src/model/block.rs b/src/model/block.rs index 57f4c077..ba5544a3 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -127,18 +127,16 @@ impl BlockManager { } } - pub async fn spawn_background_worker(self: Arc<Self>) { + pub fn spawn_background_worker(self: Arc<Self>) { // Launch 2 simultaneous workers for background resync loop preprocessing - for i in 0..2usize { + for i in 0..2u64 { let bm2 = self.clone(); let background = self.system.background.clone(); tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(10)).await; - background - .spawn_worker(format!("block resync worker {}", i), move |must_exit| { - bm2.resync_loop(must_exit) - }) - .await; + tokio::time::delay_for(Duration::from_secs(10 * (i + 1))).await; + background.spawn_worker(format!("block resync worker {}", i), move |must_exit| { + bm2.resync_loop(must_exit) + }); }); } } diff --git a/src/model/garage.rs b/src/model/garage.rs index d109fdaa..aac79a85 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -35,7 +35,7 @@ pub struct Garage { } impl Garage { - pub async fn new( + pub fn new( config: Config, db: sled::Db, background: Arc<BackgroundRunner>, @@ -86,8 +86,7 @@ impl Garage { &db, "block_ref".to_string(), rpc_server, - ) - .await; + ); info!("Initialize version_table..."); let version_table = Table::new( @@ -100,8 +99,7 @@ impl Garage { &db, "version".to_string(), rpc_server, - ) - .await; + ); info!("Initialize object_table..."); let object_table = Table::new( @@ -114,8 +112,7 @@ impl Garage { &db, "object".to_string(), rpc_server, - ) - .await; + ); info!("Initialize bucket_table..."); let bucket_table = Table::new( @@ -125,8 +122,7 @@ impl Garage { &db, "bucket".to_string(), rpc_server, - ) - .await; + ); info!("Initialize key_table_table..."); let key_table = Table::new( @@ -136,8 +132,7 @@ impl Garage { &db, "key".to_string(), rpc_server, - ) - .await; + ); info!("Initialize Garage..."); let garage = Arc::new(Self { @@ -155,7 +150,7 @@ impl Garage { info!("Start block manager background thread..."); garage.block_manager.garage.swap(Some(garage.clone())); - garage.block_manager.clone().spawn_background_worker().await; + garage.block_manager.clone().spawn_background_worker(); garage } diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 44d7122a..e1dc297e 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -319,8 +319,7 @@ impl System { .background .spawn_worker(format!("ping loop"), |stop_signal| { self2.ping_loop(stop_signal).map(Ok) - }) - .await; + }); if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) { let self2 = self.clone(); @@ -330,8 +329,7 @@ impl System { self2 .consul_loop(stop_signal, consul_host, consul_service_name) .map(Ok) - }) - .await; + }); } } diff --git a/src/table/lib.rs b/src/table/lib.rs index 704f8f1e..62fd30c5 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -7,6 +7,7 @@ pub mod crdt; pub mod schema; pub mod util; +pub mod merkle; pub mod table; pub mod table_fullcopy; pub mod table_sharded; diff --git a/src/table/merkle.rs b/src/table/merkle.rs new file mode 100644 index 00000000..50cb90d5 --- /dev/null +++ b/src/table/merkle.rs @@ -0,0 +1,352 @@ +use std::convert::TryInto; +use std::sync::Arc; +use std::time::Duration; + +use futures::select; +use futures_util::future::*; +use log::{info, warn}; +use serde::{Deserialize, Serialize}; +use sled::transaction::{ + ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, +}; +use tokio::sync::{watch, Notify}; + +use garage_util::background::BackgroundRunner; +use garage_util::data::*; +use garage_util::error::Error; + +// This modules partitions the data in 2**16 partitions, based on the top +// 16 bits (two bytes) of item's partition keys' hashes. +// It builds one Merkle tree for each of these 2**16 partitions. + +pub(crate) struct MerkleUpdater { + table_name: String, + background: Arc<BackgroundRunner>, + + // Content of the todo tree: items where + // - key = the key of an item in the main table, ie hash(partition_key)+sort_key + // - value = the hash of the full serialized item, if present, + // or an empty vec if item is absent (deleted) + pub(crate) todo: sled::Tree, + pub(crate) todo_notify: Notify, + + // Content of the merkle tree: items where + // - key = .bytes() for MerkleNodeKey + // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found + pub(crate) merkle_tree: sled::Tree, + empty_node_hash: Hash, +} + +#[derive(Clone)] +pub struct MerkleNodeKey { + // partition: first 16 bits (two bytes) of the partition_key's hash + pub partition: [u8; 2], + + // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) + pub prefix: Vec<u8>, +} + +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] +pub enum MerkleNode { + // The empty Merkle node + Empty, + + // An intermediate Merkle tree node for a prefix + // Contains the hashes of the 256 possible next prefixes + Intermediate(Vec<(u8, Hash)>), + + // A final node for an item + // Contains the full key of the item and the hash of the value + Leaf(Vec<u8>, Hash), +} + +impl MerkleUpdater { + pub(crate) fn new( + table_name: String, + background: Arc<BackgroundRunner>, + todo: sled::Tree, + merkle_tree: sled::Tree, + ) -> Arc<Self> { + let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); + + Arc::new(Self { + table_name, + background, + todo, + todo_notify: Notify::new(), + merkle_tree, + empty_node_hash, + }) + } + + pub(crate) fn launch(self: &Arc<Self>) { + let self2 = self.clone(); + self.background.spawn_worker( + format!("Merkle tree updater for {}", self.table_name), + |must_exit: watch::Receiver<bool>| self2.updater_loop(must_exit), + ); + } + + async fn updater_loop( + self: Arc<Self>, + mut must_exit: watch::Receiver<bool>, + ) -> Result<(), Error> { + while !*must_exit.borrow() { + if let Some(x) = self.todo.iter().next() { + match x { + Ok((key, valhash)) => { + if let Err(e) = self.update_item(&key[..], &valhash[..]) { + warn!("Error while updating Merkle tree item: {}", e); + } + } + Err(e) => { + warn!("Error while iterating on Merkle todo tree: {}", e); + tokio::time::delay_for(Duration::from_secs(10)).await; + } + } + } else { + select! { + _ = self.todo_notify.notified().fuse() => (), + _ = must_exit.recv().fuse() => (), + } + } + } + Ok(()) + } + + fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { + let khash = blake2sum(k); + + let new_vhash = if vhash_by.len() == 0 { + None + } else { + let vhash_by: [u8; 32] = vhash_by + .try_into() + .map_err(|_| Error::Message(format!("Invalid value in Merkle todo table")))?; + Some(Hash::from(vhash_by)) + }; + + let key = MerkleNodeKey { + partition: k[0..2].try_into().unwrap(), + prefix: vec![], + }; + self.merkle_tree + .transaction(|tx| self.update_item_rec(tx, k, khash, &key, new_vhash))?; + + let deleted = self + .todo + .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)? + .is_ok(); + + if !deleted { + info!( + "Item not deleted from Merkle todo because it changed: {:?}", + k + ); + } + Ok(()) + } + + fn update_item_rec( + &self, + tx: &TransactionalTree, + k: &[u8], + khash: Hash, + key: &MerkleNodeKey, + new_vhash: Option<Hash>, + ) -> ConflictableTransactionResult<Option<Hash>, Error> { + let i = key.prefix.len(); + let mutate = match self.read_node_txn(tx, &key)? { + MerkleNode::Empty => { + if let Some(vhv) = new_vhash { + Some(MerkleNode::Leaf(k.to_vec(), vhv)) + } else { + None + } + } + MerkleNode::Intermediate(mut children) => { + let key2 = key.next_key(khash); + if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? { + if subhash == self.empty_node_hash { + intermediate_rm_child(&mut children, key2.prefix[i]); + } else { + intermediate_set_child(&mut children, key2.prefix[i], subhash); + } + if children.len() == 0 { + // should not happen + warn!("Replacing intermediate node with empty node, should not happen."); + Some(MerkleNode::Empty) + } else if children.len() == 1 { + // move node down to this level + let key_sub = key.add_byte(children[0].0); + let subnode = self.read_node_txn(tx, &key_sub)?; + tx.remove(key_sub.encode())?; + Some(subnode) + } else { + Some(MerkleNode::Intermediate(children)) + } + } else { + None + } + } + MerkleNode::Leaf(exlf_key, exlf_hash) => { + if exlf_key == k { + match new_vhash { + Some(vhv) if vhv == exlf_hash => None, + Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)), + None => Some(MerkleNode::Empty), + } + } else { + if let Some(vhv) = new_vhash { + // Create two sub-nodes and replace by intermediary node + let (pos1, h1) = { + let key2 = key.next_key(blake2sum(&exlf_key[..])); + let subhash = + self.put_node_txn(tx, &key2, &MerkleNode::Leaf(exlf_key, exlf_hash))?; + (key2.prefix[i], subhash) + }; + let (pos2, h2) = { + let key2 = key.next_key(khash); + let subhash = + self.put_node_txn(tx, &key2, &MerkleNode::Leaf(k.to_vec(), vhv))?; + (key2.prefix[i], subhash) + }; + let mut int = vec![]; + intermediate_set_child(&mut int, pos1, h1); + intermediate_set_child(&mut int, pos2, h2); + Some(MerkleNode::Intermediate(int)) + } else { + None + } + } + } + }; + + if let Some(new_node) = mutate { + let hash = self.put_node_txn(tx, &key, &new_node)?; + Ok(Some(hash)) + } else { + Ok(None) + } + } + + // Merkle tree node manipulation + + fn read_node_txn( + &self, + tx: &TransactionalTree, + k: &MerkleNodeKey, + ) -> ConflictableTransactionResult<MerkleNode, Error> { + let ent = tx.get(k.encode())?; + match ent { + None => Ok(MerkleNode::Empty), + Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..]) + .map_err(|e| ConflictableTransactionError::Abort(e.into()))?), + } + } + + fn put_node_txn( + &self, + tx: &TransactionalTree, + k: &MerkleNodeKey, + v: &MerkleNode, + ) -> ConflictableTransactionResult<Hash, Error> { + if *v == MerkleNode::Empty { + tx.remove(k.encode())?; + Ok(self.empty_node_hash) + } else { + let vby = rmp_to_vec_all_named(v) + .map_err(|e| ConflictableTransactionError::Abort(e.into()))?; + let rethash = blake2sum(&vby[..]); + tx.insert(k.encode(), vby)?; + Ok(rethash) + } + } + + pub(crate) fn read_node( + &self, + k: &MerkleNodeKey, + ) -> Result<MerkleNode, Error> { + let ent = self.merkle_tree.get(k.encode())?; + match ent { + None => Ok(MerkleNode::Empty), + Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?) + } + } +} + +impl MerkleNodeKey { + fn encode(&self) -> Vec<u8> { + let mut ret = Vec::with_capacity(2 + self.prefix.len()); + ret.extend(&self.partition[..]); + ret.extend(&self.prefix[..]); + ret + } + + pub fn next_key(&self, h: Hash) -> Self { + assert!(&h.as_slice()[0..self.prefix.len()] == &self.prefix[..]); + let mut s2 = self.clone(); + s2.prefix.push(h.as_slice()[self.prefix.len()]); + s2 + } + + pub fn add_byte(&self, b: u8) -> Self { + let mut s2 = self.clone(); + s2.prefix.push(b); + s2 + } +} + +fn intermediate_set_child(ch: &mut Vec<(u8, Hash)>, pos: u8, v: Hash) { + for i in 0..ch.len() { + if ch[i].0 == pos { + ch[i].1 = v; + return; + } else if ch[i].0 > pos { + ch.insert(i, (pos, v)); + return; + } + } + ch.insert(ch.len(), (pos, v)); +} + +fn intermediate_rm_child(ch: &mut Vec<(u8, Hash)>, pos: u8) { + for i in 0..ch.len() { + if ch[i].0 == pos { + ch.remove(i); + return; + } + } +} + +#[test] +fn test_intermediate_aux() { + let mut v = vec![]; + + intermediate_set_child(&mut v, 12u8, [12u8; 32].into()); + assert!(v == vec![(12u8, [12u8; 32].into())]); + + intermediate_set_child(&mut v, 42u8, [42u8; 32].into()); + assert!(v == vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]); + + intermediate_set_child(&mut v, 4u8, [4u8; 32].into()); + assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]); + + intermediate_set_child(&mut v, 12u8, [8u8; 32].into()); + assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]); + + intermediate_set_child(&mut v, 6u8, [6u8; 32].into()); + assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]); + + intermediate_rm_child(&mut v, 42u8); + assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]); + + intermediate_rm_child(&mut v, 11u8); + assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]); + + intermediate_rm_child(&mut v, 6u8); + assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]); + + intermediate_set_child(&mut v, 6u8, [7u8; 32].into()); + assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [7u8; 32].into()), (12u8, [8u8; 32].into())]); +} diff --git a/src/table/table.rs b/src/table/table.rs index 366ce925..0e75754c 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -8,6 +8,7 @@ use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; +use sled::Transactional; use garage_util::data::*; use garage_util::error::Error; @@ -18,6 +19,7 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use crate::crdt::CRDT; +use crate::merkle::*; use crate::schema::*; use crate::table_sync::*; @@ -33,6 +35,7 @@ pub struct Table<F: TableSchema, R: TableReplication> { pub system: Arc<System>, pub store: sled::Tree, pub syncer: ArcSwapOption<TableSyncer<F, R>>, + merkle_updater: Arc<MerkleUpdater>, } #[derive(Serialize, Deserialize)] @@ -77,7 +80,7 @@ where { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub async fn new( + pub fn new( instance: F, replication: R, system: Arc<System>, @@ -85,11 +88,27 @@ where name: String, rpc_server: &mut RpcServer, ) -> Arc<Self> { - let store = db.open_tree(&name).expect("Unable to open DB tree"); + let store = db + .open_tree(&format!("{}:table", name)) + .expect("Unable to open DB tree"); + + let merkle_todo_store = db + .open_tree(&format!("{}:merkle_todo", name)) + .expect("Unable to open DB Merkle TODO tree"); + let merkle_tree_store = db + .open_tree(&format!("{}:merkle_tree", name)) + .expect("Unable to open DB Merkle tree tree"); let rpc_path = format!("table_{}", name); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); + let merkle_updater = MerkleUpdater::new( + name.clone(), + system.background.clone(), + merkle_todo_store, + merkle_tree_store, + ); + let table = Arc::new(Self { instance, replication, @@ -98,12 +117,15 @@ where system, store, syncer: ArcSwapOption::from(None), + merkle_updater, }); table.clone().register_handler(rpc_server, rpc_path); - let syncer = TableSyncer::launch(table.clone()).await; + let syncer = TableSyncer::launch(table.clone()); table.syncer.swap(Some(syncer)); + table.merkle_updater.launch(); + table } @@ -322,7 +344,7 @@ where Ok(TableRPC::Update(values)) } TableRPC::Update(pairs) => { - self.handle_update(pairs).await?; + self.handle_update(pairs)?; Ok(TableRPC::Ok) } TableRPC::SyncRPC(rpc) => { @@ -380,53 +402,64 @@ where Ok(ret) } - pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { - let syncer = self.syncer.load_full().unwrap(); + // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================ + pub fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { for update_bytes in entries.iter() { - let update = self.decode_entry(update_bytes.as_slice())?; - - let tree_key = self.tree_key(update.partition_key(), update.sort_key()); - - let (old_entry, new_entry) = self.store.transaction(|db| { - let (old_entry, new_entry) = match db.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = self - .decode_entry(&prev_bytes) - .map_err(sled::transaction::ConflictableTransactionError::Abort)?; - let mut new_entry = old_entry.clone(); - new_entry.merge(&update); - (Some(old_entry), new_entry) - } - None => (None, update.clone()), - }; + self.update_entry(update_bytes.as_slice())?; + } + Ok(()) + } + pub(crate) fn update_entry(self: &Arc<Self>, update_bytes: &[u8]) -> Result<(), Error> { + let update = self.decode_entry(update_bytes)?; + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| { + let (old_entry, new_entry) = match db.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = self + .decode_entry(&prev_bytes) + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + let mut new_entry = old_entry.clone(); + new_entry.merge(&update); + (Some(old_entry), new_entry) + } + None => (None, update.clone()), + }; + + if Some(&new_entry) != old_entry.as_ref() { let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RMPEncode) .map_err(sled::transaction::ConflictableTransactionError::Abort)?; + mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?; db.insert(tree_key.clone(), new_bytes)?; - Ok((old_entry, new_entry)) - })?; - - if old_entry.as_ref() != Some(&new_entry) { - self.instance.updated(old_entry, Some(new_entry)); - syncer.invalidate(&tree_key[..]); + Ok(Some((old_entry, new_entry))) + } else { + Ok(None) } + })?; + + if let Some((old_entry, new_entry)) = changed { + self.instance.updated(old_entry, Some(new_entry)); + self.syncer.load_full().unwrap().invalidate(&tree_key[..]); } Ok(()) } pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { - let removed = self.store.transaction(|txn| { + let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| { if let Some(cur_v) = txn.get(k)? { if cur_v == v { txn.remove(k)?; + mkl_todo.insert(k, vec![])?; return Ok(true); } } Ok(false) })?; + if removed { let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index c38b6bd5..51f8cd6f 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -106,7 +106,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { + pub(crate) fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { let todo = SyncTodo { todo: Vec::new() }; let syncer = Arc::new(TableSyncer { table: table.clone(), @@ -119,24 +119,16 @@ where let (busy_tx, busy_rx) = mpsc::unbounded_channel(); let s1 = syncer.clone(); - table - .system - .background - .spawn_worker( - format!("table sync watcher for {}", table.name), - move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), - ) - .await; + table.system.background.spawn_worker( + format!("table sync watcher for {}", table.name), + move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), + ); let s2 = syncer.clone(); - table - .system - .background - .spawn_worker( - format!("table syncer for {}", table.name), - move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), - ) - .await; + table.system.background.spawn_worker( + format!("table syncer for {}", table.name), + move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), + ); let s3 = syncer.clone(); tokio::spawn(async move { @@ -630,7 +622,7 @@ where } } if diff_items.len() > 0 { - self.table.handle_update(&diff_items[..]).await?; + self.table.handle_update(&diff_items[..])?; } if items_to_send.len() > 0 { self.send_items(who, items_to_send).await?; diff --git a/src/util/background.rs b/src/util/background.rs index 937062dd..8081f157 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -1,11 +1,11 @@ use core::future::Future; use std::pin::Pin; +use std::sync::Mutex; use futures::future::join_all; use futures::select; use futures_util::future::*; use std::sync::Arc; -use tokio::sync::Mutex; use tokio::sync::{mpsc, watch, Notify}; use crate::error::Error; @@ -38,7 +38,7 @@ impl BackgroundRunner { } pub async fn run(self: Arc<Self>) { - let mut workers = self.workers.lock().await; + let mut workers = self.workers.lock().unwrap(); for i in 0..self.n_runners { workers.push(tokio::spawn(self.clone().runner(i))); } @@ -47,7 +47,7 @@ impl BackgroundRunner { let mut stop_signal = self.stop_signal.clone(); while let Some(exit_now) = stop_signal.recv().await { if exit_now { - let mut workers = self.workers.lock().await; + let mut workers = self.workers.lock().unwrap(); let workers_vec = workers.drain(..).collect::<Vec<_>>(); join_all(workers_vec).await; return; @@ -73,12 +73,12 @@ impl BackgroundRunner { self.job_notify.notify(); } - pub async fn spawn_worker<F, T>(&self, name: String, worker: F) + pub fn spawn_worker<F, T>(&self, name: String, worker: F) where F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static, T: Future<Output = JobOutput> + Send + 'static, { - let mut workers = self.workers.lock().await; + let mut workers = self.workers.lock().unwrap(); let stop_signal = self.stop_signal.clone(); workers.push(tokio::spawn(async move { if let Err(e) = worker(stop_signal).await { @@ -93,7 +93,7 @@ impl BackgroundRunner { let mut stop_signal = self.stop_signal.clone(); loop { let must_exit: bool = *stop_signal.borrow(); - if let Some(job) = self.dequeue_job(must_exit).await { + if let Some(job) = self.dequeue_job(must_exit) { if let Err(e) = job.await { error!("Job failed: {}", e) } @@ -110,8 +110,8 @@ impl BackgroundRunner { } } - async fn dequeue_job(&self, must_exit: bool) -> Option<Job> { - let mut queue = self.queue_out.lock().await; + fn dequeue_job(&self, must_exit: bool) -> Option<Job> { + let mut queue = self.queue_out.lock().unwrap(); while let Ok((job, cancellable)) = queue.try_recv() { if cancellable && must_exit { continue; |