diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/garage/Cargo.toml | 3 | ||||
-rw-r--r-- | src/garage/server.rs | 23 | ||||
-rw-r--r-- | src/model/Cargo.toml | 2 | ||||
-rw-r--r-- | src/model/bucket_table.rs | 1 | ||||
-rw-r--r-- | src/rpc/rpc_client.rs | 3 | ||||
-rw-r--r-- | src/rpc/rpc_server.rs | 17 | ||||
-rw-r--r-- | src/table/Cargo.toml | 2 | ||||
-rw-r--r-- | src/table/table.rs | 10 | ||||
-rw-r--r-- | src/table/table_sync.rs | 6 | ||||
-rw-r--r-- | src/util/Cargo.toml | 2 | ||||
-rw-r--r-- | src/util/error.rs | 8 |
11 files changed, 58 insertions, 19 deletions
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 254645c7..03bc472d 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -28,7 +28,8 @@ sha2 = "0.8" log = "0.4" pretty_env_logger = "0.4" -sled = "0.31" +sled = "0.34" +old_sled = { package = "sled", version = "0.31" } structopt = { version = "0.3", default-features = false } toml = "0.5" diff --git a/src/garage/server.rs b/src/garage/server.rs index ec78c067..2e109f8b 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -40,7 +40,28 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Opening database..."); let mut db_path = config.metadata_dir.clone(); db_path.push("db"); - let db = sled::open(db_path).expect("Unable to open DB"); + let db = match sled::open(&db_path) { + Ok(db) => db, + Err(e) => { + warn!("Old DB could not be openned ({}), attempting migration.", e); + let old = old_sled::open(&db_path).expect("Unable to open old DB for migration"); + let mut new_path = config.metadata_dir.clone(); + new_path.push("db2"); + let new = sled::open(&new_path).expect("Unable to open new DB for migration"); + new.import(old.export()); + if old.checksum().expect("unable to compute old db checksum") + != new.checksum().expect("unable to compute new db checksum") + { + panic!("db checksums don't match after migration"); + } + drop(new); + drop(old); + std::fs::remove_dir_all(&db_path).expect("Cannot remove old DB folder"); + std::fs::rename(new_path, &db_path) + .expect("Cannot move new DB folder to correct place"); + sled::open(db_path).expect("Unable to open new DB after migration") + } + }; info!("Initialize RPC server..."); let mut rpc_server = RpcServer::new(config.rpc_bind_addr.clone(), config.rpc_tls.clone()); diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 2c779d0a..48b75d24 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -25,7 +25,7 @@ sha2 = "0.8" arc-swap = "0.4" log = "0.4" -sled = "0.31" +sled = "0.34" rmp-serde = "0.14.3" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index af42d551..2878aa38 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -3,7 +3,6 @@ use serde::{Deserialize, Serialize}; use garage_table::crdt::CRDT; use garage_table::*; - use crate::key_table::PermissionSet; // We import the same file but in its version 0.1.0. diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index 5183bb4b..70384391 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -310,7 +310,9 @@ impl RpcHttpClient { ClientMethod::HTTPS(client) => client.request(req).fuse(), }; + trace!("({}) Acquiring request_limiter slot...", path); let slot = self.request_limiter.acquire().await; + trace!("({}) Got slot, doing request to {}...", path, to_addr); let resp = tokio::time::timeout(timeout, resp_fut) .await .map_err(|e| { @@ -330,6 +332,7 @@ impl RpcHttpClient { })?; let status = resp.status(); + trace!("({}) Request returned, got status {}", path, status); let body = hyper::body::to_bytes(resp.into_body()).await?; drop(slot); diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 4113f15b..1c6bc8d2 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -48,6 +48,12 @@ where let begin_time = Instant::now(); let whole_body = hyper::body::to_bytes(req.into_body()).await?; let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?; + + trace!( + "Request message: {}", + serde_json::to_string(&msg).unwrap_or("<json error>".into()) + ); + match handler(msg, sockaddr).await { Ok(resp) => { let resp_bytes = rmp_to_vec_all_named::<Result<M, String>>(&Ok(resp))?; @@ -112,7 +118,8 @@ impl RpcServer { return Ok(bad_request); } - let path = &req.uri().path()[1..]; + let path = &req.uri().path()[1..].to_string(); + let handler = match self.handlers.get(path) { Some(h) => h, None => { @@ -122,6 +129,8 @@ impl RpcServer { } }; + trace!("({}) Handling request", path); + let resp_waiter = tokio::spawn(handler(req, addr)); match resp_waiter.await { Err(err) => { @@ -131,11 +140,15 @@ impl RpcServer { Ok(ise) } Ok(Err(err)) => { + trace!("({}) Request handler failed: {}", path, err); let mut bad_request = Response::new(Body::from(format!("{}", err))); *bad_request.status_mut() = StatusCode::BAD_REQUEST; Ok(bad_request) } - Ok(Ok(resp)) => Ok(resp), + Ok(Ok(resp)) => { + trace!("({}) Request handler succeeded", path); + Ok(resp) + } } } diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 483c386c..6485a542 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -23,7 +23,7 @@ arc-swap = "0.4" log = "0.4" hexdump = "0.1" -sled = "0.31" +sled = "0.34" rmp-serde = "0.14.3" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } diff --git a/src/table/table.rs b/src/table/table.rs index 737ed589..8389c29f 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -394,7 +394,7 @@ where Some(prev_bytes) => { let old_entry = self .decode_entry(&prev_bytes) - .map_err(sled::ConflictableTransactionError::Abort)?; + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; let mut new_entry = old_entry.clone(); new_entry.merge(&update); (Some(old_entry), new_entry) @@ -404,7 +404,7 @@ where let new_bytes = rmp_to_vec_all_named(&new_entry) .map_err(Error::RMPEncode) - .map_err(sled::ConflictableTransactionError::Abort)?; + .map_err(sled::transaction::ConflictableTransactionError::Abort)?; db.insert(tree_key.clone(), new_bytes)?; Ok((old_entry, new_entry)) })?; @@ -429,11 +429,7 @@ where Ok(()) } - pub(crate) fn delete_if_equal( - self: &Arc<Self>, - k: &[u8], - v: &[u8], - ) -> Result<bool, Error> { + pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { let removed = self.store.transaction(|txn| { if let Some(cur_v) = self.store.get(k)? { if cur_v == v { diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index b81dad86..58391274 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -385,6 +385,7 @@ where must_exit: &mut watch::Receiver<bool>, ) -> Result<RangeChecksum, Error> { assert!(range.level != 0); + trace!("Call range_checksum {:?}", range); if range.level == 1 { let mut children = vec![]; @@ -400,6 +401,7 @@ where .iter() .all(|x| *x == 0u8) { + trace!("range_checksum {:?} returning {} items", range, children.len()); return Ok(RangeChecksum { bounds: range.clone(), children, @@ -414,6 +416,7 @@ where }; children.push((item_range, blake2sum(&value[..]))); } + trace!("range_checksum {:?} returning {} items", range, children.len()); Ok(RangeChecksum { bounds: range.clone(), children, @@ -439,6 +442,7 @@ where } if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() { + trace!("range_checksum {:?} returning {} items", range, children.len()); return Ok(RangeChecksum { bounds: range.clone(), children, @@ -453,6 +457,7 @@ where .iter() .all(|x| *x == 0u8) { + trace!("range_checksum {:?} returning {} items", range, children.len()); return Ok(RangeChecksum { bounds: range.clone(), children, @@ -463,6 +468,7 @@ where sub_range.begin = found_limit; } + trace!("range_checksum {:?} exiting due to must_exit", range); Err(Error::Message(format!("Exiting."))) } } diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 93115843..35130c96 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -21,7 +21,7 @@ err-derive = "0.2.3" log = "0.4" fasthash = "0.4" -sled = "0.31" +sled = "0.34" toml = "0.5" rmp-serde = "0.14.3" diff --git a/src/util/error.rs b/src/util/error.rs index e5dcf654..dbf71ac1 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -73,11 +73,11 @@ pub enum Error { Message(String), } -impl From<sled::TransactionError<Error>> for Error { - fn from(e: sled::TransactionError<Error>) -> Error { +impl From<sled::transaction::TransactionError<Error>> for Error { + fn from(e: sled::transaction::TransactionError<Error>) -> Error { match e { - sled::TransactionError::Abort(x) => x, - sled::TransactionError::Storage(x) => Error::Sled(x), + sled::transaction::TransactionError::Abort(x) => x, + sled::transaction::TransactionError::Storage(x) => Error::Sled(x), } } } |