aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/garage/Cargo.toml3
-rw-r--r--src/garage/server.rs23
-rw-r--r--src/model/Cargo.toml2
-rw-r--r--src/model/bucket_table.rs1
-rw-r--r--src/rpc/rpc_client.rs3
-rw-r--r--src/rpc/rpc_server.rs17
-rw-r--r--src/table/Cargo.toml2
-rw-r--r--src/table/table.rs10
-rw-r--r--src/table/table_sync.rs6
-rw-r--r--src/util/Cargo.toml2
-rw-r--r--src/util/error.rs8
11 files changed, 58 insertions, 19 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 254645c7..03bc472d 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -28,7 +28,8 @@ sha2 = "0.8"
log = "0.4"
pretty_env_logger = "0.4"
-sled = "0.31"
+sled = "0.34"
+old_sled = { package = "sled", version = "0.31" }
structopt = { version = "0.3", default-features = false }
toml = "0.5"
diff --git a/src/garage/server.rs b/src/garage/server.rs
index ec78c067..2e109f8b 100644
--- a/src/garage/server.rs
+++ b/src/garage/server.rs
@@ -40,7 +40,28 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Opening database...");
let mut db_path = config.metadata_dir.clone();
db_path.push("db");
- let db = sled::open(db_path).expect("Unable to open DB");
+ let db = match sled::open(&db_path) {
+ Ok(db) => db,
+ Err(e) => {
+ warn!("Old DB could not be openned ({}), attempting migration.", e);
+ let old = old_sled::open(&db_path).expect("Unable to open old DB for migration");
+ let mut new_path = config.metadata_dir.clone();
+ new_path.push("db2");
+ let new = sled::open(&new_path).expect("Unable to open new DB for migration");
+ new.import(old.export());
+ if old.checksum().expect("unable to compute old db checksum")
+ != new.checksum().expect("unable to compute new db checksum")
+ {
+ panic!("db checksums don't match after migration");
+ }
+ drop(new);
+ drop(old);
+ std::fs::remove_dir_all(&db_path).expect("Cannot remove old DB folder");
+ std::fs::rename(new_path, &db_path)
+ .expect("Cannot move new DB folder to correct place");
+ sled::open(db_path).expect("Unable to open new DB after migration")
+ }
+ };
info!("Initialize RPC server...");
let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone());
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index 2c779d0a..48b75d24 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -25,7 +25,7 @@ sha2 = "0.8"
arc-swap = "0.4"
log = "0.4"
-sled = "0.31"
+sled = "0.34"
rmp-serde = "0.14.3"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs
index af42d551..2878aa38 100644
--- a/src/model/bucket_table.rs
+++ b/src/model/bucket_table.rs
@@ -3,7 +3,6 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::CRDT;
use garage_table::*;
-
use crate::key_table::PermissionSet;
// We import the same file but in its version 0.1.0.
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index 5183bb4b..70384391 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -310,7 +310,9 @@ impl RpcHttpClient {
ClientMethod::HTTPS(client) => client.request(req).fuse(),
};
+ trace!("({}) Acquiring request_limiter slot...", path);
let slot = self.request_limiter.acquire().await;
+ trace!("({}) Got slot, doing request to {}...", path, to_addr);
let resp = tokio::time::timeout(timeout, resp_fut)
.await
.map_err(|e| {
@@ -330,6 +332,7 @@ impl RpcHttpClient {
})?;
let status = resp.status();
+ trace!("({}) Request returned, got status {}", path, status);
let body = hyper::body::to_bytes(resp.into_body()).await?;
drop(slot);
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 4113f15b..1c6bc8d2 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -48,6 +48,12 @@ where
let begin_time = Instant::now();
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?;
+
+ trace!(
+ "Request message: {}",
+ serde_json::to_string(&msg).unwrap_or("<json error>".into())
+ );
+
match handler(msg, sockaddr).await {
Ok(resp) => {
let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?;
@@ -112,7 +118,8 @@ impl RpcServer {
return Ok(bad_request);
}
- let path = &req.uri().path()[1..];
+ let path = &req.uri().path()[1..].to_string();
+
let handler = match self.handlers.get(path) {
Some(h) => h,
None => {
@@ -122,6 +129,8 @@ impl RpcServer {
}
};
+ trace!("({}) Handling request", path);
+
let resp_waiter = tokio::spawn(handler(req, addr));
match resp_waiter.await {
Err(err) => {
@@ -131,11 +140,15 @@ impl RpcServer {
Ok(ise)
}
Ok(Err(err)) => {
+ trace!("({}) Request handler failed: {}", path, err);
let mut bad_request = Response::new(Body::from(format!("{}", err)));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
Ok(bad_request)
}
- Ok(Ok(resp)) => Ok(resp),
+ Ok(Ok(resp)) => {
+ trace!("({}) Request handler succeeded", path);
+ Ok(resp)
+ }
}
}
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 483c386c..6485a542 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -23,7 +23,7 @@ arc-swap = "0.4"
log = "0.4"
hexdump = "0.1"
-sled = "0.31"
+sled = "0.34"
rmp-serde = "0.14.3"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
diff --git a/src/table/table.rs b/src/table/table.rs
index 737ed589..8389c29f 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -394,7 +394,7 @@ where
Some(prev_bytes) => {
let old_entry = self
.decode_entry(&prev_bytes)
- .map_err(sled::ConflictableTransactionError::Abort)?;
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
let mut new_entry = old_entry.clone();
new_entry.merge(&update);
(Some(old_entry), new_entry)
@@ -404,7 +404,7 @@ where
let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RMPEncode)
- .map_err(sled::ConflictableTransactionError::Abort)?;
+ .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
db.insert(tree_key.clone(), new_bytes)?;
Ok((old_entry, new_entry))
})?;
@@ -429,11 +429,7 @@ where
Ok(())
}
- pub(crate) fn delete_if_equal(
- self: &Arc<Self>,
- k: &[u8],
- v: &[u8],
- ) -> Result<bool, Error> {
+ pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
let removed = self.store.transaction(|txn| {
if let Some(cur_v) = self.store.get(k)? {
if cur_v == v {
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
index b81dad86..58391274 100644
--- a/src/table/table_sync.rs
+++ b/src/table/table_sync.rs
@@ -385,6 +385,7 @@ where
must_exit: &mut watch::Receiver<bool>,
) -> Result<RangeChecksum, Error> {
assert!(range.level != 0);
+ trace!("Call range_checksum {:?}", range);
if range.level == 1 {
let mut children = vec![];
@@ -400,6 +401,7 @@ where
.iter()
.all(|x| *x == 0u8)
{
+ trace!("range_checksum {:?} returning {} items", range, children.len());
return Ok(RangeChecksum {
bounds: range.clone(),
children,
@@ -414,6 +416,7 @@ where
};
children.push((item_range, blake2sum(&value[..])));
}
+ trace!("range_checksum {:?} returning {} items", range, children.len());
Ok(RangeChecksum {
bounds: range.clone(),
children,
@@ -439,6 +442,7 @@ where
}
if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
+ trace!("range_checksum {:?} returning {} items", range, children.len());
return Ok(RangeChecksum {
bounds: range.clone(),
children,
@@ -453,6 +457,7 @@ where
.iter()
.all(|x| *x == 0u8)
{
+ trace!("range_checksum {:?} returning {} items", range, children.len());
return Ok(RangeChecksum {
bounds: range.clone(),
children,
@@ -463,6 +468,7 @@ where
sub_range.begin = found_limit;
}
+ trace!("range_checksum {:?} exiting due to must_exit", range);
Err(Error::Message(format!("Exiting.")))
}
}
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 93115843..35130c96 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -21,7 +21,7 @@ err-derive = "0.2.3"
log = "0.4"
fasthash = "0.4"
-sled = "0.31"
+sled = "0.34"
toml = "0.5"
rmp-serde = "0.14.3"
diff --git a/src/util/error.rs b/src/util/error.rs
index e5dcf654..dbf71ac1 100644
--- a/src/util/error.rs
+++ b/src/util/error.rs
@@ -73,11 +73,11 @@ pub enum Error {
Message(String),
}
-impl From<sled::TransactionError<Error>> for Error {
- fn from(e: sled::TransactionError<Error>) -> Error {
+impl From<sled::transaction::TransactionError<Error>> for Error {
+ fn from(e: sled::transaction::TransactionError<Error>) -> Error {
match e {
- sled::TransactionError::Abort(x) => x,
- sled::TransactionError::Storage(x) => Error::Sled(x),
+ sled::transaction::TransactionError::Abort(x) => x,
+ sled::transaction::TransactionError::Storage(x) => Error::Sled(x),
}
}
}