aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/garage/admin_rpc.rs3
-rw-r--r--src/garage/server.rs2
-rw-r--r--src/model/block.rs14
-rw-r--r--src/model/garage.rs19
-rw-r--r--src/rpc/membership.rs6
-rw-r--r--src/table/lib.rs1
-rw-r--r--src/table/merkle.rs352
-rw-r--r--src/table/table.rs91
-rw-r--r--src/table/table_sync.rs28
-rw-r--r--src/util/background.rs16
10 files changed, 450 insertions, 82 deletions
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index e1981e3a..b4a65cad 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -350,8 +350,7 @@ impl AdminRpcHandler {
.background
.spawn_worker("Repair worker".into(), move |must_exit| async move {
repair.repair_worker(opt, must_exit).await
- })
- .await;
+ });
Ok(AdminRPC::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
diff --git a/src/garage/server.rs b/src/garage/server.rs
index a0ab17c4..8dddd7bb 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -49,7 +49,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
let (send_cancel, watch_cancel) = watch::channel(false);
let background = BackgroundRunner::new(16, watch_cancel.clone());
- let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await;
+ let garage = Garage::new(config, db, background.clone(), &mut rpc_server);
info!("Crate admin RPC handler...");
AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server);
diff --git a/src/model/block.rs b/src/model/block.rs
index 57f4c077..ba5544a3 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -127,18 +127,16 @@ impl BlockManager {
}
}
- pub async fn spawn_background_worker(self: Arc<Self>) {
+ pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing
- for i in 0..2usize {
+ for i in 0..2u64 {
let bm2 = self.clone();
let background = self.system.background.clone();
tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(10)).await;
- background
- .spawn_worker(format!("block resync worker {}", i), move |must_exit| {
- bm2.resync_loop(must_exit)
- })
- .await;
+ tokio::time::delay_for(Duration::from_secs(10 * (i + 1))).await;
+ background.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
+ bm2.resync_loop(must_exit)
+ });
});
}
}
diff --git a/src/model/garage.rs b/src/model/garage.rs
index d109fdaa..aac79a85 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -35,7 +35,7 @@ pub struct Garage {
}
impl Garage {
- pub async fn new(
+ pub fn new(
config: Config,
db: sled::Db,
background: Arc<BackgroundRunner>,
@@ -86,8 +86,7 @@ impl Garage {
&db,
"block_ref".to_string(),
rpc_server,
- )
- .await;
+ );
info!("Initialize version_table...");
let version_table = Table::new(
@@ -100,8 +99,7 @@ impl Garage {
&db,
"version".to_string(),
rpc_server,
- )
- .await;
+ );
info!("Initialize object_table...");
let object_table = Table::new(
@@ -114,8 +112,7 @@ impl Garage {
&db,
"object".to_string(),
rpc_server,
- )
- .await;
+ );
info!("Initialize bucket_table...");
let bucket_table = Table::new(
@@ -125,8 +122,7 @@ impl Garage {
&db,
"bucket".to_string(),
rpc_server,
- )
- .await;
+ );
info!("Initialize key_table_table...");
let key_table = Table::new(
@@ -136,8 +132,7 @@ impl Garage {
&db,
"key".to_string(),
rpc_server,
- )
- .await;
+ );
info!("Initialize Garage...");
let garage = Arc::new(Self {
@@ -155,7 +150,7 @@ impl Garage {
info!("Start block manager background thread...");
garage.block_manager.garage.swap(Some(garage.clone()));
- garage.block_manager.clone().spawn_background_worker().await;
+ garage.block_manager.clone().spawn_background_worker();
garage
}
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 44d7122a..e1dc297e 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -319,8 +319,7 @@ impl System {
.background
.spawn_worker(format!("ping loop"), |stop_signal| {
self2.ping_loop(stop_signal).map(Ok)
- })
- .await;
+ });
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
@@ -330,8 +329,7 @@ impl System {
self2
.consul_loop(stop_signal, consul_host, consul_service_name)
.map(Ok)
- })
- .await;
+ });
}
}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index 704f8f1e..62fd30c5 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -7,6 +7,7 @@ pub mod crdt;
pub mod schema;
pub mod util;
+pub mod merkle;
pub mod table;
pub mod table_fullcopy;
pub mod table_sharded;
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
new file mode 100644
index 00000000..50cb90d5
--- /dev/null
+++ b/src/table/merkle.rs
@@ -0,0 +1,352 @@
+use std::convert::TryInto;
+use std::sync::Arc;
+use std::time::Duration;
+
+use futures::select;
+use futures_util::future::*;
+use log::{info, warn};
+use serde::{Deserialize, Serialize};
+use sled::transaction::{
+ ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
+};
+use tokio::sync::{watch, Notify};
+
+use garage_util::background::BackgroundRunner;
+use garage_util::data::*;
+use garage_util::error::Error;
+
+// This modules partitions the data in 2**16 partitions, based on the top
+// 16 bits (two bytes) of item's partition keys' hashes.
+// It builds one Merkle tree for each of these 2**16 partitions.
+
+pub(crate) struct MerkleUpdater {
+ table_name: String,
+ background: Arc<BackgroundRunner>,
+
+ // Content of the todo tree: items where
+ // - key = the key of an item in the main table, ie hash(partition_key)+sort_key
+ // - value = the hash of the full serialized item, if present,
+ // or an empty vec if item is absent (deleted)
+ pub(crate) todo: sled::Tree,
+ pub(crate) todo_notify: Notify,
+
+ // Content of the merkle tree: items where
+ // - key = .bytes() for MerkleNodeKey
+ // - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found
+ pub(crate) merkle_tree: sled::Tree,
+ empty_node_hash: Hash,
+}
+
+#[derive(Clone)]
+pub struct MerkleNodeKey {
+ // partition: first 16 bits (two bytes) of the partition_key's hash
+ pub partition: [u8; 2],
+
+ // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
+ pub prefix: Vec<u8>,
+}
+
+#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
+pub enum MerkleNode {
+ // The empty Merkle node
+ Empty,
+
+ // An intermediate Merkle tree node for a prefix
+ // Contains the hashes of the 256 possible next prefixes
+ Intermediate(Vec<(u8, Hash)>),
+
+ // A final node for an item
+ // Contains the full key of the item and the hash of the value
+ Leaf(Vec<u8>, Hash),
+}
+
+impl MerkleUpdater {
+ pub(crate) fn new(
+ table_name: String,
+ background: Arc<BackgroundRunner>,
+ todo: sled::Tree,
+ merkle_tree: sled::Tree,
+ ) -> Arc<Self> {
+ let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
+
+ Arc::new(Self {
+ table_name,
+ background,
+ todo,
+ todo_notify: Notify::new(),
+ merkle_tree,
+ empty_node_hash,
+ })
+ }
+
+ pub(crate) fn launch(self: &Arc<Self>) {
+ let self2 = self.clone();
+ self.background.spawn_worker(
+ format!("Merkle tree updater for {}", self.table_name),
+ |must_exit: watch::Receiver<bool>| self2.updater_loop(must_exit),
+ );
+ }
+
+ async fn updater_loop(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ while !*must_exit.borrow() {
+ if let Some(x) = self.todo.iter().next() {
+ match x {
+ Ok((key, valhash)) => {
+ if let Err(e) = self.update_item(&key[..], &valhash[..]) {
+ warn!("Error while updating Merkle tree item: {}", e);
+ }
+ }
+ Err(e) => {
+ warn!("Error while iterating on Merkle todo tree: {}", e);
+ tokio::time::delay_for(Duration::from_secs(10)).await;
+ }
+ }
+ } else {
+ select! {
+ _ = self.todo_notify.notified().fuse() => (),
+ _ = must_exit.recv().fuse() => (),
+ }
+ }
+ }
+ Ok(())
+ }
+
+ fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
+ let khash = blake2sum(k);
+
+ let new_vhash = if vhash_by.len() == 0 {
+ None
+ } else {
+ let vhash_by: [u8; 32] = vhash_by
+ .try_into()
+ .map_err(|_| Error::Message(format!("Invalid value in Merkle todo table")))?;
+ Some(Hash::from(vhash_by))
+ };
+
+ let key = MerkleNodeKey {
+ partition: k[0..2].try_into().unwrap(),
+ prefix: vec![],
+ };
+ self.merkle_tree
+ .transaction(|tx| self.update_item_rec(tx, k, khash, &key, new_vhash))?;
+
+ let deleted = self
+ .todo
+ .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
+ .is_ok();
+
+ if !deleted {
+ info!(
+ "Item not deleted from Merkle todo because it changed: {:?}",
+ k
+ );
+ }
+ Ok(())
+ }
+
+ fn update_item_rec(
+ &self,
+ tx: &TransactionalTree,
+ k: &[u8],
+ khash: Hash,
+ key: &MerkleNodeKey,
+ new_vhash: Option<Hash>,
+ ) -> ConflictableTransactionResult<Option<Hash>, Error> {
+ let i = key.prefix.len();
+ let mutate = match self.read_node_txn(tx, &key)? {
+ MerkleNode::Empty => {
+ if let Some(vhv) = new_vhash {
+ Some(MerkleNode::Leaf(k.to_vec(), vhv))
+ } else {
+ None
+ }
+ }
+ MerkleNode::Intermediate(mut children) => {
+ let key2 = key.next_key(khash);
+ if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? {
+ if subhash == self.empty_node_hash {
+ intermediate_rm_child(&mut children, key2.prefix[i]);
+ } else {
+ intermediate_set_child(&mut children, key2.prefix[i], subhash);
+ }
+ if children.len() == 0 {
+ // should not happen
+ warn!("Replacing intermediate node with empty node, should not happen.");
+ Some(MerkleNode::Empty)
+ } else if children.len() == 1 {
+ // move node down to this level
+ let key_sub = key.add_byte(children[0].0);
+ let subnode = self.read_node_txn(tx, &key_sub)?;
+ tx.remove(key_sub.encode())?;
+ Some(subnode)
+ } else {
+ Some(MerkleNode::Intermediate(children))
+ }
+ } else {
+ None
+ }
+ }
+ MerkleNode::Leaf(exlf_key, exlf_hash) => {
+ if exlf_key == k {
+ match new_vhash {
+ Some(vhv) if vhv == exlf_hash => None,
+ Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)),
+ None => Some(MerkleNode::Empty),
+ }
+ } else {
+ if let Some(vhv) = new_vhash {
+ // Create two sub-nodes and replace by intermediary node
+ let (pos1, h1) = {
+ let key2 = key.next_key(blake2sum(&exlf_key[..]));
+ let subhash =
+ self.put_node_txn(tx, &key2, &MerkleNode::Leaf(exlf_key, exlf_hash))?;
+ (key2.prefix[i], subhash)
+ };
+ let (pos2, h2) = {
+ let key2 = key.next_key(khash);
+ let subhash =
+ self.put_node_txn(tx, &key2, &MerkleNode::Leaf(k.to_vec(), vhv))?;
+ (key2.prefix[i], subhash)
+ };
+ let mut int = vec![];
+ intermediate_set_child(&mut int, pos1, h1);
+ intermediate_set_child(&mut int, pos2, h2);
+ Some(MerkleNode::Intermediate(int))
+ } else {
+ None
+ }
+ }
+ }
+ };
+
+ if let Some(new_node) = mutate {
+ let hash = self.put_node_txn(tx, &key, &new_node)?;
+ Ok(Some(hash))
+ } else {
+ Ok(None)
+ }
+ }
+
+ // Merkle tree node manipulation
+
+ fn read_node_txn(
+ &self,
+ tx: &TransactionalTree,
+ k: &MerkleNodeKey,
+ ) -> ConflictableTransactionResult<MerkleNode, Error> {
+ let ent = tx.get(k.encode())?;
+ match ent {
+ None => Ok(MerkleNode::Empty),
+ Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])
+ .map_err(|e| ConflictableTransactionError::Abort(e.into()))?),
+ }
+ }
+
+ fn put_node_txn(
+ &self,
+ tx: &TransactionalTree,
+ k: &MerkleNodeKey,
+ v: &MerkleNode,
+ ) -> ConflictableTransactionResult<Hash, Error> {
+ if *v == MerkleNode::Empty {
+ tx.remove(k.encode())?;
+ Ok(self.empty_node_hash)
+ } else {
+ let vby = rmp_to_vec_all_named(v)
+ .map_err(|e| ConflictableTransactionError::Abort(e.into()))?;
+ let rethash = blake2sum(&vby[..]);
+ tx.insert(k.encode(), vby)?;
+ Ok(rethash)
+ }
+ }
+
+ pub(crate) fn read_node(
+ &self,
+ k: &MerkleNodeKey,
+ ) -> Result<MerkleNode, Error> {
+ let ent = self.merkle_tree.get(k.encode())?;
+ match ent {
+ None => Ok(MerkleNode::Empty),
+ Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?)
+ }
+ }
+}
+
+impl MerkleNodeKey {
+ fn encode(&self) -> Vec<u8> {
+ let mut ret = Vec::with_capacity(2 + self.prefix.len());
+ ret.extend(&self.partition[..]);
+ ret.extend(&self.prefix[..]);
+ ret
+ }
+
+ pub fn next_key(&self, h: Hash) -> Self {
+ assert!(&h.as_slice()[0..self.prefix.len()] == &self.prefix[..]);
+ let mut s2 = self.clone();
+ s2.prefix.push(h.as_slice()[self.prefix.len()]);
+ s2
+ }
+
+ pub fn add_byte(&self, b: u8) -> Self {
+ let mut s2 = self.clone();
+ s2.prefix.push(b);
+ s2
+ }
+}
+
+fn intermediate_set_child(ch: &mut Vec<(u8, Hash)>, pos: u8, v: Hash) {
+ for i in 0..ch.len() {
+ if ch[i].0 == pos {
+ ch[i].1 = v;
+ return;
+ } else if ch[i].0 > pos {
+ ch.insert(i, (pos, v));
+ return;
+ }
+ }
+ ch.insert(ch.len(), (pos, v));
+}
+
+fn intermediate_rm_child(ch: &mut Vec<(u8, Hash)>, pos: u8) {
+ for i in 0..ch.len() {
+ if ch[i].0 == pos {
+ ch.remove(i);
+ return;
+ }
+ }
+}
+
+#[test]
+fn test_intermediate_aux() {
+ let mut v = vec![];
+
+ intermediate_set_child(&mut v, 12u8, [12u8; 32].into());
+ assert!(v == vec![(12u8, [12u8; 32].into())]);
+
+ intermediate_set_child(&mut v, 42u8, [42u8; 32].into());
+ assert!(v == vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]);
+
+ intermediate_set_child(&mut v, 4u8, [4u8; 32].into());
+ assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]);
+
+ intermediate_set_child(&mut v, 12u8, [8u8; 32].into());
+ assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]);
+
+ intermediate_set_child(&mut v, 6u8, [6u8; 32].into());
+ assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]);
+
+ intermediate_rm_child(&mut v, 42u8);
+ assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]);
+
+ intermediate_rm_child(&mut v, 11u8);
+ assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]);
+
+ intermediate_rm_child(&mut v, 6u8);
+ assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]);
+
+ intermediate_set_child(&mut v, 6u8, [7u8; 32].into());
+ assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [7u8; 32].into()), (12u8, [8u8; 32].into())]);
+}
diff --git a/src/table/table.rs b/src/table/table.rs
index 366ce925..0e75754c 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -8,6 +8,7 @@ use arc_swap::ArcSwapOption;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
+use sled::Transactional;
use garage_util::data::*;
use garage_util::error::Error;
@@ -18,6 +19,7 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
use crate::crdt::CRDT;
+use crate::merkle::*;
use crate::schema::*;
use crate::table_sync::*;
@@ -33,6 +35,7 @@ pub struct Table<F: TableSchema, R: TableReplication> {
pub system: Arc<System>,
pub store: sled::Tree,
pub syncer: ArcSwapOption<TableSyncer<F, R>>,
+ merkle_updater: Arc<MerkleUpdater>,
}
#[derive(Serialize, Deserialize)]
@@ -77,7 +80,7 @@ where
{
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
- pub async fn new(
+ pub fn new(
instance: F,
replication: R,
system: Arc<System>,
@@ -85,11 +88,27 @@ where
name: String,
rpc_server: &mut RpcServer,
) -> Arc<Self> {
- let store = db.open_tree(&name).expect("Unable to open DB tree");
+ let store = db
+ .open_tree(&format!("{}:table", name))
+ .expect("Unable to open DB tree");
+
+ let merkle_todo_store = db
+ .open_tree(&format!("{}:merkle_todo", name))
+ .expect("Unable to open DB Merkle TODO tree");
+ let merkle_tree_store = db
+ .open_tree(&format!("{}:merkle_tree", name))
+ .expect("Unable to open DB Merkle tree tree");
let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
+ let merkle_updater = MerkleUpdater::new(
+ name.clone(),
+ system.background.clone(),
+ merkle_todo_store,
+ merkle_tree_store,
+ );
+
let table = Arc::new(Self {
instance,
replication,
@@ -98,12 +117,15 @@ where
system,
store,
syncer: ArcSwapOption::from(None),
+ merkle_updater,
});
table.clone().register_handler(rpc_server, rpc_path);
- let syncer = TableSyncer::launch(table.clone()).await;
+ let syncer = TableSyncer::launch(table.clone());
table.syncer.swap(Some(syncer));
+ table.merkle_updater.launch();
+
table
}
@@ -322,7 +344,7 @@ where
Ok(TableRPC::Update(values))
}
TableRPC::Update(pairs) => {
- self.handle_update(pairs).await?;
+ self.handle_update(pairs)?;
Ok(TableRPC::Ok)
}
TableRPC::SyncRPC(rpc) => {
@@ -380,53 +402,64 @@ where
Ok(ret)
}
- pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
- let syncer = self.syncer.load_full().unwrap();
+ // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================
+ pub fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
for update_bytes in entries.iter() {
- let update = self.decode_entry(update_bytes.as_slice())?;
-
- let tree_key = self.tree_key(update.partition_key(), update.sort_key());
-
- let (old_entry, new_entry) = self.store.transaction(|db| {
- let (old_entry, new_entry) = match db.get(&tree_key)? {
- Some(prev_bytes) => {
- let old_entry = self
- .decode_entry(&prev_bytes)
- .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
- let mut new_entry = old_entry.clone();
- new_entry.merge(&update);
- (Some(old_entry), new_entry)
- }
- None => (None, update.clone()),
- };
+ self.update_entry(update_bytes.as_slice())?;
+ }
+ Ok(())
+ }
+ pub(crate) fn update_entry(self: &Arc<Self>, update_bytes: &[u8]) -> Result<(), Error> {
+ let update = self.decode_entry(update_bytes)?;
+ let tree_key = self.tree_key(update.partition_key(), update.sort_key());
+
+ let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| {
+ let (old_entry, new_entry) = match db.get(&tree_key)? {
+ Some(prev_bytes) => {
+ let old_entry = self
+ .decode_entry(&prev_bytes)
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ let mut new_entry = old_entry.clone();
+ new_entry.merge(&update);
+ (Some(old_entry), new_entry)
+ }
+ None => (None, update.clone()),
+ };
+
+ if Some(&new_entry) != old_entry.as_ref() {
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RMPEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
+ mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?;
db.insert(tree_key.clone(), new_bytes)?;
- Ok((old_entry, new_entry))
- })?;
-
- if old_entry.as_ref() != Some(&new_entry) {
- self.instance.updated(old_entry, Some(new_entry));
- syncer.invalidate(&tree_key[..]);
+ Ok(Some((old_entry, new_entry)))
+ } else {
+ Ok(None)
}
+ })?;
+
+ if let Some((old_entry, new_entry)) = changed {
+ self.instance.updated(old_entry, Some(new_entry));
+ self.syncer.load_full().unwrap().invalidate(&tree_key[..]);
}
Ok(())
}
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
- let removed = self.store.transaction(|txn| {
+ let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| {
if let Some(cur_v) = txn.get(k)? {
if cur_v == v {
txn.remove(k)?;
+ mkl_todo.insert(k, vec![])?;
return Ok(true);
}
}
Ok(false)
})?;
+
if removed {
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None);
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
index c38b6bd5..51f8cd6f 100644
--- a/src/table/table_sync.rs
+++ b/src/table/table_sync.rs
@@ -106,7 +106,7 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
+ pub(crate) fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
let todo = SyncTodo { todo: Vec::new() };
let syncer = Arc::new(TableSyncer {
table: table.clone(),
@@ -119,24 +119,16 @@ where
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone();
- table
- .system
- .background
- .spawn_worker(
- format!("table sync watcher for {}", table.name),
- move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
- )
- .await;
+ table.system.background.spawn_worker(
+ format!("table sync watcher for {}", table.name),
+ move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
+ );
let s2 = syncer.clone();
- table
- .system
- .background
- .spawn_worker(
- format!("table syncer for {}", table.name),
- move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
- )
- .await;
+ table.system.background.spawn_worker(
+ format!("table syncer for {}", table.name),
+ move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
+ );
let s3 = syncer.clone();
tokio::spawn(async move {
@@ -630,7 +622,7 @@ where
}
}
if diff_items.len() > 0 {
- self.table.handle_update(&diff_items[..]).await?;
+ self.table.handle_update(&diff_items[..])?;
}
if items_to_send.len() > 0 {
self.send_items(who, items_to_send).await?;
diff --git a/src/util/background.rs b/src/util/background.rs
index 937062dd..8081f157 100644
--- a/src/util/background.rs
+++ b/src/util/background.rs
@@ -1,11 +1,11 @@
use core::future::Future;
use std::pin::Pin;
+use std::sync::Mutex;
use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use std::sync::Arc;
-use tokio::sync::Mutex;
use tokio::sync::{mpsc, watch, Notify};
use crate::error::Error;
@@ -38,7 +38,7 @@ impl BackgroundRunner {
}
pub async fn run(self: Arc<Self>) {
- let mut workers = self.workers.lock().await;
+ let mut workers = self.workers.lock().unwrap();
for i in 0..self.n_runners {
workers.push(tokio::spawn(self.clone().runner(i)));
}
@@ -47,7 +47,7 @@ impl BackgroundRunner {
let mut stop_signal = self.stop_signal.clone();
while let Some(exit_now) = stop_signal.recv().await {
if exit_now {
- let mut workers = self.workers.lock().await;
+ let mut workers = self.workers.lock().unwrap();
let workers_vec = workers.drain(..).collect::<Vec<_>>();
join_all(workers_vec).await;
return;
@@ -73,12 +73,12 @@ impl BackgroundRunner {
self.job_notify.notify();
}
- pub async fn spawn_worker<F, T>(&self, name: String, worker: F)
+ pub fn spawn_worker<F, T>(&self, name: String, worker: F)
where
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
T: Future<Output = JobOutput> + Send + 'static,
{
- let mut workers = self.workers.lock().await;
+ let mut workers = self.workers.lock().unwrap();
let stop_signal = self.stop_signal.clone();
workers.push(tokio::spawn(async move {
if let Err(e) = worker(stop_signal).await {
@@ -93,7 +93,7 @@ impl BackgroundRunner {
let mut stop_signal = self.stop_signal.clone();
loop {
let must_exit: bool = *stop_signal.borrow();
- if let Some(job) = self.dequeue_job(must_exit).await {
+ if let Some(job) = self.dequeue_job(must_exit) {
if let Err(e) = job.await {
error!("Job failed: {}", e)
}
@@ -110,8 +110,8 @@ impl BackgroundRunner {
}
}
- async fn dequeue_job(&self, must_exit: bool) -> Option<Job> {
- let mut queue = self.queue_out.lock().await;
+ fn dequeue_job(&self, must_exit: bool) -> Option<Job> {
+ let mut queue = self.queue_out.lock().unwrap();
while let Ok((job, cancellable)) = queue.try_recv() {
if cancellable && must_exit {
continue;