aboutsummaryrefslogtreecommitdiff
path: root/s3/file.go
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2021-11-19 19:54:49 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2021-11-19 19:54:49 +0100
commit0ee29e31ddcc81f541de7459b0a5e40dfa552672 (patch)
tree859ff133f8c78bd034b0c2184cdad0ce9f38b065 /s3/file.go
parent93631b4e3d5195d446504db1c4a2bc7468b3ef28 (diff)
downloadbagage-0ee29e31ddcc81f541de7459b0a5e40dfa552672.tar.gz
bagage-0ee29e31ddcc81f541de7459b0a5e40dfa552672.zip
Working on SFTP
Diffstat (limited to 's3/file.go')
-rw-r--r--s3/file.go225
1 files changed, 225 insertions, 0 deletions
diff --git a/s3/file.go b/s3/file.go
new file mode 100644
index 0000000..aa4f227
--- /dev/null
+++ b/s3/file.go
@@ -0,0 +1,225 @@
+package s3
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "io/fs"
+ "log"
+ "mime"
+ "path"
+
+ "github.com/minio/minio-go/v7"
+)
+
+type S3File struct {
+ fs *S3FS
+ obj *minio.Object
+ objw *io.PipeWriter
+ donew chan error
+ pos int64
+ entries []fs.FileInfo
+ Path S3Path
+}
+
+func NewS3File(s *S3FS, path string) (*S3File, error) {
+ f := new(S3File)
+ f.fs = s
+ f.pos = 0
+ f.entries = nil
+ f.Path = NewS3Path(path)
+ return f, nil
+}
+
+func (f *S3File) Close() error {
+ err := make([]error, 0)
+
+ if f.obj != nil {
+ err = append(err, f.obj.Close())
+ f.obj = nil
+ }
+
+ if f.objw != nil {
+ // wait that minio completes its transfers in background
+ err = append(err, f.objw.Close())
+ err = append(err, <-f.donew)
+ f.donew = nil
+ f.objw = nil
+ }
+
+ count := 0
+ for _, e := range err {
+ if e != nil {
+ count++
+ log.Println(e)
+ }
+ }
+ if count > 0 {
+ return errors.New(fmt.Sprintf("%d errors when closing this WebDAV File. Read previous logs to know more.", count))
+ }
+ return nil
+}
+
+func (f *S3File) loadObject() error {
+ if f.obj == nil {
+ obj, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{})
+ if err != nil {
+ return err
+ }
+ f.obj = obj
+ }
+ return nil
+}
+
+func (f *S3File) Read(p []byte) (n int, err error) {
+ //if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */
+ // return 0, os.ErrInvalid
+ //}
+ if err := f.loadObject(); err != nil {
+ return 0, err
+ }
+
+ return f.obj.Read(p)
+}
+
+func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) {
+ if err := f.loadObject(); err != nil {
+ return 0, err
+ }
+
+ return f.obj.ReadAt(p, off)
+}
+
+func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) {
+ return 0, errors.New("not implemented")
+
+}
+
+func (f *S3File) Write(p []byte) (n int, err error) {
+ /*if f.path.class != OBJECT {
+ return 0, os.ErrInvalid
+ }*/
+
+ if f.objw == nil {
+ if f.pos != 0 {
+ return 0, errors.New("writing with an offset is not implemented")
+ }
+
+ r, w := io.Pipe()
+ f.donew = make(chan error, 1)
+ f.objw = w
+
+ contentType := mime.TypeByExtension(path.Ext(f.Path.Key))
+ go func() {
+ /* @FIXME
+ PutObject has a strange behaviour when used with unknown size, it supposes the final size will be 5TiB.
+ Then it computes that, following the final size of the file, each part of the multipart upload should be 512MiB, which leads to big allocations.
+ The culprit is OptimalPartInfo: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L70
+ We set this value to the minimum allowed one, 5MiB.
+ The minimum value is set here: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/constants.go#L24
+ Because Multipart uploads seems to be limited to 10 000 parts, it might be possible that we are limited to 50 GiB files, which is still good enough.
+ Ref: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L110-L112
+ */
+ _, err := f.fs.mc.PutObject(context.Background(), f.Path.Bucket, f.Path.Key, r, -1, minio.PutObjectOptions{ContentType: contentType, PartSize: 5*1024*1024})
+ f.donew <- err
+ }()
+ }
+
+ return f.objw.Write(p)
+}
+
+func (f *S3File) Seek(offset int64, whence int) (int64, error) {
+ if err := f.loadObject(); err != nil {
+ return 0, err
+ }
+
+ pos, err := f.obj.Seek(offset, whence)
+ f.pos += pos
+ return pos, err
+}
+
+/*
+ReadDir reads the contents of the directory associated with the file f and returns a slice of DirEntry values in directory order. Subsequent calls on the same file will yield later DirEntry records in the directory.
+
+If n > 0, ReadDir returns at most n DirEntry records. In this case, if ReadDir returns an empty slice, it will return an error explaining why. At the end of a directory, the error is io.EOF.
+
+If n <= 0, ReadDir returns all the DirEntry records remaining in the directory. When it succeeds, it returns a nil error (not io.EOF).
+*/
+func (f *S3File) Readdir(count int) ([]fs.FileInfo, error) {
+ if f.Path.Class == ROOT {
+ return f.readDirRoot(count)
+ } else {
+ return f.readDirChild(count)
+ }
+}
+
+func min(a, b int64) int64 {
+ if a < b {
+ return a
+ }
+ return b
+}
+
+func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) {
+ var err error
+ if f.entries == nil {
+ buckets, err := f.fs.mc.ListBuckets(f.fs.ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ f.entries = make([]fs.FileInfo, 0, len(buckets))
+ for _, bucket := range buckets {
+ //log.Println("Stat from GarageFile.readDirRoot()", "/"+bucket.Name)
+ nf, err := NewS3Stat(f.fs, "/"+bucket.Name)
+ if err != nil {
+ return nil, err
+ }
+ f.entries = append(f.entries, nf)
+ }
+ }
+ beg := f.pos
+ end := int64(len(f.entries))
+ if count > 0 {
+ end = min(beg + int64(count), end)
+ }
+ f.pos = end
+
+ if end - beg == 0 {
+ err = io.EOF
+ }
+
+ return f.entries[beg:end], err
+}
+
+func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) {
+ prefix := f.Path.Key
+ if len(prefix) > 0 && prefix[len(prefix)-1:] != "/" {
+ prefix = prefix + "/"
+ }
+
+ objs_info := f.fs.mc.ListObjects(f.fs.ctx, f.Path.Bucket, minio.ListObjectsOptions{
+ Prefix: prefix,
+ Recursive: false,
+ })
+
+ entries := make([]fs.FileInfo, 0)
+ for object := range objs_info {
+ if object.Err != nil {
+ return nil, object.Err
+ }
+ //log.Println("Stat from GarageFile.readDirChild()", path.Join("/", f.path.bucket, object.Key))
+ nf, err := NewS3StatFromObjectInfo(f.fs, f.Path.Bucket, object)
+ if err != nil {
+ return nil, err
+ }
+ entries = append(entries, nf)
+ }
+
+ return entries, nil
+}
+
+func (f *S3File) Stat() (fs.FileInfo, error) {
+ return NewS3Stat(f.fs, f.Path.Path)
+}