From 0ee29e31ddcc81f541de7459b0a5e40dfa552672 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 19 Nov 2021 19:54:49 +0100 Subject: Working on SFTP --- s3/file.go | 225 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ s3/fs.go | 152 +++++++++++++++++++++++++++++++++++++++++ s3/path.go | 68 +++++++++++++++++++ s3/stat.go | 128 +++++++++++++++++++++++++++++++++++ 4 files changed, 573 insertions(+) create mode 100644 s3/file.go create mode 100644 s3/fs.go create mode 100644 s3/path.go create mode 100644 s3/stat.go (limited to 's3') diff --git a/s3/file.go b/s3/file.go new file mode 100644 index 0000000..aa4f227 --- /dev/null +++ b/s3/file.go @@ -0,0 +1,225 @@ +package s3 + +import ( + "context" + "errors" + "fmt" + "io" + "io/fs" + "log" + "mime" + "path" + + "github.com/minio/minio-go/v7" +) + +type S3File struct { + fs *S3FS + obj *minio.Object + objw *io.PipeWriter + donew chan error + pos int64 + entries []fs.FileInfo + Path S3Path +} + +func NewS3File(s *S3FS, path string) (*S3File, error) { + f := new(S3File) + f.fs = s + f.pos = 0 + f.entries = nil + f.Path = NewS3Path(path) + return f, nil +} + +func (f *S3File) Close() error { + err := make([]error, 0) + + if f.obj != nil { + err = append(err, f.obj.Close()) + f.obj = nil + } + + if f.objw != nil { + // wait that minio completes its transfers in background + err = append(err, f.objw.Close()) + err = append(err, <-f.donew) + f.donew = nil + f.objw = nil + } + + count := 0 + for _, e := range err { + if e != nil { + count++ + log.Println(e) + } + } + if count > 0 { + return errors.New(fmt.Sprintf("%d errors when closing this WebDAV File. Read previous logs to know more.", count)) + } + return nil +} + +func (f *S3File) loadObject() error { + if f.obj == nil { + obj, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{}) + if err != nil { + return err + } + f.obj = obj + } + return nil +} + +func (f *S3File) Read(p []byte) (n int, err error) { + //if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */ + // return 0, os.ErrInvalid + //} + if err := f.loadObject(); err != nil { + return 0, err + } + + return f.obj.Read(p) +} + +func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) { + if err := f.loadObject(); err != nil { + return 0, err + } + + return f.obj.ReadAt(p, off) +} + +func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) { + return 0, errors.New("not implemented") + +} + +func (f *S3File) Write(p []byte) (n int, err error) { + /*if f.path.class != OBJECT { + return 0, os.ErrInvalid + }*/ + + if f.objw == nil { + if f.pos != 0 { + return 0, errors.New("writing with an offset is not implemented") + } + + r, w := io.Pipe() + f.donew = make(chan error, 1) + f.objw = w + + contentType := mime.TypeByExtension(path.Ext(f.Path.Key)) + go func() { + /* @FIXME + PutObject has a strange behaviour when used with unknown size, it supposes the final size will be 5TiB. + Then it computes that, following the final size of the file, each part of the multipart upload should be 512MiB, which leads to big allocations. + The culprit is OptimalPartInfo: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L70 + We set this value to the minimum allowed one, 5MiB. + The minimum value is set here: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/constants.go#L24 + Because Multipart uploads seems to be limited to 10 000 parts, it might be possible that we are limited to 50 GiB files, which is still good enough. + Ref: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L110-L112 + */ + _, err := f.fs.mc.PutObject(context.Background(), f.Path.Bucket, f.Path.Key, r, -1, minio.PutObjectOptions{ContentType: contentType, PartSize: 5*1024*1024}) + f.donew <- err + }() + } + + return f.objw.Write(p) +} + +func (f *S3File) Seek(offset int64, whence int) (int64, error) { + if err := f.loadObject(); err != nil { + return 0, err + } + + pos, err := f.obj.Seek(offset, whence) + f.pos += pos + return pos, err +} + +/* +ReadDir reads the contents of the directory associated with the file f and returns a slice of DirEntry values in directory order. Subsequent calls on the same file will yield later DirEntry records in the directory. + +If n > 0, ReadDir returns at most n DirEntry records. In this case, if ReadDir returns an empty slice, it will return an error explaining why. At the end of a directory, the error is io.EOF. + +If n <= 0, ReadDir returns all the DirEntry records remaining in the directory. When it succeeds, it returns a nil error (not io.EOF). +*/ +func (f *S3File) Readdir(count int) ([]fs.FileInfo, error) { + if f.Path.Class == ROOT { + return f.readDirRoot(count) + } else { + return f.readDirChild(count) + } +} + +func min(a, b int64) int64 { + if a < b { + return a + } + return b +} + +func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) { + var err error + if f.entries == nil { + buckets, err := f.fs.mc.ListBuckets(f.fs.ctx) + if err != nil { + return nil, err + } + + f.entries = make([]fs.FileInfo, 0, len(buckets)) + for _, bucket := range buckets { + //log.Println("Stat from GarageFile.readDirRoot()", "/"+bucket.Name) + nf, err := NewS3Stat(f.fs, "/"+bucket.Name) + if err != nil { + return nil, err + } + f.entries = append(f.entries, nf) + } + } + beg := f.pos + end := int64(len(f.entries)) + if count > 0 { + end = min(beg + int64(count), end) + } + f.pos = end + + if end - beg == 0 { + err = io.EOF + } + + return f.entries[beg:end], err +} + +func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) { + prefix := f.Path.Key + if len(prefix) > 0 && prefix[len(prefix)-1:] != "/" { + prefix = prefix + "/" + } + + objs_info := f.fs.mc.ListObjects(f.fs.ctx, f.Path.Bucket, minio.ListObjectsOptions{ + Prefix: prefix, + Recursive: false, + }) + + entries := make([]fs.FileInfo, 0) + for object := range objs_info { + if object.Err != nil { + return nil, object.Err + } + //log.Println("Stat from GarageFile.readDirChild()", path.Join("/", f.path.bucket, object.Key)) + nf, err := NewS3StatFromObjectInfo(f.fs, f.Path.Bucket, object) + if err != nil { + return nil, err + } + entries = append(entries, nf) + } + + return entries, nil +} + +func (f *S3File) Stat() (fs.FileInfo, error) { + return NewS3Stat(f.fs, f.Path.Path) +} diff --git a/s3/fs.go b/s3/fs.go new file mode 100644 index 0000000..c5ae6a0 --- /dev/null +++ b/s3/fs.go @@ -0,0 +1,152 @@ +package s3 + +import ( + "context" + "errors" + "io" + "log" + "os" + "path" + "strings" + "time" + + "github.com/minio/minio-go/v7" + "golang.org/x/net/webdav" +) + +/* + * S3FS lifetime is limited to a single request + * Conversely, Golang's abstraction has been thought to be shared between users + * Sharing an instance between users would be very dangerous (as we would need many checks between shared values) + */ +type S3FS struct { + cache map[string]*S3Stat + mc *minio.Client + ctx context.Context +} + +func NewS3FS(mc *minio.Client) S3FS { + return S3FS{ + cache: make(map[string]*S3Stat), + mc: mc, + } +} + +func (s S3FS) Mkdir(ctx context.Context, name string, perm os.FileMode) error { + s.ctx = ctx + + p := NewS3Path(name) + + if p.Class == ROOT { + return errors.New("Unable to create another root folder") + } else if p.Class == BUCKET { + log.Println("Creating bucket is not implemented yet") + return nil + } + + f, err := NewS3File(&s, path.Join(name, ".bagage")) + if err != nil { + return err + } + defer f.Close() + + _, err = io.Copy(f, strings.NewReader("This is a placeholder")) + return nil +} + +func (s S3FS) OpenFile2(ctx context.Context, name string, flag int, perm os.FileMode) (*S3File, error) { + s.ctx = ctx + + // If the file does not exist when opening it, we create a stub + if _, ok := s.cache[name]; !ok { + st := new(S3Stat) + st.fs = &s + st.path = NewS3Path(name) + st.path.Class = OBJECT + st.obj.Key = st.path.Key + st.obj.LastModified = time.Now() + s.cache[name] = st + } + + return NewS3File(&s, name) +} + +func (s S3FS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (webdav.File, error) { + return s.OpenFile2(ctx, name, flag, perm) +} + +func (s S3FS) RemoveAll(ctx context.Context, name string) error { + //@FIXME nautilus deletes files one by one, at the end, it does not find its folder as it is "already deleted" + s.ctx = ctx + + p := NewS3Path(name) + if p.Class == ROOT { + return errors.New("Unable to create another root folder") + } else if p.Class == BUCKET { + log.Println("Deleting bucket is not implemented yet") + return nil + } + + objCh := s.mc.ListObjects(s.ctx, p.Bucket, minio.ListObjectsOptions{Prefix: p.Key, Recursive: true}) + rmCh := s.mc.RemoveObjects(s.ctx, p.Bucket, objCh, minio.RemoveObjectsOptions{}) + + for rErr := range rmCh { + return rErr.Err + } + + return nil +} + +func (s S3FS) Rename(ctx context.Context, oldName, newName string) error { + s.ctx = ctx + + po := NewS3Path(oldName) + pn := NewS3Path(newName) + if po.Class == ROOT || pn.Class == ROOT { + return errors.New("Unable to rename root folder") + } else if po.Class == BUCKET || pn.Class == BUCKET { + log.Println("Moving a bucket is not implemented yet") + return nil + } + + //Check that newName is not inside oldName + if len(newName) > len(oldName) && newName[:len(oldName)] == oldName { + return errors.New("Cannot move an entity inside itself (eg. moving /data in /data/test is impossible)") + } + + //Gather all keys, copy the object, delete the original + objCh := s.mc.ListObjects(s.ctx, po.Bucket, minio.ListObjectsOptions{Prefix: po.Key, Recursive: true}) + for obj := range objCh { + src := minio.CopySrcOptions{ + Bucket: po.Bucket, + Object: obj.Key, + } + + dst := minio.CopyDestOptions{ + Bucket: pn.Bucket, + Object: path.Join(pn.Key, obj.Key[len(po.Key):]), + } + + _, err := s.mc.CopyObject(s.ctx, dst, src) + if err != nil { + return err + } + + err = s.mc.RemoveObject(s.ctx, po.Bucket, obj.Key, minio.RemoveObjectOptions{}) + var e minio.ErrorResponse + log.Println(errors.As(err, &e)) + log.Println(e) + if errors.As(err, &e) && e.StatusCode == 200 { + /* @FIXME workaround for garage's bug #98 */ + } else if err != nil { + return err + } + } + + return nil +} + +func (s S3FS) Stat(ctx context.Context, name string) (os.FileInfo, error) { + s.ctx = ctx + return NewS3Stat(&s, name) +} diff --git a/s3/path.go b/s3/path.go new file mode 100644 index 0000000..b1a981b --- /dev/null +++ b/s3/path.go @@ -0,0 +1,68 @@ +package s3 + +import ( + "path" + "strings" + + "github.com/minio/minio-go/v7" +) + +type S3Class int + +const ( + ROOT S3Class = 1 << iota + BUCKET + COMMON_PREFIX + OBJECT + OPAQUE_KEY + + KEY = COMMON_PREFIX | OBJECT | OPAQUE_KEY +) + +type S3Path struct { + Path string + Class S3Class + Bucket string + Key string +} + +func NewS3Path(p string) S3Path { + // Remove first dot, eq. relative directory == "/" + if len(p) > 0 && p[0] == '.' { + p = p[1:] + } + + // Add the first slash if missing + p = "/" + p + + // Clean path using golang tools + p = path.Clean(p) + + exploded_path := strings.SplitN(p, "/", 3) + + // If there is no bucket name (eg. "/") + if len(exploded_path) < 2 || exploded_path[1] == "" { + return S3Path{p, ROOT, "", ""} + } + + // If there is no key + if len(exploded_path) < 3 || exploded_path[2] == "" { + return S3Path{p, BUCKET, exploded_path[1], ""} + } + + return S3Path{p, OPAQUE_KEY, exploded_path[1], exploded_path[2]} +} + +func NewTrustedS3Path(bucket string, obj minio.ObjectInfo) S3Path { + cl := OBJECT + if obj.Key[len(obj.Key)-1:] == "/" { + cl = COMMON_PREFIX + } + + return S3Path{ + Path: path.Join("/", bucket, obj.Key), + Bucket: bucket, + Key: obj.Key, + Class: cl, + } +} diff --git a/s3/stat.go b/s3/stat.go new file mode 100644 index 0000000..96b0c24 --- /dev/null +++ b/s3/stat.go @@ -0,0 +1,128 @@ +package s3 + +import ( + "errors" + "io/fs" + "path" + "time" + + "github.com/minio/minio-go/v7" +) + +type S3Stat struct { + fs *S3FS + obj minio.ObjectInfo + path S3Path +} + +/* + * Stat a path knowing its ObjectInfo + */ +func NewS3StatFromObjectInfo(fs *S3FS, bucket string, obj minio.ObjectInfo) (*S3Stat, error) { + s := new(S3Stat) + s.path = NewTrustedS3Path(bucket, obj) + s.obj = obj + s.fs = fs + + fs.cache[s.path.Path] = s + return s, nil +} + +/* + * Stat a path without additional information + */ +func NewS3Stat(fs *S3FS, path string) (*S3Stat, error) { + cache := fs.cache + if entry, ok := cache[path]; ok { + return entry, nil + } + + s := new(S3Stat) + s.fs = fs + s.path = NewS3Path(path) + if err := s.Refresh(); err != nil { + return nil, err + } + + if s.path.Class&OPAQUE_KEY != 0 { + return nil, errors.New("Failed to precisely determine the key type, this a logic error.") + } + + cache[path] = s + cache[s.path.Path] = s + return s, nil +} + +func (s *S3Stat) Refresh() error { + if s.path.Class == ROOT || s.path.Class == BUCKET { + return nil + } + + mc := s.fs.mc + + // Compute the prefix to have the desired behaviour for our stat logic + prefix := s.path.Key + if prefix[len(prefix)-1:] == "/" { + prefix = prefix[:len(prefix)-1] + } + + // Get info and check if the key exists + objs_info := mc.ListObjects(s.fs.ctx, s.path.Bucket, minio.ListObjectsOptions{ + Prefix: prefix, + Recursive: false, + }) + + found := false + for object := range objs_info { + if object.Err != nil { + return object.Err + } + + if object.Key == prefix || object.Key == prefix+"/" { + s.obj = object + s.path = NewTrustedS3Path(s.path.Bucket, object) + found = true + break + } + } + + if !found { + return fs.ErrNotExist + } + + return nil +} + +func (s *S3Stat) Name() string { + if s.path.Class == ROOT { + return "/" + } else if s.path.Class == BUCKET { + return s.path.Bucket + } else { + return path.Base(s.path.Key) + } +} + +func (s *S3Stat) Size() int64 { + return s.obj.Size +} + +func (s *S3Stat) Mode() fs.FileMode { + if s.path.Class == OBJECT { + return fs.ModePerm + } else { + return fs.ModeDir | fs.ModePerm + } +} + +func (s *S3Stat) ModTime() time.Time { + return s.obj.LastModified +} + +func (s *S3Stat) IsDir() bool { + return s.path.Class != OBJECT +} + +func (s *S3Stat) Sys() interface{} { + return nil +} -- cgit v1.2.3