diff options
author | Quentin <quentin@deuxfleurs.fr> | 2021-11-20 13:42:20 +0100 |
---|---|---|
committer | Quentin <quentin@deuxfleurs.fr> | 2021-11-20 13:42:20 +0100 |
commit | e10f04c5e36109c2e58d667c4b6ec054cbdd51be (patch) | |
tree | 7288ab0c17c541c921b77d8ddb71add2a54620ac /s3 | |
parent | 87fff9843dd60d4ce05596dc55bff44a3724a6bf (diff) | |
download | bagage-e10f04c5e36109c2e58d667c4b6ec054cbdd51be.tar.gz bagage-e10f04c5e36109c2e58d667c4b6ec054cbdd51be.zip |
It seems to worksftp
Diffstat (limited to 's3')
-rw-r--r-- | s3/file.go | 176 | ||||
-rw-r--r-- | s3/fs.go | 4 | ||||
-rw-r--r-- | s3/stat.go | 2 |
3 files changed, 126 insertions, 56 deletions
@@ -1,27 +1,27 @@ package s3 import ( - "context" "errors" "fmt" "io" "io/fs" "log" "mime" + "os" "path" "github.com/minio/minio-go/v7" ) type S3File struct { - fs *S3FS - obj *minio.Object - objw *io.PipeWriter - donew chan error - pos int64 - eof bool + fs *S3FS + obj *minio.Object + objw *io.PipeWriter + cache *os.File + donew chan error + pos int64 entries []fs.FileInfo - Path S3Path + Path S3Path } func NewS3File(s *S3FS, path string) (*S3File, error) { @@ -49,6 +49,11 @@ func (f *S3File) Close() error { f.objw = nil } + if f.cache != nil { + err = append(err, f.writeFlush()) + f.cache = nil + } + count := 0 for _, e := range err { if e != nil { @@ -57,7 +62,7 @@ func (f *S3File) Close() error { } } if count > 0 { - return errors.New(fmt.Sprintf("%d errors when closing this WebDAV File. Read previous logs to know more.", count)) + return errors.New(fmt.Sprintf("%d errors when closing this S3 File. Read previous logs to know more.", count)) } return nil } @@ -74,10 +79,15 @@ func (f *S3File) loadObject() error { } func (f *S3File) Read(p []byte) (n int, err error) { - log.Printf("s3 Read\n") + //log.Printf("s3 Read\n") //if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */ // return 0, os.ErrInvalid //} + + if f.cache != nil { + return f.cache.Read(p) + } + if err := f.loadObject(); err != nil { return 0, err } @@ -86,60 +96,120 @@ func (f *S3File) Read(p []byte) (n int, err error) { } func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) { - stat, err := f.Stat() - if err != nil { - return 0, err - } else if off >= stat.Size() { - return 0, io.EOF - } - - log.Printf("s3 ReadAt %v\n", off) + if f.cache != nil { + return f.cache.ReadAt(p, off) + } + + stat, err := f.Stat() + if err != nil { + return 0, err + } else if off >= stat.Size() { + return 0, io.EOF + } + + //log.Printf("s3 ReadAt %v\n", off) if err := f.loadObject(); err != nil { return 0, err } - return f.obj.ReadAt(p, off) + return f.obj.ReadAt(p, off) +} + +func (f *S3File) initCache() error { + // We use a locally cached file instead of writing directly to S3 + // When the user calls close, the file is flushed on S3. + // Check writeFlush below. + if f.cache == nil { + // We create a temp file in the configured folder + // We do not use the default tmp file as files can be very large + // and could fillup the RAM (often /tmp is mounted in RAM) + tmp, err := os.CreateTemp(f.fs.local, "bagage-cache") + if err != nil { + return err + } + f.cache = tmp + + // Problem: WriteAt override the existing file, if it exists + // So if when we stat the file, its size is greater than zero, + // we download it in our cache + file, err := f.Stat() + if err != nil { + return err + } else if file.Size() != 0 { + // We get a Reader on our object + object, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{}) + if err != nil { + return err + } + // We inject it in our cache file + if _, err = io.Copy(f.cache, object); err != nil { + return err + } + } + } + + return nil } func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) { - return 0, errors.New("not implemented") + f.initCache() + // And now we simply apply the command on our cache + return f.cache.WriteAt(p, off) } func (f *S3File) Write(p []byte) (n int, err error) { - /*if f.path.class != OBJECT { - return 0, os.ErrInvalid - }*/ + f.initCache() - if f.objw == nil { - if f.pos != 0 { - return 0, errors.New("writing with an offset is not implemented") - } + return f.cache.Write(p) +} + +func (f *S3File) writeFlush() error { + // Only needed if we used a write cache + if f.cache == nil { + return nil + } + + // Rewind the file to copy from the start + _, err := f.cache.Seek(0, 0) + if err != nil { + return err + } + + // Get a FileInfo object as minio needs its size (ideally) + stat, err := f.cache.Stat() + if err != nil { + return err + } + + // Send the file to minio + contentType := mime.TypeByExtension(path.Ext(f.Path.Key)) + _, err = f.fs.mc.PutObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, f.cache, stat.Size(), minio.PutObjectOptions{ + ContentType: contentType, + }) + if err != nil { + return err + } - r, w := io.Pipe() - f.donew = make(chan error, 1) - f.objw = w + // Close the cache file and remove it + err = f.cache.Close() + if err != nil { + return err + } - contentType := mime.TypeByExtension(path.Ext(f.Path.Key)) - go func() { - /* @FIXME - PutObject has a strange behaviour when used with unknown size, it supposes the final size will be 5TiB. - Then it computes that, following the final size of the file, each part of the multipart upload should be 512MiB, which leads to big allocations. - The culprit is OptimalPartInfo: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L70 - We set this value to the minimum allowed one, 5MiB. - The minimum value is set here: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/constants.go#L24 - Because Multipart uploads seems to be limited to 10 000 parts, it might be possible that we are limited to 50 GiB files, which is still good enough. - Ref: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L110-L112 - */ - _, err := f.fs.mc.PutObject(context.Background(), f.Path.Bucket, f.Path.Key, r, -1, minio.PutObjectOptions{ContentType: contentType, PartSize: 5*1024*1024}) - f.donew <- err - }() + err = os.Remove(f.cache.Name()) + if err != nil { + return err } - return f.objw.Write(p) + return nil } func (f *S3File) Seek(offset int64, whence int) (int64, error) { + if f.cache != nil { + return f.cache.Seek(offset, whence) + } + if err := f.loadObject(); err != nil { return 0, err } @@ -165,10 +235,10 @@ func (f *S3File) Readdir(count int) ([]fs.FileInfo, error) { } func min(a, b int64) int64 { - if a < b { - return a - } - return b + if a < b { + return a + } + return b } func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) { @@ -192,11 +262,11 @@ func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) { beg := f.pos end := int64(len(f.entries)) if count > 0 { - end = min(beg + int64(count), end) + end = min(beg+int64(count), end) } f.pos = end - if end - beg == 0 { + if end-beg == 0 { err = io.EOF } @@ -232,11 +302,11 @@ func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) { beg := f.pos end := int64(len(f.entries)) if count > 0 { - end = min(beg + int64(count), end) + end = min(beg+int64(count), end) } f.pos = end - if end - beg == 0 { + if end-beg == 0 { err = io.EOF } @@ -22,13 +22,15 @@ import ( type S3FS struct { cache map[string]*S3Stat mc *minio.Client + local string ctx context.Context } -func NewS3FS(mc *minio.Client) S3FS { +func NewS3FS(mc *minio.Client, local string) S3FS { return S3FS{ cache: make(map[string]*S3Stat), mc: mc, + local: local, } } @@ -1,7 +1,6 @@ package s3 import ( - "log" "errors" "io/fs" "path" @@ -105,7 +104,6 @@ func (s *S3Stat) Name() string { } func (s *S3Stat) Size() int64 { - log.Println("stat size: ", s.obj.Size) return s.obj.Size } |