aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/block_ref_table.rs15
-rw-r--r--src/object_table.rs12
-rw-r--r--src/server.rs63
-rw-r--r--src/version_table.rs13
4 files changed, 53 insertions, 50 deletions
diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs
index f3a14d81..3e5fb0a1 100644
--- a/src/block_ref_table.rs
+++ b/src/block_ref_table.rs
@@ -1,11 +1,11 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
-use tokio::sync::RwLock;
use crate::data::*;
-use crate::server::Garage;
use crate::table::*;
+use crate::background::*;
+use crate::block::*;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef {
@@ -35,7 +35,8 @@ impl Entry<Hash, UUID> for BlockRef {
}
pub struct BlockRefTable {
- pub garage: RwLock<Option<Arc<Garage>>>,
+ pub background: Arc<BackgroundRunner>,
+ pub block_manager: Arc<BlockManager>,
}
#[async_trait]
@@ -45,19 +46,17 @@ impl TableFormat for BlockRefTable {
type E = BlockRef;
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
- let garage = self.garage.read().await.as_ref().cloned().unwrap();
-
let was_before = old.map(|x| !x.deleted).unwrap_or(false);
let is_after = !new.deleted;
if is_after && !was_before {
- if let Err(e) = garage.block_manager.block_incref(&new.block) {
+ if let Err(e) = self.block_manager.block_incref(&new.block) {
eprintln!("Failed to incref block {:?}: {}", &new.block, e);
}
}
if was_before && !is_after {
- if let Err(e) = garage
+ if let Err(e) = self
.block_manager
- .block_decref(&new.block, &garage.background)
+ .block_decref(&new.block, &self.background)
{
eprintln!("Failed to decref block {:?}: {}", &new.block, e);
}
diff --git a/src/object_table.rs b/src/object_table.rs
index 63dabf20..8ce49565 100644
--- a/src/object_table.rs
+++ b/src/object_table.rs
@@ -1,11 +1,10 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
-use tokio::sync::RwLock;
use crate::data::*;
-use crate::server::Garage;
use crate::table::*;
+use crate::background::BackgroundRunner;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
@@ -88,7 +87,8 @@ impl Entry<String, String> for Object {
}
pub struct ObjectTable {
- pub garage: RwLock<Option<Arc<Garage>>>,
+ pub background: Arc<BackgroundRunner>,
+ pub version_table: Arc<Table<VersionTable>>,
}
#[async_trait]
@@ -98,8 +98,8 @@ impl TableFormat for ObjectTable {
type E = Object;
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
- let garage = self.garage.read().await.as_ref().cloned().unwrap();
- garage.clone().background.spawn(async move {
+ let version_table = self.version_table.clone();
+ self.background.spawn(async move {
// Propagate deletion of old versions
if let Some(old_v) = old {
for v in old_v.versions.iter() {
@@ -115,7 +115,7 @@ impl TableFormat for ObjectTable {
bucket: old_v.bucket.clone(),
key: old_v.key.clone(),
};
- garage.version_table.insert(&deleted_version).await?;
+ version_table.insert(&deleted_version).await?;
}
}
}
diff --git a/src/server.rs b/src/server.rs
index e38e8580..29a2dbcb 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -6,7 +6,6 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::watch;
-use tokio::sync::RwLock;
use crate::api_server;
use crate::background::*;
@@ -36,12 +35,21 @@ pub struct Config {
#[serde(default = "default_replication_factor")]
pub data_replication_factor: usize,
+
+ pub tls: TlsConfig,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct TlsConfig {
+ pub ca_cert: Option<String>,
+ pub node_cert: Option<String>,
+ pub node_key: Option<String>,
}
pub struct Garage {
pub db: sled::Db,
pub system: Arc<System>,
- pub block_manager: BlockManager,
+ pub block_manager: Arc<BlockManager>,
pub background: Arc<BackgroundRunner>,
pub table_rpc_handlers: HashMap<String, Box<dyn TableRpcHandler + Sync + Send>>,
@@ -58,10 +66,17 @@ impl Garage {
db: sled::Db,
background: Arc<BackgroundRunner>,
) -> Arc<Self> {
- let block_manager = BlockManager::new(&db, config.data_dir.clone());
+ let block_manager = Arc::new(BlockManager::new(&db, config.data_dir.clone()));
let system = Arc::new(System::new(config, id, background.clone()));
+ let data_rep_param = TableReplicationParams {
+ replication_factor: system.config.data_replication_factor,
+ write_quorum: (system.config.data_replication_factor + 1) / 2,
+ read_quorum: 1,
+ timeout: DEFAULT_TIMEOUT,
+ };
+
let meta_rep_param = TableReplicationParams {
replication_factor: system.config.meta_replication_factor,
write_quorum: (system.config.meta_replication_factor + 1) / 2,
@@ -69,42 +84,38 @@ impl Garage {
timeout: DEFAULT_TIMEOUT,
};
- let object_table = Arc::new(Table::new(
- ObjectTable {
- garage: RwLock::new(None),
+ let block_ref_table = Arc::new(Table::new(
+ BlockRefTable {
+ background: background.clone(),
+ block_manager: block_manager.clone(),
},
system.clone(),
&db,
- "object".to_string(),
- meta_rep_param.clone(),
+ "block_ref".to_string(),
+ data_rep_param.clone(),
));
let version_table = Arc::new(Table::new(
VersionTable {
- garage: RwLock::new(None),
+ background: background.clone(),
+ block_ref_table: block_ref_table.clone(),
},
system.clone(),
&db,
"version".to_string(),
meta_rep_param.clone(),
));
-
- let data_rep_param = TableReplicationParams {
- replication_factor: system.config.data_replication_factor,
- write_quorum: (system.config.data_replication_factor + 1) / 2,
- read_quorum: 1,
- timeout: DEFAULT_TIMEOUT,
- };
-
- let block_ref_table = Arc::new(Table::new(
- BlockRefTable {
- garage: RwLock::new(None),
+ let object_table = Arc::new(Table::new(
+ ObjectTable {
+ background: background.clone(),
+ version_table: version_table.clone(),
},
system.clone(),
&db,
- "block_ref".to_string(),
- data_rep_param.clone(),
+ "object".to_string(),
+ meta_rep_param.clone(),
));
+
let mut garage = Self {
db,
system: system.clone(),
@@ -129,13 +140,7 @@ impl Garage {
garage.block_ref_table.clone().rpc_handler(),
);
- let garage = Arc::new(garage);
-
- *garage.object_table.instance.garage.write().await = Some(garage.clone());
- *garage.version_table.instance.garage.write().await = Some(garage.clone());
- *garage.block_ref_table.instance.garage.write().await = Some(garage.clone());
-
- garage
+ Arc::new(garage)
}
}
diff --git a/src/version_table.rs b/src/version_table.rs
index 9ea0551e..cb70c645 100644
--- a/src/version_table.rs
+++ b/src/version_table.rs
@@ -1,11 +1,10 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
-use tokio::sync::RwLock;
use crate::data::*;
-use crate::server::Garage;
use crate::table::*;
+use crate::background::BackgroundRunner;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
@@ -54,7 +53,8 @@ impl Entry<Hash, EmptySortKey> for Version {
}
pub struct VersionTable {
- pub garage: RwLock<Option<Arc<Garage>>>,
+ pub background: Arc<BackgroundRunner>,
+ pub block_ref_table: Arc<Table<BlockRefTable>>,
}
#[async_trait]
@@ -64,8 +64,8 @@ impl TableFormat for VersionTable {
type E = Version;
async fn updated(&self, old: Option<Self::E>, new: Self::E) {
- let garage = self.garage.read().await.as_ref().cloned().unwrap();
- garage.clone().background.spawn(async move {
+ let block_ref_table = self.block_ref_table.clone();
+ self.background.spawn(async move {
// Propagate deletion of version blocks
if let Some(old_v) = old {
if new.deleted && !old_v.deleted {
@@ -78,8 +78,7 @@ impl TableFormat for VersionTable {
deleted: true,
})
.collect::<Vec<_>>();
- garage
- .block_ref_table
+ block_ref_table
.insert_many(&deleted_block_refs[..])
.await?;
}