aboutsummaryrefslogblamecommitdiff
path: root/s3/file.go
blob: 1d6fced816788a2c47f93a7962712a298256599d (plain) (tree)
1
2
3
4
5
6
7
8
9
          

        





                
            


                                      


                    





                              
                             
                      

 
                                                       


                        

                                


















                                                                        




                                                 







                                      
                                                                                                                              





                                     
                                                                                                            








                                                    
                                 


                                                                                                                     




                                      






                                              
                                                                 











                                             



                                              




































                                                                                                                       


                                                                  
                     
 

                                                           

 
                                                     
                     
 




























                                                                                                                     
 




                                             
 


                                       

         
                  


                                                                



                                                   
















                                                                                                                                                                                                                       
                                 





                                            
                            



                        
 
 



                                                                


                                       













                                                                                            
                                                
         
                   
 
                         



                                      


                                                                 





                                                                     
 



                                                                                                   
 










                                                                                                                       
                 
         


                                    
                                                

                   
 
                         



                                      


                                              
                                           
 
package s3

import (
	"errors"
	"fmt"
	"io"
	"io/fs"
	"log"
	"mime"
	"os"
	"path"

	"github.com/minio/minio-go/v7"
)

type S3File struct {
	fs      *S3FS
	obj     *minio.Object
	objw    *io.PipeWriter
	cache   *os.File
	donew   chan error
	pos     int64
	entries []fs.FileInfo
	Path    S3Path
}

func NewS3File(s *S3FS, path string) (*S3File, error) {
	f := new(S3File)
	f.fs = s
	f.pos = 0
	f.entries = nil
	f.Path = NewS3Path(path)
	return f, nil
}

func (f *S3File) Close() error {
	err := make([]error, 0)

	if f.obj != nil {
		err = append(err, f.obj.Close())
		f.obj = nil
	}

	if f.objw != nil {
		// wait that minio completes its transfers in background
		err = append(err, f.objw.Close())
		err = append(err, <-f.donew)
		f.donew = nil
		f.objw = nil
	}

	if f.cache != nil {
		err = append(err, f.writeFlush())
		f.cache = nil
	}

	count := 0
	for _, e := range err {
		if e != nil {
			count++
			log.Println(e)
		}
	}
	if count > 0 {
		return errors.New(fmt.Sprintf("%d errors when closing this S3 File. Read previous logs to know more.", count))
	}
	return nil
}

func (f *S3File) loadObject() error {
	if f.obj == nil {
		obj, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{})
		if err != nil {
			return err
		}
		f.obj = obj
	}
	return nil
}

func (f *S3File) Read(p []byte) (n int, err error) {
	//log.Printf("s3 Read\n")
	//if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */
	//	return 0, os.ErrInvalid
	//}

	if f.cache != nil {
		return f.cache.Read(p)
	}

	if err := f.loadObject(); err != nil {
		return 0, err
	}

	return f.obj.Read(p)
}

func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) {
	if f.cache != nil {
		return f.cache.ReadAt(p, off)
	}

	stat, err := f.Stat()
	if err != nil {
		return 0, err
	} else if off >= stat.Size() {
		return 0, io.EOF
	}

	//log.Printf("s3 ReadAt %v\n", off)
	if err := f.loadObject(); err != nil {
		return 0, err
	}

	return f.obj.ReadAt(p, off)
}

func (f *S3File) initCache() error {
	// We use a locally cached file instead of writing directly to S3
	// When the user calls close, the file is flushed on S3.
	// Check writeFlush below.
	if f.cache == nil {
		// We create a temp file in the configured folder
		// We do not use the default tmp file as files can be very large
		// and could fillup the RAM (often /tmp is mounted in RAM)
		tmp, err := os.CreateTemp(f.fs.local, "bagage-cache")
		if err != nil {
			return err
		}
		f.cache = tmp

		// Problem: WriteAt override the existing file, if it exists
		// So if when we stat the file, its size is greater than zero,
		// we download it in our cache
		file, err := f.Stat()
		if err != nil {
			return err
		} else if file.Size() != 0 {
			// We get a Reader on our object
			object, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{})
			if err != nil {
				return err
			}
			// We inject it in our cache file
			if _, err = io.Copy(f.cache, object); err != nil {
				return err
			}
		}
	}

	return nil
}

