aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-10 22:01:48 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-10 22:01:48 +0200
commit3477864142ed09c36abea1111937b829fb41c8a4 (patch)
treed95221e66b9c014af7f4dba61ae4ff113c0e409a /src/server.rs
parentd66c0d6833ddbeb61e34ee222dde92a5363bda1f (diff)
downloadgarage-3477864142ed09c36abea1111937b829fb41c8a4.tar.gz
garage-3477864142ed09c36abea1111937b829fb41c8a4.zip
Fix the Sync issue. Details:
So the HTTP client future of Hyper is not Sync, thus the stream that read blocks wasn't either. However Hyper's default Body type requires a stream to be Sync for wrap_stream. Solution: reimplement a custom HTTP body type.
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs59
1 files changed, 32 insertions, 27 deletions
diff --git a/src/server.rs b/src/server.rs
index d9f98164..3df6ca59 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,17 +1,17 @@
+use futures::channel::oneshot;
+use serde::Deserialize;
use std::collections::HashMap;
use std::io::{Read, Write};
-use std::sync::Arc;
use std::net::SocketAddr;
use std::path::PathBuf;
-use futures::channel::oneshot;
-use serde::Deserialize;
+use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
+use crate::api_server;
use crate::data::*;
-use crate::proto::*;
use crate::error::Error;
use crate::membership::System;
-use crate::api_server;
+use crate::proto::*;
use crate::rpc_server;
use crate::table::*;
@@ -30,27 +30,33 @@ impl Garage {
pub async fn new(config: Config, id: UUID, db: sled::Db) -> Arc<Self> {
let system = Arc::new(System::new(config, id));
- let meta_rep_param = TableReplicationParams{
+ let meta_rep_param = TableReplicationParams {
replication_factor: system.config.meta_replication_factor,
- write_quorum: (system.config.meta_replication_factor+1)/2,
- read_quorum: (system.config.meta_replication_factor+1)/2,
+ write_quorum: (system.config.meta_replication_factor + 1) / 2,
+ read_quorum: (system.config.meta_replication_factor + 1) / 2,
timeout: DEFAULT_TIMEOUT,
};
let object_table = Arc::new(Table::new(
- ObjectTable{garage: RwLock::new(None)},
+ ObjectTable {
+ garage: RwLock::new(None),
+ },
system.clone(),
&db,
"object".to_string(),
- meta_rep_param.clone()));
+ meta_rep_param.clone(),
+ ));
let version_table = Arc::new(Table::new(
- VersionTable{garage: RwLock::new(None)},
+ VersionTable {
+ garage: RwLock::new(None),
+ },
system.clone(),
&db,
"version".to_string(),
- meta_rep_param.clone()));
+ meta_rep_param.clone(),
+ ));
- let mut garage = Self{
+ let mut garage = Self {
db,
system: system.clone(),
fs_lock: Mutex::new(()),
@@ -61,10 +67,12 @@ impl Garage {
garage.table_rpc_handlers.insert(
garage.object_table.name.clone(),
- garage.object_table.clone().rpc_handler());
+ garage.object_table.clone().rpc_handler(),
+ );
garage.table_rpc_handlers.insert(
garage.version_table.name.clone(),
- garage.version_table.clone().rpc_handler());
+ garage.version_table.clone().rpc_handler(),
+ );
let garage = Arc::new(garage);
@@ -103,7 +111,7 @@ fn read_config(config_file: PathBuf) -> Result<Config, Error> {
let mut file = std::fs::OpenOptions::new()
.read(true)
.open(config_file.as_path())?;
-
+
let mut config = String::new();
file.read_to_string(&mut config)?;
@@ -118,7 +126,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
let mut d = vec![];
f.read_to_end(&mut d)?;
if d.len() != 32 {
- return Err(Error::Message(format!("Corrupt node_id file")))
+ return Err(Error::Message(format!("Corrupt node_id file")));
}
let mut id = [0u8; 32];
@@ -134,10 +142,10 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
}
async fn shutdown_signal(chans: Vec<oneshot::Sender<()>>) {
- // Wait for the CTRL+C signal
- tokio::signal::ctrl_c()
- .await
- .expect("failed to install CTRL+C signal handler");
+ // Wait for the CTRL+C signal
+ tokio::signal::ctrl_c()
+ .await
+ .expect("failed to install CTRL+C signal handler");
println!("Received CTRL+C, shutting down.");
for ch in chans {
ch.send(()).unwrap();
@@ -149,16 +157,13 @@ async fn wait_from(chan: oneshot::Receiver<()>) -> () {
}
pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
- let config = read_config(config_file)
- .expect("Unable to read config file");
+ let config = read_config(config_file).expect("Unable to read config file");
let mut db_path = config.metadata_dir.clone();
db_path.push("db");
- let db = sled::open(db_path)
- .expect("Unable to open DB");
+ let db = sled::open(db_path).expect("Unable to open DB");
- let id = gen_node_id(&config.metadata_dir)
- .expect("Unable to read or generate node ID");
+ let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID");
println!("Node ID: {}", hex::encode(&id));
let garage = Garage::new(config, id, db).await;