aboutsummaryrefslogtreecommitdiff
path: root/scenarios/fragments
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2022-09-14 18:01:44 +0200
committerQuentin Dufour <quentin@deuxfleurs.fr>2022-09-14 18:01:44 +0200
commitc77d8dcfa7333b32e07df7326b8943838f34a59b (patch)
tree7ef87c7597e9f6d3da345d407cad2f1eb3908149 /scenarios/fragments
parente3409ce6b751fa92a98000ca4a2225b2ed10fdee (diff)
downloadmknet-c77d8dcfa7333b32e07df7326b8943838f34a59b.tar.gz
mknet-c77d8dcfa7333b32e07df7326b8943838f34a59b.zip
Working s3lat with Garage
Diffstat (limited to 'scenarios/fragments')
-rw-r--r--scenarios/fragments/.shared.py.swobin0 -> 12288 bytes
-rw-r--r--scenarios/fragments/garage.py201
-rw-r--r--scenarios/fragments/minio.py62
-rw-r--r--scenarios/fragments/s3lat.py16
-rw-r--r--scenarios/fragments/shared.py28
5 files changed, 307 insertions, 0 deletions
diff --git a/scenarios/fragments/.shared.py.swo b/scenarios/fragments/.shared.py.swo
new file mode 100644
index 0000000..02b9acb
--- /dev/null
+++ b/scenarios/fragments/.shared.py.swo
Binary files differ
diff --git a/scenarios/fragments/garage.py b/scenarios/fragments/garage.py
new file mode 100644
index 0000000..cd8b888
--- /dev/null
+++ b/scenarios/fragments/garage.py
@@ -0,0 +1,201 @@
+import glob, json, requests, time, garage_admin_sdk
+from os.path import exists
+from os import environ as env
+from pathlib import Path
+from fragments import shared
+from garage_admin_sdk.api import nodes_api, layout_api, key_api
+from garage_admin_sdk.model.node_cluster_info import NodeClusterInfo
+from garage_admin_sdk.model.layout_version import LayoutVersion
+from garage_admin_sdk.model.add_key_request import AddKeyRequest
+from garage_admin_sdk.model.update_key_request import UpdateKeyRequest
+from garage_admin_sdk.model.update_key_request_allow import UpdateKeyRequestAllow
+
+storage_path = "./i/am/not/defined"
+rpc_secret = "3e9abff5f9e480afbadb46a77b7a26fe0e404258f0dc3fd5386b0ba8e0ad2fba"
+metrics = "cacce0b2de4bc2d9f5b5fdff551e01ac1496055aed248202d415398987e35f81"
+admin = "ae8cb40ea7368bbdbb6430af11cca7da833d3458a5f52086f4e805a570fb5c2a"
+path = None
+key = None
+
+
+configuration = garage_admin_sdk.Configuration(
+ host = "http://localhost:3903/v0",
+ access_token = admin
+)
+api = garage_admin_sdk.ApiClient(configuration)
+nodes = nodes_api.NodesApi(api)
+layout = layout_api.LayoutApi(api)
+keys = key_api.KeyApi(api)
+
+
+# Setup, launch on import
+storage_path = Path(shared.storage_path) / "garage" / env['HOST']
+if 'ZONE' in env:
+ storage_path = Path(shared.storage_path) / "garage" / env['ZONE'] / env['HOST']
+config = storage_path / "garage.toml"
+env['GARAGE_CONFIG_FILE'] = str(config)
+
+def deploy_coord(version=None, target=None):
+ destroy()
+ from_ci(version, target)
+ shared.log("start daemon")
+ daemon()
+ shared.log("discover nodes")
+ connect()
+ shared.log("build layout")
+ create_layout()
+ shared.log("create key")
+ create_key()
+ shared.log("ready")
+
+def deploy_follow(version=None, target=None):
+ destroy()
+ from_ci(version, target)
+ shared.log("start daemon")
+ daemon()
+ shared.log("wait for coord")
+ sync_on_key_up()
+ shared.log("ready")
+
+def from_local(p):
+ global path
+ path = p
+ shared.exec(f"{p} --version")
+
+def from_ci(version=None, target=None):
+ global path
+ version = version or "v0.7.3"
+ target = target or "x86_64-unknown-linux-musl"
+
+ binary = f"garage-{target}-{version}"
+ path = Path(shared.binary_path) / binary
+ if shared.id() != 1: return
+
+ if not exists(path):
+ shared.exec(f"mkdir -p {shared.binary_path}")
+ shared.exec(f"wget https://garagehq.deuxfleurs.fr/_releases/{version}/{target}/garage -O {path}")
+ shared.exec(f"chmod +x {path}")
+ shared.exec(f"{path} --version")
+
+def daemon():
+ shared.exec(f"mkdir -p {storage_path}")
+ with open(config, 'w+') as f:
+ f.write(f"""
+metadata_dir = "{storage_path}/meta"
+data_dir = "{storage_path}/data"
+
+replication_mode = "3"
+
+rpc_bind_addr = "[::]:3901"
+rpc_public_addr = "[{env['IP']}]:3901"
+rpc_secret = "{rpc_secret}"
+
+bootstrap_peers=[]
+
+[s3_api]
+s3_region = "garage"
+api_bind_addr = "[::]:3900"
+root_domain = ".s3.garage"
+
+[s3_web]
+bind_addr = "[::]:3902"
+root_domain = ".web.garage"
+index = "index.html"
+
+[admin]
+api_bind_addr = "0.0.0.0:3903"
+metrics_token = "{metrics}"
+admin_token = "{admin}"
+ """)
+
+ shared.exec(f"{path} server 2>> {storage_path}/logs.stderr 1>> {storage_path}/logs.stdout & echo $! > {storage_path}/daemon.pid")
+ time.sleep(1)
+
+ node_info = storage_path / "node_info"
+ node_id = shared.fn_retry(lambda: nodes.get_nodes().node)
+ with open(node_info, 'w+') as f:
+ f.write(json.dumps({
+ "node_addr": f"{node_id}@{env['IP']}:3901",
+ "node_id": node_id,
+ "zone": env['ZONE'],
+ "host": env['HOST'],
+ }))
+
+def destroy():
+ dpid = Path(storage_path) / "daemon.pid"
+ if exists(dpid):
+ shared.exec(f"kill -9 $(cat {dpid})")
+ shared.exec(f"rm -f {dpid}")
+ if len(str(storage_path)) < 8: # arbitrary, stupid safe guard
+ print(storage_path)
+ raise Exception("You tried to clean a storage path that might be the root of your FS, panicking...")
+ shared.exec(f"rm -fr {storage_path}")
+
+# this function is ugly, sorry :s
+_cluster_info = None
+def cluster_info():
+ global _cluster_info
+ if _cluster_info is not None: return _cluster_info
+
+ while True:
+ time.sleep(1)
+ node_files = glob.glob(f"{shared.storage_path}/**/node_info", recursive=True)
+ if len(node_files) == shared.count(): break
+
+ _cluster_info = [ json.loads(Path(f).read_text()) for f in node_files ]
+ return _cluster_info
+
+
+def connect():
+ cinf = cluster_info()
+ ret = nodes.add_node([n['node_addr'] for n in cinf])
+ for st in ret:
+ if not st.success:
+ raise Exception("Node connect failed", ret)
+ shared.log("all nodes connected")
+
+def create_layout():
+ v = layout.get_layout().version
+
+ cinf = cluster_info()
+ nlay = dict()
+ for n in cinf:
+ nlay[n['node_id']] = NodeClusterInfo(
+ zone = n['zone'],
+ capacity = 1,
+ tags = [ n['host'] ],
+ )
+ layout.add_layout(nlay)
+ layout.apply_layout(LayoutVersion(version=v+1))
+
+def create_key():
+ global key
+ kinfo = shared.fn_retry(lambda: keys.add_key(AddKeyRequest(name="mknet")))
+ allow_create = UpdateKeyRequestAllow(create_bucket=True)
+ keys.update_key(kinfo.access_key_id, UpdateKeyRequest(allow=allow_create))
+ key = kinfo
+
+
+def delete_key():
+ global key
+ keys.delete_key(key.access_key_id)
+ key = None
+
+def sync_on_key_up():
+ global key
+ while True:
+ try:
+ key = keys.search_key("mknet")
+ return key
+ except:
+ pass
+ time.sleep(1)
+
+def sync_on_key_down():
+ while True:
+ try:
+ keys.search_key("mknet")
+ except:
+ return
+ time.sleep(1)
+
diff --git a/scenarios/fragments/minio.py b/scenarios/fragments/minio.py
new file mode 100644
index 0000000..431b983
--- /dev/null
+++ b/scenarios/fragments/minio.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python3
+import json, os, sys, time, pathlib, socket, shutil
+
+STORAGE_PATH = os.path.join(os.getcwd(), '.minio-testnet')
+HOSTS_PATH = os.path.join(STORAGE_PATH, 'hosts.txt')
+UNIX_SOCK = os.path.join(STORAGE_PATH, 'deploy.sock')
+DATA_PATH = lambda nid: os.path.join(STORAGE_PATH, 'data'+str(nid))
+
+def main():
+ if int(os.environ['ID']) == 1: leader()
+ else: follower()
+
+def leader():
+ shutil.rmtree(STORAGE_PATH, ignore_errors=True)
+ os.makedirs(STORAGE_PATH)
+ print(STORAGE_PATH)
+
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.bind(UNIX_SOCK)
+ sock.listen()
+
+ n_serv = int(os.environ['SERVER_COUNT'])
+ fl = [ co for co, addr in [ sock.accept() for i in range(n_serv - 1) ]]
+
+ identities = [ json.loads(co.makefile().readline()) for co in fl ] + [ { "ip": os.environ['IP'], "path": make_data() } ]
+ print(f"ident: {identities}")
+ msg = f"{json.dumps(identities)}\n".encode()
+ [ co.send(msg) for co in fl ]
+
+ run_minio(identities)
+
+def follower():
+ co = None
+ while True:
+ time.sleep(1)
+ try:
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.connect(UNIX_SOCK)
+ co = sock.makefile()
+ break
+ except Exception as err:
+ print('conn failed, wait,', err)
+ my_identity = json.dumps({ "ip": os.environ['IP'], "path": make_data() })
+ sock.send(f"{my_identity}\n".encode())
+ identities = json.loads(co.readline())
+
+ run_minio(identities)
+
+def make_data():
+ data_path = DATA_PATH(os.environ['ID'])
+ os.makedirs(data_path)
+ return data_path
+
+def run_minio(identities):
+ cmd = f"minio server --console-address ':9001' --address ':9000'"
+ for ident in identities:
+ cmd += f" http://[{ident['ip']}]:9000{ident['path']}"
+ cmd += f" > {os.path.join(STORAGE_PATH, 'minio'+os.environ['ID']+'.log')} 2>&1"
+ print("launch: ", cmd)
+ os.system(cmd)
+
+__name__ == '__main__' and main()
diff --git a/scenarios/fragments/s3lat.py b/scenarios/fragments/s3lat.py
new file mode 100644
index 0000000..7582350
--- /dev/null
+++ b/scenarios/fragments/s3lat.py
@@ -0,0 +1,16 @@
+import os
+from os.path import exists
+from pathlib import Path
+from fragments import shared, garage
+
+s3bin = Path(os.path.dirname(__file__)) / "../../benchmarks/s3lat/s3lat"
+
+def on_garage():
+ os.environ['AWS_ACCESS_KEY_ID'] = garage.key.access_key_id
+ os.environ['AWS_SECRET_ACCESS_KEY'] = garage.key.secret_access_key
+ os.environ['ENDPOINT'] = "localhost:3900"
+
+ out = Path(shared.storage_path) / "s3lat.csv"
+ shared.log(f"launching s3lat ({s3bin})")
+ shared.exec(f"{s3bin} > {out}")
+ shared.log(f"execution done, output written to {out}")
diff --git a/scenarios/fragments/shared.py b/scenarios/fragments/shared.py
new file mode 100644
index 0000000..e0cd449
--- /dev/null
+++ b/scenarios/fragments/shared.py
@@ -0,0 +1,28 @@
+import os, time
+
+binary_path = "/tmp/mknet-bin"
+storage_path = "/tmp/mknet-store"
+
+def exec(s):
+ if os.system(s) != 0:
+ raise Exception("Command terminated with an error")
+def exec_retry(s, cnt=16):
+ print(s)
+ for i in range(cnt):
+ time.sleep(i) # this is expected to sleep before running the command to reduce the noise
+ if os.system(s) == 0: return
+ raise Exception("Command terminated with an error too many times")
+def fn_retry(f, cnt=5):
+ for i in range(cnt):
+ try:
+ r = f()
+ return r
+ except Exception as e:
+ if i+1 == cnt: raise e
+ log(f"failed call, retry in {i} sec")
+ time.sleep(i)
+
+def id(): return int(os.environ['ID'])
+def count(): return int(os.environ['SERVER_COUNT'])
+def log(*args): print(f"[{id()}/{count()} - {os.environ['HOST']}]", *args)
+