aboutsummaryrefslogblamecommitdiff
path: root/benchmarks/s3billion/main.go
blob: 7a2cc6c0ebf33c3e62b09e3684df6b7f1817e744 (plain) (tree)















































































































































































                                                                                                                        
package main

import (
	"context"
	"crypto/tls"
	"fmt"
	"io"
	"log"
	"math/rand"
	"net/http"
	"os"
	"strconv"
	"time"

	"github.com/google/uuid"
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
)

type PRNG struct {
	rem int64
}

func (r *PRNG) Read(p []byte) (n int, err error) {
	//log.Printf("rem=%d, buf=%d\n", r.rem, len(p))
	if int64(len(p)) > r.rem {
		p = p[:r.rem]
	}

	if int64(len(p)) > r.rem {
		log.Fatal("LOGIC ERROR")
	}

	n, err = rand.Read(p)
	if err != nil {
		return
	}
	r.rem -= int64(n)
	if r.rem <= 0 {
		err = io.EOF
		//log.Printf("PRNG file has been fully read. rem=%d,n=%d,err=%s\n", r.rem, n, err)
	}
	return
}

func putObj(mc *minio.Client, buck string, size int64) error {
	prng := new(PRNG)
	prng.rem = size

	key := uuid.New().String()

	_, err := mc.PutObject(
		context.Background(),
		buck,
		key,
		prng,
		size,
		minio.PutObjectOptions{ContentType: "application/octet-stream"},
	)

	return err
}

func main() {
	fmt.Printf("total_objects,batch_dur_nanoseconds\n")

	minio.MaxRetry = 1

	_, isSSL := os.LookupEnv("SSL")
	opts := minio.Options{
		Creds:  credentials.NewStaticV4(os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY"), ""),
		Secure: isSSL,
	}

	if region, ok := os.LookupEnv("AWS_REGION"); ok {
		opts.Region = region
	}

	if _, ok := os.LookupEnv("SSL_INSECURE"); ok {
		opts.Transport = &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
	}

	mc, err := minio.New(os.Getenv("ENDPOINT"), &opts)
	if err != nil {
		log.Fatal("failed connect", err)
		return
	}

	thread := 32
	if env_thread, ok := os.LookupEnv("THREAD"); ok {
		tmp, err := strconv.Atoi(env_thread)
		if err != nil {
			log.Fatalf("invalid value for THREAD: %v\n", env_thread)
		}
		thread = tmp
	}

	batch_size := 256
	if env_batch_size, ok := os.LookupEnv("BATCH_SIZE"); ok {
		tmp, err := strconv.Atoi(env_batch_size)
		if err != nil {
			log.Fatalf("invalid value for BATCH_SIZE: %v\n", env_batch_size)
		}
		batch_size = tmp
	}

	batch_count := 128
	if env_batch_count, ok := os.LookupEnv("BATCH_COUNT"); ok {
		tmp, err := strconv.Atoi(env_batch_count)
		if err != nil {
			log.Fatalf("invalid value for BATCH_COUNT: %v\n", env_batch_count)
		}
		batch_count = tmp
	}

	obj_size := 16
	if env_obj_size, ok := os.LookupEnv("OBJ_SIZE"); ok {
		tmp, err := strconv.Atoi(env_obj_size)
		if err != nil {
			log.Fatalf("invalid value for OBJ_SIZE: %v\n", env_obj_size)
		}
		obj_size = tmp
	}

	total_obj := thread * batch_size * batch_count
	total_size := total_obj * obj_size
	log.Printf("if bench succeed, %v objects (%v bytes) will be created\n", total_obj, total_size)

	// Create Bucket
	buck := uuid.New().String()
	err = mc.MakeBucket(context.Background(), buck, minio.MakeBucketOptions{})
	if err != nil {
		log.Fatal(err)
		return
	}
	log.Printf("created bucket %s\n", buck)

	// Start sending...
	for bc := 0; bc < batch_count; bc++ {
		log.Printf("batch %d/%d - start\n", bc+1, batch_count)

		start := time.Now()
		syn := make(chan error)

		for tc := 0; tc < thread; tc++ {
			go func() {
				for bs := 0; bs < batch_size; bs++ {
					err := putObj(mc, buck, int64(obj_size))
					if err != nil {
						syn <- err
						return
					}
				}
				syn <- nil

			}()
		}
		log.Printf("batch %d/%d - all threads started\n", bc+1, batch_count)

		errCount := 0
		for tc := 0; tc < thread; tc++ {
			cerr := <-syn
			if cerr != nil {
				errCount += 1
				log.Printf("thread %d/%d failed with %s\n", tc, thread, cerr)
			}
		}
		if errCount > 0 {
			log.Fatal("Too many errors, exiting...")
			return
		}
		elapsed := time.Since(start)
		fmt.Printf("%d,%v\n", bc * thread * batch_size, elapsed.Nanoseconds())
		log.Printf("batch %d/%d - all threads returned\n", bc+1, batch_count)
	}
}