diff options
author | Quentin Dufour <quentin@deuxfleurs.fr> | 2021-11-19 19:54:49 +0100 |
---|---|---|
committer | Quentin Dufour <quentin@deuxfleurs.fr> | 2021-11-19 19:54:49 +0100 |
commit | 0ee29e31ddcc81f541de7459b0a5e40dfa552672 (patch) | |
tree | 859ff133f8c78bd034b0c2184cdad0ce9f38b065 /s3/file.go | |
parent | 93631b4e3d5195d446504db1c4a2bc7468b3ef28 (diff) | |
download | bagage-0ee29e31ddcc81f541de7459b0a5e40dfa552672.tar.gz bagage-0ee29e31ddcc81f541de7459b0a5e40dfa552672.zip |
Working on SFTP
Diffstat (limited to 's3/file.go')
-rw-r--r-- | s3/file.go | 225 |
1 files changed, 225 insertions, 0 deletions
diff --git a/s3/file.go b/s3/file.go new file mode 100644 index 0000000..aa4f227 --- /dev/null +++ b/s3/file.go @@ -0,0 +1,225 @@ +package s3 + +import ( + "context" + "errors" + "fmt" + "io" + "io/fs" + "log" + "mime" + "path" + + "github.com/minio/minio-go/v7" +) + +type S3File struct { + fs *S3FS + obj *minio.Object + objw *io.PipeWriter + donew chan error + pos int64 + entries []fs.FileInfo + Path S3Path +} + +func NewS3File(s *S3FS, path string) (*S3File, error) { + f := new(S3File) + f.fs = s + f.pos = 0 + f.entries = nil + f.Path = NewS3Path(path) + return f, nil +} + +func (f *S3File) Close() error { + err := make([]error, 0) + + if f.obj != nil { + err = append(err, f.obj.Close()) + f.obj = nil + } + + if f.objw != nil { + // wait that minio completes its transfers in background + err = append(err, f.objw.Close()) + err = append(err, <-f.donew) + f.donew = nil + f.objw = nil + } + + count := 0 + for _, e := range err { + if e != nil { + count++ + log.Println(e) + } + } + if count > 0 { + return errors.New(fmt.Sprintf("%d errors when closing this WebDAV File. Read previous logs to know more.", count)) + } + return nil +} + +func (f *S3File) loadObject() error { + if f.obj == nil { + obj, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{}) + if err != nil { + return err + } + f.obj = obj + } + return nil +} + +func (f *S3File) Read(p []byte) (n int, err error) { + //if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */ + // return 0, os.ErrInvalid + //} + if err := f.loadObject(); err != nil { + return 0, err + } + + return f.obj.Read(p) +} + +func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) { + if err := f.loadObject(); err != nil { + return 0, err + } + + return f.obj.ReadAt(p, off) +} + +func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) { + return 0, errors.New("not implemented") + +} + +func (f *S3File) Write(p []byte) (n int, err error) { + /*if f.path.class != OBJECT { + return 0, os.ErrInvalid + }*/ + + if f.objw == nil { + if f.pos != 0 { + return 0, errors.New("writing with an offset is not implemented") + } + + r, w := io.Pipe() + f.donew = make(chan error, 1) + f.objw = w + + contentType := mime.TypeByExtension(path.Ext(f.Path.Key)) + go func() { + /* @FIXME + PutObject has a strange behaviour when used with unknown size, it supposes the final size will be 5TiB. + Then it computes that, following the final size of the file, each part of the multipart upload should be 512MiB, which leads to big allocations. + The culprit is OptimalPartInfo: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L70 + We set this value to the minimum allowed one, 5MiB. + The minimum value is set here: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/constants.go#L24 + Because Multipart uploads seems to be limited to 10 000 parts, it might be possible that we are limited to 50 GiB files, which is still good enough. + Ref: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L110-L112 + */ + _, err := f.fs.mc.PutObject(context.Background(), f.Path.Bucket, f.Path.Key, r, -1, minio.PutObjectOptions{ContentType: contentType, PartSize: 5*1024*1024}) + f.donew <- err + }() + } + + return f.objw.Write(p) +} + +func (f *S3File) Seek(offset int64, whence int) (int64, error) { + if err := f.loadObject(); err != nil { + return 0, err + } + + pos, err := f.obj.Seek(offset, whence) + f.pos += pos + return pos, err +} + +/* +ReadDir reads the contents of the directory associated with the file f and returns a slice of DirEntry values in directory order. Subsequent calls on the same file will yield later DirEntry records in the directory. + +If n > 0, ReadDir returns at most n DirEntry records. In this case, if ReadDir returns an empty slice, it will return an error explaining why. At the end of a directory, the error is io.EOF. + +If n <= 0, ReadDir returns all the DirEntry records remaining in the directory. When it succeeds, it returns a nil error (not io.EOF). +*/ +func (f *S3File) Readdir(count int) ([]fs.FileInfo, error) { + if f.Path.Class == ROOT { + return f.readDirRoot(count) + } else { + return f.readDirChild(count) + } +} + +func min(a, b int64) int64 { + if a < b { + return a + } + return b +} + +func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) { + var err error + if f.entries == nil { + buckets, err := f.fs.mc.ListBuckets(f.fs.ctx) + if err != nil { + return nil, err + } + + f.entries = make([]fs.FileInfo, 0, len(buckets)) + for _, bucket := range buckets { + //log.Println("Stat from GarageFile.readDirRoot()", "/"+bucket.Name) + nf, err := NewS3Stat(f.fs, "/"+bucket.Name) + if err != nil { + return nil, err + } + f.entries = append(f.entries, nf) + } + } + beg := f.pos + end := int64(len(f.entries)) + if count > 0 { + end = min(beg + int64(count), end) + } + f.pos = end + + if end - beg == 0 { + err = io.EOF + } + + return f.entries[beg:end], err +} + +func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) { + prefix := f.Path.Key + if len(prefix) > 0 && prefix[len(prefix)-1:] != "/" { + prefix = prefix + "/" + } + + objs_info := f.fs.mc.ListObjects(f.fs.ctx, f.Path.Bucket, minio.ListObjectsOptions{ + Prefix: prefix, + Recursive: false, + }) + + entries := make([]fs.FileInfo, 0) + for object := range objs_info { + if object.Err != nil { + return nil, object.Err + } + //log.Println("Stat from GarageFile.readDirChild()", path.Join("/", f.path.bucket, object.Key)) + nf, err := NewS3StatFromObjectInfo(f.fs, f.Path.Bucket, object) + if err != nil { + return nil, err + } + entries = append(entries, nf) + } + + return entries, nil +} + +func (f *S3File) Stat() (fs.FileInfo, error) { + return NewS3Stat(f.fs, f.Path.Path) +} |