From c77d8dcfa7333b32e07df7326b8943838f34a59b Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 14 Sep 2022 18:01:44 +0200 Subject: Working s3lat with Garage --- scenarios/fragments/.shared.py.swo | Bin 0 -> 12288 bytes scenarios/fragments/garage.py | 201 +++++++++++++++++++++++++++++++++++++ scenarios/fragments/minio.py | 62 ++++++++++++ scenarios/fragments/s3lat.py | 16 +++ scenarios/fragments/shared.py | 28 ++++++ 5 files changed, 307 insertions(+) create mode 100644 scenarios/fragments/.shared.py.swo create mode 100644 scenarios/fragments/garage.py create mode 100644 scenarios/fragments/minio.py create mode 100644 scenarios/fragments/s3lat.py create mode 100644 scenarios/fragments/shared.py (limited to 'scenarios/fragments') diff --git a/scenarios/fragments/.shared.py.swo b/scenarios/fragments/.shared.py.swo new file mode 100644 index 0000000..02b9acb Binary files /dev/null and b/scenarios/fragments/.shared.py.swo differ diff --git a/scenarios/fragments/garage.py b/scenarios/fragments/garage.py new file mode 100644 index 0000000..cd8b888 --- /dev/null +++ b/scenarios/fragments/garage.py @@ -0,0 +1,201 @@ +import glob, json, requests, time, garage_admin_sdk +from os.path import exists +from os import environ as env +from pathlib import Path +from fragments import shared +from garage_admin_sdk.api import nodes_api, layout_api, key_api +from garage_admin_sdk.model.node_cluster_info import NodeClusterInfo +from garage_admin_sdk.model.layout_version import LayoutVersion +from garage_admin_sdk.model.add_key_request import AddKeyRequest +from garage_admin_sdk.model.update_key_request import UpdateKeyRequest +from garage_admin_sdk.model.update_key_request_allow import UpdateKeyRequestAllow + +storage_path = "./i/am/not/defined" +rpc_secret = "3e9abff5f9e480afbadb46a77b7a26fe0e404258f0dc3fd5386b0ba8e0ad2fba" +metrics = "cacce0b2de4bc2d9f5b5fdff551e01ac1496055aed248202d415398987e35f81" +admin = "ae8cb40ea7368bbdbb6430af11cca7da833d3458a5f52086f4e805a570fb5c2a" +path = None +key = None + + +configuration = garage_admin_sdk.Configuration( + host = "http://localhost:3903/v0", + access_token = admin +) +api = garage_admin_sdk.ApiClient(configuration) +nodes = nodes_api.NodesApi(api) +layout = layout_api.LayoutApi(api) +keys = key_api.KeyApi(api) + + +# Setup, launch on import +storage_path = Path(shared.storage_path) / "garage" / env['HOST'] +if 'ZONE' in env: + storage_path = Path(shared.storage_path) / "garage" / env['ZONE'] / env['HOST'] +config = storage_path / "garage.toml" +env['GARAGE_CONFIG_FILE'] = str(config) + +def deploy_coord(version=None, target=None): + destroy() + from_ci(version, target) + shared.log("start daemon") + daemon() + shared.log("discover nodes") + connect() + shared.log("build layout") + create_layout() + shared.log("create key") + create_key() + shared.log("ready") + +def deploy_follow(version=None, target=None): + destroy() + from_ci(version, target) + shared.log("start daemon") + daemon() + shared.log("wait for coord") + sync_on_key_up() + shared.log("ready") + +def from_local(p): + global path + path = p + shared.exec(f"{p} --version") + +def from_ci(version=None, target=None): + global path + version = version or "v0.7.3" + target = target or "x86_64-unknown-linux-musl" + + binary = f"garage-{target}-{version}" + path = Path(shared.binary_path) / binary + if shared.id() != 1: return + + if not exists(path): + shared.exec(f"mkdir -p {shared.binary_path}") + shared.exec(f"wget https://garagehq.deuxfleurs.fr/_releases/{version}/{target}/garage -O {path}") + shared.exec(f"chmod +x {path}") + shared.exec(f"{path} --version") + +def daemon(): + shared.exec(f"mkdir -p {storage_path}") + with open(config, 'w+') as f: + f.write(f""" +metadata_dir = "{storage_path}/meta" +data_dir = "{storage_path}/data" + +replication_mode = "3" + +rpc_bind_addr = "[::]:3901" +rpc_public_addr = "[{env['IP']}]:3901" +rpc_secret = "{rpc_secret}" + +bootstrap_peers=[] + +[s3_api] +s3_region = "garage" +api_bind_addr = "[::]:3900" +root_domain = ".s3.garage" + +[s3_web] +bind_addr = "[::]:3902" +root_domain = ".web.garage" +index = "index.html" + +[admin] +api_bind_addr = "0.0.0.0:3903" +metrics_token = "{metrics}" +admin_token = "{admin}" + """) + + shared.exec(f"{path} server 2>> {storage_path}/logs.stderr 1>> {storage_path}/logs.stdout & echo $! > {storage_path}/daemon.pid") + time.sleep(1) + + node_info = storage_path / "node_info" + node_id = shared.fn_retry(lambda: nodes.get_nodes().node) + with open(node_info, 'w+') as f: + f.write(json.dumps({ + "node_addr": f"{node_id}@{env['IP']}:3901", + "node_id": node_id, + "zone": env['ZONE'], + "host": env['HOST'], + })) + +def destroy(): + dpid = Path(storage_path) / "daemon.pid" + if exists(dpid): + shared.exec(f"kill -9 $(cat {dpid})") + shared.exec(f"rm -f {dpid}") + if len(str(storage_path)) < 8: # arbitrary, stupid safe guard + print(storage_path) + raise Exception("You tried to clean a storage path that might be the root of your FS, panicking...") + shared.exec(f"rm -fr {storage_path}") + +# this function is ugly, sorry :s +_cluster_info = None +def cluster_info(): + global _cluster_info + if _cluster_info is not None: return _cluster_info + + while True: + time.sleep(1) + node_files = glob.glob(f"{shared.storage_path}/**/node_info", recursive=True) + if len(node_files) == shared.count(): break + + _cluster_info = [ json.loads(Path(f).read_text()) for f in node_files ] + return _cluster_info + + +def connect(): + cinf = cluster_info() + ret = nodes.add_node([n['node_addr'] for n in cinf]) + for st in ret: + if not st.success: + raise Exception("Node connect failed", ret) + shared.log("all nodes connected") + +def create_layout(): + v = layout.get_layout().version + + cinf = cluster_info() + nlay = dict() + for n in cinf: + nlay[n['node_id']] = NodeClusterInfo( + zone = n['zone'], + capacity = 1, + tags = [ n['host'] ], + ) + layout.add_layout(nlay) + layout.apply_layout(LayoutVersion(version=v+1)) + +def create_key(): + global key + kinfo = shared.fn_retry(lambda: keys.add_key(AddKeyRequest(name="mknet"))) + allow_create = UpdateKeyRequestAllow(create_bucket=True) + keys.update_key(kinfo.access_key_id, UpdateKeyRequest(allow=allow_create)) + key = kinfo + + +def delete_key(): + global key + keys.delete_key(key.access_key_id) + key = None + +def sync_on_key_up(): + global key + while True: + try: + key = keys.search_key("mknet") + return key + except: + pass + time.sleep(1) + +def sync_on_key_down(): + while True: + try: + keys.search_key("mknet") + except: + return + time.sleep(1) + diff --git a/scenarios/fragments/minio.py b/scenarios/fragments/minio.py new file mode 100644 index 0000000..431b983 --- /dev/null +++ b/scenarios/fragments/minio.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +import json, os, sys, time, pathlib, socket, shutil + +STORAGE_PATH = os.path.join(os.getcwd(), '.minio-testnet') +HOSTS_PATH = os.path.join(STORAGE_PATH, 'hosts.txt') +UNIX_SOCK = os.path.join(STORAGE_PATH, 'deploy.sock') +DATA_PATH = lambda nid: os.path.join(STORAGE_PATH, 'data'+str(nid)) + +def main(): + if int(os.environ['ID']) == 1: leader() + else: follower() + +def leader(): + shutil.rmtree(STORAGE_PATH, ignore_errors=True) + os.makedirs(STORAGE_PATH) + print(STORAGE_PATH) + + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(UNIX_SOCK) + sock.listen() + + n_serv = int(os.environ['SERVER_COUNT']) + fl = [ co for co, addr in [ sock.accept() for i in range(n_serv - 1) ]] + + identities = [ json.loads(co.makefile().readline()) for co in fl ] + [ { "ip": os.environ['IP'], "path": make_data() } ] + print(f"ident: {identities}") + msg = f"{json.dumps(identities)}\n".encode() + [ co.send(msg) for co in fl ] + + run_minio(identities) + +def follower(): + co = None + while True: + time.sleep(1) + try: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(UNIX_SOCK) + co = sock.makefile() + break + except Exception as err: + print('conn failed, wait,', err) + my_identity = json.dumps({ "ip": os.environ['IP'], "path": make_data() }) + sock.send(f"{my_identity}\n".encode()) + identities = json.loads(co.readline()) + + run_minio(identities) + +def make_data(): + data_path = DATA_PATH(os.environ['ID']) + os.makedirs(data_path) + return data_path + +def run_minio(identities): + cmd = f"minio server --console-address ':9001' --address ':9000'" + for ident in identities: + cmd += f" http://[{ident['ip']}]:9000{ident['path']}" + cmd += f" > {os.path.join(STORAGE_PATH, 'minio'+os.environ['ID']+'.log')} 2>&1" + print("launch: ", cmd) + os.system(cmd) + +__name__ == '__main__' and main() diff --git a/scenarios/fragments/s3lat.py b/scenarios/fragments/s3lat.py new file mode 100644 index 0000000..7582350 --- /dev/null +++ b/scenarios/fragments/s3lat.py @@ -0,0 +1,16 @@ +import os +from os.path import exists +from pathlib import Path +from fragments import shared, garage + +s3bin = Path(os.path.dirname(__file__)) / "../../benchmarks/s3lat/s3lat" + +def on_garage(): + os.environ['AWS_ACCESS_KEY_ID'] = garage.key.access_key_id + os.environ['AWS_SECRET_ACCESS_KEY'] = garage.key.secret_access_key + os.environ['ENDPOINT'] = "localhost:3900" + + out = Path(shared.storage_path) / "s3lat.csv" + shared.log(f"launching s3lat ({s3bin})") + shared.exec(f"{s3bin} > {out}") + shared.log(f"execution done, output written to {out}") diff --git a/scenarios/fragments/shared.py b/scenarios/fragments/shared.py new file mode 100644 index 0000000..e0cd449 --- /dev/null +++ b/scenarios/fragments/shared.py @@ -0,0 +1,28 @@ +import os, time + +binary_path = "/tmp/mknet-bin" +storage_path = "/tmp/mknet-store" + +def exec(s): + if os.system(s) != 0: + raise Exception("Command terminated with an error") +def exec_retry(s, cnt=16): + print(s) + for i in range(cnt): + time.sleep(i) # this is expected to sleep before running the command to reduce the noise + if os.system(s) == 0: return + raise Exception("Command terminated with an error too many times") +def fn_retry(f, cnt=5): + for i in range(cnt): + try: + r = f() + return r + except Exception as e: + if i+1 == cnt: raise e + log(f"failed call, retry in {i} sec") + time.sleep(i) + +def id(): return int(os.environ['ID']) +def count(): return int(os.environ['SERVER_COUNT']) +def log(*args): print(f"[{id()}/{count()} - {os.environ['HOST']}]", *args) + -- cgit v1.2.3