aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/error.rs21
-rw-r--r--src/api/s3_bucket.rs6
-rw-r--r--src/api/s3_delete.rs2
-rw-r--r--src/api/s3_put.rs6
-rw-r--r--src/garage/admin_rpc.rs16
-rw-r--r--src/garage/cli.rs6
-rw-r--r--src/model/block.rs2
-rw-r--r--src/model/block_ref_table.rs12
-rw-r--r--src/model/bucket_table.rs20
-rw-r--r--src/model/key_table.rs20
-rw-r--r--src/model/object_table.rs10
-rw-r--r--src/model/version_table.rs8
-rw-r--r--src/rpc/lib.rs1
-rw-r--r--src/rpc/membership.rs18
-rw-r--r--src/rpc/ring.rs10
-rw-r--r--src/rpc/rpc_client.rs38
-rw-r--r--src/rpc/rpc_server.rs2
-rw-r--r--src/table/crdt/bool.rs2
-rw-r--r--src/table/crdt/crdt.rs14
-rw-r--r--src/table/crdt/lww.rs10
-rw-r--r--src/table/crdt/lww_map.rs14
-rw-r--r--src/table/crdt/map.rs8
-rw-r--r--src/table/data.rs4
-rw-r--r--src/table/gc.rs30
-rw-r--r--src/table/lib.rs2
-rw-r--r--src/table/replication/fullcopy.rs4
-rw-r--r--src/table/replication/parameters.rs4
-rw-r--r--src/table/replication/sharded.rs4
-rw-r--r--src/table/schema.rs4
-rw-r--r--src/table/sync.rs46
-rw-r--r--src/table/table.rs46
-rw-r--r--src/util/data.rs4
-rw-r--r--src/util/error.rs28
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/web/error.rs5
35 files changed, 212 insertions, 216 deletions
diff --git a/src/api/error.rs b/src/api/error.rs
index eaa6c3a1..a49ba211 100644
--- a/src/api/error.rs
+++ b/src/api/error.rs
@@ -8,7 +8,6 @@ use garage_util::error::Error as GarageError;
use crate::encoding::*;
/// Errors of this crate
-#[allow(clippy::upper_case_acronyms)]
#[derive(Debug, Error)]
pub enum Error {
// Category: internal error
@@ -22,7 +21,7 @@ pub enum Error {
/// Error related to HTTP
#[error(display = "Internal error (HTTP error): {}", _0)]
- HTTP(#[error(source)] http::Error),
+ Http(#[error(source)] http::Error),
// Category: cannot process
/// No proper api key was used, or the signature was invalid
@@ -40,11 +39,11 @@ pub enum Error {
// Category: bad request
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
- InvalidUTF8Str(#[error(source)] std::str::Utf8Error),
+ InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
/// The request used an invalid path
#[error(display = "Invalid UTF-8: {}", _0)]
- InvalidUTF8String(#[error(source)] std::string::FromUtf8Error),
+ InvalidUtf8String(#[error(source)] std::string::FromUtf8Error),
/// Some base64 encoded data was badly encoded
#[error(display = "Invalid base64: {}", _0)]
@@ -52,7 +51,7 @@ pub enum Error {
/// The client sent invalid XML data
#[error(display = "Invalid XML: {}", _0)]
- InvalidXML(String),
+ InvalidXml(String),
/// The client sent a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
@@ -69,13 +68,13 @@ pub enum Error {
impl From<roxmltree::Error> for Error {
fn from(err: roxmltree::Error) -> Self {
- Self::InvalidXML(format!("{}", err))
+ Self::InvalidXml(format!("{}", err))
}
}
impl From<quick_xml::de::DeError> for Error {
fn from(err: quick_xml::de::DeError) -> Self {
- Self::InvalidXML(format!("{}", err))
+ Self::InvalidXml(format!("{}", err))
}
}
@@ -85,8 +84,8 @@ impl Error {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
Error::Forbidden(_) => StatusCode::FORBIDDEN,
- Error::InternalError(GarageError::RPC(_)) => StatusCode::SERVICE_UNAVAILABLE,
- Error::InternalError(_) | Error::Hyper(_) | Error::HTTP(_) => {
+ Error::InternalError(GarageError::Rpc(_)) => StatusCode::SERVICE_UNAVAILABLE,
+ Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
_ => StatusCode::BAD_REQUEST,
@@ -98,8 +97,8 @@ impl Error {
Error::NotFound => "NoSuchKey",
Error::Forbidden(_) => "AccessDenied",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
- Error::InternalError(GarageError::RPC(_)) => "ServiceUnavailable",
- Error::InternalError(_) | Error::Hyper(_) | Error::HTTP(_) => "InternalError",
+ Error::InternalError(GarageError::Rpc(_)) => "ServiceUnavailable",
+ Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError",
_ => "InvalidRequest",
}
}
diff --git a/src/api/s3_bucket.rs b/src/api/s3_bucket.rs
index d1a4425a..e7d89698 100644
--- a/src/api/s3_bucket.rs
+++ b/src/api/s3_bucket.rs
@@ -34,7 +34,7 @@ struct DisplayName {
pub body: String,
}
#[derive(Debug, Serialize, PartialEq)]
-struct ID {
+struct Id {
#[serde(rename = "$value")]
pub body: String,
}
@@ -43,7 +43,7 @@ struct Owner {
#[serde(rename = "DisplayName")]
display_name: DisplayName,
#[serde(rename = "ID")]
- id: ID,
+ id: Id,
}
#[derive(Debug, Serialize, PartialEq)]
struct BucketList {
@@ -80,7 +80,7 @@ pub fn handle_list_buckets(api_key: &Key) -> Result<Response<Body>, Error> {
display_name: DisplayName {
body: api_key.name.get().to_string(),
},
- id: ID {
+ id: Id {
body: api_key.key_id.to_string(),
},
},
diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs
index 85bb7692..9d2a67f5 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3_delete.rs
@@ -17,7 +17,7 @@ async fn handle_delete_internal(
garage: &Garage,
bucket: &str,
key: &str,
-) -> Result<(UUID, UUID), Error> {
+) -> Result<(Uuid, Uuid), Error> {
let object = garage
.object_table
.get(&bucket.to_string(), &key.to_string())
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index c39189c5..bb6cf579 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -305,7 +305,7 @@ impl BodyChunker {
}
}
-pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
+pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
.header("ETag", format!("\"{}\"", md5sum_hex))
@@ -633,14 +633,14 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
})
}
-fn decode_upload_id(id: &str) -> Result<UUID, Error> {
+fn decode_upload_id(id: &str) -> Result<Uuid, Error> {
let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?;
if id_bin.len() != 32 {
return None.ok_or_bad_request("Invalid upload ID");
}
let mut uuid = [0u8; 32];
uuid.copy_from_slice(&id_bin[..]);
- Ok(UUID::from(uuid))
+ Ok(Uuid::from(uuid))
}
#[derive(Debug)]
diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs
index dc27caae..f2d11bb3 100644
--- a/src/garage/admin_rpc.rs
+++ b/src/garage/admin_rpc.rs
@@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use garage_util::error::Error;
-use garage_table::crdt::CRDT;
+use garage_table::crdt::Crdt;
use garage_table::replication::*;
use garage_table::*;
@@ -61,7 +61,7 @@ impl AdminRpcHandler {
AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt).await,
AdminRpc::Stats(opt) => self2.handle_stats(opt).await,
- _ => Err(Error::BadRPC("Invalid RPC".to_string())),
+ _ => Err(Error::BadRpc("Invalid RPC".to_string())),
}
}
});
@@ -88,7 +88,7 @@ impl AdminRpcHandler {
let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? {
Some(mut bucket) => {
if !bucket.is_deleted() {
- return Err(Error::BadRPC(format!(
+ return Err(Error::BadRpc(format!(
"Bucket {} already exists",
query.name
)));
@@ -111,10 +111,10 @@ impl AdminRpcHandler {
.get_range(&query.name, None, Some(DeletedFilter::NotDeleted), 10)
.await?;
if !objects.is_empty() {
- return Err(Error::BadRPC(format!("Bucket {} is not empty", query.name)));
+ return Err(Error::BadRpc(format!("Bucket {} is not empty", query.name)));
}
if !query.yes {
- return Err(Error::BadRPC(
+ return Err(Error::BadRpc(
"Add --yes flag to really perform this operation".to_string(),
));
}
@@ -223,7 +223,7 @@ impl AdminRpcHandler {
KeyOperation::Delete(query) => {
let key = self.get_existing_key(&query.key_pattern).await?;
if !query.yes {
- return Err(Error::BadRPC(
+ return Err(Error::BadRpc(
"Add --yes flag to really perform this operation".to_string(),
));
}
@@ -265,7 +265,7 @@ impl AdminRpcHandler {
.await?
.filter(|b| !b.is_deleted())
.map(Ok)
- .unwrap_or_else(|| Err(Error::BadRPC(format!("Bucket {} does not exist", bucket))))
+ .unwrap_or_else(|| Err(Error::BadRpc(format!("Bucket {} does not exist", bucket))))
}
async fn get_existing_key(&self, pattern: &str) -> Result<Key, Error> {
@@ -342,7 +342,7 @@ impl AdminRpcHandler {
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
- return Err(Error::BadRPC(
+ return Err(Error::BadRpc(
"Please provide the --yes flag to initiate repair operations.".to_string(),
));
}
diff --git a/src/garage/cli.rs b/src/garage/cli.rs
index d281570b..bfe7e08e 100644
--- a/src/garage/cli.rs
+++ b/src/garage/cli.rs
@@ -5,7 +5,7 @@ use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
-use garage_util::data::UUID;
+use garage_util::data::Uuid;
use garage_util::error::Error;
use garage_util::time::*;
@@ -385,9 +385,9 @@ pub async fn cmd_status(
}
pub fn find_matching_node(
- cand: impl std::iter::Iterator<Item = UUID>,
+ cand: impl std::iter::Iterator<Item = Uuid>,
pattern: &str,
-) -> Result<UUID, Error> {
+) -> Result<Uuid, Error> {
let mut candidates = vec![];
for c in cand {
if hex::encode(&c).starts_with(&pattern) {
diff --git a/src/model/block.rs b/src/model/block.rs
index 1c9aa08b..348f0711 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -137,7 +137,7 @@ impl BlockManager {
Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
Message::GetBlock(h) => self.read_block(h).await,
Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply),
- _ => Err(Error::BadRPC("Unexpected RPC message".to_string())),
+ _ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
}
}
diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs
index c2d77fd3..f8f529c4 100644
--- a/src/model/block_ref_table.rs
+++ b/src/model/block_ref_table.rs
@@ -3,7 +3,7 @@ use std::sync::Arc;
use garage_util::data::*;
-use garage_table::crdt::CRDT;
+use garage_table::crdt::Crdt;
use garage_table::*;
use crate::block::*;
@@ -14,18 +14,18 @@ pub struct BlockRef {
pub block: Hash,
/// Id of the Version for the object containing this block, used as sorting key
- pub version: UUID,
+ pub version: Uuid,
// Keep track of deleted status
/// Is the Version that contains this block deleted
pub deleted: crdt::Bool,
}
-impl Entry<Hash, UUID> for BlockRef {
+impl Entry<Hash, Uuid> for BlockRef {
fn partition_key(&self) -> &Hash {
&self.block
}
- fn sort_key(&self) -> &UUID {
+ fn sort_key(&self) -> &Uuid {
&self.version
}
fn is_tombstone(&self) -> bool {
@@ -33,7 +33,7 @@ impl Entry<Hash, UUID> for BlockRef {
}
}
-impl CRDT for BlockRef {
+impl Crdt for BlockRef {
fn merge(&mut self, other: &Self) {
self.deleted.merge(&other.deleted);
}
@@ -45,7 +45,7 @@ pub struct BlockRefTable {
impl TableSchema for BlockRefTable {
type P = Hash;
- type S = UUID;
+ type S = Uuid;
type E = BlockRef;
type Filter = DeletedFilter;
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index 9f89dccc..168ed713 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
-use garage_table::crdt::CRDT;
+use garage_table::crdt::Crdt;
use garage_table::*;
use crate::key_table::PermissionSet;
@@ -15,7 +15,7 @@ pub struct Bucket {
/// Name of the bucket
pub name: String,
/// State, and configuration if not deleted, of the bucket
- pub state: crdt::LWW<BucketState>,
+ pub state: crdt::Lww<BucketState>,
}
/// State of a bucket
@@ -27,7 +27,7 @@ pub enum BucketState {
Present(BucketParams),
}
-impl CRDT for BucketState {
+impl Crdt for BucketState {
fn merge(&mut self, o: &Self) {
match o {
BucketState::Deleted => *self = BucketState::Deleted,
@@ -44,22 +44,22 @@ impl CRDT for BucketState {
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketParams {
/// Map of key with access to the bucket, and what kind of access they give
- pub authorized_keys: crdt::LWWMap<String, PermissionSet>,
+ pub authorized_keys: crdt::LwwMap<String, PermissionSet>,
/// Is the bucket served as http
- pub website: crdt::LWW<bool>,
+ pub website: crdt::Lww<bool>,
}
impl BucketParams {
/// Create an empty BucketParams with no authorized keys and no website accesss
pub fn new() -> Self {
BucketParams {
- authorized_keys: crdt::LWWMap::new(),
- website: crdt::LWW::new(false),
+ authorized_keys: crdt::LwwMap::new(),
+ website: crdt::Lww::new(false),
}
}
}
-impl CRDT for BucketParams {
+impl Crdt for BucketParams {
fn merge(&mut self, o: &Self) {
self.authorized_keys.merge(&o.authorized_keys);
self.website.merge(&o.website);
@@ -77,7 +77,7 @@ impl Bucket {
pub fn new(name: String) -> Self {
Bucket {
name,
- state: crdt::LWW::new(BucketState::Present(BucketParams::new())),
+ state: crdt::Lww::new(BucketState::Present(BucketParams::new())),
}
}
@@ -105,7 +105,7 @@ impl Entry<EmptyKey, String> for Bucket {
}
}
-impl CRDT for Bucket {
+impl Crdt for Bucket {
fn merge(&mut self, other: &Self) {
self.state.merge(&other.state);
}
diff --git a/src/model/key_table.rs b/src/model/key_table.rs
index ba1f6b81..a6186aa9 100644
--- a/src/model/key_table.rs
+++ b/src/model/key_table.rs
@@ -13,14 +13,14 @@ pub struct Key {
pub secret_key: String,
/// Name for the key
- pub name: crdt::LWW<String>,
+ pub name: crdt::Lww<String>,
/// Is the key deleted
pub deleted: crdt::Bool,
/// Buckets in which the key is authorized. Empty if `Key` is deleted
// CRDT interaction: deleted implies authorized_buckets is empty
- pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
+ pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
}
impl Key {
@@ -31,9 +31,9 @@ impl Key {
Self {
key_id,
secret_key,
- name: crdt::LWW::new(name),
+ name: crdt::Lww::new(name),
deleted: crdt::Bool::new(false),
- authorized_buckets: crdt::LWWMap::new(),
+ authorized_buckets: crdt::LwwMap::new(),
}
}
@@ -42,9 +42,9 @@ impl Key {
Self {
key_id: key_id.to_string(),
secret_key: secret_key.to_string(),
- name: crdt::LWW::new(name.to_string()),
+ name: crdt::Lww::new(name.to_string()),
deleted: crdt::Bool::new(false),
- authorized_buckets: crdt::LWWMap::new(),
+ authorized_buckets: crdt::LwwMap::new(),
}
}
@@ -53,9 +53,9 @@ impl Key {
Self {
key_id,
secret_key: "".into(),
- name: crdt::LWW::new("".to_string()),
+ name: crdt::Lww::new("".to_string()),
deleted: crdt::Bool::new(true),
- authorized_buckets: crdt::LWWMap::new(),
+ authorized_buckets: crdt::LwwMap::new(),
}
}
@@ -85,7 +85,7 @@ pub struct PermissionSet {
pub allow_write: bool,
}
-impl AutoCRDT for PermissionSet {
+impl AutoCrdt for PermissionSet {
const WARN_IF_DIFFERENT: bool = true;
}
@@ -98,7 +98,7 @@ impl Entry<EmptyKey, String> for Key {
}
}
-impl CRDT for Key {
+impl Crdt for Key {
fn merge(&mut self, other: &Self) {
self.name.merge(&other.name);
self.deleted.merge(&other.deleted);
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index f0473836..d743a2b6 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -64,7 +64,7 @@ impl Object {
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
- pub uuid: UUID,
+ pub uuid: Uuid,
/// Timestamp of when the object was created
pub timestamp: u64,
/// State of the version
@@ -82,7 +82,7 @@ pub enum ObjectVersionState {
Aborted,
}
-impl CRDT for ObjectVersionState {
+impl Crdt for ObjectVersionState {
fn merge(&mut self, other: &Self) {
use ObjectVersionState::*;
match other {
@@ -115,7 +115,7 @@ pub enum ObjectVersionData {
FirstBlock(ObjectVersionMeta, Hash),
}
-impl AutoCRDT for ObjectVersionData {
+impl AutoCrdt for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true;
}
@@ -140,7 +140,7 @@ pub struct ObjectVersionHeaders {
}
impl ObjectVersion {
- fn cmp_key(&self) -> (u64, UUID) {
+ fn cmp_key(&self) -> (u64, Uuid) {
(self.timestamp, self.uuid)
}
@@ -178,7 +178,7 @@ impl Entry<String, String> for Object {
}
}
-impl CRDT for Object {
+impl Crdt for Object {
fn merge(&mut self, other: &Self) {
// Merge versions from other into here
for other_v in other.versions.iter() {
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index dd088224..bff7d4bb 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -14,7 +14,7 @@ use crate::block_ref_table::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
/// UUID of the version, used as partition key
- pub uuid: UUID,
+ pub uuid: Uuid,
// Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags
@@ -35,7 +35,7 @@ pub struct Version {
}
impl Version {
- pub fn new(uuid: UUID, bucket: String, key: String, deleted: bool) -> Self {
+ pub fn new(uuid: Uuid, bucket: String, key: String, deleted: bool) -> Self {
Self {
uuid,
deleted: deleted.into(),
@@ -78,7 +78,7 @@ pub struct VersionBlock {
pub size: u64,
}
-impl AutoCRDT for VersionBlock {
+impl AutoCrdt for VersionBlock {
const WARN_IF_DIFFERENT: bool = true;
}
@@ -94,7 +94,7 @@ impl Entry<Hash, EmptyKey> for Version {
}
}
-impl CRDT for Version {
+impl Crdt for Version {
fn merge(&mut self, other: &Self) {
self.deleted.merge(&other.deleted);
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index e787833c..96561d0e 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -1,4 +1,3 @@
-#![allow(clippy::upper_case_acronyms)]
//! Crate containing rpc related functions and types used in Garage
#[macro_use]
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index ce4029f1..da7dcf8f 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -56,7 +56,7 @@ impl RpcMessage for Message {}
/// A ping, containing informations about status and config
#[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage {
- id: UUID,
+ id: Uuid,
rpc_port: u16,
status_hash: Hash,
@@ -69,7 +69,7 @@ pub struct PingMessage {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode {
/// Id of the node this advertisement relates to
- pub id: UUID,
+ pub id: Uuid,
/// IP and port of the node
pub addr: SocketAddr,
@@ -84,7 +84,7 @@ pub struct AdvertisedNode {
/// This node's membership manager
pub struct System {
/// The id of this node
- pub id: UUID,
+ pub id: Uuid,
persist_config: Persister<NetworkConfig>,
persist_status: Persister<Vec<AdvertisedNode>>,
@@ -114,7 +114,7 @@ struct Updaters {
#[derive(Debug, Clone)]
pub struct Status {
/// Mapping of each node id to its known status
- pub nodes: HashMap<UUID, Arc<StatusEntry>>,
+ pub nodes: HashMap<Uuid, Arc<StatusEntry>>,
/// Hash of `nodes`, used to detect when nodes have different views of the cluster
pub hash: Hash,
}
@@ -198,7 +198,7 @@ impl Status {
}
}
-fn gen_node_id(metadata_dir: &Path) -> Result<UUID, Error> {
+fn gen_node_id(metadata_dir: &Path) -> Result<Uuid, Error> {
let mut id_file = metadata_dir.to_path_buf();
id_file.push("node_id");
if id_file.as_path().exists() {
@@ -301,7 +301,7 @@ impl System {
Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await,
Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await,
- _ => Err(Error::BadRPC("Unexpected RPC message".to_string())),
+ _ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
}
}
});
@@ -369,7 +369,7 @@ impl System {
});
}
- async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
+ async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<Uuid>)>) {
let ping_msg = self.make_ping();
let ping_resps = join_all(peers.iter().map(|(addr, id_option)| {
let sys = self.clone();
@@ -640,7 +640,7 @@ impl System {
#[allow(clippy::manual_async_fn)]
fn pull_status(
self: Arc<Self>,
- peer: UUID,
+ peer: Uuid,
) -> impl futures::future::Future<Output = ()> + Send + 'static {
async move {
let resp = self
@@ -653,7 +653,7 @@ impl System {
}
}
- async fn pull_config(self: Arc<Self>, peer: UUID) {
+ async fn pull_config(self: Arc<Self>, peer: Uuid) {
let resp = self
.rpc_client
.call(peer, Message::PullConfig, PING_TIMEOUT)
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index d371bb64..0f94d0f6 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -32,7 +32,7 @@ pub const MAX_REPLICATION: usize = 3;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig {
/// Map of each node's id to it's configuration
- pub members: HashMap<UUID, NetworkConfigEntry>,
+ pub members: HashMap<Uuid, NetworkConfigEntry>,
/// Version of this config
pub version: u64,
}
@@ -73,7 +73,7 @@ pub struct RingEntry {
/// The prefix of the Hash of object which should use this entry
pub location: Hash,
/// The nodes in which a matching object should get stored
- pub nodes: [UUID; MAX_REPLICATION],
+ pub nodes: [Uuid; MAX_REPLICATION],
}
impl Ring {
@@ -92,7 +92,7 @@ impl Ring {
let n_datacenters = datacenters.len();
// Prepare ring
- let mut partitions: Vec<Vec<(&UUID, &NetworkConfigEntry)>> = partitions_idx
+ let mut partitions: Vec<Vec<(&Uuid, &NetworkConfigEntry)>> = partitions_idx
.iter()
.map(|_i| Vec::new())
.collect::<Vec<_>>();
@@ -180,7 +180,7 @@ impl Ring {
let top = (i as u16) << (16 - PARTITION_BITS);
let mut hash = [0u8; 32];
hash[0..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
- let nodes = nodes.iter().map(|(id, _info)| **id).collect::<Vec<UUID>>();
+ let nodes = nodes.iter().map(|(id, _info)| **id).collect::<Vec<Uuid>>();
RingEntry {
location: hash.into(),
nodes: nodes.try_into().unwrap(),
@@ -213,7 +213,7 @@ impl Ring {
// TODO rename this function as it no longer walk the ring
/// Walk the ring to find the n servers in which data should be replicated
- pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
+ pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<Uuid> {
if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!");
return vec![];
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index f68e4c03..5ed43d44 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -19,7 +19,7 @@ use tokio::sync::{watch, Semaphore};
use garage_util::background::BackgroundRunner;
use garage_util::config::TlsConfig;
use garage_util::data::*;
-use garage_util::error::{Error, RPCError};
+use garage_util::error::{Error, RpcError};
use crate::membership::Status;
use crate::rpc_server::RpcMessage;
@@ -70,7 +70,7 @@ pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>,
- local_handler: ArcSwapOption<(UUID, LocalHandlerFn<M>)>,
+ local_handler: ArcSwapOption<(Uuid, LocalHandlerFn<M>)>,
rpc_addr_client: RpcAddrClient<M>,
}
@@ -91,7 +91,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
/// Set the local handler, to process RPC to this node without network usage
- pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
+ pub fn set_local_handler<F, Fut>(&self, my_id: Uuid, handler: F)
where
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<M, Error>> + Send + 'static,
@@ -110,12 +110,12 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
/// Make a RPC call
- pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
+ pub async fn call(&self, to: Uuid, msg: M, timeout: Duration) -> Result<M, Error> {
self.call_arc(to, Arc::new(msg), timeout).await
}
/// Make a RPC call from a message stored in an Arc
- pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
+ pub async fn call_arc(&self, to: Uuid, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
if let Some(lh) = self.local_handler.load_full() {
let (my_id, local_handler) = lh.as_ref();
if to.borrow() == my_id {
@@ -128,7 +128,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
if node_status.is_up() {
node_status
} else {
- return Err(Error::from(RPCError::NodeDown(to)));
+ return Err(Error::from(RpcError::NodeDown(to)));
}
}
None => {
@@ -152,7 +152,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
/// Make a RPC call to multiple servers, returning a Vec containing each result
- pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
+ pub async fn call_many(&self, to: &[Uuid], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
let msg = Arc::new(msg);
let mut resp_stream = to
.iter()
@@ -170,7 +170,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
/// strategy could not be respected due to too many errors
pub async fn try_call_many(
self: &Arc<Self>,
- to: &[UUID],
+ to: &[Uuid],
msg: M,
strategy: RequestStrategy,
) -> Result<Vec<M>, Error> {
@@ -222,7 +222,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
Ok(results)
} else {
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- Err(Error::from(RPCError::TooManyErrors(errors)))
+ Err(Error::from(RpcError::TooManyErrors(errors)))
}
}
}
@@ -251,7 +251,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
to_addr: &SocketAddr,
msg: MB,
timeout: Duration,
- ) -> Result<Result<M, Error>, RPCError>
+ ) -> Result<Result<M, Error>, RpcError>
where
MB: Borrow<M>,
{
@@ -268,8 +268,8 @@ pub struct RpcHttpClient {
}
enum ClientMethod {
- HTTP(Client<HttpConnector, hyper::Body>),
- HTTPS(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>),
+ Http(Client<HttpConnector, hyper::Body>),
+ Https(Client<tls_util::HttpsConnectorFixedDnsname<HttpConnector>, hyper::Body>),
}
impl RpcHttpClient {
@@ -294,9 +294,9 @@ impl RpcHttpClient {
let connector =
tls_util::HttpsConnectorFixedDnsname::<HttpConnector>::new(config, "garage");
- ClientMethod::HTTPS(Client::builder().build(connector))
+ ClientMethod::Https(Client::builder().build(connector))
} else {
- ClientMethod::HTTP(Client::new())
+ ClientMethod::Http(Client::new())
};
Ok(RpcHttpClient {
method,
@@ -311,14 +311,14 @@ impl RpcHttpClient {
to_addr: &SocketAddr,
msg: MB,
timeout: Duration,
- ) -> Result<Result<M, Error>, RPCError>
+ ) -> Result<Result<M, Error>, RpcError>
where
MB: Borrow<M>,
M: RpcMessage,
{
let uri = match self.method {
- ClientMethod::HTTP(_) => format!("http://{}/{}", to_addr, path),
- ClientMethod::HTTPS(_) => format!("https://{}/{}", to_addr, path),
+ ClientMethod::Http(_) => format!("http://{}/{}", to_addr, path),
+ ClientMethod::Https(_) => format!("https://{}/{}", to_addr, path),
};
let req = Request::builder()
@@ -327,8 +327,8 @@ impl RpcHttpClient {
.body(Body::from(rmp_to_vec_all_named(msg.borrow())?))?;
let resp_fut = match &self.method {
- ClientMethod::HTTP(client) => client.request(req).fuse(),
- ClientMethod::HTTPS(client) => client.request(req).fuse(),
+ ClientMethod::Http(client) => client.request(req).fuse(),
+ ClientMethod::Https(client) => client.request(req).fuse(),
};
trace!("({}) Acquiring request_limiter slot...", path);
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 55d97170..81361ab9 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -77,7 +77,7 @@ where
let rep_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Err(err_str))?;
let mut err_response = Response::new(Body::from(rep_bytes));
*err_response.status_mut() = match e {
- Error::BadRPC(_) => StatusCode::BAD_REQUEST,
+ Error::BadRpc(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
warn!(
diff --git a/src/table/crdt/bool.rs b/src/table/crdt/bool.rs
index 1989c92e..53af8f82 100644
--- a/src/table/crdt/bool.rs
+++ b/src/table/crdt/bool.rs
@@ -27,7 +27,7 @@ impl From<bool> for Bool {
}
}
-impl CRDT for Bool {
+impl Crdt for Bool {
fn merge(&mut self, other: &Self) {
self.0 = self.0 || other.0;
}
diff --git a/src/table/crdt/crdt.rs b/src/table/crdt/crdt.rs
index 7abe8ba9..a8f1b9aa 100644
--- a/src/table/crdt/crdt.rs
+++ b/src/table/crdt/crdt.rs
@@ -18,7 +18,7 @@ use garage_util::data::*;
/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
/// as this would imply a cycle in the partial order.
-pub trait CRDT {
+pub trait Crdt {
/// Merge the two datastructures according to the CRDT rules.
/// `self` is modified to contain the merged CRDT value. `other` is not modified.
///
@@ -31,16 +31,16 @@ pub trait CRDT {
/// All types that implement `Ord` (a total order) can also implement a trivial CRDT
/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type
/// to enable this behavior.
-pub trait AutoCRDT: Ord + Clone + std::fmt::Debug {
+pub trait AutoCrdt: Ord + Clone + std::fmt::Debug {
/// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if
/// different values in your application should never happen. Set this to false
/// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`.
const WARN_IF_DIFFERENT: bool;
}
-impl<T> CRDT for T
+impl<T> Crdt for T
where
- T: AutoCRDT,
+ T: AutoCrdt,
{
fn merge(&mut self, other: &Self) {
if Self::WARN_IF_DIFFERENT && self != other {
@@ -58,14 +58,14 @@ where
}
}
-impl AutoCRDT for String {
+impl AutoCrdt for String {
const WARN_IF_DIFFERENT: bool = true;
}
-impl AutoCRDT for bool {
+impl AutoCrdt for bool {
const WARN_IF_DIFFERENT: bool = true;
}
-impl AutoCRDT for FixedBytes32 {
+impl AutoCrdt for FixedBytes32 {
const WARN_IF_DIFFERENT: bool = true;
}
diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs
index 3b1b2406..be197d88 100644
--- a/src/table/crdt/lww.rs
+++ b/src/table/crdt/lww.rs
@@ -36,14 +36,14 @@ use crate::crdt::crdt::*;
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in enterprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWW<T> {
+pub struct Lww<T> {
ts: u64,
v: T,
}
-impl<T> LWW<T>
+impl<T> Lww<T>
where
- T: CRDT,
+ T: Crdt,
{
/// Creates a new CRDT
///
@@ -99,9 +99,9 @@ where
}
}
-impl<T> CRDT for LWW<T>
+impl<T> Crdt for Lww<T>
where
- T: Clone + CRDT,
+ T: Clone + Crdt,
{
fn merge(&mut self, other: &Self) {
if other.ts > self.ts {
diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs
index 4ed26809..36bbf667 100644
--- a/src/table/crdt/lww_map.rs
+++ b/src/table/crdt/lww_map.rs
@@ -22,14 +22,14 @@ use crate::crdt::crdt::*;
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
/// actually not losing anything here.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
-pub struct LWWMap<K, V> {
+pub struct LwwMap<K, V> {
vals: Vec<(K, u64, V)>,
}
-impl<K, V> LWWMap<K, V>
+impl<K, V> LwwMap<K, V>
where
K: Ord,
- V: CRDT,
+ V: Crdt,
{
/// Create a new empty map CRDT
pub fn new() -> Self {
@@ -125,10 +125,10 @@ where
}
}
-impl<K, V> CRDT for LWWMap<K, V>
+impl<K, V> Crdt for LwwMap<K, V>
where
K: Clone + Ord,
- V: Clone + CRDT,
+ V: Clone + Crdt,
{
fn merge(&mut self, other: &Self) {
for (k, ts2, v2) in other.vals.iter() {
@@ -150,10 +150,10 @@ where
}
}
-impl<K, V> Default for LWWMap<K, V>
+impl<K, V> Default for LwwMap<K, V>
where
K: Ord,
- V: CRDT,
+ V: Crdt,
{
fn default() -> Self {
Self::new()
diff --git a/src/table/crdt/map.rs b/src/table/crdt/map.rs
index c4dd1613..e2aee40a 100644
--- a/src/table/crdt/map.rs
+++ b/src/table/crdt/map.rs
@@ -22,7 +22,7 @@ pub struct Map<K, V> {
impl<K, V> Map<K, V>
where
K: Clone + Ord,
- V: Clone + CRDT,
+ V: Clone + Crdt,
{
/// Create a new empty map CRDT
pub fn new() -> Self {
@@ -69,10 +69,10 @@ where
}
}
-impl<K, V> CRDT for Map<K, V>
+impl<K, V> Crdt for Map<K, V>
where
K: Clone + Ord,
- V: Clone + CRDT,
+ V: Clone + Crdt,
{
fn merge(&mut self, other: &Self) {
for (k, v2) in other.vals.iter() {
@@ -91,7 +91,7 @@ where
impl<K, V> Default for Map<K, V>
where
K: Clone + Ord,
- V: Clone + CRDT,
+ V: Clone + Crdt,
{
fn default() -> Self {
Self::new()
diff --git a/src/table/data.rs b/src/table/data.rs
index 542a8481..e7e85e65 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -11,7 +11,7 @@ use garage_util::error::*;
use garage_rpc::membership::System;
-use crate::crdt::CRDT;
+use crate::crdt::Crdt;
use crate::replication::*;
use crate::schema::*;
@@ -151,7 +151,7 @@ where
if Some(&new_entry) != old_entry.as_ref() {
let new_bytes = rmp_to_vec_all_named(&new_entry)
- .map_err(Error::RMPEncode)
+ .map_err(Error::RmpEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?;
let new_bytes_hash = blake2sum(&new_bytes[..]);
mkl_todo.insert(tree_key.clone(), new_bytes_hash.as_slice())?;
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 2dcbcaa0..73e08827 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -24,23 +24,23 @@ use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-pub struct TableGC<F: TableSchema, R: TableReplication> {
+pub struct TableGc<F: TableSchema, R: TableReplication> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
- rpc_client: Arc<RpcClient<GcRPC>>,
+ rpc_client: Arc<RpcClient<GcRpc>>,
}
#[derive(Serialize, Deserialize)]
-enum GcRPC {
+enum GcRpc {
Update(Vec<ByteBuf>),
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
Ok,
}
-impl RpcMessage for GcRPC {}
+impl RpcMessage for GcRpc {}
-impl<F, R> TableGC<F, R>
+impl<F, R> TableGc<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
@@ -51,7 +51,7 @@ where
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rpc_path = format!("table_{}/gc", data.name);
- let rpc_client = system.rpc_client::<GcRPC>(&rpc_path);
+ let rpc_client = system.rpc_client::<GcRpc>(&rpc_path);
let gc = Arc::new(Self {
system: system.clone(),
@@ -168,7 +168,7 @@ where
async fn try_send_and_delete(
&self,
- nodes: Vec<UUID>,
+ nodes: Vec<Uuid>,
items: Vec<(ByteBuf, Hash, ByteBuf)>,
) -> Result<(), Error> {
let n_items = items.len();
@@ -183,7 +183,7 @@ where
self.rpc_client
.try_call_many(
&nodes[..],
- GcRPC::Update(updates),
+ GcRpc::Update(updates),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
@@ -196,7 +196,7 @@ where
self.rpc_client
.try_call_many(
&nodes[..],
- GcRPC::DeleteIfEqualHash(deletes.clone()),
+ GcRpc::DeleteIfEqualHash(deletes.clone()),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_GC_RPC_TIMEOUT),
)
.await?;
@@ -221,7 +221,7 @@ where
fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
let self2 = self.clone();
- rpc_server.add_handler::<GcRPC, _, _>(path, move |msg, _addr| {
+ rpc_server.add_handler::<GcRpc, _, _>(path, move |msg, _addr| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
@@ -234,18 +234,18 @@ where
});
}
- async fn handle_rpc(self: &Arc<Self>, message: &GcRPC) -> Result<GcRPC, Error> {
+ async fn handle_rpc(self: &Arc<Self>, message: &GcRpc) -> Result<GcRpc, Error> {
match message {
- GcRPC::Update(items) => {
+ GcRpc::Update(items) => {
self.data.update_many(items)?;
- Ok(GcRPC::Ok)
+ Ok(GcRpc::Ok)
}
- GcRPC::DeleteIfEqualHash(items) => {
+ GcRpc::DeleteIfEqualHash(items) => {
for (key, vhash) in items.iter() {
self.data.delete_if_equal_hash(&key[..], *vhash)?;
self.todo_remove_if_equal(&key[..], *vhash)?;
}
- Ok(GcRPC::Ok)
+ Ok(GcRpc::Ok)
}
_ => Err(Error::Message("Unexpected GC RPC".to_string())),
}
diff --git a/src/table/lib.rs b/src/table/lib.rs
index 7b5d0512..53d2c93b 100644
--- a/src/table/lib.rs
+++ b/src/table/lib.rs
@@ -1,5 +1,5 @@
#![recursion_limit = "1024"]
-#![allow(clippy::comparison_chain, clippy::upper_case_acronyms)]
+#![allow(clippy::comparison_chain)]
#[macro_use]
extern crate log;
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index a6b4c98c..3ce7c0bf 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -19,14 +19,14 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
- fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
+ fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
fn read_quorum(&self) -> usize {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<UUID> {
+ fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
let ring = self.system.ring.borrow();
ring.config.members.keys().cloned().collect::<Vec<_>>()
}
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index c2c78c8b..64996828 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -8,12 +8,12 @@ pub trait TableReplication: Send + Sync {
// To understand various replication methods
/// Which nodes to send read requests to
- fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
+ fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
+ fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a write succesfull
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index f2d89729..93b95a38 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -25,7 +25,7 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
- fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
+ fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor)
}
@@ -33,7 +33,7 @@ impl TableReplication for TableShardedReplication {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<UUID> {
+ fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
let ring = self.system.ring.borrow();
ring.walk_ring(&hash, self.replication_factor)
}
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 74611749..4d6050e8 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*;
-use crate::crdt::CRDT;
+use crate::crdt::Crdt;
/// Trait for field used to partition data
pub trait PartitionKey {
@@ -42,7 +42,7 @@ impl SortKey for Hash {
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>:
- CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
+ Crdt + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
/// Get the key used to partition
fn partition_key(&self) -> &P;
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 33b01455..a3afbbba 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -34,11 +34,11 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> {
merkle: Arc<MerkleUpdater<F, R>>,
todo: Mutex<SyncTodo>,
- rpc_client: Arc<RpcClient<SyncRPC>>,
+ rpc_client: Arc<RpcClient<SyncRpc>>,
}
#[derive(Serialize, Deserialize)]
-pub(crate) enum SyncRPC {
+pub(crate) enum SyncRpc {
RootCkHash(Partition, Hash),
RootCkDifferent(bool),
GetNode(MerkleNodeKey),
@@ -47,7 +47,7 @@ pub(crate) enum SyncRPC {
Ok,
}
-impl RpcMessage for SyncRPC {}
+impl RpcMessage for SyncRpc {}
struct SyncTodo {
todo: Vec<TodoPartition>,
@@ -75,7 +75,7 @@ where
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rpc_path = format!("table_{}/sync", data.name);
- let rpc_client = system.rpc_client::<SyncRPC>(&rpc_path);
+ let rpc_client = system.rpc_client::<SyncRpc>(&rpc_path);
let todo = SyncTodo { todo: vec![] };
@@ -114,7 +114,7 @@ where
fn register_handler(self: &Arc<Self>, rpc_server: &mut RpcServer, path: String) {
let self2 = self.clone();
- rpc_server.add_handler::<SyncRPC, _, _>(path, move |msg, _addr| {
+ rpc_server.add_handler::<SyncRpc, _, _>(path, move |msg, _addr| {
let self2 = self2.clone();
async move { self2.handle_rpc(&msg).await }
});
@@ -317,14 +317,14 @@ where
async fn offload_items(
self: &Arc<Self>,
items: &[(Vec<u8>, Arc<ByteBuf>)],
- nodes: &[UUID],
+ nodes: &[Uuid],
) -> Result<(), Error> {
let values = items.iter().map(|(_k, v)| v.clone()).collect::<Vec<_>>();
self.rpc_client
.try_call_many(
nodes,
- SyncRPC::Items(values),
+ SyncRpc::Items(values),
RequestStrategy::with_quorum(nodes.len()).with_timeout(TABLE_SYNC_RPC_TIMEOUT),
)
.await?;
@@ -362,7 +362,7 @@ where
async fn do_sync_with(
self: Arc<Self>,
partition: TodoPartition,
- who: UUID,
+ who: Uuid,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?;
@@ -381,20 +381,20 @@ where
.rpc_client
.call(
who,
- SyncRPC::RootCkHash(partition.partition, root_ck_hash),
+ SyncRpc::RootCkHash(partition.partition, root_ck_hash),
TABLE_SYNC_RPC_TIMEOUT,
)
.await?;
let mut todo = match root_resp {
- SyncRPC::RootCkDifferent(false) => {
+ SyncRpc::RootCkDifferent(false) => {
debug!(
"({}) Sync {:?} with {:?}: no difference",
self.data.name, partition, who
);
return Ok(());
}
- SyncRPC::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]),
+ SyncRpc::RootCkDifferent(true) => VecDeque::from(vec![root_ck_key]),
x => {
return Err(Error::Message(format!(
"Invalid respone to RootCkHash RPC: {}",
@@ -431,10 +431,10 @@ where
// and compare it with local node
let remote_node = match self
.rpc_client
- .call(who, SyncRPC::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
+ .call(who, SyncRpc::GetNode(key.clone()), TABLE_SYNC_RPC_TIMEOUT)
.await?
{
- SyncRPC::Node(_, node) => node,
+ SyncRpc::Node(_, node) => node,
x => {
return Err(Error::Message(format!(
"Invalid respone to GetNode RPC: {}",
@@ -478,7 +478,7 @@ where
Ok(())
}
- async fn send_items(&self, who: UUID, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
+ async fn send_items(&self, who: Uuid, item_value_list: Vec<Vec<u8>>) -> Result<(), Error> {
info!(
"({}) Sending {} items to {:?}",
self.data.name,
@@ -493,9 +493,9 @@ where
let rpc_resp = self
.rpc_client
- .call(who, SyncRPC::Items(values), TABLE_SYNC_RPC_TIMEOUT)
+ .call(who, SyncRpc::Items(values), TABLE_SYNC_RPC_TIMEOUT)
.await?;
- if let SyncRPC::Ok = rpc_resp {
+ if let SyncRpc::Ok = rpc_resp {
Ok(())
} else {
Err(Error::Message(format!(
@@ -507,20 +507,20 @@ where
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
- async fn handle_rpc(self: &Arc<Self>, message: &SyncRPC) -> Result<SyncRPC, Error> {
+ async fn handle_rpc(self: &Arc<Self>, message: &SyncRpc) -> Result<SyncRpc, Error> {
match message {
- SyncRPC::RootCkHash(range, h) => {
+ SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
let hash = hash_of::<MerkleNode>(&root_ck)?;
- Ok(SyncRPC::RootCkDifferent(hash != *h))
+ Ok(SyncRpc::RootCkDifferent(hash != *h))
}
- SyncRPC::GetNode(k) => {
+ SyncRpc::GetNode(k) => {
let node = self.merkle.read_node(&k)?;
- Ok(SyncRPC::Node(k.clone(), node))
+ Ok(SyncRpc::Node(k.clone(), node))
}
- SyncRPC::Items(items) => {
+ SyncRpc::Items(items) => {
self.data.update_many(items)?;
- Ok(SyncRPC::Ok)
+ Ok(SyncRpc::Ok)
}
_ => Err(Error::Message("Unexpected sync RPC".to_string())),
}
diff --git a/src/table/table.rs b/src/table/table.rs
index 833d5771..eb9bd25c 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -13,7 +13,7 @@ use garage_rpc::membership::System;
use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*;
-use crate::crdt::CRDT;
+use crate::crdt::Crdt;
use crate::data::*;
use crate::gc::*;
use crate::merkle::*;
@@ -28,11 +28,11 @@ pub struct Table<F: TableSchema, R: TableReplication> {
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
- rpc_client: Arc<RpcClient<TableRPC<F>>>,
+ rpc_client: Arc<RpcClient<TableRpc<F>>>,
}
#[derive(Serialize, Deserialize)]
-pub(crate) enum TableRPC<F: TableSchema> {
+pub(crate) enum TableRpc<F: TableSchema> {
Ok,
ReadEntry(F::P, F::S),
@@ -44,7 +44,7 @@ pub(crate) enum TableRPC<F: TableSchema> {
Update(Vec<Arc<ByteBuf>>),
}
-impl<F: TableSchema> RpcMessage for TableRPC<F> {}
+impl<F: TableSchema> RpcMessage for TableRpc<F> {}
impl<F, R> Table<F, R>
where
@@ -62,7 +62,7 @@ where
rpc_server: &mut RpcServer,
) -> Arc<Self> {
let rpc_path = format!("table_{}", name);
- let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
+ let rpc_client = system.rpc_client::<TableRpc<F>>(&rpc_path);
let data = TableData::new(system.clone(), name, instance, replication, db);
@@ -74,7 +74,7 @@ where
merkle_updater.clone(),
rpc_server,
);
- TableGC::launch(system.clone(), data.clone(), rpc_server);
+ TableGc::launch(system.clone(), data.clone(), rpc_server);
let table = Arc::new(Self {
system,
@@ -95,7 +95,7 @@ where
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
- let rpc = TableRPC::<F>::Update(vec![e_enc]);
+ let rpc = TableRpc::<F>::Update(vec![e_enc]);
self.rpc_client
.try_call_many(
@@ -121,7 +121,7 @@ where
}
let call_futures = call_list.drain().map(|(node, entries)| async move {
- let rpc = TableRPC::<F>::Update(entries);
+ let rpc = TableRpc::<F>::Update(entries);
let resp = self.rpc_client.call(node, rpc, TABLE_RPC_TIMEOUT).await?;
Ok::<_, Error>((node, resp))
@@ -150,7 +150,7 @@ where
let who = self.data.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who);
- let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
+ let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
.rpc_client
.try_call_many(
@@ -165,7 +165,7 @@ where
let mut ret = None;
let mut not_all_same = false;
for resp in resps {
- if let TableRPC::ReadEntryResponse(value) = resp {
+ if let TableRpc::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value {
let v = self.data.decode_entry(v_bytes.as_slice())?;
ret = match ret {
@@ -205,7 +205,7 @@ where
let hash = partition_key.hash();
let who = self.data.replication.read_nodes(&hash);
- let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
+ let rpc = TableRpc::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);
let resps = self
.rpc_client
@@ -221,7 +221,7 @@ where
let mut ret = BTreeMap::new();
let mut to_repair = BTreeMap::new();
for resp in resps {
- if let TableRPC::Update(entries) = resp {
+ if let TableRpc::Update(entries) = resp {
for entry_bytes in entries.iter() {
let entry = self.data.decode_entry(entry_bytes.as_slice())?;
let entry_key = self.data.tree_key(entry.partition_key(), entry.sort_key());
@@ -261,12 +261,12 @@ where
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
- async fn repair_on_read(&self, who: &[UUID], what: F::E) -> Result<(), Error> {
+ async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?));
self.rpc_client
.try_call_many(
who,
- TableRPC::<F>::Update(vec![what_enc]),
+ TableRpc::<F>::Update(vec![what_enc]),
RequestStrategy::with_quorum(who.len()).with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -277,7 +277,7 @@ where
fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
let self2 = self.clone();
- rpc_server.add_handler::<TableRPC<F>, _, _>(path, move |msg, _addr| {
+ rpc_server.add_handler::<TableRpc<F>, _, _>(path, move |msg, _addr| {
let self2 = self2.clone();
async move { self2.handle(&msg).await }
});
@@ -290,21 +290,21 @@ where
});
}
- async fn handle(self: &Arc<Self>, msg: &TableRPC<F>) -> Result<TableRPC<F>, Error> {
+ async fn handle(self: &Arc<Self>, msg: &TableRpc<F>) -> Result<TableRpc<F>, Error> {
match msg {
- TableRPC::ReadEntry(key, sort_key) => {
+ TableRpc::ReadEntry(key, sort_key) => {
let value = self.data.read_entry(key, sort_key)?;
- Ok(TableRPC::ReadEntryResponse(value))
+ Ok(TableRpc::ReadEntryResponse(value))
}
- TableRPC::ReadRange(key, begin_sort_key, filter, limit) => {
+ TableRpc::ReadRange(key, begin_sort_key, filter, limit) => {
let values = self.data.read_range(key, begin_sort_key, filter, *limit)?;
- Ok(TableRPC::Update(values))
+ Ok(TableRpc::Update(values))
}
- TableRPC::Update(pairs) => {
+ TableRpc::Update(pairs) => {
self.data.update_many(pairs)?;
- Ok(TableRPC::Ok)
+ Ok(TableRpc::Ok)
}
- _ => Err(Error::BadRPC("Unexpected table RPC".to_string())),
+ _ => Err(Error::BadRpc("Unexpected table RPC".to_string())),
}
}
}
diff --git a/src/util/data.rs b/src/util/data.rs
index 56c7ab56..6df51cd0 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -88,7 +88,7 @@ impl FixedBytes32 {
}
/// A 32 bytes UUID
-pub type UUID = FixedBytes32;
+pub type Uuid = FixedBytes32;
/// A 256 bit cryptographic hash, can be sha256 or blake2 depending on provenance
pub type Hash = FixedBytes32;
@@ -127,7 +127,7 @@ pub fn fasthash(data: &[u8]) -> FastHash {
}
/// Generate a random 32 bytes UUID
-pub fn gen_uuid() -> UUID {
+pub fn gen_uuid() -> Uuid {
rand::thread_rng().gen::<[u8; 32]>().into()
}
diff --git a/src/util/error.rs b/src/util/error.rs
index 2b862269..c3d84e63 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -7,24 +7,24 @@ use crate::data::*;
/// RPC related errors
#[derive(Debug, Error)]
-pub enum RPCError {
+pub enum RpcError {
#[error(display = "Node is down: {:?}.", _0)]
- NodeDown(UUID),
+ NodeDown(Uuid),
#[error(display = "Timeout: {}", _0)]
Timeout(#[error(source)] tokio::time::error::Elapsed),
#[error(display = "HTTP error: {}", _0)]
- HTTP(#[error(source)] http::Error),
+ Http(#[error(source)] http::Error),
#[error(display = "Hyper error: {}", _0)]
Hyper(#[error(source)] hyper::Error),
#[error(display = "Messagepack encode error: {}", _0)]
- RMPEncode(#[error(source)] rmp_serde::encode::Error),
+ RmpEncode(#[error(source)] rmp_serde::encode::Error),
#[error(display = "Messagepack decode error: {}", _0)]
- RMPDecode(#[error(source)] rmp_serde::decode::Error),
+ RmpDecode(#[error(source)] rmp_serde::decode::Error),
#[error(display = "Too many errors: {:?}", _0)]
TooManyErrors(Vec<String>),
@@ -40,26 +40,26 @@ pub enum Error {
Hyper(#[error(source)] hyper::Error),
#[error(display = "HTTP error: {}", _0)]
- HTTP(#[error(source)] http::Error),
+ Http(#[error(source)] http::Error),
#[error(display = "Invalid HTTP header value: {}", _0)]
- HTTPHeader(#[error(source)] http::header::ToStrError),
+ HttpHeader(#[error(source)] http::header::ToStrError),
#[error(display = "TLS error: {}", _0)]
- TLS(#[error(source)] rustls::TLSError),
+ Tls(#[error(source)] rustls::TLSError),
#[error(display = "PKI error: {}", _0)]
- PKI(#[error(source)] webpki::Error),
+ Pki(#[error(source)] webpki::Error),
#[error(display = "Sled error: {}", _0)]
Sled(#[error(source)] sled::Error),
#[error(display = "Messagepack encode error: {}", _0)]
- RMPEncode(#[error(source)] rmp_serde::encode::Error),
+ RmpEncode(#[error(source)] rmp_serde::encode::Error),
#[error(display = "Messagepack decode error: {}", _0)]
- RMPDecode(#[error(source)] rmp_serde::decode::Error),
+ RmpDecode(#[error(source)] rmp_serde::decode::Error),
#[error(display = "JSON error: {}", _0)]
- JSON(#[error(source)] serde_json::error::Error),
+ Json(#[error(source)] serde_json::error::Error),
#[error(display = "TOML decode error: {}", _0)]
TomlDecode(#[error(source)] toml::de::Error),
@@ -67,13 +67,13 @@ pub enum Error {
TokioJoin(#[error(source)] tokio::task::JoinError),
#[error(display = "RPC call error: {}", _0)]
- RPC(#[error(source)] RPCError),
+ Rpc(#[error(source)] RpcError),
#[error(display = "Remote error: {} (status code {})", _0, _1)]
RemoteError(String, StatusCode),
#[error(display = "Bad RPC: {}", _0)]
- BadRPC(String),
+ BadRpc(String),
#[error(display = "Corrupt data: does not match hash {:?}", _0)]
CorruptData(Hash),
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 15d020cc..c080e3a3 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -1,4 +1,3 @@
-#![allow(clippy::upper_case_acronyms)]
//! Crate containing common functions and types used in Garage
#[macro_use]
diff --git a/src/web/error.rs b/src/web/error.rs
index dc934015..08717ce1 100644
--- a/src/web/error.rs
+++ b/src/web/error.rs
@@ -4,7 +4,6 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError;
/// Errors of this crate
-#[allow(clippy::upper_case_acronyms)]
#[derive(Debug, Error)]
pub enum Error {
/// An error received from the API crate
@@ -22,7 +21,7 @@ pub enum Error {
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
- InvalidUTF8(#[error(source)] std::str::Utf8Error),
+ InvalidUtf8(#[error(source)] std::str::Utf8Error),
/// The client send a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
@@ -39,7 +38,7 @@ impl Error {
match self {
Error::NotFound => StatusCode::NOT_FOUND,
Error::ApiError(e) => e.http_status_code(),
- Error::InternalError(GarageError::RPC(_)) => StatusCode::SERVICE_UNAVAILABLE,
+ Error::InternalError(GarageError::Rpc(_)) => StatusCode::SERVICE_UNAVAILABLE,
Error::InternalError(_) => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::BAD_REQUEST,
}