diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/Cargo.toml | 8 | ||||
-rw-r--r-- | src/api/s3/copy.rs | 5 | ||||
-rw-r--r-- | src/api/s3/put.rs | 87 | ||||
-rw-r--r-- | src/api/signature/mod.rs | 14 | ||||
-rw-r--r-- | src/block/block.rs | 17 | ||||
-rw-r--r-- | src/block/manager.rs | 34 | ||||
-rw-r--r-- | src/garage/Cargo.toml | 6 | ||||
-rw-r--r-- | src/garage/main.rs | 5 | ||||
-rw-r--r-- | src/garage/tests/lib.rs | 4 | ||||
-rw-r--r-- | src/model/garage.rs | 15 | ||||
-rw-r--r-- | src/rpc/Cargo.toml | 2 | ||||
-rw-r--r-- | src/util/Cargo.toml | 4 | ||||
-rw-r--r-- | src/util/async_hash.rs | 64 | ||||
-rw-r--r-- | src/util/lib.rs | 1 |
14 files changed, 203 insertions, 63 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index eb87a4fb..cdfabcb8 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -24,15 +24,15 @@ async-trait = "0.1.7" base64 = "0.13" bytes = "1.0" chrono = "0.4" -crypto-mac = "0.10" +crypto-common = "0.1" err-derive = "0.3" hex = "0.4" -hmac = "0.10" +hmac = "0.12" idna = "0.2" tracing = "0.1.30" -md-5 = "0.9" +md-5 = "0.10" nom = "7.1" -sha2 = "0.9" +sha2 = "0.10" futures = "0.3" futures-util = "0.3" diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 0fc16993..4415a037 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -365,7 +365,10 @@ pub async fn handle_upload_part_copy( // we need to insert that data as a new block. async move { if must_upload { - garage2.block_manager.rpc_put_block(final_hash, data).await + garage2 + .block_manager + .rpc_put_block(final_hash, data.into()) + .await } else { Ok(()) } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 9ef37421..dc0530df 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -8,7 +8,13 @@ use hyper::{Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; +use opentelemetry::{ + trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, + Context, +}; + use garage_table::*; +use garage_util::async_hash::*; use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; @@ -130,7 +136,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block_hash = blake2sum(&first_block[..]); + let first_block = Bytes::from(first_block); + let first_block_hash = async_blake2sum(first_block.clone()).await; let tx_result = (|| async { let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( @@ -273,14 +280,23 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: &Garage, version: &Version, part_number: u64, - first_block: Vec<u8>, + first_block: Bytes, first_block_hash: Hash, chunker: &mut StreamChunker<S>, ) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> { - let mut md5hasher = Md5::new(); - let mut sha256hasher = Sha256::new(); - md5hasher.update(&first_block[..]); - sha256hasher.update(&first_block[..]); + let tracer = opentelemetry::global::tracer("garage"); + + let md5hasher = AsyncHasher::<Md5>::new(); + let sha256hasher = AsyncHasher::<Sha256>::new(); + + futures::future::join( + md5hasher.update(first_block.clone()), + sha256hasher.update(first_block.clone()), + ) + .with_context(Context::current_with_span( + tracer.start("Hash first block (md5, sha256)"), + )) + .await; let mut next_offset = first_block.len(); let mut put_curr_version_block = put_block_meta( @@ -302,9 +318,16 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( chunker.next(), )?; if let Some(block) = next_block { - md5hasher.update(&block[..]); - sha256hasher.update(&block[..]); - let block_hash = blake2sum(&block[..]); + let block = Bytes::from(block); + let (_, _, block_hash) = futures::future::join3( + md5hasher.update(block.clone()), + sha256hasher.update(block.clone()), + async_blake2sum(block.clone()), + ) + .with_context(Context::current_with_span( + tracer.start("Hash block (md5, sha256, blake2)"), + )) + .await; let block_len = block.len(); put_curr_version_block = put_block_meta( garage, @@ -322,9 +345,9 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( } let total_size = next_offset as u64; - let data_md5sum = md5hasher.finalize(); + let data_md5sum = md5hasher.finalize().await; - let data_sha256sum = sha256hasher.finalize(); + let data_sha256sum = sha256hasher.finalize().await; let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); Ok((total_size, data_md5sum, data_sha256sum)) @@ -364,7 +387,8 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> { stream: S, read_all: bool, block_size: usize, - buf: VecDeque<u8>, + buf: VecDeque<Bytes>, + buf_len: usize, } impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { @@ -373,29 +397,45 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { stream, read_all: false, block_size, - buf: VecDeque::with_capacity(2 * block_size), + buf: VecDeque::with_capacity(8), + buf_len: 0, } } async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> { - while !self.read_all && self.buf.len() < self.block_size { + while !self.read_all && self.buf_len < self.block_size { if let Some(block) = self.stream.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); - self.buf.extend(bytes); + self.buf_len += bytes.len(); + self.buf.push_back(bytes); } else { self.read_all = true; } } - if self.buf.is_empty() { + if self.buf_len == 0 { Ok(None) - } else if self.buf.len() <= self.block_size { - let block = self.buf.drain(..).collect::<Vec<u8>>(); - Ok(Some(block)) } else { - let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>(); - Ok(Some(block)) + let mut slices = Vec::with_capacity(self.buf.len()); + let mut taken = 0; + while self.buf_len > 0 && taken < self.block_size { + let front = self.buf.pop_front().unwrap(); + if taken + front.len() <= self.block_size { + taken += front.len(); + self.buf_len -= front.len(); + slices.push(front); + } else { + let front_take = self.block_size - taken; + slices.push(front.slice(..front_take)); + self.buf.push_front(front.slice(front_take..)); + self.buf_len -= front_take; + break; + } + } + Ok(Some( + slices.iter().map(|x| &x[..]).collect::<Vec<_>>().concat(), + )) } } } @@ -504,7 +544,10 @@ pub async fn handle_put_part( // Copy block to store let version = Version::new(version_uuid, bucket_id, key, false); - let first_block_hash = blake2sum(&first_block[..]); + + let first_block = Bytes::from(first_block); + let first_block_hash = async_blake2sum(first_block.clone()).await; + let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( &garage, &version, diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs index dd5b590c..4b8b990f 100644 --- a/src/api/signature/mod.rs +++ b/src/api/signature/mod.rs @@ -1,5 +1,5 @@ use chrono::{DateTime, Utc}; -use hmac::{Hmac, Mac, NewMac}; +use hmac::{Hmac, Mac}; use sha2::Sha256; use garage_util::data::{sha256sum, Hash}; @@ -29,17 +29,17 @@ pub fn signing_hmac( secret_key: &str, region: &str, service: &str, -) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> { +) -> Result<HmacSha256, crypto_common::InvalidLength> { let secret = String::from("AWS4") + secret_key; - let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; + let mut date_hmac = HmacSha256::new_from_slice(secret.as_bytes())?; date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); - let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; + let mut region_hmac = HmacSha256::new_from_slice(&date_hmac.finalize().into_bytes())?; region_hmac.update(region.as_bytes()); - let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; + let mut service_hmac = HmacSha256::new_from_slice(®ion_hmac.finalize().into_bytes())?; service_hmac.update(service.as_bytes()); - let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; + let mut signing_hmac = HmacSha256::new_from_slice(&service_hmac.finalize().into_bytes())?; signing_hmac.update(b"aws4_request"); - let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; + let hmac = HmacSha256::new_from_slice(&signing_hmac.finalize().into_bytes())?; Ok(hmac) } diff --git a/src/block/block.rs b/src/block/block.rs index 4d3fbcb8..f17bd2c0 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use serde::{Deserialize, Serialize}; use zstd::stream::{decode_all as zstd_decode, Encoder}; @@ -61,13 +62,17 @@ impl DataBlock { } } - pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock { - if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); + pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock { + tokio::task::spawn_blocking(move || { + if let Some(level) = level { + if let Ok(data) = zstd_encode(&data[..], level) { + return DataBlock::Compressed(data); + } } - } - DataBlock::Plain(data) + DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here + }) + .await + .unwrap() } } diff --git a/src/block/manager.rs b/src/block/manager.rs index 9240db25..b5199b62 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,12 +3,18 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{mpsc, Mutex}; +use opentelemetry::{ + trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, + Context, +}; + use garage_db as db; use garage_util::data::*; @@ -68,7 +74,7 @@ pub struct BlockManager { compression_level: Option<i32>, - mutation_lock: Mutex<BlockManagerLocked>, + mutation_lock: [Mutex<BlockManagerLocked>; 256], pub(crate) rc: BlockRc, pub resync: BlockResyncManager, @@ -105,8 +111,6 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); - let manager_locked = BlockManagerLocked(); - let metrics = BlockManagerMetrics::new(resync.queue.clone(), resync.errors.clone()); let (scrub_tx, scrub_rx) = mpsc::channel(1); @@ -115,7 +119,7 @@ impl BlockManager { replication, data_dir, compression_level, - mutation_lock: Mutex::new(manager_locked), + mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())), rc, resync, system, @@ -174,14 +178,15 @@ impl BlockManager { } /// Send block to nodes that should have it - pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { + pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - let data = DataBlock::from_buffer(data, self.compression_level); + let data = DataBlock::from_buffer(data, self.compression_level).await; self.system .rpc .try_call_many( &self.endpoint, &who[..], + // TODO: remove to_vec() here BlockRpc::PutBlock { hash, data }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) @@ -260,14 +265,21 @@ impl BlockManager { hash: &Hash, data: &DataBlock, ) -> Result<BlockRpc, Error> { + let tracer = opentelemetry::global::tracer("garage"); + let write_size = data.inner_buffer().len() as u64; - let res = self - .mutation_lock + let res = self.mutation_lock[hash.as_slice()[0] as usize] .lock() + .with_context(Context::current_with_span( + tracer.start("Acquire mutation_lock"), + )) .await .write_block(hash, data, self) .bound_record_duration(&self.metrics.block_write_duration) + .with_context(Context::current_with_span( + tracer.start("BlockManagerLocked::write_block"), + )) .await?; self.metrics.bytes_written.add(write_size); @@ -317,7 +329,7 @@ impl BlockManager { if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); - self.mutation_lock + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .move_block_to_corrupted(hash, self) @@ -331,7 +343,7 @@ impl BlockManager { /// Check if this node has a block and whether it needs it pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> { - self.mutation_lock + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .check_block_status(hash, self) @@ -346,7 +358,7 @@ impl BlockManager { /// Delete block if it is not needed anymore pub(crate) async fn delete_if_unneeded(&self, hash: &Hash) -> Result<(), Error> { - self.mutation_lock + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .await .delete_if_unneeded(hash, self) diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index b08e9439..f30b88ac 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -35,7 +35,7 @@ bytesize = "1.1" timeago = "0.3" hex = "0.4" tracing = { version = "0.1.30", features = ["log-always"] } -pretty_env_logger = "0.4" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } rand = "0.8" async-trait = "0.1.7" sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } @@ -63,9 +63,9 @@ prometheus = { version = "0.13", optional = true } aws-sdk-s3 = "0.8" chrono = "0.4" http = "0.2" -hmac = "0.10" +hmac = "0.12" hyper = { version = "0.14", features = ["client", "http1", "runtime"] } -sha2 = "0.9" +sha2 = "0.10" static_init = "1.0" assert-json-diff = "2.0" diff --git a/src/garage/main.rs b/src/garage/main.rs index 77d5db24..0eca24ae 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -68,7 +68,10 @@ async fn main() { if std::env::var("RUST_LOG").is_err() { std::env::set_var("RUST_LOG", "netapp=info,garage=info") } - pretty_env_logger::init(); + tracing_subscriber::fmt() + .with_writer(std::io::stderr) + .with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env()) + .init(); sodiumoxide::init().expect("Unable to init sodiumoxide"); // Abort on panic (same behavior as in Go) diff --git a/src/garage/tests/lib.rs b/src/garage/tests/lib.rs index d15639b9..87be1327 100644 --- a/src/garage/tests/lib.rs +++ b/src/garage/tests/lib.rs @@ -3,6 +3,8 @@ mod common; mod admin; mod bucket; + +mod s3; + #[cfg(feature = "k2v")] mod k2v; -mod s3; diff --git a/src/model/garage.rs b/src/model/garage.rs index 0bca2071..66c359e7 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -118,11 +118,16 @@ impl Garage { std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); let map_size = garage_db::lmdb_adapter::recommended_map_size(); - let db = db::lmdb_adapter::heed::EnvOpenOptions::new() - .max_dbs(100) - .map_size(map_size) - .open(&db_path) - .expect("Unable to open LMDB DB"); + use db::lmdb_adapter::heed; + let mut env_builder = heed::EnvOpenOptions::new(); + env_builder.max_dbs(100); + env_builder.max_readers(500); + env_builder.map_size(map_size); + unsafe { + env_builder.flag(heed::flags::Flags::MdbNoSync); + env_builder.flag(heed::flags::Flags::MdbNoMetaSync); + } + let db = env_builder.open(&db_path).expect("Unable to open LMDB DB"); db::lmdb_adapter::LmdbDb::init(db) } #[cfg(not(feature = "lmdb"))] diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 21841a02..7263d9cc 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -47,7 +47,7 @@ opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] } -netapp = { version = "0.4.4", features = ["telemetry"] } +netapp = { version = "0.4.5", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 163c1b77..d5c194e8 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -19,6 +19,8 @@ garage_db = { version = "0.8.0", path = "../db" } arc-swap = "1.0" async-trait = "0.1" blake2 = "0.9" +bytes = "1.0" +digest = "0.10" err-derive = "0.3" git-version = "0.3.4" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } @@ -26,7 +28,7 @@ hex = "0.4" lazy_static = "1.4" tracing = "0.1.30" rand = "0.8" -sha2 = "0.9" +sha2 = "0.10" chrono = "0.4" rmp-serde = "0.15" diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs new file mode 100644 index 00000000..fa8ee7ff --- /dev/null +++ b/src/util/async_hash.rs @@ -0,0 +1,64 @@ +use bytes::Bytes; +use digest::Digest; + +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; + +use crate::data::*; + +/// Compute the sha256 of a slice, +/// spawning on a tokio thread for CPU-intensive processing +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_sha256sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || sha256sum(&data)) + .await + .unwrap() +} + +/// Compute the blake2sum of a slice, +/// spawning on a tokio thread for CPU-intensive processing. +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_blake2sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || blake2sum(&data)) + .await + .unwrap() +} + +// ---- + +pub struct AsyncHasher<D: Digest> { + sendblk: mpsc::UnboundedSender<(Bytes, oneshot::Sender<()>)>, + task: JoinHandle<digest::Output<D>>, +} + +impl<D: Digest> AsyncHasher<D> { + pub fn new() -> Self { + let (sendblk, mut recvblk) = mpsc::unbounded_channel::<(Bytes, oneshot::Sender<()>)>(); + let task = tokio::task::spawn_blocking(move || { + let mut digest = D::new(); + while let Some((blk, ch)) = recvblk.blocking_recv() { + digest.update(&blk[..]); + let _ = ch.send(()); + } + digest.finalize() + }); + Self { sendblk, task } + } + + pub async fn update(&self, b: Bytes) { + let (tx, rx) = oneshot::channel(); + self.sendblk.send((b, tx)).unwrap(); + let _ = rx.await; + } + + pub async fn finalize(self) -> digest::Output<D> { + drop(self.sendblk); + self.task.await.unwrap() + } +} + +impl<D: Digest> Default for AsyncHasher<D> { + fn default() -> Self { + Self::new() + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs index 47c85c3a..264cc192 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -3,6 +3,7 @@ #[macro_use] extern crate tracing; +pub mod async_hash; pub mod background; pub mod config; pub mod crdt; |