import glob, json, time from os.path import exists from os import environ as env from pathlib import Path from . import shared, flavor import garage_admin_sdk from garage_admin_sdk.api import nodes_api, layout_api, key_api from garage_admin_sdk.model.node_cluster_info import NodeClusterInfo from garage_admin_sdk.model.layout_version import LayoutVersion from garage_admin_sdk.model.add_key_request import AddKeyRequest from garage_admin_sdk.model.update_key_request import UpdateKeyRequest from garage_admin_sdk.model.update_key_request_allow import UpdateKeyRequestAllow storage_path = "./i/am/not/defined" rpc_secret = "3e9abff5f9e480afbadb46a77b7a26fe0e404258f0dc3fd5386b0ba8e0ad2fba" metrics = "cacce0b2de4bc2d9f5b5fdff551e01ac1496055aed248202d415398987e35f81" admin = "ae8cb40ea7368bbdbb6430af11cca7da833d3458a5f52086f4e805a570fb5c2a" key = None version = flavor.garage["garage-v0.7"] configuration = garage_admin_sdk.Configuration( host = "http://localhost:3903/v0", access_token = admin ) api = garage_admin_sdk.ApiClient(configuration) nodes = nodes_api.NodesApi(api) layout = layout_api.LayoutApi(api) keys = key_api.KeyApi(api) # Setup, launch on import if 'HOST' in env: storage_path = Path(shared.storage_path) / "garage" / env['HOST'] if 'ZONE' in env and env['ZONE'] != "": storage_path = Path(shared.storage_path) / "garage" / env['ZONE'] / env['HOST'] config = storage_path / "garage.toml" env['GARAGE_CONFIG_FILE'] = str(config) def deploy_coord(gw=None, uroot={}, us3_api={}, us3_web={}, uadmin={}): destroy() shared.log("start daemon") shared.exec(f"{version['path']} --version") daemon(uroot,us3_api,us3_web,uadmin) shared.log("discover nodes") connect() shared.log("build layout") create_layout(gw=gw) shared.log("create key") create_key() shared.log("ready") def deploy_follow(uroot={}, us3_api={}, us3_web={}, uadmin={}): destroy() shared.log("start daemon") daemon(uroot,us3_api,us3_web,uadmin) shared.log("wait for coord") sync_on_key_up() shared.log("ready") def to_toml(d): return "\n".join([ f"{k} = {v}" if type(v) is int else f"{k} = \"{v}\"" for k, v in d.items() ]) def daemon(uroot={}, us3_api={}, us3_web={}, uadmin={}): root = { "metadata_dir": f"{storage_path}/meta", "data_dir": f"{storage_path}/data", "replication_mode": "3", "rpc_bind_addr": "[::]:3901", "rpc_public_addr": f"[{env['IP']}]:3901", "rpc_secret": f"{rpc_secret}", } root.update(uroot) s3_api = { "s3_region": "garage", "api_bind_addr": "[::]:3900", "root_domain": ".s3.garage", } s3_api.update(us3_api) s3_web = { "bind_addr": "[::]:3902", "root_domain": ".web.garage", "index": "index.html", } s3_web.update(us3_web) sect_admin = { "api_bind_addr": "0.0.0.0:3903", "metrics_token": f"{metrics}", "admin_token": f"{admin}", } sect_admin.update(uadmin) shared.exec(f"mkdir -p {storage_path}") with open(config, 'w+') as f: f.write(f""" {to_toml(root)} bootstrap_peers = [] [s3_api] {to_toml(s3_api)} [s3_web] {to_toml(s3_web)} [admin] {to_toml(sect_admin)} """) if shared.id() == 1: shared.exec(f"{version['path']} --version") shared.exec(f"{version['path']} server 2>> {storage_path}/logs.stderr 1>> {storage_path}/logs.stdout & echo $! > {storage_path}/daemon.pid") time.sleep(1) node_info = storage_path / "node_info" node_id = shared.fn_retry(lambda: nodes.get_nodes().node) with open(node_info, 'w+') as f: f.write(json.dumps({ "node_addr": f"{node_id}@{env['IP']}:3901", "node_id": node_id, "zone": env['ZONE'] if 'ZONE' in env and env['ZONE'] != "" else env['HOST'], "host": env['HOST'], })) def destroy(): dpid = Path(storage_path) / "daemon.pid" if exists(dpid): try: shared.exec(f"kill -9 $(cat {dpid})") except: pass shared.exec(f"rm -f {dpid}") if len(str(storage_path)) < 8: # arbitrary, stupid safe guard print(storage_path) raise Exception("You tried to clean a storage path that might be the root of your FS, panicking...") shared.exec(f"rm -fr {storage_path}") # this function is ugly, sorry :s _cluster_info = None def cluster_info(): global _cluster_info if _cluster_info is not None: return _cluster_info shared.log("fetch cluster info") while True: node_files = glob.glob(f"{shared.storage_path}/**/node_info", recursive=True) if len(node_files) >= shared.count(): break shared.log(f"found {len(node_files)} over {shared.count()}, wait 1 sec.") time.sleep(1) _cluster_info = [ json.loads(Path(f).read_text()) for f in node_files ] return _cluster_info def connect(): cinf = cluster_info() shared.log("start connections...") while True: try: ret = nodes.add_node([n['node_addr'] for n in cinf], _request_timeout=3) except: shared.log("not ready, retry in 1sec") time.sleep(1) continue for st in ret: if not st.success: continue #raise Exception("Node connect failed", ret) break shared.log("all nodes connected") def create_layout(gw=None): if gw is None: gw = [] v = layout.get_layout().version cinf = cluster_info() nlay = dict() for n in cinf: capa = 1 if n['host'] in gw: capa = None nlay[n['node_id']] = NodeClusterInfo( zone = n['zone'], capacity = capa, tags = [ n['host'] ], ) layout.add_layout(nlay) layout.apply_layout(LayoutVersion(version=v+1)) shared.log(layout.get_layout()) def create_key(): global key kinfo = shared.fn_retry(lambda: keys.add_key(AddKeyRequest(name="mknet"))) allow_create = UpdateKeyRequestAllow(create_bucket=True) keys.update_key(kinfo.access_key_id, UpdateKeyRequest(allow=allow_create)) key = kinfo def delete_key(): global key keys.delete_key(key.access_key_id) key = None def sync_on_key_up(): global key while True: try: key = keys.search_key("mknet") return key except: pass time.sleep(1) def sync_on_key_down(): while True: try: keys.search_key("mknet") except: return time.sleep(1)