From 8a2b1dd422fb57abe611d8c1cf3cb0b55f487189 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 12:55:36 +0100 Subject: wip: split out layout management from System into separate LayoutManager --- src/block/manager.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/block/manager.rs') diff --git a/src/block/manager.rs b/src/block/manager.rs index 2d1b5c67..72b4ea66 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -265,7 +265,7 @@ impl BlockManager { Fut: futures::Future>, { let who = self.replication.read_nodes(hash); - let who = self.system.rpc.request_order(&who); + let who = self.system.rpc_helper().request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); @@ -305,7 +305,7 @@ impl BlockManager { // if the first one doesn't succeed rapidly // TODO: keep first request running when initiating a new one and take the // one that finishes earlier - _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { + _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => { debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node); } }; @@ -363,7 +363,7 @@ impl BlockManager { Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -439,7 +439,7 @@ impl BlockManager { tokio::spawn(async move { if let Err(e) = this .resync - .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout()) + .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout()) { error!("Block {:?} could not be put in resync queue: {}.", hash, e); } @@ -533,7 +533,7 @@ impl BlockManager { None => { // Not found but maybe we should have had it ?? self.resync - .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; + .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?; return Err(Error::Message(format!( "block {:?} not found on node", hash -- cgit v1.2.3 From 3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 14:28:16 +0100 Subject: layout: prepare for write sets --- src/block/manager.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/block/manager.rs') diff --git a/src/block/manager.rs b/src/block/manager.rs index 72b4ea66..2bb9c23d 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -354,7 +354,8 @@ impl BlockManager { /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { - let who = self.replication.write_nodes(&hash); + // TODO: use quorums among latest write set + let who = self.replication.storage_nodes(&hash); let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) .await -- cgit v1.2.3 From 90e1619b1e9f5d81e59da371f04717f0c4fe5afc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 15:40:46 +0100 Subject: table: take into account multiple write sets in inserts --- src/block/manager.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'src/block/manager.rs') diff --git a/src/block/manager.rs b/src/block/manager.rs index 2bb9c23d..0ca8bc31 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -354,8 +354,7 @@ impl BlockManager { /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { - // TODO: use quorums among latest write set - let who = self.replication.storage_nodes(&hash); + let who = self.replication.write_sets(&hash); let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) .await @@ -365,9 +364,9 @@ impl BlockManager { self.system .rpc_helper() - .try_call_many( + .try_write_many_sets( &self.endpoint, - &who[..], + &who, put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) .with_quorum(self.replication.write_quorum()), -- cgit v1.2.3 From 33c8a489b0a9c0e869282bfc19c548f5a3e02e8c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 15:40:44 +0100 Subject: layou: implement ack locking --- src/block/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/block/manager.rs') diff --git a/src/block/manager.rs b/src/block/manager.rs index 0ca8bc31..be2e4951 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -366,7 +366,7 @@ impl BlockManager { .rpc_helper() .try_write_many_sets( &self.endpoint, - &who, + who.as_ref(), put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) .with_quorum(self.replication.write_quorum()), -- cgit v1.2.3 From d6d239fc7909cbd017da6ea35cceb3d561a87cca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 27 Nov 2023 11:52:57 +0100 Subject: block manager: read_block using old layout versions if necessary --- src/block/manager.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/block/manager.rs') diff --git a/src/block/manager.rs b/src/block/manager.rs index be2e4951..47111160 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -264,8 +264,10 @@ impl BlockManager { F: Fn(DataBlockHeader, ByteStream) -> Fut, Fut: futures::Future>, { - let who = self.replication.read_nodes(hash); - let who = self.system.rpc_helper().request_order(&who); + let who = self + .system + .cluster_layout() + .block_read_nodes_of(hash, self.system.rpc_helper()); for node in who.iter() { let node_id = NodeID::from(*node); -- cgit v1.2.3 From 5dd200c015aed786173f0e11541b0505f95dd6d1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 8 Dec 2023 12:02:24 +0100 Subject: layout: move block_read_nodes_of to rpc_helper to avoid double-locking (in theory, this could have caused a deadlock) --- src/block/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/block/manager.rs') diff --git a/src/block/manager.rs b/src/block/manager.rs index 47111160..bfd390ee 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -266,7 +266,7 @@ impl BlockManager { { let who = self .system - .cluster_layout() + .rpc_helper() .block_read_nodes_of(hash, self.system.rpc_helper()); for node in who.iter() { -- cgit v1.2.3