aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorQuentin <quentin@deuxfleurs.fr>2021-11-20 13:42:20 +0100
committerQuentin <quentin@deuxfleurs.fr>2021-11-20 13:42:20 +0100
commite10f04c5e36109c2e58d667c4b6ec054cbdd51be (patch)
tree7288ab0c17c541c921b77d8ddb71add2a54620ac
parent87fff9843dd60d4ce05596dc55bff44a3724a6bf (diff)
downloadbagage-sftp.tar.gz
bagage-sftp.zip
It seems to worksftp
-rw-r--r--config.go1
-rw-r--r--main.go9
-rw-r--r--s3/file.go176
-rw-r--r--s3/fs.go4
-rw-r--r--s3/stat.go2
-rw-r--r--sftp/allocator.go3
-rw-r--r--sftp/packet_manager.go3
-rw-r--r--sftp/server.go34
-rw-r--r--webdav.go2
9 files changed, 151 insertions, 83 deletions
diff --git a/config.go b/config.go
index b660e68..c5aff5b 100644
--- a/config.go
+++ b/config.go
@@ -14,6 +14,7 @@ type Config struct {
UserNameAttr string `env:"BAGAGE_LDAP_USERNAME_ATTR" default:"cn"`
Endpoint string `env:"BAGAGE_S3_ENDPOINT" default:"garage.deuxfleurs.fr"`
UseSSL bool `env:"BAGAGE_S3_SSL" default:"true"`
+ S3Cache string `env:"BAGAGE_S3_CACHE" default:"./s3_cache"`
SSHKey string `env:"BAGAGE_SSH_KEY" default:"id_rsa"`
}
diff --git a/main.go b/main.go
index bf7dc5e..c70dd4d 100644
--- a/main.go
+++ b/main.go
@@ -13,16 +13,19 @@ import (
"log"
"net"
"net/http"
+ "os"
)
func main() {
log.Println("=== Starting Bagage ===")
config := (&Config{}).LoadWithDefault().LoadWithEnv()
-
log.Println(config)
- done := make(chan error)
+ // Some init
+ os.MkdirAll(config.S3Cache, 0755)
+ // Launch our submodules
+ done := make(chan error)
go httpServer(config, done)
go sshServer(config, done)
@@ -148,7 +151,7 @@ func handleSSHConn(nConn net.Conn, dconfig *Config, config *ssh.ServerConfig) {
return
}
- fs := s3.NewS3FS(mc)
+ fs := s3.NewS3FS(mc, dconfig.S3Cache)
server, err := sftp.NewServer(ctx, channel, &fs)
if err != nil {
diff --git a/s3/file.go b/s3/file.go
index b20d247..1d6fced 100644
--- a/s3/file.go
+++ b/s3/file.go
@@ -1,27 +1,27 @@
package s3
import (
- "context"
"errors"
"fmt"
"io"
"io/fs"
"log"
"mime"
+ "os"
"path"
"github.com/minio/minio-go/v7"
)
type S3File struct {
- fs *S3FS
- obj *minio.Object
- objw *io.PipeWriter
- donew chan error
- pos int64
- eof bool
+ fs *S3FS
+ obj *minio.Object
+ objw *io.PipeWriter
+ cache *os.File
+ donew chan error
+ pos int64
entries []fs.FileInfo
- Path S3Path
+ Path S3Path
}
func NewS3File(s *S3FS, path string) (*S3File, error) {
@@ -49,6 +49,11 @@ func (f *S3File) Close() error {
f.objw = nil
}
+ if f.cache != nil {
+ err = append(err, f.writeFlush())
+ f.cache = nil
+ }
+
count := 0
for _, e := range err {
if e != nil {
@@ -57,7 +62,7 @@ func (f *S3File) Close() error {
}
}
if count > 0 {
- return errors.New(fmt.Sprintf("%d errors when closing this WebDAV File. Read previous logs to know more.", count))
+ return errors.New(fmt.Sprintf("%d errors when closing this S3 File. Read previous logs to know more.", count))
}
return nil
}
@@ -74,10 +79,15 @@ func (f *S3File) loadObject() error {
}
func (f *S3File) Read(p []byte) (n int, err error) {
- log.Printf("s3 Read\n")
+ //log.Printf("s3 Read\n")
//if f.Stat() & OBJECT == 0 { /* @FIXME Ideally we would check against OBJECT but we need a non OPAQUE_KEY */
// return 0, os.ErrInvalid
//}
+
+ if f.cache != nil {
+ return f.cache.Read(p)
+ }
+
if err := f.loadObject(); err != nil {
return 0, err
}
@@ -86,60 +96,120 @@ func (f *S3File) Read(p []byte) (n int, err error) {
}
func (f *S3File) ReadAt(p []byte, off int64) (n int, err error) {
- stat, err := f.Stat()
- if err != nil {
- return 0, err
- } else if off >= stat.Size() {
- return 0, io.EOF
- }
-
- log.Printf("s3 ReadAt %v\n", off)
+ if f.cache != nil {
+ return f.cache.ReadAt(p, off)
+ }
+
+ stat, err := f.Stat()
+ if err != nil {
+ return 0, err
+ } else if off >= stat.Size() {
+ return 0, io.EOF
+ }
+
+ //log.Printf("s3 ReadAt %v\n", off)
if err := f.loadObject(); err != nil {
return 0, err
}
- return f.obj.ReadAt(p, off)
+ return f.obj.ReadAt(p, off)
+}
+
+func (f *S3File) initCache() error {
+ // We use a locally cached file instead of writing directly to S3
+ // When the user calls close, the file is flushed on S3.
+ // Check writeFlush below.
+ if f.cache == nil {
+ // We create a temp file in the configured folder
+ // We do not use the default tmp file as files can be very large
+ // and could fillup the RAM (often /tmp is mounted in RAM)
+ tmp, err := os.CreateTemp(f.fs.local, "bagage-cache")
+ if err != nil {
+ return err
+ }
+ f.cache = tmp
+
+ // Problem: WriteAt override the existing file, if it exists
+ // So if when we stat the file, its size is greater than zero,
+ // we download it in our cache
+ file, err := f.Stat()
+ if err != nil {
+ return err
+ } else if file.Size() != 0 {
+ // We get a Reader on our object
+ object, err := f.fs.mc.GetObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, minio.GetObjectOptions{})
+ if err != nil {
+ return err
+ }
+ // We inject it in our cache file
+ if _, err = io.Copy(f.cache, object); err != nil {
+ return err
+ }
+ }
+ }
+
+ return nil
}
func (f *S3File) WriteAt(p []byte, off int64) (n int, err error) {
- return 0, errors.New("not implemented")
+ f.initCache()
+ // And now we simply apply the command on our cache
+ return f.cache.WriteAt(p, off)
}
func (f *S3File) Write(p []byte) (n int, err error) {
- /*if f.path.class != OBJECT {
- return 0, os.ErrInvalid
- }*/
+ f.initCache()
- if f.objw == nil {
- if f.pos != 0 {
- return 0, errors.New("writing with an offset is not implemented")
- }
+ return f.cache.Write(p)
+}
+
+func (f *S3File) writeFlush() error {
+ // Only needed if we used a write cache
+ if f.cache == nil {
+ return nil
+ }
+
+ // Rewind the file to copy from the start
+ _, err := f.cache.Seek(0, 0)
+ if err != nil {
+ return err
+ }
+
+ // Get a FileInfo object as minio needs its size (ideally)
+ stat, err := f.cache.Stat()
+ if err != nil {
+ return err
+ }
+
+ // Send the file to minio
+ contentType := mime.TypeByExtension(path.Ext(f.Path.Key))
+ _, err = f.fs.mc.PutObject(f.fs.ctx, f.Path.Bucket, f.Path.Key, f.cache, stat.Size(), minio.PutObjectOptions{
+ ContentType: contentType,
+ })
+ if err != nil {
+ return err
+ }
- r, w := io.Pipe()
- f.donew = make(chan error, 1)
- f.objw = w
+ // Close the cache file and remove it
+ err = f.cache.Close()
+ if err != nil {
+ return err
+ }
- contentType := mime.TypeByExtension(path.Ext(f.Path.Key))
- go func() {
- /* @FIXME
- PutObject has a strange behaviour when used with unknown size, it supposes the final size will be 5TiB.
- Then it computes that, following the final size of the file, each part of the multipart upload should be 512MiB, which leads to big allocations.
- The culprit is OptimalPartInfo: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L70
- We set this value to the minimum allowed one, 5MiB.
- The minimum value is set here: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/constants.go#L24
- Because Multipart uploads seems to be limited to 10 000 parts, it might be possible that we are limited to 50 GiB files, which is still good enough.
- Ref: https://github.com/minio/minio-go/blob/62beca8cd87e9960d88793320220ad2c159bb5e5/api-put-object-common.go#L110-L112
- */
- _, err := f.fs.mc.PutObject(context.Background(), f.Path.Bucket, f.Path.Key, r, -1, minio.PutObjectOptions{ContentType: contentType, PartSize: 5*1024*1024})
- f.donew <- err
- }()
+ err = os.Remove(f.cache.Name())
+ if err != nil {
+ return err
}
- return f.objw.Write(p)
+ return nil
}
func (f *S3File) Seek(offset int64, whence int) (int64, error) {
+ if f.cache != nil {
+ return f.cache.Seek(offset, whence)
+ }
+
if err := f.loadObject(); err != nil {
return 0, err
}
@@ -165,10 +235,10 @@ func (f *S3File) Readdir(count int) ([]fs.FileInfo, error) {
}
func min(a, b int64) int64 {
- if a < b {
- return a
- }
- return b
+ if a < b {
+ return a
+ }
+ return b
}
func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) {
@@ -192,11 +262,11 @@ func (f *S3File) readDirRoot(count int) ([]fs.FileInfo, error) {
beg := f.pos
end := int64(len(f.entries))
if count > 0 {
- end = min(beg + int64(count), end)
+ end = min(beg+int64(count), end)
}
f.pos = end
- if end - beg == 0 {
+ if end-beg == 0 {
err = io.EOF
}
@@ -232,11 +302,11 @@ func (f *S3File) readDirChild(count int) ([]fs.FileInfo, error) {
beg := f.pos
end := int64(len(f.entries))
if count > 0 {
- end = min(beg + int64(count), end)
+ end = min(beg+int64(count), end)
}
f.pos = end
- if end - beg == 0 {
+ if end-beg == 0 {
err = io.EOF
}
diff --git a/s3/fs.go b/s3/fs.go
index c5ae6a0..a8199d7 100644
--- a/s3/fs.go
+++ b/s3/fs.go
@@ -22,13 +22,15 @@ import (
type S3FS struct {
cache map[string]*S3Stat
mc *minio.Client
+ local string
ctx context.Context
}
-func NewS3FS(mc *minio.Client) S3FS {
+func NewS3FS(mc *minio.Client, local string) S3FS {
return S3FS{
cache: make(map[string]*S3Stat),
mc: mc,
+ local: local,
}
}
diff --git a/s3/stat.go b/s3/stat.go
index c91a757..96b0c24 100644
--- a/s3/stat.go
+++ b/s3/stat.go
@@ -1,7 +1,6 @@
package s3
import (
- "log"
"errors"
"io/fs"
"path"
@@ -105,7 +104,6 @@ func (s *S3Stat) Name() string {
}
func (s *S3Stat) Size() int64 {
- log.Println("stat size: ", s.obj.Size)
return s.obj.Size
}
diff --git a/sftp/allocator.go b/sftp/allocator.go
index fc1b6f0..5ae2145 100644
--- a/sftp/allocator.go
+++ b/sftp/allocator.go
@@ -2,7 +2,7 @@ package sftp
/*
Imported from: https://github.com/pkg/sftp
- */
+*/
import (
"sync"
@@ -98,4 +98,3 @@ func (a *allocator) isRequestOrderIDUsed(requestOrderID uint32) bool {
_, ok := a.used[requestOrderID]
return ok
}
-
diff --git a/sftp/packet_manager.go b/sftp/packet_manager.go
index 5aeb72b..59b1ed1 100644
--- a/sftp/packet_manager.go
+++ b/sftp/packet_manager.go
@@ -2,7 +2,7 @@ package sftp
/*
Imported from: https://github.com/pkg/sftp
- */
+*/
import (
"encoding"
@@ -218,4 +218,3 @@ func (s *packetManager) maybeSendPackets() {
// }
// return res
// }
-
diff --git a/sftp/server.go b/sftp/server.go
index 51db31a..be9f70a 100644
--- a/sftp/server.go
+++ b/sftp/server.go
@@ -4,12 +4,12 @@ package sftp
import (
"context"
- "log"
"encoding"
"errors"
"fmt"
"io"
"io/ioutil"
+ "log"
"os"
"strconv"
"sync"
@@ -21,7 +21,7 @@ import (
const (
// SftpServerWorkerCount defines the number of workers for the SFTP server
- SftpServerWorkerCount = 8
+ SftpServerWorkerCount = 1
)
// Server is an SSH File Transfer Protocol (sftp) server.
@@ -194,7 +194,7 @@ func handlePacket(s *Server, p orderedRequest) error {
case *sshFxpLstatPacket:
log.Println("pkt: lstat: ", p.Path)
// stat the requested file
- info, err := os.Lstat(toLocalPath(p.Path))
+ info, err := s.fs.Stat(s.ctx, p.Path)
rpkt = &sshFxpStatResponse{
ID: p.ID,
info: info,
@@ -219,43 +219,39 @@ func handlePacket(s *Server, p orderedRequest) error {
}
case *sshFxpMkdirPacket:
log.Println("pkt: mkdir: ", p.Path)
- err := os.Mkdir(toLocalPath(p.Path), 0755)
+ err := s.fs.Mkdir(s.ctx, p.Path, 0755)
rpkt = statusFromError(p.ID, err)
case *sshFxpRmdirPacket:
log.Println("pkt: rmdir: ", p.Path)
- err := os.Remove(toLocalPath(p.Path))
+ err := s.fs.RemoveAll(s.ctx, p.Path)
rpkt = statusFromError(p.ID, err)
case *sshFxpRemovePacket:
log.Println("pkt: rm: ", p.Filename)
- err := os.Remove(toLocalPath(p.Filename))
+ err := s.fs.RemoveAll(s.ctx, p.Filename)
rpkt = statusFromError(p.ID, err)
case *sshFxpRenamePacket:
log.Println("pkt: rename: ", p.Oldpath, ", ", p.Newpath)
- err := os.Rename(toLocalPath(p.Oldpath), toLocalPath(p.Newpath))
+ err := s.fs.Rename(s.ctx, p.Oldpath, p.Newpath)
rpkt = statusFromError(p.ID, err)
case *sshFxpSymlinkPacket:
log.Println("pkt: ln -s: ", p.Targetpath, ", ", p.Linkpath)
- err := os.Symlink(toLocalPath(p.Targetpath), toLocalPath(p.Linkpath))
+ err := s.fs.Rename(s.ctx, p.Targetpath, p.Linkpath)
rpkt = statusFromError(p.ID, err)
case *sshFxpClosePacket:
log.Println("pkt: close handle: ", p.Handle)
rpkt = statusFromError(p.ID, s.closeHandle(p.Handle))
case *sshFxpReadlinkPacket:
- log.Println("pkt: read: ", p.Path)
- f, err := os.Readlink(toLocalPath(p.Path))
+ log.Println("pkt: readlink: ", p.Path)
rpkt = &sshFxpNamePacket{
ID: p.ID,
NameAttrs: []*sshFxpNameAttr{
{
- Name: f,
- LongName: f,
+ Name: p.Path,
+ LongName: p.Path,
Attrs: emptyFileStat,
},
},
}
- if err != nil {
- rpkt = statusFromError(p.ID, err)
- }
case *sshFxpRealpathPacket:
log.Println("pkt: absolute path: ", p.Path)
f := s3.NewS3Path(p.Path).Path
@@ -288,7 +284,7 @@ func handlePacket(s *Server, p orderedRequest) error {
case *sshFxpReadPacket:
var err error = EBADF
f, ok := s.getHandle(p.Handle)
- log.Println("pkt: read handle: ", p.Handle, f.Path.Path)
+ //log.Println("pkt: read handle: ", p.Handle, f.Path.Path)
if ok {
err = nil
data := p.getDataSlice(s.pktMgr.alloc, orderID)
@@ -309,7 +305,7 @@ func handlePacket(s *Server, p orderedRequest) error {
}
case *sshFxpWritePacket:
- log.Println("pkt: write handle: ", p.Handle, ", Offset: ", p.Offset)
+ //log.Println("pkt: write handle: ", p.Handle, ", Offset: ", p.Offset)
f, ok := s.getHandle(p.Handle)
var err error = EBADF
if ok {
@@ -324,7 +320,7 @@ func handlePacket(s *Server, p orderedRequest) error {
rpkt = p.respond(s)
}
case serverRespondablePacket:
- log.Println("pkt: respondable")
+ //log.Println("pkt: respondable")
rpkt = p.respond(s)
default:
return fmt.Errorf("unexpected packet type %T", p)
@@ -477,7 +473,7 @@ func (p *sshFxpOpenPacket) respond(svr *Server) responsePacket {
}
func (p *sshFxpReaddirPacket) respond(svr *Server) responsePacket {
- log.Println("pkt: readdir: ", p.Handle)
+ //log.Println("pkt: readdir: ", p.Handle)
f, ok := svr.getHandle(p.Handle)
if !ok {
return statusFromError(p.ID, EBADF)
diff --git a/webdav.go b/webdav.go
index 8901166..8e2ce95 100644
--- a/webdav.go
+++ b/webdav.go
@@ -16,7 +16,7 @@ func (wd WebDav) WithMC(mc *minio.Client) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
(&webdav.Handler{
Prefix: wd.WithConfig.DavPath,
- FileSystem: s3.NewS3FS(mc),
+ FileSystem: s3.NewS3FS(mc, wd.WithConfig.S3Cache),
LockSystem: webdav.NewMemLS(),
Logger: func(r *http.Request, err error) {
log.Printf("INFO: %s %s %s\n", r.RemoteAddr, r.Method, r.URL)