diff options
Diffstat (limited to 'src/api_server.rs')
-rw-r--r-- | src/api_server.rs | 56 |
1 files changed, 5 insertions, 51 deletions
diff --git a/src/api_server.rs b/src/api_server.rs index f3f7165b..4ae48720 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -9,12 +9,10 @@ use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; -use crate::data; +use crate::block::*; use crate::data::*; use crate::error::Error; use crate::http_util::*; -use crate::proto::*; -use crate::rpc_client::*; use crate::server::Garage; use crate::table::EmptySortKey; @@ -155,7 +153,7 @@ async fn handle_put( let mut next_offset = first_block.len(); let mut put_curr_version_block = put_block_meta(garage.clone(), &version, 0, first_block_hash.clone()); - let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block); + let mut put_curr_block = rpc_put_block(&garage.system, first_block_hash, first_block); loop { let (_, _, next_block) = @@ -169,7 +167,7 @@ async fn handle_put( next_offset as u64, block_hash.clone(), ); - put_curr_block = put_block(garage.clone(), block_hash, block); + put_curr_block = rpc_put_block(&garage.system, block_hash, block); next_offset += block_len; } else { break; @@ -209,24 +207,6 @@ async fn put_block_meta( Ok(()) } -async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(), Error> { - let who = garage - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, garage.system.config.data_replication_factor); - rpc_try_call_many( - garage.system.clone(), - &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), - (garage.system.config.data_replication_factor + 1) / 2, - BLOCK_RW_TIMEOUT, - ) - .await?; - Ok(()) -} - struct BodyChunker { body: Body, read_all: bool, @@ -322,7 +302,7 @@ async fn handle_get( Ok(resp_builder.body(body)?) } ObjectVersionData::FirstBlock(first_block_hash) => { - let read_first_block = get_block(garage.clone(), &first_block_hash); + let read_first_block = rpc_get_block(&garage.system, &first_block_hash); let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptySortKey); let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?; @@ -345,7 +325,7 @@ async fn handle_get( if let Some(data) = data_opt { Ok(Bytes::from(data)) } else { - get_block(garage.clone(), &hash).await.map(Bytes::from) + rpc_get_block(&garage.system, &hash).await.map(Bytes::from) } } }) @@ -355,29 +335,3 @@ async fn handle_get( } } } - -async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> { - let who = garage - .system - .ring - .borrow() - .clone() - .walk_ring(&hash, garage.system.config.data_replication_factor); - let resps = rpc_try_call_many( - garage.system.clone(), - &who[..], - Message::GetBlock(hash.clone()), - 1, - BLOCK_RW_TIMEOUT, - ) - .await?; - - for resp in resps { - if let Message::PutBlock(pbm) = resp { - if data::hash(&pbm.data) == *hash { - return Ok(pbm.data); - } - } - } - Err(Error::Message(format!("No valid blocks returned"))) -} |