aboutsummaryrefslogtreecommitdiff
path: root/s3
diff options
context:
space:
mode:
authorQuentin <quentin@deuxfleurs.fr>2021-11-20 13:42:20 +0100
committerQuentin <quentin@deuxfleurs.fr>2021-11-20 13:42:20 +0100
commite10f04c5e36109c2e58d667c4b6ec054cbdd51be (patch)
tree7288ab0c17c541c921b77d8ddb71add2a54620ac /s3
parent87fff9843dd60d4ce05596dc55bff44a3724a6bf (diff)
downloadbagage-e10f04c5e36109c2e58d667c4b6ec054cbdd51be.tar.gz
bagage-e10f04c5e36109c2e58d667c4b6ec054cbdd51be.zip
It seems to worksftp
Diffstat (limited to 's3')
-rw-r--r--s3/file.go176
-rw-r--r--s3/fs.go4
-rw-r--r--s3/stat.go2
3 files changed, 126 insertions, 56 deletions
diff --git a/s3/file.go b/s3/file.go
index b20d247..1d6fced 100644
--- a/s3/file.go
+++ b/s3/file.go
@@ -1,27 +1,27 @@
package s3
import (
- "context"
"errors"
"fmt"
"io"
"io/fs"
"log"
"mime"
+ "os"
"path"
"github.com/minio/minio-go/v7"
)
type S3File struct {
- fs *S3FS
- obj *minio.Object
- objw *io.PipeWriter
- donew chan error
- pos int64
- eof bool
+ fs *S3FS
+ obj *minio.Object
+ objw *io.PipeWriter
+ cache *os.File
+ donew chan error
+ pos int64
entries []fs.FileInfo
- Path S3Path
+ Path S3Path
}
func NewS3File(s *S3FS, path string) (*S3File, error) {
@@ -49,6 +49,11 @@ func (f *S3File) Close() error {
f.objw = nil
}
+ if f.cache != nil {
+ err = append(err, f.writeFlush())
+ f.cache = nil
+ }
+
count := 0
for _, e := range err {
if e != nil {
@@ -57,7 +62,7 @@ func (f *S3File) Close() error {
}
}
if count > 0 {
- return errors.New(fmt.Sprintf("%d errors when closing this WebDAV File. Read previous logs to know more.", count))
+ return errors.New(fmt.Sprintf("%d errors when closing this S3 File. Read previous logs to know more.", count))
}
return nil
}
@@ -74,10 +79,15 @@ func (f *S3File) loadObject() error {
}
func (f *S3File) Read(p []byte) (n int, err error) {
- log.Printf("s3 Read\n")
+ //log.Printf("s3 Read\n")
//if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */
// return 0, os.ErrInvalid
//}
+
+ if f.cache != nil {
+ return f.cache.Read(p)
+ }
+
if err := f.loadObject(); err != nil {
return 0, err
}
@@ -86,60 +96,120 @@ func (f *S3File) Read(p []byte) (n int, err error) {
}
func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) {
- stat, err := f.Stat()
- if err != nil {
- return 0, err
- } else if off >= stat.Size() {
- return 0, io.EOF
- }
-
- log.Printf("s3 ReadAt %v\n", off)
+ if f.cache != nil {
+ return f.cache.ReadAt(p, off)
+ }
+
+ stat, err := f.Stat()
+ if err != nil {
+ return 0, err
+ } else if off >= stat.Size() {
+ return 0, io.EOF
+ }
+
+ //log.Printf("s3 ReadAt %v\n", off)
if err := f.loadObject(); err != nil {
return 0, err
}
- return f.obj.ReadAt(p, off)
+ return f.obj.ReadAt(p, off)
+}
+
+func (f *S3File) initCache() error {
+ // We use a locally cached file instead of writing directly to S3
+ // When the user calls close, the file is flushed on S3.
+ // Check writeFlush below.
+ if f.cache == nil {
+ // We create a temp file in the configured folder
+ // We do not use the default tmp file as files can be very large
+ // and could fillup the RAM (often /tmp is mounted in RAM)
+ tmp, err := os.CreateTemp(f.fs.local, "bagage-cache")
+ if err != nil {
+ return err
+ }
+ f.cache = tmp
+
+ // Problem: WriteAt override the existing file, if it exists
+ // So if when we stat the file, its size is greater than zero,
+ // we download it in our cache
+ file, err := f.Stat()
+ if err != nil {
+ return err
+ } else if file.Size() != 0 {
+ // We get a Reader on our object
+ object, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{})
+ if err != nil {
+ return err
+ }
+ // We inject it in our cache file
+ if _, err = io.Copy(f.cache, object); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
}
func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) {
- return 0, errors.New("not implemented")
+ f.initCache()
+ // And now we simply apply the command on our cache
+ return f.cache.WriteAt(p, off)
}
func (f *S3File) Write(p []byte) (n int, err error) {
- /*if f.path.class != OBJECT {
- return 0, os.ErrInvalid
- }*/
+ f.initCache()
- if f.objw == nil {
- if f.pos != 0 {
- return 0, errors.New("writing with an offset is not implemented")
- }
+ return f.cache.Write(p)
+}
+
+func (f *S3File) writeFlush() error {
+ // Only needed if we used a write cache
+ if f.cache == nil {
+ return nil
+ }
+
+ // Rewind the file to copy from the start
+ _, err := f.cache.Seek(0, 0)
+ if err != nil {
+ return err
+ }
+
+ // Get a FileInfo object as minio needs its size (ideally)
+ stat, err := f.cache.Stat()
+ if err != nil {
+ return err
+ }
+
+ // Send the file to minio
+ contentType := mime.TypeByExtension(path.Ext(f.Path.Key))
+ _, err = f.fs.mc.PutObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, f.cache, stat.Size(), minio.PutObjectOptions{
+ ContentType: contentType,
+ })
+ if err != nil {
+ return err
+ }
- r, w := io.Pipe()
- f.donew = make(chan error, 1)
- f.objw = w
+ // Close the cache file and remove it
+ err = f.cache.Close()
+ if err != nil {
+ return err
+ }
- contentType := mime.TypeByExtension(path.Ext(f.Path.Key))
- go func() {
- /* @FIXME
- PutObject has a strange behaviour when used with unknown size, it supposes the final size will be 5TiB.
- Then it computes that, following the final size of the file, each part of the multipart upload should be 512MiB, which leads to big allocations.
- The culprit is OptimalPartInfo: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L70
- We set this value to the minimum allowed one, 5MiB.
- The minimum value is set here: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/constants.go#L24
- Because Multipart uploads seems to be limited to 10 000 parts, it might be possible that we are limited to 50 GiB files, which is still good enough.
- Ref: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L110-L112
- */
- _, err := f.fs.mc.PutObject(context.Background(), f.Path.Bucket, f.Path.Key, r, -1, minio.PutObjectOptions{ContentType: contentType, PartSize: 5*1024*1024})
- f.donew <- err
- }()
+ err = os.Remove(f.cache.Name())
+ if err != nil {
+ return err
}
- return f.objw.Write(p)
+ return nil
}
func (f *S3File) Seek(offset int64, whence int) (int64, error) {
+ if f.cache != nil {
+ return f.cache.Seek(offset, whence)
+ }
+
if err := f.loadObject(); err != nil {
return 0, err
}
@@ -165,10 +235,10 @@ func (f *S3File) Readdir(count int) ([]fs.FileInfo, error) {
}
func min(a, b int64) int64 {
- if a < b {
- return a
- }
- return b
+ if a < b {
+ return a
+ }
+ return b
}
func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) {
@@ -192,11 +262,11 @@ func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) {
beg := f.pos
end := int64(len(f.entries))
if count > 0 {
- end = min(beg + int64(count), end)
+ end = min(beg+int64(count), end)
}
f.pos = end
- if end - beg == 0 {
+ if end-beg == 0 {
err = io.EOF
}
@@ -232,11 +302,11 @@ func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) {
beg := f.pos
end := int64(len(f.entries))
if count > 0 {
- end = min(beg + int64(count), end)
+ end = min(beg+int64(count), end)
}
f.pos = end
- if end - beg == 0 {
+ if end-beg == 0 {
err = io.EOF
}
diff --git a/s3/fs.go b/s3/fs.go
index c5ae6a0..a8199d7 100644
--- a/s3/fs.go
+++ b/s3/fs.go
@@ -22,13 +22,15 @@ import (
type S3FS struct {
cache map[string]*S3Stat
mc *minio.Client
+ local string
ctx context.Context
}
-func NewS3FS(mc *minio.Client) S3FS {
+func NewS3FS(mc *minio.Client, local string) S3FS {
return S3FS{
cache: make(map[string]*S3Stat),
mc: mc,
+ local: local,
}
}
diff --git a/s3/stat.go b/s3/stat.go
index c91a757..96b0c24 100644
--- a/s3/stat.go
+++ b/s3/stat.go
@@ -1,7 +1,6 @@
package s3
import (
- "log"
"errors"
"io/fs"
"path"
@@ -105,7 +104,6 @@ func (s *S3Stat) Name() string {
}
func (s *S3Stat) Size() int64 {
- log.Println("stat size: ", s.obj.Size)
return s.obj.Size
}