func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) {
	f.initCache()

	// And now we simply apply the command on our cache
	return f.cache.WriteAt(p, off)
}

func (f *S3File) Write(p []byte) (n int, err error) {
	f.initCache()

	return f.cache.Write(p)
}

func (f *S3File) writeFlush() error {
	// Only needed if we used a write cache
	if f.cache == nil {
		return nil
	}

	// Rewind the file to copy from the start
	_, err := f.cache.Seek(0, 0)
	if err != nil {
		return err
	}

	// Get a FileInfo object as minio needs its size (ideally)
	stat, err := f.cache.Stat()
	if err != nil {
		return err
	}

	// Send the file to minio
	contentType := mime.TypeByExtension(path.Ext(f.Path.Key))
	_, err = f.fs.mc.PutObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, f.cache, stat.Size(), minio.PutObjectOptions{
		ContentType: contentType,
	})
	if err != nil {
		return err
	}

	// Close the cache file and remove it
	err = f.cache.Close()
	if err != nil {
		return err
	}

	err = os.Remove(f.cache.Name())
	if err != nil {
		return err
	}

	return nil
}

func (f *S3File) Seek(offset int64, whence int) (int64, error) {
	if f.cache != nil {
		return f.cache.Seek(offset, whence)
	}

	if err := f.loadObject(); err != nil {
		return 0, err
	}

	pos, err := f.obj.Seek(offset, whence)
	f.pos += pos
	return pos, err
}

/*
ReadDir reads the contents of the directory associated with the file f and returns a slice of DirEntry values in directory order. Subsequent calls on the same file will yield later DirEntry records in the directory.

If n > 0, ReadDir returns at most n DirEntry records. In this case, if ReadDir returns an empty slice, it will return an error explaining why. At the end of a directory, the error is io.EOF.

If n <= 0, ReadDir returns all the DirEntry records remaining in the directory. When it succeeds, it returns a nil error (not io.EOF).
*/
func (f *S3File) Readdir(count int) ([]fs.FileInfo, error) {
	if f.Path.Class == ROOT {
		return f.readDirRoot(count)
	} else {
		return f.readDirChild(count)
	}
}

func min(a, b int64) int64 {
	if a < b {
		return a
	}
	return b
}

func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) {
	var err error
	if f.entries == nil {
		buckets, err := f.fs.mc.ListBuckets(f.fs.ctx)
		if err != nil {
			return nil, err
		}

		f.entries = make([]fs.FileInfo, 0, len(buckets))
		for _, bucket := range buckets {
			//log.Println("Stat from GarageFile.readDirRoot()", "/"+bucket.Name)
			nf, err := NewS3Stat(f.fs, "/"+bucket.Name)
			if err != nil {
				return nil, err
			}
			f.entries = append(f.entries, nf)
		}
	}
	beg := f.pos
	end := int64(len(f.entries))
	if count > 0 {
		end = min(beg+int64(count), end)
	}
	f.pos = end

	if end-beg == 0 {
		err = io.EOF
	}

	return f.entries[beg:end], err
}

func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) {
	var err error
	if f.entries == nil {
		prefix := f.Path.Key
		if len(prefix) > 0 && prefix[len(prefix)-1:] != "/" {
			prefix = prefix + "/"
		}

		objs_info := f.fs.mc.ListObjects(f.fs.ctx, f.Path.Bucket, minio.ListObjectsOptions{
			Prefix:    prefix,
			Recursive: false,
		})

		f.entries = make([]fs.FileInfo, 0)
		for object := range objs_info {
			if object.Err != nil {
				return nil, object.Err
			}
			//log.Println("Stat from GarageFile.readDirChild()", path.Join("/", f.path.bucket, object.Key))
			nf, err := NewS3StatFromObjectInfo(f.fs, f.Path.Bucket, object)
			if err != nil {
				return nil, err
			}
			f.entries = append(f.entries, nf)
		}
	}
	beg := f.pos
	end := int64(len(f.entries))
	if count > 0 {
		end = min(beg+int64(count), end)
	}
	f.pos = end

	if end-beg == 0 {
		err = io.EOF
	}

	return f.entries[beg:end], err
}

func (f *S3File) Stat() (fs.FileInfo, error) {
	return NewS3Stat(f.fs, f.Path.Path)
}