aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/k2v/api_server.rs3
-rw-r--r--src/api/k2v/batch.rs85
-rw-r--r--src/api/k2v/index.rs9
-rw-r--r--src/api/k2v/item.rs16
-rw-r--r--src/api/k2v/router.rs8
-rw-r--r--src/garage/admin.rs4
-rw-r--r--src/garage/tests/k2v/poll.rs170
-rw-r--r--src/k2v-client/Cargo.toml3
-rw-r--r--src/k2v-client/bin/k2v-cli.rs203
-rw-r--r--src/k2v-client/lib.rs89
-rw-r--r--src/model/garage.rs8
-rw-r--r--src/model/k2v/causality.rs62
-rw-r--r--src/model/k2v/item_table.rs11
-rw-r--r--src/model/k2v/mod.rs4
-rw-r--r--src/model/k2v/poll.rs50
-rw-r--r--src/model/k2v/rpc.rs275
-rw-r--r--src/model/k2v/seen.rs105
-rw-r--r--src/model/k2v/sub.rs110
-rw-r--r--src/rpc/rpc_helper.rs5
-rw-r--r--src/table/data.rs24
20 files changed, 1051 insertions, 193 deletions
diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs
index 084867b5..bb85b2e7 100644
--- a/src/api/k2v/api_server.rs
+++ b/src/api/k2v/api_server.rs
@@ -164,6 +164,9 @@ impl ApiHandler for K2VApiServer {
Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await,
Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await,
Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await,
+ Endpoint::PollRange { partition_key } => {
+ handle_poll_range(garage, bucket_id, &partition_key, req).await
+ }
Endpoint::Options => unreachable!(),
};
diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs
index 82b4f7e3..26d678da 100644
--- a/src/api/k2v/batch.rs
+++ b/src/api/k2v/batch.rs
@@ -5,7 +5,6 @@ use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_util::data::*;
-use garage_util::error::Error as GarageError;
use garage_table::{EnumerationOrder, TableSchema};
@@ -26,11 +25,7 @@ pub async fn handle_insert_batch(
let mut items2 = vec![];
for it in items {
- let ct = it
- .ct
- .map(|s| CausalContext::parse(&s))
- .transpose()
- .ok_or_bad_request("Invalid causality token")?;
+ let ct = it.ct.map(|s| CausalContext::parse_helper(&s)).transpose()?;
let v = match it.v {
Some(vs) => DvvsValue::Value(
BASE64_STANDARD
@@ -68,10 +63,7 @@ pub async fn handle_read_batch(
resps.push(resp?);
}
- let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(resp_json))?)
+ Ok(json_ok_response(&resps)?)
}
async fn handle_read_batch_query(
@@ -163,10 +155,7 @@ pub async fn handle_delete_batch(
resps.push(resp?);
}
- let resp_json = serde_json::to_string_pretty(&resps).map_err(GarageError::from)?;
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(resp_json))?)
+ Ok(json_ok_response(&resps)?)
}
async fn handle_delete_batch_query(
@@ -260,6 +249,53 @@ async fn handle_delete_batch_query(
})
}
+pub(crate) async fn handle_poll_range(
+ garage: Arc<Garage>,
+ bucket_id: Uuid,
+ partition_key: &str,
+ req: Request<Body>,
+) -> Result<Response<Body>, Error> {
+ use garage_model::k2v::sub::PollRange;
+
+ let query = parse_json_body::<PollRangeQuery>(req).await?;
+
+ let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;
+
+ let resp = garage
+ .k2v
+ .rpc
+ .poll_range(
+ PollRange {
+ partition: K2VItemPartition {
+ bucket_id,
+ partition_key: partition_key.to_string(),
+ },
+ start: query.start,
+ end: query.end,
+ prefix: query.prefix,
+ },
+ query.seen_marker,
+ timeout_msec,
+ )
+ .await?;
+
+ if let Some((items, seen_marker)) = resp {
+ let resp = PollRangeResponse {
+ items: items
+ .into_iter()
+ .map(|(_k, i)| ReadBatchResponseItem::from(i))
+ .collect::<Vec<_>>(),
+ seen_marker,
+ };
+
+ Ok(json_ok_response(&resp)?)
+ } else {
+ Ok(Response::builder()
+ .status(StatusCode::NOT_MODIFIED)
+ .body(Body::empty())?)
+ }
+}
+
#[derive(Deserialize)]
struct InsertBatchItem {
pk: String,
@@ -364,3 +400,24 @@ struct DeleteBatchResponse {
#[serde(rename = "deletedItems")]
deleted_items: usize,
}
+
+#[derive(Deserialize)]
+struct PollRangeQuery {
+ #[serde(default)]
+ prefix: Option<String>,
+ #[serde(default)]
+ start: Option<String>,
+ #[serde(default)]
+ end: Option<String>,
+ #[serde(default)]
+ timeout: Option<u64>,
+ #[serde(default, rename = "seenMarker")]
+ seen_marker: Option<String>,
+}
+
+#[derive(Serialize)]
+struct PollRangeResponse {
+ items: Vec<ReadBatchResponseItem>,
+ #[serde(rename = "seenMarker")]
+ seen_marker: String,
+}
diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index 210950bf..6c1d4a91 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -1,10 +1,9 @@
use std::sync::Arc;
-use hyper::{Body, Response, StatusCode};
+use hyper::{Body, Response};
use serde::Serialize;
use garage_util::data::*;
-use garage_util::error::Error as GarageError;
use garage_rpc::ring::Ring;
use garage_table::util::*;
@@ -12,6 +11,7 @@ use garage_table::util::*;
use garage_model::garage::Garage;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
+use crate::helpers::*;
use crate::k2v::error::*;
use crate::k2v::range::read_range;
@@ -68,10 +68,7 @@ pub async fn handle_read_index(
next_start,
};
- let resp_json = serde_json::to_string_pretty(&resp).map_err(GarageError::from)?;
- Ok(Response::builder()
- .status(StatusCode::OK)
- .body(Body::from(resp_json))?)
+ Ok(json_ok_response(&resp)?)
}
#[derive(Serialize)]
diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs
index 041382c0..e13a0f30 100644
--- a/src/api/k2v/item.rs
+++ b/src/api/k2v/item.rs
@@ -134,9 +134,8 @@ pub async fn handle_insert_item(
.get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str())
.transpose()?
- .map(CausalContext::parse)
- .transpose()
- .ok_or_bad_request("Invalid causality token")?;
+ .map(CausalContext::parse_helper)
+ .transpose()?;
let body = hyper::body::to_bytes(req.into_body()).await?;
let value = DvvsValue::Value(body.to_vec());
@@ -170,9 +169,8 @@ pub async fn handle_delete_item(
.get(X_GARAGE_CAUSALITY_TOKEN)
.map(|s| s.to_str())
.transpose()?
- .map(CausalContext::parse)
- .transpose()
- .ok_or_bad_request("Invalid causality token")?;
+ .map(CausalContext::parse_helper)
+ .transpose()?;
let value = DvvsValue::Deleted;
@@ -209,15 +207,17 @@ pub async fn handle_poll_item(
let causal_context =
CausalContext::parse(&causality_token).ok_or_bad_request("Invalid causality token")?;
+ let timeout_msec = timeout_secs.unwrap_or(300).clamp(1, 600) * 1000;
+
let item = garage
.k2v
.rpc
- .poll(
+ .poll_item(
bucket_id,
partition_key,
sort_key,
causal_context,
- timeout_secs.unwrap_or(300) * 1000,
+ timeout_msec,
)
.await?;
diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs
index e7a3dd69..1cc58be5 100644
--- a/src/api/k2v/router.rs
+++ b/src/api/k2v/router.rs
@@ -32,6 +32,9 @@ pub enum Endpoint {
causality_token: String,
timeout: Option<u64>,
},
+ PollRange {
+ partition_key: String,
+ },
ReadBatch {
},
ReadIndex {
@@ -113,6 +116,7 @@ impl Endpoint {
@gen_parser
(query.keyword.take().unwrap_or_default(), partition_key, query, None),
key: [
+ POLL_RANGE => PollRange,
],
no_key: [
EMPTY => ReadBatch,
@@ -142,6 +146,7 @@ impl Endpoint {
@gen_parser
(query.keyword.take().unwrap_or_default(), partition_key, query, None),
key: [
+ POLL_RANGE => PollRange,
],
no_key: [
EMPTY => InsertBatch,
@@ -234,7 +239,8 @@ impl Endpoint {
generateQueryParameters! {
keywords: [
"delete" => DELETE,
- "search" => SEARCH
+ "search" => SEARCH,
+ "poll_range" => POLL_RANGE
],
fields: [
"prefix" => prefix,
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index 4eabebca..2ef3077c 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -911,7 +911,9 @@ impl AdminRpcHandler {
let role = layout.roles.get(id).and_then(|x| x.0.as_ref());
let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?");
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
- let capacity = role.map(|x| x.capacity_string()).unwrap_or("?".into());
+ let capacity = role
+ .map(|x| x.capacity_string())
+ .unwrap_or_else(|| "?".into());
let avail_str = |x| match x {
Some((avail, total)) => {
let pct = (avail as f64) / (total as f64) * 100.;
diff --git a/src/garage/tests/k2v/poll.rs b/src/garage/tests/k2v/poll.rs
index e56705ae..f54cc5d4 100644
--- a/src/garage/tests/k2v/poll.rs
+++ b/src/garage/tests/k2v/poll.rs
@@ -1,12 +1,16 @@
use hyper::{Method, StatusCode};
use std::time::Duration;
+use assert_json_diff::assert_json_eq;
+use serde_json::json;
+
+use super::json_body;
use crate::common;
#[tokio::test]
-async fn test_poll() {
+async fn test_poll_item() {
let ctx = common::context();
- let bucket = ctx.create_bucket("test-k2v-poll");
+ let bucket = ctx.create_bucket("test-k2v-poll-item");
// Write initial value
let res = ctx
@@ -96,3 +100,165 @@ async fn test_poll() {
.to_vec();
assert_eq!(poll_res_body, b"New value");
}
+
+#[tokio::test]
+async fn test_poll_range() {
+ let ctx = common::context();
+ let bucket = ctx.create_bucket("test-k2v-poll-range");
+
+ // Write initial value
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .body(b"Initial value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Retrieve initial value to get its causality token
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("accept", "application/octet-stream")
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), StatusCode::OK);
+ let ct = res2
+ .headers()
+ .get("x-garage-causality-token")
+ .unwrap()
+ .to_str()
+ .unwrap()
+ .to_string();
+
+ // Initial poll range, retrieve single item and first seen_marker
+ let res2 = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(b"{}".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res2.status(), StatusCode::OK);
+ let json_res = json_body(res2).await;
+ let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
+ assert_json_eq!(
+ json_res,
+ json!(
+ {
+ "items": [
+ {"sk": "test1", "ct": ct, "v": [base64::encode(b"Initial value")]},
+ ],
+ "seenMarker": seen_marker,
+ }
+ )
+ );
+
+ // Second poll range, which will complete later
+ let poll = {
+ let bucket = bucket.clone();
+ tokio::spawn(async move {
+ let ctx = common::context();
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
+ .send()
+ .await
+ })
+ };
+
+ // Write new value that supersedes initial one
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test1"))
+ .signed_header("x-garage-causality-token", ct)
+ .body(b"New value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), StatusCode::OK);
+ let json_res = json_body(poll_res).await;
+ let seen_marker = json_res["seenMarker"].as_str().unwrap().to_string();
+ assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
+ assert_json_eq!(&json_res["items"][0]["sk"], json!("test1"));
+ assert_json_eq!(
+ &json_res["items"][0]["v"],
+ json!([base64::encode(b"New value")])
+ );
+
+ // Now we will add a value on a different key
+ // Start a new poll operation
+ let poll = {
+ let bucket = bucket.clone();
+ tokio::spawn(async move {
+ let ctx = common::context();
+ ctx.k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::POST)
+ .path("root")
+ .query_param("poll_range", None::<String>)
+ .body(format!(r#"{{"seenMarker": "{}"}}"#, seen_marker).into_bytes())
+ .send()
+ .await
+ })
+ };
+
+ // Write value on different key
+ let res = ctx
+ .k2v
+ .request
+ .builder(bucket.clone())
+ .method(Method::PUT)
+ .path("root")
+ .query_param("sort_key", Some("test2"))
+ .body(b"Other value".to_vec())
+ .send()
+ .await
+ .unwrap();
+ assert_eq!(res.status(), StatusCode::NO_CONTENT);
+
+ // Check poll finishes with correct value
+ let poll_res = tokio::select! {
+ _ = tokio::time::sleep(Duration::from_secs(10)) => panic!("poll did not terminate in time"),
+ res = poll => res.unwrap().unwrap(),
+ };
+
+ assert_eq!(poll_res.status(), StatusCode::OK);
+ let json_res = json_body(poll_res).await;
+ assert_eq!(json_res["items"].as_array().unwrap().len(), 1);
+ assert_json_eq!(&json_res["items"][0]["sk"], json!("test2"));
+ assert_json_eq!(
+ &json_res["items"][0]["v"],
+ json!([base64::encode(b"Other value")])
+ );
+}
diff --git a/src/k2v-client/Cargo.toml b/src/k2v-client/Cargo.toml
index 7de2a55d..88d52747 100644
--- a/src/k2v-client/Cargo.toml
+++ b/src/k2v-client/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "k2v-client"
-version = "0.0.1"
+version = "0.1.1"
authors = ["Trinity Pointard <trinity.pointard@gmail.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
@@ -15,6 +15,7 @@ log = "0.4"
rusoto_core = { version = "0.48.0", default-features = false, features = ["rustls"] }
rusoto_credential = "0.48.0"
rusoto_signature = "0.48.0"
+hyper-rustls = { version = "0.23", default-features = false, features = [ "http1", "http2", "tls12" ] }
serde = "1.0"
serde_json = "1.0"
thiserror = "1.0"
diff --git a/src/k2v-client/bin/k2v-cli.rs b/src/k2v-client/bin/k2v-cli.rs
index 925ebeb8..cdd63cce 100644
--- a/src/k2v-client/bin/k2v-cli.rs
+++ b/src/k2v-client/bin/k2v-cli.rs
@@ -1,3 +1,5 @@
+use std::collections::BTreeMap;
+use std::process::exit;
use std::time::Duration;
use k2v_client::*;
@@ -57,22 +59,39 @@ enum Command {
#[clap(flatten)]
output_kind: ReadOutputKind,
},
- /// Watch changes on a single value
- Poll {
- /// Partition key to delete from
+ /// Watch changes on a single value
+ PollItem {
+ /// Partition key of item to watch
partition_key: String,
- /// Sort key to delete from
+ /// Sort key of item to watch
sort_key: String,
/// Causality information
#[clap(short, long)]
causality: String,
/// Timeout, in seconds
- #[clap(short, long)]
+ #[clap(short = 'T', long)]
timeout: Option<u64>,
/// Output formating
#[clap(flatten)]
output_kind: ReadOutputKind,
},
+ /// Watch changes on a range of values
+ PollRange {
+ /// Partition key to poll from
+ partition_key: String,
+ /// Output only sort keys matching this filter
+ #[clap(flatten)]
+ filter: Filter,
+ /// Marker of data that had previously been seen by a PollRange
+ #[clap(short = 'S', long)]
+ seen_marker: Option<String>,
+ /// Timeout, in seconds
+ #[clap(short = 'T', long)]
+ timeout: Option<u64>,
+ /// Output formating
+ #[clap(flatten)]
+ output_kind: BatchOutputKind,
+ },
/// Delete a single value
Delete {
/// Partition key to delete from
@@ -176,7 +195,6 @@ struct ReadOutputKind {
impl ReadOutputKind {
fn display_output(&self, val: CausalValue) -> ! {
use std::io::Write;
- use std::process::exit;
if self.json {
let stdout = std::io::stdout();
@@ -254,6 +272,83 @@ struct BatchOutputKind {
json: bool,
}
+impl BatchOutputKind {
+ fn display_human_output(&self, values: BTreeMap<String, CausalValue>) -> ! {
+ for (key, values) in values {
+ println!("key: {}", key);
+ let causality: String = values.causality.into();
+ println!("causality: {}", causality);
+ for value in values.value {
+ match value {
+ K2vValue::Value(v) => {
+ if let Ok(string) = std::str::from_utf8(&v) {
+ println!(" value(utf-8): {}", string);
+ } else {
+ println!(" value(base64): {}", base64::encode(&v));
+ }
+ }
+ K2vValue::Tombstone => {
+ println!(" tombstone");
+ }
+ }
+ }
+ }
+ exit(0);
+ }
+
+ fn values_json(&self, values: BTreeMap<String, CausalValue>) -> Vec<serde_json::Value> {
+ values
+ .into_iter()
+ .map(|(k, v)| {
+ let mut value = serde_json::to_value(v).unwrap();
+ value
+ .as_object_mut()
+ .unwrap()
+ .insert("sort_key".to_owned(), k.into());
+ value
+ })
+ .collect::<Vec<_>>()
+ }
+
+ fn display_poll_range_output(
+ &self,
+ seen_marker: String,
+ values: BTreeMap<String, CausalValue>,
+ ) -> ! {
+ if self.json {
+ let json = serde_json::json!({
+ "values": self.values_json(values),
+ "seen_marker": seen_marker,
+ });
+
+ let stdout = std::io::stdout();
+ serde_json::to_writer_pretty(stdout, &json).unwrap();
+ exit(0)
+ } else {
+ println!("seen marker: {}", seen_marker);
+ self.display_human_output(values)
+ }
+ }
+
+ fn display_read_range_output(&self, res: PaginatedRange<CausalValue>) -> ! {
+ if self.json {
+ let json = serde_json::json!({
+ "next_key": res.next_start,
+ "values": self.values_json(res.items),
+ });
+
+ let stdout = std::io::stdout();
+ serde_json::to_writer_pretty(stdout, &json).unwrap();
+ exit(0)
+ } else {
+ if let Some(next) = res.next_start {
+ println!("next key: {}", next);
+ }
+ self.display_human_output(res.items)
+ }
+ }
+}
+
/// Filter for batch operations
#[derive(Parser, Debug)]
#[clap(group = clap::ArgGroup::new("filter").multiple(true).required(true))]
@@ -342,7 +437,7 @@ async fn main() -> Result<(), Error> {
let res = client.read_item(&partition_key, &sort_key).await?;
output_kind.display_output(res);
}
- Command::Poll {
+ Command::PollItem {
partition_key,
sort_key,
causality,
@@ -356,7 +451,54 @@ async fn main() -> Result<(), Error> {
if let Some(res) = res_opt {
output_kind.display_output(res);
} else {
- println!("Delay expired and value didn't change.");
+ if output_kind.json {
+ println!("null");
+ } else {
+ println!("Delay expired and value didn't change.");
+ }
+ }
+ }
+ Command::PollRange {
+ partition_key,
+ filter,
+ seen_marker,
+ timeout,
+ output_kind,
+ } => {
+ if filter.conflicts_only
+ || filter.tombstones
+ || filter.reverse
+ || filter.limit.is_some()
+ {
+ return Err(Error::Message(
+ "limit, reverse, conlicts-only, tombstones are invalid for poll-range".into(),
+ ));
+ }
+
+ let timeout = timeout.map(Duration::from_secs);
+ let res = client
+ .poll_range(
+ &partition_key,
+ Some(PollRangeFilter {
+ start: filter.start.as_deref(),
+ end: filter.end.as_deref(),
+ prefix: filter.prefix.as_deref(),
+ }),
+ seen_marker.as_deref(),
+ timeout,
+ )
+ .await?;
+ match res {
+ Some((items, seen_marker)) => {
+ output_kind.display_poll_range_output(seen_marker, items);
+ }
+ None => {
+ if output_kind.json {
+ println!("null");
+ } else {
+ println!("Delay expired and value didn't change.");
+ }
+ }
}
}
Command::ReadIndex {
@@ -419,50 +561,7 @@ async fn main() -> Result<(), Error> {
};
let mut res = client.read_batch(&[op]).await?;
let res = res.pop().unwrap();
- if output_kind.json {
- let values = res
- .items
- .into_iter()
- .map(|(k, v)| {
- let mut value = serde_json::to_value(v).unwrap();
- value
- .as_object_mut()
- .unwrap()
- .insert("sort_key".to_owned(), k.into());
- value
- })
- .collect::<Vec<_>>();
- let json = serde_json::json!({
- "next_key": res.next_start,
- "values": values,
- });
-
- let stdout = std::io::stdout();
- serde_json::to_writer_pretty(stdout, &json).unwrap();
- } else {
- if let Some(next) = res.next_start {
- println!("next key: {}", next);
- }
- for (key, values) in res.items {
- println!("key: {}", key);
- let causality: String = values.causality.into();
- println!("causality: {}", causality);
- for value in values.value {
- match value {
- K2vValue::Value(v) => {
- if let Ok(string) = std::str::from_utf8(&v) {
- println!(" value(utf-8): {}", string);
- } else {
- println!(" value(base64): {}", base64::encode(&v));
- }
- }
- K2vValue::Tombstone => {
- println!(" tombstone");
- }
- }
- }
- }
- }
+ output_kind.display_read_range_output(res);
}
Command::DeleteRange {
partition_key,
diff --git a/src/k2v-client/lib.rs b/src/k2v-client/lib.rs
index c2606af4..ca52d0cf 100644
--- a/src/k2v-client/lib.rs
+++ b/src/k2v-client/lib.rs
@@ -40,7 +40,13 @@ impl K2vClient {
creds: AwsCredentials,
user_agent: Option<String>,
) -> Result<Self, Error> {
- let mut client = HttpClient::new()?;
+ let connector = hyper_rustls::HttpsConnectorBuilder::new()
+ .with_native_roots()
+ .https_or_http()
+ .enable_http1()
+ .enable_http2()
+ .build();
+ let mut client = HttpClient::from_connector(connector);
if let Some(ua) = user_agent {
client.local_agent_prepend(ua);
} else {
@@ -153,6 +159,58 @@ impl K2vClient {
}
}
+ /// Perform a PollRange request, waiting for any change in a given range of keys
+ /// to occur
+ pub async fn poll_range(
+ &self,
+ partition_key: &str,
+ filter: Option<PollRangeFilter<'_>>,
+ seen_marker: Option<&str>,
+ timeout: Option<Duration>,
+ ) -> Result<Option<(BTreeMap<String, CausalValue>, String)>, Error> {
+ let timeout = timeout.unwrap_or(DEFAULT_POLL_TIMEOUT);
+
+ let request = PollRangeRequest {
+ filter: filter.unwrap_or_default(),
+ seen_marker,
+ timeout: timeout.as_secs(),
+ };
+
+ let mut req = SignedRequest::new(
+ "POST",
+ SERVICE,
+ &self.region,
+ &format!("/{}/{}", self.bucket, partition_key),
+ );
+ req.add_param("poll_range", "");
+
+ let payload = serde_json::to_vec(&request)?;
+ req.set_payload(Some(payload));
+ let res = self.dispatch(req, Some(timeout + DEFAULT_TIMEOUT)).await?;
+
+ if res.status == StatusCode::NOT_MODIFIED {
+ return Ok(None);
+ }
+
+ let resp: PollRangeResponse = serde_json::from_slice(&res.body)?;
+
+ let items = resp
+ .items
+ .into_iter()
+ .map(|BatchReadItem { sk, ct, v }| {
+ (
+ sk,
+ CausalValue {
+ causality: ct,
+ value: v,
+ },
+ )
+ })
+ .collect::<BTreeMap<_, _>>();
+
+ Ok(Some((items, resp.seen_marker)))
+ }
+
/// Perform an InsertItem request, inserting a value for a single pk+sk.
pub async fn insert_item(
&self,
@@ -389,6 +447,12 @@ impl From<CausalityToken> for String {
}
}
+impl AsRef<str> for CausalityToken {
+ fn as_ref(&self) -> &str {
+ &self.0
+ }
+}
+
/// A value in K2V. can be either a binary value, or a tombstone.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum K2vValue {
@@ -466,6 +530,29 @@ pub struct Filter<'a> {
pub reverse: bool,
}
+#[derive(Debug, Default, Clone, Serialize)]
+pub struct PollRangeFilter<'a> {
+ pub start: Option<&'a str>,
+ pub end: Option<&'a str>,
+ pub prefix: Option<&'a str>,
+}
+
+#[derive(Debug, Clone, Serialize)]
+#[serde(rename_all = "camelCase")]
+struct PollRangeRequest<'a> {
+ #[serde(flatten)]
+ filter: PollRangeFilter<'a>,
+ seen_marker: Option<&'a str>,
+ timeout: u64,
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "camelCase")]
+struct PollRangeResponse {
+ items: Vec<BatchReadItem>,
+ seen_marker: String,
+}
+
impl<'a> Filter<'a> {
fn insert_params(&self, req: &mut SignedRequest) {
if let Some(start) = &self.start {
diff --git a/src/model/garage.rs b/src/model/garage.rs
index ffa54dc5..4716954a 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -27,7 +27,7 @@ use crate::index_counter::*;
use crate::key_table::*;
#[cfg(feature = "k2v")]
-use crate::k2v::{item_table::*, poll::*, rpc::*};
+use crate::k2v::{item_table::*, rpc::*, sub::*};
/// An entire Garage full of data
pub struct Garage {
@@ -305,8 +305,10 @@ impl GarageK2V {
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
+
info!("Initialize K2V subscription manager...");
let subscriptions = Arc::new(SubscriptionManager::new());
+
info!("Initialize K2V item table...");
let item_table = Table::new(
K2VItemTable {
@@ -317,7 +319,9 @@ impl GarageK2V {
system.clone(),
db,
);
- let rpc = K2VRpcHandler::new(system, item_table.clone(), subscriptions);
+
+ info!("Initialize K2V RPC handler...");
+ let rpc = K2VRpcHandler::new(system, db, item_table.clone(), subscriptions);
Self {
item_table,
diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs
index 62488d53..c80ebd39 100644
--- a/src/model/k2v/causality.rs
+++ b/src/model/k2v/causality.rs
@@ -1,3 +1,12 @@
+//! Implements a CausalContext, which is a set of timestamps for each
+//! node -- a vector clock --, indicating that the versions with
+//! timestamps <= these numbers have been seen and can be
+//! overwritten by a subsequent write.
+//!
+//! The textual representation of a CausalContext, which we call a
+//! "causality token", is used in the API and must be sent along with
+//! each write or delete operation to indicate the previously seen
+//! versions that we want to overwrite or delete.
use base64::prelude::*;
use std::collections::BTreeMap;
@@ -7,28 +16,44 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
+use crate::helper::error::{Error as HelperError, OkOrBadRequest};
+
/// Node IDs used in K2V are u64 integers that are the abbreviation
/// of full Garage node IDs which are 256-bit UUIDs.
pub type K2VNodeId = u64;
+pub type VectorClock = BTreeMap<K2VNodeId, u64>;
+
pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
let mut tmp = [0u8; 8];
tmp.copy_from_slice(&node_id.as_slice()[..8]);
u64::from_be_bytes(tmp)
}
-#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
+pub fn vclock_gt(a: &VectorClock, b: &VectorClock) -> bool {
+ a.iter().any(|(n, ts)| ts > b.get(n).unwrap_or(&0))
+}
+
+pub fn vclock_max(a: &VectorClock, b: &VectorClock) -> VectorClock {
+ let mut ret = a.clone();
+ for (n, ts) in b.iter() {
+ let ent = ret.entry(*n).or_insert(0);
+ *ent = std::cmp::max(*ts, *ent);
+ }
+ ret
+}
+
+#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Default)]
pub struct CausalContext {
- pub vector_clock: BTreeMap<K2VNodeId, u64>,
+ pub vector_clock: VectorClock,
}
impl CausalContext {
/// Empty causality context
- pub fn new_empty() -> Self {
- Self {
- vector_clock: BTreeMap::new(),
- }
+ pub fn new() -> Self {
+ Self::default()
}
+
/// Make binary representation and encode in base64
pub fn serialize(&self) -> String {
let mut ints = Vec::with_capacity(2 * self.vector_clock.len());
@@ -45,13 +70,13 @@ impl CausalContext {
BASE64_URL_SAFE_NO_PAD.encode(bytes)
}
- /// Parse from base64-encoded binary representation
- pub fn parse(s: &str) -> Result<Self, String> {
- let bytes = BASE64_URL_SAFE_NO_PAD
- .decode(s)
- .map_err(|e| format!("bad causality token base64: {}", e))?;
+
+ /// Parse from base64-encoded binary representation.
+ /// Returns None on error.
+ pub fn parse(s: &str) -> Option<Self> {
+ let bytes = BASE64_URL_SAFE_NO_PAD.decode(s).ok()?;
if bytes.len() % 16 != 8 || bytes.len() < 8 {
- return Err("bad causality token length".into());
+ return None;
}
let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap());
@@ -68,16 +93,19 @@ impl CausalContext {
let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t);
if check != checksum {
- return Err("bad causality token checksum".into());
+ return None;
}
- Ok(ret)
+ Some(ret)
}
+
+ pub fn parse_helper(s: &str) -> Result<Self, HelperError> {
+ Self::parse(s).ok_or_bad_request("Invalid causality token")
+ }
+
/// Check if this causal context contains newer items than another one
pub fn is_newer_than(&self, other: &Self) -> bool {
- self.vector_clock
- .iter()
- .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0))
+ vclock_gt(&self.vector_clock, &other.vector_clock)
}
}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 9955a9cd..28646f37 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -11,7 +11,7 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
-use crate::k2v::poll::*;
+use crate::k2v::sub::*;
pub const ENTRIES: &str = "entries";
pub const CONFLICTS: &str = "conflicts";
@@ -73,7 +73,8 @@ impl K2VItem {
this_node: Uuid,
context: &Option<CausalContext>,
new_value: DvvsValue,
- ) {
+ node_ts: u64,
+ ) -> u64 {
if let Some(context) = context {
for (node, t_discard) in context.vector_clock.iter() {
if let Some(e) = self.items.get_mut(node) {
@@ -98,12 +99,14 @@ impl K2VItem {
values: vec![],
});
let t_prev = e.max_time();
- e.values.push((t_prev + 1, new_value));
+ let t_new = std::cmp::max(t_prev + 1, node_ts + 1);
+ e.values.push((t_new, new_value));
+ t_new
}
/// Extract the causality context of a K2V Item
pub fn causal_context(&self) -> CausalContext {
- let mut cc = CausalContext::new_empty();
+ let mut cc = CausalContext::new();
for (node, ent) in self.items.iter() {
cc.vector_clock.insert(*node, ent.max_time());
}
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index f6a96151..acc1fcdc 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -1,6 +1,8 @@
pub mod causality;
+pub mod seen;
pub mod item_table;
-pub mod poll;
pub mod rpc;
+
+pub mod sub;
diff --git a/src/model/k2v/poll.rs b/src/model/k2v/poll.rs
deleted file mode 100644
index 93105207..00000000
--- a/src/model/k2v/poll.rs
+++ /dev/null
@@ -1,50 +0,0 @@
-use std::collections::HashMap;
-use std::sync::Mutex;
-
-use serde::{Deserialize, Serialize};
-use tokio::sync::broadcast;
-
-use crate::k2v::item_table::*;
-
-#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
-pub struct PollKey {
- pub partition: K2VItemPartition,
- pub sort_key: String,
-}
-
-#[derive(Default)]
-pub struct SubscriptionManager {
- subscriptions: Mutex<HashMap<PollKey, broadcast::Sender<K2VItem>>>,
-}
-
-impl SubscriptionManager {
- pub fn new() -> Self {
- Self::default()
- }
-
- pub fn subscribe(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
- let mut subs = self.subscriptions.lock().unwrap();
- if let Some(s) = subs.get(key) {
- s.subscribe()
- } else {
- let (tx, rx) = broadcast::channel(8);
- subs.insert(key.clone(), tx);
- rx
- }
- }
-
- pub fn notify(&self, item: &K2VItem) {
- let key = PollKey {
- partition: item.partition.clone(),
- sort_key: item.sort_key.clone(),
- };
- let mut subs = self.subscriptions.lock().unwrap();
- if let Some(s) = subs.get(&key) {
- if s.send(item.clone()).is_err() {
- // no more subscribers, remove channel from here
- // (we will re-create it later if we need to subscribe again)
- subs.remove(&key);
- }
- }
- }
-}
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index f64a7984..117103b6 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -5,9 +5,10 @@
//! node does not process the entry directly, as this would
//! mean the vector clock gets much larger than needed).
-use std::collections::HashMap;
-use std::sync::Arc;
-use std::time::Duration;
+use std::collections::{BTreeMap, HashMap};
+use std::convert::TryInto;
+use std::sync::{Arc, Mutex, MutexGuard};
+use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
@@ -15,9 +16,12 @@ use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select;
+use garage_db as db;
+
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
+use garage_util::time::now_msec;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -25,9 +29,15 @@ use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::{PartitionKey, Table};
+use crate::helper::error::Error as HelperError;
use crate::k2v::causality::*;
use crate::k2v::item_table::*;
-use crate::k2v::poll::*;
+use crate::k2v::seen::*;
+use crate::k2v::sub::*;
+
+const POLL_RANGE_EXTRA_DELAY: Duration = Duration::from_millis(200);
+
+const TIMESTAMP_KEY: &'static [u8] = b"timestamp";
/// RPC messages for K2V
#[derive(Debug, Serialize, Deserialize)]
@@ -40,7 +50,13 @@ enum K2VRpc {
causal_context: CausalContext,
timeout_msec: u64,
},
+ PollRange {
+ range: PollRange,
+ seen_str: Option<String>,
+ timeout_msec: u64,
+ },
PollItemResponse(Option<K2VItem>),
+ PollRangeResponse(Uuid, Vec<K2VItem>),
}
#[derive(Debug, Serialize, Deserialize)]
@@ -59,6 +75,12 @@ impl Rpc for K2VRpc {
pub struct K2VRpcHandler {
system: Arc<System>,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
+
+ // Using a mutex on the local_timestamp_tree is not strictly necessary,
+ // but it helps to not try to do several inserts at the same time,
+ // which would create transaction conflicts and force many useless retries.
+ local_timestamp_tree: Mutex<db::Tree>,
+
endpoint: Arc<Endpoint<K2VRpc, Self>>,
subscriptions: Arc<SubscriptionManager>,
}
@@ -66,14 +88,19 @@ pub struct K2VRpcHandler {
impl K2VRpcHandler {
pub fn new(
system: Arc<System>,
+ db: &db::Db,
item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
subscriptions: Arc<SubscriptionManager>,
) -> Arc<Self> {
+ let local_timestamp_tree = db
+ .open_tree("k2v_local_timestamp")
+ .expect("Unable to open DB tree for k2v local timestamp");
let endpoint = system.netapp.endpoint("garage_model/k2v/Rpc".to_string());
let rpc_handler = Arc::new(Self {
system,
item_table,
+ local_timestamp_tree: Mutex::new(local_timestamp_tree),
endpoint,
subscriptions,
});
@@ -181,7 +208,7 @@ impl K2VRpcHandler {
Ok(())
}
- pub async fn poll(
+ pub async fn poll_item(
&self,
bucket_id: Uuid,
partition_key: String,
@@ -230,9 +257,7 @@ impl K2VRpcHandler {
resp = Some(x);
}
}
- K2VRpc::PollItemResponse(None) => {
- return Ok(None);
- }
+ K2VRpc::PollItemResponse(None) => (),
v => return Err(Error::unexpected_rpc_message(v)),
}
}
@@ -240,10 +265,117 @@ impl K2VRpcHandler {
Ok(resp)
}
+ pub async fn poll_range(
+ &self,
+ range: PollRange,
+ seen_str: Option<String>,
+ timeout_msec: u64,
+ ) -> Result<Option<(BTreeMap<String, K2VItem>, String)>, HelperError> {
+ let has_seen_marker = seen_str.is_some();
+
+ // Parse seen marker, we will use it below. This is also the first check
+ // that it is valid, which returns a bad request error if not.
+ let mut seen = seen_str
+ .as_deref()
+ .map(RangeSeenMarker::decode_helper)
+ .transpose()?
+ .unwrap_or_default();
+ seen.restrict(&range);
+
+ // Prepare PollRange RPC to send to the storage nodes responsible for the parititon
+ let nodes = self
+ .item_table
+ .data
+ .replication
+ .write_nodes(&range.partition.hash());
+ let quorum = self.item_table.data.replication.read_quorum();
+ let msg = K2VRpc::PollRange {
+ range,
+ seen_str,
+ timeout_msec,
+ };
+
+ // Send the request to all nodes, use FuturesUnordered to get the responses in any order
+ let msg = msg.into_req().map_err(netapp::error::Error::from)?;
+ let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
+ let mut requests = nodes
+ .iter()
+ .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
+ .collect::<FuturesUnordered<_>>();
+
+ // Fetch responses. This procedure stops fetching responses when any of the following
+ // conditions arise:
+ // - we have a response to all requests
+ // - we have a response to a read quorum of requests (e.g. 2/3), and an extra delay
+ // has passed since the quorum was achieved
+ // - a global RPC timeout expired
+ // The extra delay after a quorum was received is usefull if the third response was to
+ // arrive during this short interval: this would allow us to consider all the data seen
+ // by that last node in the response we produce, and would likely help reduce the
+ // size of the seen marker that we will return (because we would have an info of the
+ // kind: all items produced by that node until time ts have been returned, so we can
+ // bump the entry in the global vector clock and possibly remove some item-specific
+ // vector clocks)
+ let mut deadline =
+ Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let mut resps = vec![];
+ let mut errors = vec![];
+ loop {
+ select! {
+ _ = tokio::time::sleep_until(deadline.into()) => {
+ break;
+ }
+ res = requests.next() => match res {
+ None => break,
+ Some(Err(e)) => errors.push(e),
+ Some(Ok(r)) => {
+ resps.push(r);
+ if resps.len() >= quorum {
+ deadline = std::cmp::min(deadline, Instant::now() + POLL_RANGE_EXTRA_DELAY);
+ }
+ }
+ }
+ }
+ }
+ if errors.len() > nodes.len() - quorum {
+ let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
+ return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into());
+ }
+
+ // Take all returned items into account to produce the response.
+ let mut new_items = BTreeMap::<String, K2VItem>::new();
+ for v in resps {
+ if let K2VRpc::PollRangeResponse(node, items) = v {
+ seen.mark_seen_node_items(node, items.iter());
+ for item in items.into_iter() {
+ match new_items.get_mut(&item.sort_key) {
+ Some(ent) => {
+ ent.merge(&item);
+ }
+ None => {
+ new_items.insert(item.sort_key.clone(), item);
+ }
+ }
+ }
+ } else {
+ return Err(Error::unexpected_rpc_message(v).into());
+ }
+ }
+
+ if new_items.is_empty() && has_seen_marker {
+ Ok(None)
+ } else {
+ Ok(Some((new_items, seen.encode()?)))
+ }
+ }
+
// ---- internal handlers ----
async fn handle_insert(&self, item: &InsertedItem) -> Result<K2VRpc, Error> {
- let new = self.local_insert(item)?;
+ let new = {
+ let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
+ self.local_insert(&local_timestamp_tree, item)?
+ };
// Propagate to rest of network
if let Some(updated) = new {
@@ -256,11 +388,14 @@ impl K2VRpcHandler {
async fn handle_insert_many(&self, items: &[InsertedItem]) -> Result<K2VRpc, Error> {
let mut updated_vec = vec![];
- for item in items {
- let new = self.local_insert(item)?;
+ {
+ let local_timestamp_tree = self.local_timestamp_tree.lock().unwrap();
+ for item in items {
+ let new = self.local_insert(&local_timestamp_tree, item)?;
- if let Some(updated) = new {
- updated_vec.push(updated);
+ if let Some(updated) = new {
+ updated_vec.push(updated);
+ }
}
}
@@ -272,10 +407,22 @@ impl K2VRpcHandler {
Ok(K2VRpc::Ok)
}
- fn local_insert(&self, item: &InsertedItem) -> Result<Option<K2VItem>, Error> {
+ fn local_insert(
+ &self,
+ local_timestamp_tree: &MutexGuard<'_, db::Tree>,
+ item: &InsertedItem,
+ ) -> Result<Option<K2VItem>, Error> {
+ let now = now_msec();
+
self.item_table
.data
- .update_entry_with(&item.partition, &item.sort_key, |ent| {
+ .update_entry_with(&item.partition, &item.sort_key, |tx, ent| {
+ let old_local_timestamp = tx
+ .get(&local_timestamp_tree, TIMESTAMP_KEY)?
+ .and_then(|x| x.try_into().ok())
+ .map(u64::from_be_bytes)
+ .unwrap_or_default();
+
let mut ent = ent.unwrap_or_else(|| {
K2VItem::new(
item.partition.bucket_id,
@@ -283,13 +430,25 @@ impl K2VRpcHandler {
item.sort_key.clone(),
)
});
- ent.update(self.system.id, &item.causal_context, item.value.clone());
- ent
+ let new_local_timestamp = ent.update(
+ self.system.id,
+ &item.causal_context,
+ item.value.clone(),
+ std::cmp::max(old_local_timestamp, now),
+ );
+
+ tx.insert(
+ &local_timestamp_tree,
+ TIMESTAMP_KEY,
+ u64::to_be_bytes(new_local_timestamp),
+ )?;
+
+ Ok(ent)
})
}
- async fn handle_poll(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
- let mut chan = self.subscriptions.subscribe(key);
+ async fn handle_poll_item(&self, key: &PollKey, ct: &CausalContext) -> Result<K2VItem, Error> {
+ let mut chan = self.subscriptions.subscribe_item(key);
let mut value = self
.item_table
@@ -311,6 +470,71 @@ impl K2VRpcHandler {
Ok(value)
}
+
+ async fn handle_poll_range(
+ &self,
+ range: &PollRange,
+ seen_str: &Option<String>,
+ ) -> Result<Vec<K2VItem>, Error> {
+ if let Some(seen_str) = seen_str {
+ let seen = RangeSeenMarker::decode(seen_str).ok_or_message("Invalid seenMarker")?;
+
+ // Subscribe now to all changes on that partition,
+ // so that new items that are inserted while we are reading the range
+ // will be seen in the loop below
+ let mut chan = self.subscriptions.subscribe_partition(&range.partition);
+
+ // Check for the presence of any new items already stored in the item table
+ let mut new_items = self.poll_range_read_range(range, &seen)?;
+
+ // If we found no new items, wait for a matching item to arrive
+ // on the channel
+ while new_items.is_empty() {
+ let item = chan.recv().await?;
+ if range.matches(&item) && seen.is_new_item(&item) {
+ new_items.push(item);
+ }
+ }
+
+ Ok(new_items)
+ } else {
+ // If no seen marker was specified, we do not poll for anything.
+ // We return immediately with the set of known items (even if
+ // it is empty), which will give the client an inital view of
+ // the dataset and an initial seen marker for further
+ // PollRange calls.
+ self.poll_range_read_range(range, &RangeSeenMarker::default())
+ }
+ }
+
+ fn poll_range_read_range(
+ &self,
+ range: &PollRange,
+ seen: &RangeSeenMarker,
+ ) -> Result<Vec<K2VItem>, Error> {
+ let mut new_items = vec![];
+
+ let partition_hash = range.partition.hash();
+ let first_key = match &range.start {
+ None => partition_hash.to_vec(),
+ Some(sk) => self.item_table.data.tree_key(&range.partition, sk),
+ };
+ for item in self.item_table.data.store.range(first_key..)? {
+ let (key, value) = item?;
+ if &key[..32] != partition_hash.as_slice() {
+ break;
+ }
+ let item = self.item_table.data.decode_entry(&value)?;
+ if !range.matches(&item) {
+ break;
+ }
+ if seen.is_new_item(&item) {
+ new_items.push(item);
+ }
+ }
+
+ Ok(new_items)
+ }
}
#[async_trait]
@@ -326,10 +550,21 @@ impl EndpointHandler<K2VRpc> for K2VRpcHandler {
} => {
let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
select! {
- ret = self.handle_poll(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
+ ret = self.handle_poll_item(key, causal_context) => ret.map(Some).map(K2VRpc::PollItemResponse),
_ = delay => Ok(K2VRpc::PollItemResponse(None)),
}
}
+ K2VRpc::PollRange {
+ range,
+ seen_str,
+ timeout_msec,
+ } => {
+ let delay = tokio::time::sleep(Duration::from_millis(*timeout_msec));
+ select! {
+ ret = self.handle_poll_range(range, seen_str) => ret.map(|items| K2VRpc::PollRangeResponse(self.system.id, items)),
+ _ = delay => Ok(K2VRpc::PollRangeResponse(self.system.id, vec![])),
+ }
+ }
m => Err(Error::unexpected_rpc_message(m)),
}
}
diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs
new file mode 100644
index 00000000..51098710
--- /dev/null
+++ b/src/model/k2v/seen.rs
@@ -0,0 +1,105 @@
+//! Implements a RangeSeenMarker, a data type used in the PollRange API
+//! to indicate which items in the range have already been seen
+//! and which have not been seen yet.
+//!
+//! It consists of a vector clock that indicates that for each node,
+//! all items produced by that node with timestamps <= the value in the
+//! vector clock has been seen, as well as a set of causal contexts for
+//! individual items.
+
+use std::collections::BTreeMap;
+
+use base64::prelude::*;
+use serde::{Deserialize, Serialize};
+
+use garage_util::data::Uuid;
+use garage_util::encode::{nonversioned_decode, nonversioned_encode};
+use garage_util::error::Error;
+
+use crate::helper::error::{Error as HelperError, OkOrBadRequest};
+use crate::k2v::causality::*;
+use crate::k2v::item_table::*;
+use crate::k2v::sub::*;
+
+#[derive(Debug, Serialize, Deserialize, Default)]
+pub struct RangeSeenMarker {
+ vector_clock: VectorClock,
+ items: BTreeMap<String, VectorClock>,
+}
+
+impl RangeSeenMarker {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn restrict(&mut self, range: &PollRange) {
+ if let Some(start) = &range.start {
+ self.items = self.items.split_off(start);
+ }
+ if let Some(end) = &range.end {
+ self.items.split_off(end);
+ }
+ if let Some(pfx) = &range.prefix {
+ self.items.retain(|k, _v| k.starts_with(pfx));
+ }
+ }
+
+ pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>(
+ &mut self,
+ node: Uuid,
+ items: I,
+ ) {
+ let node = make_node_id(node);
+ for item in items.into_iter() {
+ let cc = item.causal_context();
+
+ if let Some(ts) = cc.vector_clock.get(&node) {
+ let ent = self.vector_clock.entry(node).or_insert(0);
+ *ent = std::cmp::max(*ent, *ts);
+ }
+
+ if vclock_gt(&cc.vector_clock, &self.vector_clock) {
+ match self.items.get_mut(&item.sort_key) {
+ None => {
+ self.items.insert(item.sort_key.clone(), cc.vector_clock);
+ }
+ Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock),
+ }
+ }
+ }
+ }
+
+ pub fn canonicalize(&mut self) {
+ let self_vc = &self.vector_clock;
+ self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc))
+ }
+
+ pub fn encode(&mut self) -> Result<String, Error> {
+ self.canonicalize();
+
+ let bytes = nonversioned_encode(&self)?;
+ let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?;
+ Ok(BASE64_STANDARD.encode(&bytes))
+ }
+
+ /// Decode from msgpack+zstd+b64 representation, returns None on error.
+ pub fn decode(s: &str) -> Option<Self> {
+ let bytes = BASE64_STANDARD.decode(&s).ok()?;
+ let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?;
+ nonversioned_decode(&bytes).ok()
+ }
+
+ pub fn decode_helper(s: &str) -> Result<Self, HelperError> {
+ Self::decode(s).ok_or_bad_request("Invalid causality token")
+ }
+
+ pub fn is_new_item(&self, item: &K2VItem) -> bool {
+ let cc = item.causal_context();
+ vclock_gt(&cc.vector_clock, &self.vector_clock)
+ && self
+ .items
+ .get(&item.sort_key)
+ .map(|vc| vclock_gt(&cc.vector_clock, &vc))
+ .unwrap_or(true)
+ }
+}
diff --git a/src/model/k2v/sub.rs b/src/model/k2v/sub.rs
new file mode 100644
index 00000000..b1daa271
--- /dev/null
+++ b/src/model/k2v/sub.rs
@@ -0,0 +1,110 @@
+use std::collections::HashMap;
+use std::sync::Mutex;
+
+use serde::{Deserialize, Serialize};
+use tokio::sync::broadcast;
+
+use crate::k2v::item_table::*;
+
+#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct PollKey {
+ pub partition: K2VItemPartition,
+ pub sort_key: String,
+}
+
+#[derive(Debug, Hash, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct PollRange {
+ pub partition: K2VItemPartition,
+ pub prefix: Option<String>,
+ pub start: Option<String>,
+ pub end: Option<String>,
+}
+
+#[derive(Default)]
+pub struct SubscriptionManager(Mutex<SubscriptionManagerInner>);
+
+#[derive(Default)]
+pub struct SubscriptionManagerInner {
+ item_subscriptions: HashMap<PollKey, broadcast::Sender<K2VItem>>,
+ part_subscriptions: HashMap<K2VItemPartition, broadcast::Sender<K2VItem>>,
+}
+
+impl SubscriptionManager {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub(crate) fn subscribe_item(&self, key: &PollKey) -> broadcast::Receiver<K2VItem> {
+ let mut inner = self.0.lock().unwrap();
+ if let Some(s) = inner.item_subscriptions.get(key) {
+ s.subscribe()
+ } else {
+ let (tx, rx) = broadcast::channel(8);
+ inner.item_subscriptions.insert(key.clone(), tx);
+ rx
+ }
+ }
+
+ pub(crate) fn subscribe_partition(
+ &self,
+ part: &K2VItemPartition,
+ ) -> broadcast::Receiver<K2VItem> {
+ let mut inner = self.0.lock().unwrap();
+ if let Some(s) = inner.part_subscriptions.get(part) {
+ s.subscribe()
+ } else {
+ let (tx, rx) = broadcast::channel(8);
+ inner.part_subscriptions.insert(part.clone(), tx);
+ rx
+ }
+ }
+
+ pub(crate) fn notify(&self, item: &K2VItem) {
+ let mut inner = self.0.lock().unwrap();
+
+ // 1. Notify single item subscribers,
+ // removing subscriptions with no more listeners if any
+ let key = PollKey {
+ partition: item.partition.clone(),
+ sort_key: item.sort_key.clone(),
+ };
+ if let Some(s) = inner.item_subscriptions.get(&key) {
+ if s.send(item.clone()).is_err() {
+ // no more subscribers, remove channel from here
+ // (we will re-create it later if we need to subscribe again)
+ inner.item_subscriptions.remove(&key);
+ }
+ }
+
+ // 2. Notify partition subscribers,
+ // removing subscriptions with no more listeners if any
+ if let Some(s) = inner.part_subscriptions.get(&item.partition) {
+ if s.send(item.clone()).is_err() {
+ // no more subscribers, remove channel from here
+ // (we will re-create it later if we need to subscribe again)
+ inner.part_subscriptions.remove(&item.partition);
+ }
+ }
+ }
+}
+
+impl PollRange {
+ pub fn matches(&self, item: &K2VItem) -> bool {
+ item.partition == self.partition
+ && self
+ .prefix
+ .as_ref()
+ .map(|x| item.sort_key.starts_with(x))
+ .unwrap_or(true)
+ && self
+ .start
+ .as_ref()
+ .map(|x| item.sort_key >= *x)
+ .unwrap_or(true)
+ && self
+ .end
+ .as_ref()
+ .map(|x| item.sort_key < *x)
+ .unwrap_or(true)
+ }
+}
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 1ec250c3..e59c372a 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -15,10 +15,9 @@ use opentelemetry::{
};
pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
-use netapp::message::IntoReq;
pub use netapp::message::{
- Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
- PRIO_SECONDARY,
+ IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH,
+ PRIO_NORMAL, PRIO_SECONDARY,
};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::{self, NetApp, NodeID};
diff --git a/src/table/data.rs b/src/table/data.rs
index 5c792f1f..26cc3a5a 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -181,13 +181,17 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub(crate) fn update_entry(&self, update_bytes: &[u8]) -> Result<(), Error> {
let update = self.decode_entry(update_bytes)?;
- self.update_entry_with(update.partition_key(), update.sort_key(), |ent| match ent {
- Some(mut ent) => {
- ent.merge(&update);
- ent
- }
- None => update.clone(),
- })?;
+ self.update_entry_with(
+ update.partition_key(),
+ update.sort_key(),
+ |_tx, ent| match ent {
+ Some(mut ent) => {
+ ent.merge(&update);
+ Ok(ent)
+ }
+ None => Ok(update.clone()),
+ },
+ )?;
Ok(())
}
@@ -195,7 +199,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
&self,
partition_key: &F::P,
sort_key: &F::S,
- f: impl Fn(Option<F::E>) -> F::E,
+ update_fn: impl Fn(&mut db::Transaction, Option<F::E>) -> db::TxOpResult<F::E>,
) -> Result<Option<F::E>, Error> {
let tree_key = self.tree_key(partition_key, sort_key);
@@ -203,10 +207,10 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let (old_entry, old_bytes, new_entry) = match tx.get(&self.store, &tree_key)? {
Some(old_bytes) => {
let old_entry = self.decode_entry(&old_bytes).map_err(db::TxError::Abort)?;
- let new_entry = f(Some(old_entry.clone()));
+ let new_entry = update_fn(&mut tx, Some(old_entry.clone()))?;
(Some(old_entry), Some(old_bytes), new_entry)
}
- None => (None, None, f(None)),
+ None => (None, None, update_fn(&mut tx, None)?),
};
// Changed can be true in two scenarios