aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/crdt.rs73
-rw-r--r--src/table/lib.rs4
-rw-r--r--src/table/schema.rs4
3 files changed, 32 insertions, 49 deletions
diff --git a/src/table/crdt.rs b/src/table/crdt.rs
index 8f5e4d71..7c888e3a 100644
--- a/src/table/crdt.rs
+++ b/src/table/crdt.rs
@@ -7,7 +7,9 @@ pub trait CRDT {
}
impl<T> CRDT for T
-where T: Ord + Clone {
+where
+ T: Ord + Clone,
+{
fn merge(&mut self, other: &Self) {
if other > self {
*self = other.clone();
@@ -18,14 +20,14 @@ where T: Ord + Clone {
// ---- LWW Register ----
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWW<T>
-{
+pub struct LWW<T> {
ts: u64,
v: T,
}
impl<T> LWW<T>
-where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord
+where
+ T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT,
{
pub fn new(value: T) -> Self {
Self {
@@ -34,10 +36,7 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
}
}
pub fn migrate_from_raw(ts: u64, value: T) -> Self {
- Self {
- ts,
- v: value,
- }
+ Self { ts, v: value }
}
pub fn update(&mut self, new_value: T) {
self.ts = std::cmp::max(self.ts + 1, now_msec());
@@ -46,10 +45,14 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
pub fn get(&self) -> &T {
&self.v
}
+ pub fn get_mut(&mut self) -> &mut T {
+ &mut self.v
+ }
}
impl<T> CRDT for LWW<T>
-where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT
+where
+ T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT,
{
fn merge(&mut self, other: &Self) {
if other.ts > self.ts {
@@ -61,7 +64,6 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
}
}
-
// ---- Boolean (true as absorbing state) ----
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
@@ -85,61 +87,48 @@ impl CRDT for Bool {
}
}
-
// ---- LWW Map ----
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWWMap<K, V>
-{
+pub struct LWWMap<K, V> {
vals: Vec<(K, u64, V)>,
}
impl<K, V> LWWMap<K, V>
-where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
- V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
+where
+ K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
+ V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
{
pub fn new() -> Self {
- Self{
- vals: vec![],
- }
+ Self { vals: vec![] }
}
pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
- Self{
+ Self {
vals: vec![(k, ts, v)],
}
}
pub fn take_and_clear(&mut self) -> Self {
let vals = std::mem::replace(&mut self.vals, vec![]);
- Self{vals}
+ Self { vals }
}
pub fn clear(&mut self) {
self.vals.clear();
}
pub fn update_mutator(&self, k: K, new_v: V) -> Self {
- let new_vals = match self
- .vals
- .binary_search_by(|(k2, _, _)| k2.cmp(&k))
- {
+ let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => {
let (_, old_ts, _) = self.vals[i];
- let new_ts = std::cmp::max(old_ts+1, now_msec());
+ let new_ts = std::cmp::max(old_ts + 1, now_msec());
vec![(k, new_ts, new_v)]
}
- Err(_) => {
- vec![(k, now_msec(), new_v)]
- }
+ Err(_) => vec![(k, now_msec(), new_v)],
};
- Self{
- vals: new_vals,
- }
+ Self { vals: new_vals }
}
pub fn get(&self, k: &K) -> Option<&V> {
- match self
- .vals
- .binary_search_by(|(k2, _, _)| k2.cmp(&k))
- {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => Some(&self.vals[i].2),
- Err(_) => None
+ Err(_) => None,
}
}
pub fn items(&self) -> &[(K, u64, V)] {
@@ -148,17 +137,15 @@ where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
}
impl<K, V> CRDT for LWWMap<K, V>
-where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord,
- V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT,
+where
+ K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord,
+ V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT,
{
fn merge(&mut self, other: &Self) {
for (k, ts2, v2) in other.vals.iter() {
- match self
- .vals
- .binary_search_by(|(k2, _, _)| k2.cmp(&k))
- {
+ match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => {
- let (_, ts1, v1) = &self.vals[i];
+ let (_, ts1, _v1) = &self.vals[i];
if ts2 > ts1 {
self.vals[i].1 = *ts2;
self.vals[i].2 = v2.clone();
diff --git a/src/table/lib.rs b/src/table/lib.rs
index e2bf1f46..704f8f1e 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -3,9 +3,9 @@
#[macro_use]
extern crate log;
+pub mod crdt;
pub mod schema;
pub mod util;
-pub mod crdt;
pub mod table;
pub mod table_fullcopy;
@@ -13,5 +13,5 @@ pub mod table_sharded;
pub mod table_sync;
pub use schema::*;
-pub use util::*;
pub use table::*;
+pub use util::*;
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 49cede0a..d2ec9450 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -20,7 +20,6 @@ impl PartitionKey for Hash {
}
}
-
pub trait SortKey {
fn sort_key(&self) -> &[u8];
}
@@ -37,7 +36,6 @@ impl SortKey for Hash {
}
}
-
pub trait Entry<P: PartitionKey, S: SortKey>:
PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
@@ -47,7 +45,6 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn merge(&mut self, other: &Self);
}
-
#[async_trait]
pub trait TableSchema: Send + Sync {
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
@@ -66,4 +63,3 @@ pub trait TableSchema: Send + Sync {
true
}
}
-