aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs27
-rw-r--r--src/table/schema.rs8
-rw-r--r--src/table/util.rs2
3 files changed, 25 insertions, 12 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index 5c792f1f..d19586f3 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -181,13 +181,17 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
let update = self.decode_entry(update_bytes)?;
- self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
- Some(mut ent) => {
- ent.merge(&update);
- ent
- }
- None => update.clone(),
- })?;
+ self.update_entry_with(
+ update.partition_key(),
+ update.sort_key(),
+ |_tx, ent| match ent {
+ Some(mut ent) => {
+ ent.merge(&update);
+ Ok(ent)
+ }
+ None => Ok(update.clone()),
+ },
+ )?;
Ok(())
}
@@ -195,7 +199,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
&self,
partition_key: &F::P,
sort_key: &F::S,
- f: impl Fn(Option<F::E>) -> F::E,
+ update_fn: impl Fn(&mut db::Transaction, Option<F::E>) -> db::TxResult<F::E, Error>,
) -> Result<Option<F::E>, Error> {
let tree_key = self.tree_key(partition_key, sort_key);
@@ -203,10 +207,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
- let new_entry = f(Some(old_entry.clone()));
+ let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
(Some(old_entry), Some(old_bytes), new_entry)
}
- None => (None, None, f(None)),
+ None => (None, None, update_fn(&mut tx, None)?),
};
// Changed can be true in two scenarios
@@ -335,6 +339,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?;
tx.insert(&self.insert_queue, &tree_key, new_entry)?;
+
self.insert_queue_notify.notify_one();
Ok(())
@@ -344,7 +349,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {
let mut ret = p.hash().to_vec();
- ret.extend(s.sort_key());
+ ret.extend(s.sort_key().borrow());
ret
}
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 5cbf6c95..86ff816a 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -31,17 +31,23 @@ impl PartitionKey for FixedBytes32 {
/// Trait for field used to sort data
pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
+ type B<'a>: std::borrow::Borrow<[u8]>;
+
/// Get the key used to sort
- fn sort_key(&self) -> &[u8];
+ fn sort_key(&self) -> Self::B<'_>;
}
impl SortKey for String {
+ type B<'a> = &'a [u8];
+
fn sort_key(&self) -> &[u8] {
self.as_bytes()
}
}
impl SortKey for FixedBytes32 {
+ type B<'a> = &'a [u8];
+
fn sort_key(&self) -> &[u8] {
self.as_slice()
}
diff --git a/src/table/util.rs b/src/table/util.rs
index 0b10cf3f..a79e2cba 100644
--- a/src/table/util.rs
+++ b/src/table/util.rs
@@ -7,6 +7,8 @@ use crate::schema::*;
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EmptyKey;
impl SortKey for EmptyKey {
+ type B<'a> = &'a [u8];
+
fn sort_key(&self) -> &[u8] {
&[]
}