aboutsummaryrefslogtreecommitdiff
path: root/scenarios/fragments/minio.py
diff options
context:
space:
mode:
Diffstat (limited to 'scenarios/fragments/minio.py')
-rw-r--r--scenarios/fragments/minio.py136
1 files changed, 105 insertions, 31 deletions
diff --git a/scenarios/fragments/minio.py b/scenarios/fragments/minio.py
index 431b983..ab6f476 100644
--- a/scenarios/fragments/minio.py
+++ b/scenarios/fragments/minio.py
@@ -1,62 +1,136 @@
#!/usr/bin/env python3
-import json, os, sys, time, pathlib, socket, shutil
+import json, os, sys, time, pathlib, socket, shutil, urllib3
+import minio
+from os import environ as env
+from pathlib import Path
+from . import shared, flavor
-STORAGE_PATH = os.path.join(os.getcwd(), '.minio-testnet')
-HOSTS_PATH = os.path.join(STORAGE_PATH, 'hosts.txt')
-UNIX_SOCK = os.path.join(STORAGE_PATH, 'deploy.sock')
-DATA_PATH = lambda nid: os.path.join(STORAGE_PATH, 'data'+str(nid))
+storage_path = "./i/am/not/defined"
+version = flavor.minio["minio-20220917"]
+unix_sock = str(Path(shared.storage_path) / "minio.sock")
+access_key = "minioadmin"
+secret_key = "minioadmin"
+client = minio.Minio(
+ f"[{env['IP']}]:9000",
+ access_key="minioadmin",
+ secret_key="minioadmin",
+ secure=False,
+ http_client=urllib3.PoolManager(
+ timeout=5,
+ retries=1,
+ )
+)
-def main():
- if int(os.environ['ID']) == 1: leader()
- else: follower()
+if 'HOST' in env:
+ storage_path = str(Path(shared.storage_path) / "minio" / env['HOST'])
+ if 'ZONE' in env and env['ZONE'] != "":
+ storage_path = str(Path(shared.storage_path) / "minio" / env['ZONE'] / env['HOST'])
-def leader():
- shutil.rmtree(STORAGE_PATH, ignore_errors=True)
- os.makedirs(STORAGE_PATH)
- print(STORAGE_PATH)
+stdout = Path(storage_path) / "minio.stdout"
+stderr = Path(storage_path) / "minio.stderr"
+pid = Path(storage_path) / "daemon.pid"
+
+def destroy():
+ if os.path.exists(pid):
+ try:
+ shared.exec(f"kill -9 `cat {pid}`")
+ except:
+ pass
+
+ if len(str(storage_path)) > 8:
+ shutil.rmtree(storage_path, ignore_errors=True)
+
+def deploy_coord():
+ destroy()
+ if os.path.exists(unix_sock):
+ os.unlink(unix_sock)
+
+ os.makedirs(storage_path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock.bind(UNIX_SOCK)
+ sock.bind(unix_sock)
sock.listen()
- n_serv = int(os.environ['SERVER_COUNT'])
- fl = [ co for co, addr in [ sock.accept() for i in range(n_serv - 1) ]]
+ # Create sockets
+ fl = [ co for co, addr in [ sock.accept() for i in range(1, shared.count()) ]]
+
+ # Receive configurations, centralize them
+ me = [ { "ip": os.environ['IP'], "path": storage_path } ]
+ others = [ json.loads(co.makefile().readline()) for co in fl ]
+ identities = others + me
+ shared.log(f"ident: {identities}")
- identities = [ json.loads(co.makefile().readline()) for co in fl ] + [ { "ip": os.environ['IP'], "path": make_data() } ]
- print(f"ident: {identities}")
+ # Dispatch them
msg = f"{json.dumps(identities)}\n".encode()
[ co.send(msg) for co in fl ]
run_minio(identities)
-def follower():
+ while True:
+ try:
+ if client.bucket_exists("sync"): break
+ client.make_bucket("sync")
+ break
+ except Exception as e:
+ shared.log("waiting for bootstrap...", e)
+ time.sleep(1)
+ shared.log("ready")
+
+def deploy_follow():
+ destroy()
+ os.makedirs(storage_path)
+
co = None
while True:
- time.sleep(1)
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- sock.connect(UNIX_SOCK)
+ sock.connect(unix_sock)
co = sock.makefile()
break
except Exception as err:
- print('conn failed, wait,', err)
- my_identity = json.dumps({ "ip": os.environ['IP'], "path": make_data() })
+ shared.log('conn failed, wait 1 sec, err is', err)
+ time.sleep(1)
+
+ # send my identity
+ my_identity = json.dumps({ "ip": os.environ['IP'], "path": storage_path })
sock.send(f"{my_identity}\n".encode())
+
+ # get all
identities = json.loads(co.readline())
run_minio(identities)
+ sync_on_bucket_up()
+ shared.log("ready")
+
+def sync_on_bucket_up():
+ while True:
+ try:
+ if client.bucket_exists("sync"): break
+ except:
+ pass
+ shared.log("waiting for bucket 'sync'...")
+ time.sleep(1)
+
+def sync_on_bucket_down():
+ while True:
+ if not client.bucket_exists("sync"): break
+ time.sleep(1)
-def make_data():
- data_path = DATA_PATH(os.environ['ID'])
- os.makedirs(data_path)
- return data_path
+def delete_sync_bucket():
+ client.remove_bucket("sync")
def run_minio(identities):
- cmd = f"minio server --console-address ':9001' --address ':9000'"
+
+ # Required to prevent Minio error: "/tmp/mknet-store/minio/node1` is part of root drive, will not be used"
+ # https://github.com/minio/minio/issues/15720
+ env['CI'] = "true"
+ env['MINIO_CI_CD'] = "true"
+
+ cmd = f"{version['path']} server --console-address ':9001' --address ':9000'"
for ident in identities:
cmd += f" http://[{ident['ip']}]:9000{ident['path']}"
- cmd += f" > {os.path.join(STORAGE_PATH, 'minio'+os.environ['ID']+'.log')} 2>&1"
- print("launch: ", cmd)
- os.system(cmd)
+ cmd += f" > {stdout} 2> {stderr}"
+ cmd += f" & echo $! > {pid}"
-__name__ == '__main__' and main()
+ shared.log("launch: ", cmd)
+ os.system(cmd)