#!/usr/bin/env python3
import json, os, sys, time, pathlib, socket, shutil, urllib3
import minio
from os import environ as env
from pathlib import Path
from . import shared, flavor
storage_path = "./i/am/not/defined"
version = flavor.minio["minio-20220917"]
unix_sock = str(Path(shared.storage_path) / "minio.sock")
access_key = "minioadmin"
secret_key = "minioadmin"
client = minio.Minio(
f"[{env['IP']}]:9000",
access_key="minioadmin",
secret_key="minioadmin",
secure=False,
http_client=urllib3.PoolManager(
timeout=2,
retries=False,
)
)
if 'HOST' in env:
storage_path = str(Path(shared.storage_path) / "minio" / env['HOST'])
if 'ZONE' in env and env['ZONE'] != "":
storage_path = str(Path(shared.storage_path) / "minio" / env['ZONE'] / env['HOST'])
stdout = Path(storage_path) / "minio.stdout"
stderr = Path(storage_path) / "minio.stderr"
pid = Path(storage_path) / "daemon.pid"
def destroy():
if os.path.exists(pid):
try:
shared.exec(f"kill -9 `cat {pid}`")
except:
pass
if len(str(storage_path)) > 8:
shutil.rmtree(storage_path, ignore_errors=True)
def deploy_coord():
destroy()
if os.path.exists(unix_sock):
os.unlink(unix_sock)
os.makedirs(storage_path)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(unix_sock)
sock.listen()
# Create sockets
fl = [ co for co, addr in [ sock.accept() for i in range(1, shared.count()) ]]
# Receive configurations, centralize them
me = [ { "ip": os.environ['IP'], "path": storage_path } ]
others = [ json.loads(co.makefile().readline()) for co in fl ]
identities = others + me
shared.log(f"ident: {identities}")
# Dispatch them
msg = f"{json.dumps(identities)}\n".encode()
[ co.send(msg) for co in fl ]
run_minio(identities)
while True:
try:
if client.bucket_exists("sync"): break
client.make_bucket("sync")
break
except Exception as e:
shared.log("waiting for bootstrap...", e)
time.sleep(1)
shared.log("ready")
def deploy_follow():
destroy()
os.makedirs(storage_path)
co = None
while True:
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.connect(unix_sock)
co = sock.makefile()
break
except Exception as err:
shared.log('conn failed, wait 1 sec, err is', err)
time.sleep(1)
# send my identity
my_identity = json.dumps({ "ip": os.environ['IP'], "path": storage_path })
sock.send(f"{my_identity}\n".encode())
# get all
identities = json.loads(co.readline())
run_minio(identities)
sync_on_bucket_up()
shared.log("ready")
def sync_on_bucket_up():
while True:
try:
if client.bucket_exists("sync"): break
except:
pass
shared.log("waiting for bucket 'sync'...")
time.sleep(1)
def sync_on_bucket_down():
while True:
try:
if not client.bucket_exists("sync"): break
time.sleep(1)
except Exception as e:
shared.log("the cluster is probably already half shutdown, so errors are expected ->", e)
break
def delete_sync_bucket():
client.remove_bucket("sync")
def run_minio(identities):
# Required to prevent Minio error: "/tmp/mknet-store/minio/node1` is part of root drive, will not be used"
# https://github.com/minio/minio/issues/15720
env['CI'] = "true"
env['MINIO_CI_CD'] = "true"
cmd = f"{version['path']} server --console-address ':9001' --address ':9000'"
for ident in identities:
cmd += f" http://[{ident['ip']}]:9000{ident['path']}"
cmd += f" > {stdout} 2> {stderr}"
cmd += f" & echo $! > {pid}"
shared.log("launch: ", cmd)
os.system(cmd)