aboutsummaryrefslogtreecommitdiff
path: root/src/api_server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api_server.rs')
-rw-r--r--src/api_server.rs56
1 files changed, 5 insertions, 51 deletions
diff --git a/src/api_server.rs b/src/api_server.rs
index f3f7165b..4ae48720 100644
--- a/src/api_server.rs
+++ b/src/api_server.rs
@@ -9,12 +9,10 @@ use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
-use crate::data;
+use crate::block::*;
use crate::data::*;
use crate::error::Error;
use crate::http_util::*;
-use crate::proto::*;
-use crate::rpc_client::*;
use crate::server::Garage;
use crate::table::EmptySortKey;
@@ -155,7 +153,7 @@ async fn handle_put(
let mut next_offset = first_block.len();
let mut put_curr_version_block =
put_block_meta(garage.clone(), &version, 0, first_block_hash.clone());
- let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block);
+ let mut put_curr_block = rpc_put_block(&garage.system, first_block_hash, first_block);
loop {
let (_, _, next_block) =
@@ -169,7 +167,7 @@ async fn handle_put(
next_offset as u64,
block_hash.clone(),
);
- put_curr_block = put_block(garage.clone(), block_hash, block);
+ put_curr_block = rpc_put_block(&garage.system, block_hash, block);
next_offset += block_len;
} else {
break;
@@ -209,24 +207,6 @@ async fn put_block_meta(
Ok(())
}
-async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
- let who = garage
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, garage.system.config.data_replication_factor);
- rpc_try_call_many(
- garage.system.clone(),
- &who[..],
- Message::PutBlock(PutBlockMessage { hash, data }),
- (garage.system.config.data_replication_factor + 1) / 2,
- BLOCK_RW_TIMEOUT,
- )
- .await?;
- Ok(())
-}
-
struct BodyChunker {
body: Body,
read_all: bool,
@@ -322,7 +302,7 @@ async fn handle_get(
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(first_block_hash) => {
- let read_first_block = get_block(garage.clone(), &first_block_hash);
+ let read_first_block = rpc_get_block(&garage.system, &first_block_hash);
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptySortKey);
let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
@@ -345,7 +325,7 @@ async fn handle_get(
if let Some(data) = data_opt {
Ok(Bytes::from(data))
} else {
- get_block(garage.clone(), &hash).await.map(Bytes::from)
+ rpc_get_block(&garage.system, &hash).await.map(Bytes::from)
}
}
})
@@ -355,29 +335,3 @@ async fn handle_get(
}
}
}
-
-async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> {
- let who = garage
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, garage.system.config.data_replication_factor);
- let resps = rpc_try_call_many(
- garage.system.clone(),
- &who[..],
- Message::GetBlock(hash.clone()),
- 1,
- BLOCK_RW_TIMEOUT,
- )
- .await?;
-
- for resp in resps {
- if let Message::PutBlock(pbm) = resp {
- if data::hash(&pbm.data) == *hash {
- return Ok(pbm.data);
- }
- }
- }
- Err(Error::Message(format!("No valid blocks returned")))
-}