aboutsummaryrefslogtreecommitdiff
path: root/benchmarks/s3concurrent/main.go
blob: a67fc5b0d5183de9691cf5529418925cc94325bb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package main

import (
    "context"
    "crypto/tls"
    "io"
    "log"
    "math/rand"
    "net/http"
    "os"

	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
    "github.com/google/uuid"
)

func buildMc() (*minio.Client, error) {
	_, isSSL := os.LookupEnv("SSL")
	opts := minio.Options{
		Creds:  credentials.NewStaticV4(os.Getenv("AWS_ACCESS_KEY_ID"), os.Getenv("AWS_SECRET_ACCESS_KEY"), ""),
		Secure: isSSL,
	}

	if region, ok := os.LookupEnv("REGION"); ok {
		opts.Region = region
	}

	if _, ok := os.LookupEnv("SSL_INSECURE"); ok {
		opts.Transport = &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
	}

	mc, err := minio.New(os.Getenv("ENDPOINT"), &opts)
    return mc, err
}

type PRNG struct { 
    rem int64
}
func (r *PRNG)  Read(p []byte) (n int, err error) {
    //log.Printf("rem=%d, buf=%d\n", r.rem, len(p))
    if int64(len(p)) > r.rem {
        p = p[:r.rem]
    }

    if int64(len(p)) > r.rem {
        log.Fatal("LOGIC ERROR")
    }

    n, err = rand.Read(p)
    if err != nil {
        return
    }
    r.rem -= int64(n)
    if r.rem <= 0 {
        err = io.EOF
        //log.Printf("PRNG file has been fully read. rem=%d,n=%d,err=%s\n", r.rem, n, err)
    }
    return
}

func putObj(buck string, size int64) error {
    mc, err := buildMc()
    if err != nil {
        return err
    }

    prng := new(PRNG)
    prng.rem = size

    key := uuid.New().String()

    _, err = mc.PutObject(
        context.Background(), 
        buck, 
        key, 
        prng, 
        size, 
        minio.PutObjectOptions{ContentType:"application/octet-stream"},
    )

    return err
}

func main() {
    minio.MaxRetry = 1
    mc, err := buildMc()
	if err != nil {
		log.Fatal("failed connect", err)
		return
	}

    // Create Bucket
    buck := uuid.New().String()
    err = mc.MakeBucket(context.Background(), buck, minio.MakeBucketOptions{ })
    if err != nil {
        log.Fatal(err)
        return
    }
    log.Printf("created bucket %s\n", buck)

    // Send to bucket
    for i := 1; i <= 16; i++ {
        log.Printf("start concurrent loop with %d coroutines\n", i)
        syn := make(chan error)
        for j := 1; j <= i; j++ {
            go func() {
                syn <- putObj(buck, 1024 * 1024)
            }()
        }

        for j := 1; j <= i; j++ {
            cerr := <-syn
            if cerr != nil {
                log.Printf("%d/%d failed with %s\n", j, i, cerr)
            }
        }
        log.Printf("done, %d coroutines returned\n", i)
    }

    log.Println("done")
}