aboutsummaryrefslogtreecommitdiff
path: root/src/api_server.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-17 15:36:16 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-17 15:40:13 +0200
commite41ce4d81528388f043c1c5e6608df45347ea70d (patch)
treeee25f06b6f7da356c53d5f0a8fc8ec9e81d4bb23 /src/api_server.rs
parent867646093b24a9bb7e4b24a7f2248615c6e03fde (diff)
downloadgarage-e41ce4d81528388f043c1c5e6608df45347ea70d.tar.gz
garage-e41ce4d81528388f043c1c5e6608df45347ea70d.zip
Implement getting missing blocks when RC increases
Issue: RC increases also when the block ref entry is first put by the actual client. At that point the client is probably already sending us the block content, so we don't need to do a get... We should add a delay before the task is added or find something to do.
Diffstat (limited to 'src/api_server.rs')
-rw-r--r--src/api_server.rs56
1 files changed, 5 insertions, 51 deletions
diff --git a/src/api_server.rs b/src/api_server.rs
index f3f7165b..4ae48720 100644
--- a/src/api_server.rs
+++ b/src/api_server.rs
@@ -9,12 +9,10 @@ use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
-use crate::data;
+use crate::block::*;
use crate::data::*;
use crate::error::Error;
use crate::http_util::*;
-use crate::proto::*;
-use crate::rpc_client::*;
use crate::server::Garage;
use crate::table::EmptySortKey;
@@ -155,7 +153,7 @@ async fn handle_put(
let mut next_offset = first_block.len();
let mut put_curr_version_block =
put_block_meta(garage.clone(), &version, 0, first_block_hash.clone());
- let mut put_curr_block = put_block(garage.clone(), first_block_hash, first_block);
+ let mut put_curr_block = rpc_put_block(&garage.system, first_block_hash, first_block);
loop {
let (_, _, next_block) =
@@ -169,7 +167,7 @@ async fn handle_put(
next_offset as u64,
block_hash.clone(),
);
- put_curr_block = put_block(garage.clone(), block_hash, block);
+ put_curr_block = rpc_put_block(&garage.system, block_hash, block);
next_offset += block_len;
} else {
break;
@@ -209,24 +207,6 @@ async fn put_block_meta(
Ok(())
}
-async fn put_block(garage: Arc<Garage>, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
- let who = garage
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, garage.system.config.data_replication_factor);
- rpc_try_call_many(
- garage.system.clone(),
- &who[..],
- Message::PutBlock(PutBlockMessage { hash, data }),
- (garage.system.config.data_replication_factor + 1) / 2,
- BLOCK_RW_TIMEOUT,
- )
- .await?;
- Ok(())
-}
-
struct BodyChunker {
body: Body,
read_all: bool,
@@ -322,7 +302,7 @@ async fn handle_get(
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(first_block_hash) => {
- let read_first_block = get_block(garage.clone(), &first_block_hash);
+ let read_first_block = rpc_get_block(&garage.system, &first_block_hash);
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptySortKey);
let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
@@ -345,7 +325,7 @@ async fn handle_get(
if let Some(data) = data_opt {
Ok(Bytes::from(data))
} else {
- get_block(garage.clone(), &hash).await.map(Bytes::from)
+ rpc_get_block(&garage.system, &hash).await.map(Bytes::from)
}
}
})
@@ -355,29 +335,3 @@ async fn handle_get(
}
}
}
-
-async fn get_block(garage: Arc<Garage>, hash: &Hash) -> Result<Vec<u8>, Error> {
- let who = garage
- .system
- .ring
- .borrow()
- .clone()
- .walk_ring(&hash, garage.system.config.data_replication_factor);
- let resps = rpc_try_call_many(
- garage.system.clone(),
- &who[..],
- Message::GetBlock(hash.clone()),
- 1,
- BLOCK_RW_TIMEOUT,
- )
- .await?;
-
- for resp in resps {
- if let Message::PutBlock(pbm) = resp {
- if data::hash(&pbm.data) == *hash {
- return Ok(pbm.data);
- }
- }
- }
- Err(Error::Message(format!("No valid blocks returned")))
-}