package s3 import ( "errors" "fmt" "io" "io/fs" "log" "mime" "os" "path" "github.com/minio/minio-go/v7" ) type S3File struct { fs *S3FS obj *minio.Object objw *io.PipeWriter cache *os.File donew chan error pos int64 entries []fs.FileInfo Path S3Path } func NewS3File(s *S3FS, path string) (*S3File, error) { f := new(S3File) f.fs = s f.pos = 0 f.entries = nil f.Path = NewS3Path(path) return f, nil } func (f *S3File) Close() error { err := make([]error, 0) if f.obj != nil { err = append(err, f.obj.Close()) f.obj = nil } if f.objw != nil { // wait that minio completes its transfers in background err = append(err, f.objw.Close()) err = append(err, <-f.donew) f.donew = nil f.objw = nil } if f.cache != nil { err = append(err, f.writeFlush()) f.cache = nil } count := 0 for _, e := range err { if e != nil { count++ log.Println(e) } } if count > 0 { return errors.New(fmt.Sprintf("%d errors when closing this S3 File. Read previous logs to know more.", count)) } return nil } func (f *S3File) loadObject() error { if f.obj == nil { obj, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{}) if err != nil { return err } f.obj = obj } return nil } func (f *S3File) Read(p []byte) (n int, err error) { //log.Printf("s3 Read\n") //if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */ // return 0, os.ErrInvalid //} if f.cache != nil { return f.cache.Read(p) } if err := f.loadObject(); err != nil { return 0, err } return f.obj.Read(p) } func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) { if f.cache != nil { return f.cache.ReadAt(p, off) } stat, err := f.Stat() if err != nil { return 0, err } else if off >= stat.Size() { return 0, io.EOF } //log.Printf("s3 ReadAt %v\n", off) if err := f.loadObject(); err != nil { return 0, err } return f.obj.ReadAt(p, off) } func (f *S3File) initCache() error { // We use a locally cached file instead of writing directly to S3 // When the user calls close, the file is flushed on S3. // Check writeFlush below. if f.cache == nil { // We create a temp file in the configured folder // We do not use the default tmp file as files can be very large // and could fillup the RAM (often /tmp is mounted in RAM) tmp, err := os.CreateTemp(f.fs.local, "bagage-cache") if err != nil { return err } f.cache = tmp // Problem: WriteAt override the existing file, if it exists // So if when we stat the file, its size is greater than zero, // we download it in our cache file, err := f.Stat() if err != nil { return err } else if file.Size() != 0 { // We get a Reader on our object object, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{}) if err != nil { return err } // We inject it in our cache file if _, err = io.Copy(f.cache, object); err != nil { return err } } } return nil } func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) { f.initCache() // And now we simply apply the command on our cache return f.cache.WriteAt(p, off) } func (f *S3File) Write(p []byte) (n int, err error) { f.initCache() return f.cache.Write(p) } func (f *S3File) writeFlush() error { // Only needed if we used a write cache if f.cache == nil { return nil } // Rewind the file to copy from the start _, err := f.cache.Seek(0, 0) if err != nil { return err } // Get a FileInfo object as minio needs its size (ideally) stat, err := f.cache.Stat() if err != nil { return err } // Send the file to minio contentType := mime.TypeByExtension(path.Ext(f.Path.Key)) _, err = f.fs.mc.PutObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, f.cache, stat.Size(), minio.PutObjectOptions{ ContentType: contentType, }) if err != nil { return err } // Close the cache file and remove it err = f.cache.Close() if err != nil { return err } err = os.Remove(f.cache.Name()) if err != nil { return err } return nil } func (f *S3File) Seek(offset int64, whence int) (int64, error) { if f.cache != nil { return f.cache.Seek(offset, whence) } if err := f.loadObject(); err != nil { return 0, err } pos, err := f.obj.Seek(offset, whence) f.pos += pos return pos, err } /* ReadDir reads the contents of the directory associated with the file f and returns a slice of DirEntry values in directory order. Subsequent calls on the same file will yield later DirEntry records in the directory. If n > 0, ReadDir returns at most n DirEntry records. In this case, if ReadDir returns an empty slice, it will return an error explaining why. At the end of a directory, the error is io.EOF. If n <= 0, ReadDir returns all the DirEntry records remaining in the directory. When it succeeds, it returns a nil error (not io.EOF). */ func (f *S3File) Readdir(count int) ([]fs.FileInfo, error) { if f.Path.Class == ROOT { return f.readDirRoot(count) } else { return f.readDirChild(count) } } func min(a, b int64) int64 { if a < b { return a } return b } func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) { var err error if f.entries == nil { buckets, err := f.fs.mc.ListBuckets(f.fs.ctx) if err != nil { return nil, err } f.entries = make([]fs.FileInfo, 0, len(buckets)) for _, bucket := range buckets { //log.Println("Stat from GarageFile.readDirRoot()", "/"+bucket.Name) nf, err := NewS3Stat(f.fs, "/"+bucket.Name) if err != nil { return nil, err } f.entries = append(f.entries, nf) } } beg := f.pos end := int64(len(f.entries)) if count > 0 { end = min(beg+int64(count), end) } f.pos = end if end-beg == 0 { err = io.EOF } return f.entries[beg:end], err } func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) { var err error if f.entries == nil { prefix := f.Path.Key if len(prefix) > 0 && prefix[len(prefix)-1:] != "/" { prefix = prefix + "/" } objs_info := f.fs.mc.ListObjects(f.fs.ctx, f.Path.Bucket, minio.ListObjectsOptions{ Prefix: prefix, Recursive: false, }) f.entries = make([]fs.FileInfo, 0) for object := range objs_info { if object.Err != nil { return nil, object.Err } //log.Println("Stat from GarageFile.readDirChild()", path.Join("/", f.path.bucket, object.Key)) nf, err := NewS3StatFromObjectInfo(f.fs, f.Path.Bucket, object) if err != nil { return nil, err } f.entries = append(f.entries, nf) } } beg := f.pos end := int64(len(f.entries)) if count > 0 { end = min(beg+int64(count), end) } f.pos = end if end-beg == 0 { err = io.EOF } return f.entries[beg:end], err } func (f *S3File) Stat() (fs.FileInfo, error) { return NewS3Stat(f.fs, f.Path.Path) }