aboutsummaryrefslogblamecommitdiff
path: root/scenarios/fragments/minio.py
blob: 8b0e97066da0477920d3f592dc0c65ade9a4b627 (plain) (tree)
1
2
3
4
5
6
7
                      




                                                            
 










                                                         

                      

     
 



                                                                                           
 



















                                                       

                                                            
                        

                 







                                                                                  
 
                   




                                                













                                                     

               

                                                                    
                                   


                                




                                                                              
                                          

             


                                          













                                                  





                                                                                                     
 

                                

                          






                                                                                                              

                                                             

                                     
 

                               
#!/usr/bin/env python3
import json, os, sys, time, pathlib, socket, shutil, urllib3
import minio
from os import environ as env
from pathlib import Path
from . import shared, flavor

storage_path = "./i/am/not/defined"
version = flavor.minio["minio-20220917"]
unix_sock = str(Path(shared.storage_path) / "minio.sock")
access_key = "minioadmin"
secret_key = "minioadmin"
client = minio.Minio(
    f"[{env['IP']}]:9000",
    access_key="minioadmin",
    secret_key="minioadmin",
    secure=False,
    http_client=urllib3.PoolManager(
        timeout=2,
        retries=False,
    )
)

if 'HOST' in env:
    storage_path = str(Path(shared.storage_path) / "minio" / env['HOST'])
    if 'ZONE' in env and env['ZONE'] != "":
        storage_path = str(Path(shared.storage_path) / "minio" / env['ZONE'] / env['HOST'])

stdout = Path(storage_path) / "minio.stdout"
stderr = Path(storage_path) / "minio.stderr"
pid = Path(storage_path) / "daemon.pid"

def destroy():
    if os.path.exists(pid):
        try:
            shared.exec(f"kill -9 `cat {pid}`")
        except:
            pass

    if len(str(storage_path)) > 8:
        shutil.rmtree(storage_path, ignore_errors=True)

def deploy_coord():
    destroy()
    if os.path.exists(unix_sock):
        os.unlink(unix_sock)

    os.makedirs(storage_path)

    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    sock.bind(unix_sock)
    sock.listen()

    # Create sockets
    fl = [ co for co, addr in [ sock.accept() for i in range(1, shared.count()) ]]

    # Receive configurations, centralize them
    me = [ { "ip": os.environ['IP'], "path": storage_path } ]
    others = [ json.loads(co.makefile().readline()) for co in fl ]
    identities = others + me
    shared.log(f"ident: {identities}")

    # Dispatch them
    msg = f"{json.dumps(identities)}\n".encode()
    [ co.send(msg) for co in fl ]

    run_minio(identities)

    while True:
        try:
            if client.bucket_exists("sync"): break
            client.make_bucket("sync")
            break
        except Exception as e: 
            shared.log("waiting for bootstrap...", e)
            time.sleep(1)
    shared.log("ready")

def deploy_follow():
    destroy()
    os.makedirs(storage_path)

    co = None
    while True:
        try:
            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            sock.connect(unix_sock)
            co = sock.makefile()
            break
        except Exception as err:
            shared.log('conn failed, wait 1 sec, err is', err)
            time.sleep(1)
    
    # send my identity
    my_identity = json.dumps({ "ip": os.environ['IP'], "path": storage_path })
    sock.send(f"{my_identity}\n".encode())

    # get all
    identities = json.loads(co.readline())

    run_minio(identities)
    sync_on_bucket_up()
    shared.log("ready")

def sync_on_bucket_up():
    while True:
        try:
            if client.bucket_exists("sync"): break
        except:
            pass
        shared.log("waiting for bucket 'sync'...")
        time.sleep(1)

def sync_on_bucket_down():
    while True:
        try:
            if not client.bucket_exists("sync"): break
            time.sleep(1)
        except Exception as e:
            shared.log("the cluster is probably already half shutdown, so errors are expected ->", e)
            break

def delete_sync_bucket():
    client.remove_bucket("sync")

def run_minio(identities):

    # Required to prevent Minio error: "/tmp/mknet-store/minio/node1` is part of root drive, will not be used"
    # https://github.com/minio/minio/issues/15720
    env['CI'] = "true"
    env['MINIO_CI_CD'] = "true"

    cmd = f"{version['path']} server --console-address ':9001' --address ':9000'"
    for ident in identities:
        cmd += f" http://[{ident['ip']}]:9000{ident['path']}"
    cmd += f" > {stdout} 2> {stderr}"
    cmd += f" & echo $! > {pid}"

    shared.log("launch: ", cmd)
    os.system(cmd)