package sftp import ( "bytes" "encoding/binary" "errors" "io" "math" "os" "path" "sync" "sync/atomic" "syscall" "time" "github.com/kr/fs" "golang.org/x/crypto/ssh" ) var ( // ErrInternalInconsistency indicates the packets sent and the data queued to be // written to the file don't match up. It is an unusual error and usually is // caused by bad behavior server side or connection issues. The error is // limited in scope to the call where it happened, the client object is still // OK to use as long as the connection is still open. ErrInternalInconsistency = errors.New("internal inconsistency") // InternalInconsistency alias for ErrInternalInconsistency. // // Deprecated: please use ErrInternalInconsistency InternalInconsistency = ErrInternalInconsistency ) // A ClientOption is a function which applies configuration to a Client. type ClientOption func(*Client) error // MaxPacketChecked sets the maximum size of the payload, measured in bytes. // This option only accepts sizes servers should support, ie. <= 32768 bytes. // // If you get the error "failed to send packet header: EOF" when copying a // large file, try lowering this number. // // The default packet size is 32768 bytes. func MaxPacketChecked(size int) ClientOption { return func(c *Client) error { if size < 1 { return errors.New("size must be greater or equal to 1") } if size > 32768 { return errors.New("sizes larger than 32KB might not work with all servers") } c.maxPacket = size return nil } } // MaxPacketUnchecked sets the maximum size of the payload, measured in bytes. // It accepts sizes larger than the 32768 bytes all servers should support. // Only use a setting higher than 32768 if your application always connects to // the same server or after sufficiently broad testing. // // If you get the error "failed to send packet header: EOF" when copying a // large file, try lowering this number. // // The default packet size is 32768 bytes. func MaxPacketUnchecked(size int) ClientOption { return func(c *Client) error { if size < 1 { return errors.New("size must be greater or equal to 1") } c.maxPacket = size return nil } } // MaxPacket sets the maximum size of the payload, measured in bytes. // This option only accepts sizes servers should support, ie. <= 32768 bytes. // This is a synonym for MaxPacketChecked that provides backward compatibility. // // If you get the error "failed to send packet header: EOF" when copying a // large file, try lowering this number. // // The default packet size is 32768 bytes. func MaxPacket(size int) ClientOption { return MaxPacketChecked(size) } // MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file. // // The default maximum concurrent requests is 64. func MaxConcurrentRequestsPerFile(n int) ClientOption { return func(c *Client) error { if n < 1 { return errors.New("n must be greater or equal to 1") } c.maxConcurrentRequests = n return nil } } // UseConcurrentWrites allows the Client to perform concurrent Writes. // // Using concurrency while doing writes, requires special consideration. // A write to a later offset in a file after an error, // could end up with a file length longer than what was successfully written. // // When using this option, if you receive an error during `io.Copy` or `io.WriteTo`, // you may need to `Truncate` the target Writer to avoid “holes” in the data written. func UseConcurrentWrites(value bool) ClientOption { return func(c *Client) error { c.useConcurrentWrites = value return nil } } // UseConcurrentReads allows the Client to perform concurrent Reads. // // Concurrent reads are generally safe to use and not using them will degrade // performance, so this option is enabled by default. // // When enabled, WriteTo will use Stat/Fstat to get the file size and determines // how many concurrent workers to use. // Some "read once" servers will delete the file if they receive a stat call on an // open file and then the download will fail. // Disabling concurrent reads you will be able to download files from these servers. // If concurrent reads are disabled, the UseFstat option is ignored. func UseConcurrentReads(value bool) ClientOption { return func(c *Client) error { c.disableConcurrentReads = !value return nil } } // UseFstat sets whether to use Fstat or Stat when File.WriteTo is called // (usually when copying files). // Some servers limit the amount of open files and calling Stat after opening // the file will throw an error From the server. Setting this flag will call // Fstat instead of Stat which is suppose to be called on an open file handle. // // It has been found that that with IBM Sterling SFTP servers which have // "extractability" level set to 1 which means only 1 file can be opened at // any given time. // // If the server you are working with still has an issue with both Stat and // Fstat calls you can always open a file and read it until the end. // // Another reason to read the file until its end and Fstat doesn't work is // that in some servers, reading a full file will automatically delete the // file as some of these mainframes map the file to a message in a queue. // Once the file has been read it will get deleted. func UseFstat(value bool) ClientOption { return func(c *Client) error { c.useFstat = value return nil } } // Client represents an SFTP session on a *ssh.ClientConn SSH connection. // Multiple Clients can be active on a single SSH connection, and a Client // may be called concurrently from multiple Goroutines. // // Client implements the github.com/kr/fs.FileSystem interface. type Client struct { clientConn ext map[string]string // Extensions (name -> data). maxPacket int // max packet size read or written. maxConcurrentRequests int nextid uint32 // write concurrency is… error prone. // Default behavior should be to not use it. useConcurrentWrites bool useFstat bool disableConcurrentReads bool } // NewClient creates a new SFTP client on conn, using zero or more option // functions. func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) { s, err := conn.NewSession() if err != nil { return nil, err } if err := s.RequestSubsystem("sftp"); err != nil { return nil, err } pw, err := s.StdinPipe() if err != nil { return nil, err } pr, err := s.StdoutPipe() if err != nil { return nil, err } return NewClientPipe(pr, pw, opts...) } // NewClientPipe creates a new SFTP client given a Reader and a WriteCloser. // This can be used for connecting to an SFTP server over TCP/TLS or by using // the system's ssh client program (e.g. via exec.Command). func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Client, error) { sftp := &Client{ clientConn: clientConn{ conn: conn{ Reader: rd, WriteCloser: wr, }, inflight: make(map[uint32]chan<- result), closed: make(chan struct{}), }, ext: make(map[string]string), maxPacket: 1 << 15, maxConcurrentRequests: 64, } for _, opt := range opts { if err := opt(sftp); err != nil { wr.Close() return nil, err } } if err := sftp.sendInit(); err != nil { wr.Close() return nil, err } if err := sftp.recvVersion(); err != nil { wr.Close() return nil, err } sftp.clientConn.wg.Add(1) go sftp.loop() return sftp, nil } // Create creates the named file mode 0666 (before umask), truncating it if it // already exists. If successful, methods on the returned File can be used for // I/O; the associated file descriptor has mode O_RDWR. If you need more // control over the flags/mode used to open the file see client.OpenFile. // // Note that some SFTP servers (eg. AWS Transfer) do not support opening files // read/write at the same time. For those services you will need to use // `client.OpenFile(os.O_WRONLY|os.O_CREATE|os.O_TRUNC)`. func (c *Client) Create(path string) (*File, error) { return c.open(path, flags(os.O_RDWR|os.O_CREATE|os.O_TRUNC)) } const sftpProtocolVersion = 3 // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02 func (c *Client) sendInit() error { return c.clientConn.conn.sendPacket(&sshFxInitPacket{ Version: sftpProtocolVersion, // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02 }) } // returns the next value of c.nextid func (c *Client) nextID() uint32 { return atomic.AddUint32(&c.nextid, 1) } func (c *Client) recvVersion() error { typ, data, err := c.recvPacket(0) if err != nil { return err } if typ != sshFxpVersion { return &unexpectedPacketErr{sshFxpVersion, typ} } version, data, err := unmarshalUint32Safe(data) if err != nil { return err } if version != sftpProtocolVersion { return &unexpectedVersionErr{sftpProtocolVersion, version} } for len(data) > 0 { var ext extensionPair ext, data, err = unmarshalExtensionPair(data) if err != nil { return err } c.ext[ext.Name] = ext.Data } return nil } // HasExtension checks whether the server supports a named extension. // // The first return value is the extension data reported by the server // (typically a version number). func (c *Client) HasExtension(name string) (string, bool) { data, ok := c.ext[name] return data, ok } // Walk returns a new Walker rooted at root. func (c *Client) Walk(root string) *fs.Walker { return fs.WalkFS(root, c) } // ReadDir reads the directory named by dirname and returns a list of // directory entries. func (c *Client) ReadDir(p string) ([]os.FileInfo, error) { handle, err := c.opendir(p) if err != nil { return nil, err } defer c.close(handle) // this has to defer earlier than the lock below var attrs []os.FileInfo var done = false for !done { id := c.nextID() typ, data, err1 := c.sendPacket(nil, &sshFxpReaddirPacket{ ID: id, Handle: handle, }) if err1 != nil { err = err1 done = true break } switch typ { case sshFxpName: sid, data := unmarshalUint32(data) if sid != id { return nil, &unexpectedIDErr{id, sid} } count, data := unmarshalUint32(data) for i := uint32(0); i < count; i++ { var filename string filename, data = unmarshalString(data) _, data = unmarshalString(data) // discard longname var attr *FileStat attr, data = unmarshalAttrs(data) if filename == "." || filename == ".." { continue } attrs = append(attrs, fileInfoFromStat(attr, path.Base(filename))) } case sshFxpStatus: // TODO(dfc) scope warning! err = normaliseError(unmarshalStatus(id, data)) done = true default: return nil, unimplementedPacketErr(typ) } } if err == io.EOF { err = nil } return attrs, err } func (c *Client) opendir(path string) (string, error) { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpOpendirPacket{ ID: id, Path: path, }) if err != nil { return "", err } switch typ { case sshFxpHandle: sid, data := unmarshalUint32(data) if sid != id { return "", &unexpectedIDErr{id, sid} } handle, _ := unmarshalString(data) return handle, nil case sshFxpStatus: return "", normaliseError(unmarshalStatus(id, data)) default: return "", unimplementedPacketErr(typ) } } // Stat returns a FileInfo structure describing the file specified by path 'p'. // If 'p' is a symbolic link, the returned FileInfo structure describes the referent file. func (c *Client) Stat(p string) (os.FileInfo, error) { fs, err := c.stat(p) if err != nil { return nil, err } return fileInfoFromStat(fs, path.Base(p)), nil } // Lstat returns a FileInfo structure describing the file specified by path 'p'. // If 'p' is a symbolic link, the returned FileInfo structure describes the symbolic link. func (c *Client) Lstat(p string) (os.FileInfo, error) { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpLstatPacket{ ID: id, Path: p, }) if err != nil { return nil, err } switch typ { case sshFxpAttrs: sid, data := unmarshalUint32(data) if sid != id { return nil, &unexpectedIDErr{id, sid} } attr, _ := unmarshalAttrs(data) return fileInfoFromStat(attr, path.Base(p)), nil case sshFxpStatus: return nil, normaliseError(unmarshalStatus(id, data)) default: return nil, unimplementedPacketErr(typ) } } // ReadLink reads the target of a symbolic link. func (c *Client) ReadLink(p string) (string, error) { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpReadlinkPacket{ ID: id, Path: p, }) if err != nil { return "", err } switch typ { case sshFxpName: sid, data := unmarshalUint32(data) if sid != id { return "", &unexpectedIDErr{id, sid} } count, data := unmarshalUint32(data) if count != 1 { return "", unexpectedCount(1, count) } filename, _ := unmarshalString(data) // ignore dummy attributes return filename, nil case sshFxpStatus: return "", normaliseError(unmarshalStatus(id, data)) default: return "", unimplementedPacketErr(typ) } } // Link creates a hard link at 'newname', pointing at the same inode as 'oldname' func (c *Client) Link(oldname, newname string) error { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpHardlinkPacket{ ID: id, Oldpath: oldname, Newpath: newname, }) if err != nil { return err } switch typ { case sshFxpStatus: return normaliseError(unmarshalStatus(id, data)) default: return unimplementedPacketErr(typ) } } // Symlink creates a symbolic link at 'newname', pointing at target 'oldname' func (c *Client) Symlink(oldname, newname string) error { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpSymlinkPacket{ ID: id, Linkpath: newname, Targetpath: oldname, }) if err != nil { return err } switch typ { case sshFxpStatus: return normaliseError(unmarshalStatus(id, data)) default: return unimplementedPacketErr(typ) } } func (c *Client) setfstat(handle string, flags uint32, attrs interface{}) error { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpFsetstatPacket{ ID: id, Handle: handle, Flags: flags, Attrs: attrs, }) if err != nil { return err } switch typ { case sshFxpStatus: return normaliseError(unmarshalStatus(id, data)) default: return unimplementedPacketErr(typ) } } // setstat is a convience wrapper to allow for changing of various parts of the file descriptor. func (c *Client) setstat(path string, flags uint32, attrs interface{}) error { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpSetstatPacket{ ID: id, Path: path, Flags: flags, Attrs: attrs, }) if err != nil { return err } switch typ { case sshFxpStatus: return normaliseError(unmarshalStatus(id, data)) default: return unimplementedPacketErr(typ) } } // Chtimes changes the access and modification times of the named file. func (c *Client) Chtimes(path string, atime time.Time, mtime time.Time) error { type times struct { Atime uint32 Mtime uint32 } attrs := times{uint32(atime.Unix()), uint32(mtime.Unix())} return c.setstat(path, sshFileXferAttrACmodTime, attrs) } // Chown changes the user and group owners of the named file. func (c *Client) Chown(path string, uid, gid int) error { type owner struct { UID uint32 GID uint32 } attrs := owner{uint32(uid), uint32(gid)} return c.setstat(path, sshFileXferAttrUIDGID, attrs) } // Chmod changes the permissions of the named file. // // Chmod does not apply a umask, because even retrieving the umask is not // possible in a portable way without causing a race condition. Callers // should mask off umask bits, if desired. func (c *Client) Chmod(path string, mode os.FileMode) error { return c.setstat(path, sshFileXferAttrPermissions, toChmodPerm(mode)) } // Truncate sets the size of the named file. Although it may be safely assumed // that if the size is less than its current size it will be truncated to fit, // the SFTP protocol does not specify what behavior the server should do when setting // size greater than the current size. func (c *Client) Truncate(path string, size int64) error { return c.setstat(path, sshFileXferAttrSize, uint64(size)) } // Open opens the named file for reading. If successful, methods on the // returned file can be used for reading; the associated file descriptor // has mode O_RDONLY. func (c *Client) Open(path string) (*File, error) { return c.open(path, flags(os.O_RDONLY)) } // OpenFile is the generalized open call; most users will use Open or // Create instead. It opens the named file with specified flag (O_RDONLY // etc.). If successful, methods on the returned File can be used for I/O. func (c *Client) OpenFile(path string, f int) (*File, error) { return c.open(path, flags(f)) } func (c *Client) open(path string, pflags uint32) (*File, error) { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpOpenPacket{ ID: id, Path: path, Pflags: pflags, }) if err != nil { return nil, err } switch typ { case sshFxpHandle: sid, data := unmarshalUint32(data) if sid != id { return nil, &unexpectedIDErr{id, sid} } handle, _ := unmarshalString(data) return &File{c: c, path: path, handle: handle}, nil case sshFxpStatus: return nil, normaliseError(unmarshalStatus(id, data)) default: return nil, unimplementedPacketErr(typ) } } // close closes a handle handle previously returned in the response // to SSH_FXP_OPEN or SSH_FXP_OPENDIR. The handle becomes invalid // immediately after this request has been sent. func (c *Client) close(handle string) error { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpClosePacket{ ID: id, Handle: handle, }) if err != nil { return err } switch typ { case sshFxpStatus: return normaliseError(unmarshalStatus(id, data)) default: return unimplementedPacketErr(typ) } } func (c *Client) stat(path string) (*FileStat, error) { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpStatPacket{ ID: id, Path: path, }) if err != nil { return nil, err } switch typ { case sshFxpAttrs: sid, data := unmarshalUint32(data) if sid != id { return nil, &unexpectedIDErr{id, sid} } attr, _ := unmarshalAttrs(data) return attr, nil case sshFxpStatus: return nil, normaliseError(unmarshalStatus(id, data)) default: return nil, unimplementedPacketErr(typ) } } func (c *Client) fstat(handle string) (*FileStat, error) { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpFstatPacket{ ID: id, Handle: handle, }) if err != nil { return nil, err } switch typ { case sshFxpAttrs: sid, data := unmarshalUint32(data) if sid != id { return nil, &unexpectedIDErr{id, sid} } attr, _ := unmarshalAttrs(data) return attr, nil case sshFxpStatus: return nil, normaliseError(unmarshalStatus(id, data)) default: return nil, unimplementedPacketErr(typ) } } // StatVFS retrieves VFS statistics from a remote host. // // It implements the statvfs@openssh.com SSH_FXP_EXTENDED feature // from http://www.opensource.apple.com/source/OpenSSH/OpenSSH-175/openssh/PROTOCOL?txt. func (c *Client) StatVFS(path string) (*StatVFS, error) { // send the StatVFS packet to the server id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpStatvfsPacket{ ID: id, Path: path, }) if err != nil { return nil, err } switch typ { // server responded with valid data case sshFxpExtendedReply: var response StatVFS err = binary.Read(bytes.NewReader(data), binary.BigEndian, &response) if err != nil { return nil, errors.New("can not parse reply") } return &response, nil // the resquest failed case sshFxpStatus: return nil, normaliseError(unmarshalStatus(id, data)) default: return nil, unimplementedPacketErr(typ) } } // Join joins any number of path elements into a single path, adding a // separating slash if necessary. The result is Cleaned; in particular, all // empty strings are ignored. func (c *Client) Join(elem ...string) string { return path.Join(elem...) } // Remove removes the specified file or directory. An error will be returned if no // file or directory with the specified path exists, or if the specified directory // is not empty. func (c *Client) Remove(path string) error { err := c.removeFile(path) // some servers, *cough* osx *cough*, return EPERM, not ENODIR. // serv-u returns ssh_FX_FILE_IS_A_DIRECTORY // EPERM is converted to os.ErrPermission so it is not a StatusError if err, ok := err.(*StatusError); ok { switch err.Code { case sshFxFailure, sshFxFileIsADirectory: return c.RemoveDirectory(path) } } if os.IsPermission(err) { return c.RemoveDirectory(path) } return err } func (c *Client) removeFile(path string) error { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpRemovePacket{ ID: id, Filename: path, }) if err != nil { return err } switch typ { case sshFxpStatus: return normaliseError(unmarshalStatus(id, data)) default: return unimplementedPacketErr(typ) } } // RemoveDirectory removes a directory path. func (c *Client) RemoveDirectory(path string) error { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpRmdirPacket{ ID: id, Path: path, }) if err != nil { return err } switch typ { case sshFxpStatus: return normaliseError(unmarshalStatus(id, data)) default: return unimplementedPacketErr(typ) } } // Rename renames a file. func (c *Client) Rename(oldname, newname string) error { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpRenamePacket{ ID: id, Oldpath: oldname, Newpath: newname, }) if err != nil { return err } switch typ { case sshFxpStatus: return normaliseError(unmarshalStatus(id, data)) default: return unimplementedPacketErr(typ) } } // PosixRename renames a file using the posix-rename@openssh.com extension // which will replace newname if it already exists. func (c *Client) PosixRename(oldname, newname string) error { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpPosixRenamePacket{ ID: id, Oldpath: oldname, Newpath: newname, }) if err != nil { return err } switch typ { case sshFxpStatus: return normaliseError(unmarshalStatus(id, data)) default: return unimplementedPacketErr(typ) } } // RealPath can be used to have the server canonicalize any given path name to an absolute path. // // This is useful for converting path names containing ".." components, // or relative pathnames without a leading slash into absolute paths. func (c *Client) RealPath(path string) (string, error) { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpRealpathPacket{ ID: id, Path: path, }) if err != nil { return "", err } switch typ { case sshFxpName: sid, data := unmarshalUint32(data) if sid != id { return "", &unexpectedIDErr{id, sid} } count, data := unmarshalUint32(data) if count != 1 { return "", unexpectedCount(1, count) } filename, _ := unmarshalString(data) // ignore attributes return filename, nil case sshFxpStatus: return "", normaliseError(unmarshalStatus(id, data)) default: return "", unimplementedPacketErr(typ) } } // Getwd returns the current working directory of the server. Operations // involving relative paths will be based at this location. func (c *Client) Getwd() (string, error) { return c.RealPath(".") } // Mkdir creates the specified directory. An error will be returned if a file or // directory with the specified path already exists, or if the directory's // parent folder does not exist (the method cannot create complete paths). func (c *Client) Mkdir(path string) error { id := c.nextID() typ, data, err := c.sendPacket(nil, &sshFxpMkdirPacket{ ID: id, Path: path, }) if err != nil { return err } switch typ { case sshFxpStatus: return normaliseError(unmarshalStatus(id, data)) default: return unimplementedPacketErr(typ) } } // MkdirAll creates a directory named path, along with any necessary parents, // and returns nil, or else returns an error. // If path is already a directory, MkdirAll does nothing and returns nil. // If path contains a regular file, an error is returned func (c *Client) MkdirAll(path string) error { // Most of this code mimics https://golang.org/src/os/path.go?s=514:561#L13 // Fast path: if we can tell whether path is a directory or file, stop with success or error. dir, err := c.Stat(path) if err == nil { if dir.IsDir() { return nil } return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR} } // Slow path: make sure parent exists and then call Mkdir for path. i := len(path) for i > 0 && path[i-1] == '/' { // Skip trailing path separator. i-- } j := i for j > 0 && path[j-1] != '/' { // Scan backward over element. j-- } if j > 1 { // Create parent err = c.MkdirAll(path[0 : j-1]) if err != nil { return err } } // Parent now exists; invoke Mkdir and use its result. err = c.Mkdir(path) if err != nil { // Handle arguments like "foo/." by // double-checking that directory doesn't exist. dir, err1 := c.Lstat(path) if err1 == nil && dir.IsDir() { return nil } return err } return nil } // File represents a remote file. type File struct { c *Client path string handle string mu sync.Mutex offset int64 // current offset within remote file } // Close closes the File, rendering it unusable for I/O. It returns an // error, if any. func (f *File) Close() error { return f.c.close(f.handle) } // Name returns the name of the file as presented to Open or Create. func (f *File) Name() string { return f.path } // Read reads up to len(b) bytes from the File. It returns the number of bytes // read and an error, if any. Read follows io.Reader semantics, so when Read // encounters an error or EOF condition after successfully reading n > 0 bytes, // it returns the number of bytes read. // // To maximise throughput for transferring the entire file (especially // over high latency links) it is recommended to use WriteTo rather // than calling Read multiple times. io.Copy will do this // automatically. func (f *File) Read(b []byte) (int, error) { f.mu.Lock() defer f.mu.Unlock() n, err := f.ReadAt(b, f.offset) f.offset += int64(n) return n, err } // readChunkAt attempts to read the whole entire length of the buffer from the file starting at the offset. // It will continue progressively reading into the buffer until it fills the whole buffer, or an error occurs. func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err error) { for err == nil && n < len(b) { id := f.c.nextID() typ, data, err := f.c.sendPacket(ch, &sshFxpReadPacket{ ID: id, Handle: f.handle, Offset: uint64(off) + uint64(n), Len: uint32(len(b) - n), }) if err != nil { return n, err } switch typ { case sshFxpStatus: return n, normaliseError(unmarshalStatus(id, data)) case sshFxpData: sid, data := unmarshalUint32(data) if id != sid { return n, &unexpectedIDErr{id, sid} } l, data := unmarshalUint32(data) n += copy(b[n:], data[:l]) default: return n, unimplementedPacketErr(typ) } } return } func (f *File) readAtSequential(b []byte, off int64) (read int, err error) { for read < len(b) { rb := b[read:] if len(rb) > f.c.maxPacket { rb = rb[:f.c.maxPacket] } n, err := f.readChunkAt(nil, rb, off+int64(read)) if n < 0 { panic("sftp.File: returned negative count from readChunkAt") } if n > 0 { read += n } if err != nil { if errors.Is(err, io.EOF) { return read, nil // return nil explicitly. } return read, err } } return read, nil } // ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns // the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics, // so the file offset is not altered during the read. func (f *File) ReadAt(b []byte, off int64) (int, error) { if len(b) <= f.c.maxPacket { // This should be able to be serviced with 1/2 requests. // So, just do it directly. return f.readChunkAt(nil, b, off) } if f.c.disableConcurrentReads { return f.readAtSequential(b, off) } // Split the read into multiple maxPacket-sized concurrent reads bounded by maxConcurrentRequests. // This allows writes with a suitably large buffer to transfer data at a much faster rate // by overlapping round trip times. cancel := make(chan struct{}) concurrency := len(b)/f.c.maxPacket + 1 if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { concurrency = f.c.maxConcurrentRequests } resPool := newResChanPool(concurrency) type work struct { id uint32 res chan result b []byte off int64 } workCh := make(chan work) // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets. go func() { defer close(workCh) b := b offset := off chunkSize := f.c.maxPacket for len(b) > 0 { rb := b if len(rb) > chunkSize { rb = rb[:chunkSize] } id := f.c.nextID() res := resPool.Get() f.c.dispatchRequest(res, &sshFxpReadPacket{ ID: id, Handle: f.handle, Offset: uint64(offset), Len: uint32(chunkSize), }) select { case workCh <- work{id, res, rb, offset}: case <-cancel: return } offset += int64(len(rb)) b = b[len(rb):] } }() type rErr struct { off int64 err error } errCh := make(chan rErr) var wg sync.WaitGroup wg.Add(concurrency) for i := 0; i < concurrency; i++ { // Map_i: each worker gets work, and then performs the Read into its buffer from its respective offset. go func() { defer wg.Done() for packet := range workCh { var n int s := <-packet.res resPool.Put(packet.res) err := s.err if err == nil { switch s.typ { case sshFxpStatus: err = normaliseError(unmarshalStatus(packet.id, s.data)) case sshFxpData: sid, data := unmarshalUint32(s.data) if packet.id != sid { err = &unexpectedIDErr{packet.id, sid} } else { l, data := unmarshalUint32(data) n = copy(packet.b, data[:l]) // For normal disk files, it is guaranteed that this will read // the specified number of bytes, or up to end of file. // This implies, if we have a short read, that means EOF. if n < len(packet.b) { err = io.EOF } } default: err = unimplementedPacketErr(s.typ) } } if err != nil { // return the offset as the start + how much we read before the error. errCh <- rErr{packet.off + int64(n), err} return } } }() } // Wait for long tail, before closing results. go func() { wg.Wait() close(errCh) }() // Reduce: collect all the results into a relevant return: the earliest offset to return an error. firstErr := rErr{math.MaxInt64, nil} for rErr := range errCh { if rErr.off <= firstErr.off { firstErr = rErr } select { case <-cancel: default: // stop any more work from being distributed. (Just in case.) close(cancel) } } if firstErr.err != nil { // firstErr.err != nil if and only if firstErr.off > our starting offset. return int(firstErr.off - off), firstErr.err } // As per spec for io.ReaderAt, we return nil error if and only if we read everything. return len(b), nil } // writeToSequential implements WriteTo, but works sequentially with no parallelism. func (f *File) writeToSequential(w io.Writer) (written int64, err error) { b := make([]byte, f.c.maxPacket) ch := make(chan result, 1) // reusable channel for { n, err := f.readChunkAt(ch, b, f.offset) if n < 0 { panic("sftp.File: returned negative count from readChunkAt") } if n > 0 { f.offset += int64(n) m, err2 := w.Write(b[:n]) written += int64(m) if err == nil { err = err2 } } if err != nil { if err == io.EOF { return written, nil // return nil explicitly. } return written, err } } } // WriteTo writes the file to the given Writer. // The return value is the number of bytes written. // Any error encountered during the write is also returned. // // This method is preferred over calling Read multiple times // to maximise throughput for transferring the entire file, // especially over high latency links. func (f *File) WriteTo(w io.Writer) (written int64, err error) { f.mu.Lock() defer f.mu.Unlock() if f.c.disableConcurrentReads { return f.writeToSequential(w) } // For concurrency, we want to guess how many concurrent workers we should use. var fileStat *FileStat if f.c.useFstat { fileStat, err = f.c.fstat(f.handle) } else { fileStat, err = f.c.stat(f.path) } if err != nil { return 0, err } fileSize := fileStat.Size if fileSize <= uint64(f.c.maxPacket) || !isRegular(fileStat.Mode) { // only regular files are guaranteed to return (full read) xor (partial read, next error) return f.writeToSequential(w) } concurrency64 := fileSize/uint64(f.c.maxPacket) + 1 // a bad guess, but better than no guess if concurrency64 > uint64(f.c.maxConcurrentRequests) || concurrency64 < 1 { concurrency64 = uint64(f.c.maxConcurrentRequests) } // Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow. concurrency := int(concurrency64) chunkSize := f.c.maxPacket pool := newBufPool(concurrency, chunkSize) resPool := newResChanPool(concurrency) cancel := make(chan struct{}) var wg sync.WaitGroup defer func() { // Once the writing Reduce phase has ended, all the feed work needs to unconditionally stop. close(cancel) // We want to wait until all outstanding goroutines with an `f` or `f.c` reference have completed. // Just to be sure we don’t orphan any goroutines any hanging references. wg.Wait() }() type writeWork struct { b []byte off int64 err error next chan writeWork } writeCh := make(chan writeWork) type readWork struct { id uint32 res chan result off int64 cur, next chan writeWork } readCh := make(chan readWork) // Slice: hand out chunks of work on demand, with a `cur` and `next` channel built-in for sequencing. go func() { defer close(readCh) off := f.offset cur := writeCh for { id := f.c.nextID() res := resPool.Get() next := make(chan writeWork) readWork := readWork{ id: id, res: res, off: off, cur: cur, next: next, } f.c.dispatchRequest(res, &sshFxpReadPacket{ ID: id, Handle: f.handle, Offset: uint64(off), Len: uint32(chunkSize), }) select { case readCh <- readWork: case <-cancel: return } off += int64(chunkSize) cur = next } }() wg.Add(concurrency) for i := 0; i < concurrency; i++ { // Map_i: each worker gets readWork, and does the Read into a buffer at the given offset. go func() { defer wg.Done() for readWork := range readCh { var b []byte var n int s := <-readWork.res resPool.Put(readWork.res) err := s.err if err == nil { switch s.typ { case sshFxpStatus: err = normaliseError(unmarshalStatus(readWork.id, s.data)) case sshFxpData: sid, data := unmarshalUint32(s.data) if readWork.id != sid { err = &unexpectedIDErr{readWork.id, sid} } else { l, data := unmarshalUint32(data) b = pool.Get()[:l] n = copy(b, data[:l]) b = b[:n] } default: err = unimplementedPacketErr(s.typ) } } writeWork := writeWork{ b: b, off: readWork.off, err: err, next: readWork.next, } select { case readWork.cur <- writeWork: case <-cancel: return } if err != nil { return } } }() } // Reduce: serialize the results from the reads into sequential writes. cur := writeCh for { packet, ok := <-cur if !ok { return written, errors.New("sftp.File.WriteTo: unexpectedly closed channel") } // Because writes are serialized, this will always be the last successfully read byte. f.offset = packet.off + int64(len(packet.b)) if len(packet.b) > 0 { n, err := w.Write(packet.b) written += int64(n) if err != nil { return written, err } } if packet.err != nil { if packet.err == io.EOF { return written, nil } return written, packet.err } pool.Put(packet.b) cur = packet.next } } // Stat returns the FileInfo structure describing file. If there is an // error. func (f *File) Stat() (os.FileInfo, error) { fs, err := f.c.fstat(f.handle) if err != nil { return nil, err } return fileInfoFromStat(fs, path.Base(f.path)), nil } // Write writes len(b) bytes to the File. It returns the number of bytes // written and an error, if any. Write returns a non-nil error when n != // len(b). // // To maximise throughput for transferring the entire file (especially // over high latency links) it is recommended to use ReadFrom rather // than calling Write multiple times. io.Copy will do this // automatically. func (f *File) Write(b []byte) (int, error) { f.mu.Lock() defer f.mu.Unlock() n, err := f.WriteAt(b, f.offset) f.offset += int64(n) return n, err } func (f *File) writeChunkAt(ch chan result, b []byte, off int64) (int, error) { typ, data, err := f.c.sendPacket(ch, &sshFxpWritePacket{ ID: f.c.nextID(), Handle: f.handle, Offset: uint64(off), Length: uint32(len(b)), Data: b, }) if err != nil { return 0, err } switch typ { case sshFxpStatus: id, _ := unmarshalUint32(data) err := normaliseError(unmarshalStatus(id, data)) if err != nil { return 0, err } default: return 0, unimplementedPacketErr(typ) } return len(b), nil } // writeAtConcurrent implements WriterAt, but works concurrently rather than sequentially. func (f *File) writeAtConcurrent(b []byte, off int64) (int, error) { // Split the write into multiple maxPacket sized concurrent writes // bounded by maxConcurrentRequests. This allows writes with a suitably // large buffer to transfer data at a much faster rate due to // overlapping round trip times. cancel := make(chan struct{}) type work struct { b []byte off int64 } workCh := make(chan work) // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets. go func() { defer close(workCh) var read int chunkSize := f.c.maxPacket for read < len(b) { wb := b[read:] if len(wb) > chunkSize { wb = wb[:chunkSize] } select { case workCh <- work{wb, off + int64(read)}: case <-cancel: return } read += len(wb) } }() type wErr struct { off int64 err error } errCh := make(chan wErr) concurrency := len(b)/f.c.maxPacket + 1 if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { concurrency = f.c.maxConcurrentRequests } var wg sync.WaitGroup wg.Add(concurrency) for i := 0; i < concurrency; i++ { // Map_i: each worker gets work, and does the Write from each buffer to its respective offset. go func() { defer wg.Done() ch := make(chan result, 1) // reusable channel per mapper. for packet := range workCh { n, err := f.writeChunkAt(ch, packet.b, packet.off) if err != nil { // return the offset as the start + how much we wrote before the error. errCh <- wErr{packet.off + int64(n), err} } } }() } // Wait for long tail, before closing results. go func() { wg.Wait() close(errCh) }() // Reduce: collect all the results into a relevant return: the earliest offset to return an error. firstErr := wErr{math.MaxInt64, nil} for wErr := range errCh { if wErr.off <= firstErr.off { firstErr = wErr } select { case <-cancel: default: // stop any more work from being distributed. (Just in case.) close(cancel) } } if firstErr.err != nil { // firstErr.err != nil if and only if firstErr.off >= our starting offset. return int(firstErr.off - off), firstErr.err } return len(b), nil } // WriteAt writes up to len(b) byte to the File at a given offset `off`. It returns // the number of bytes written and an error, if any. WriteAt follows io.WriterAt semantics, // so the file offset is not altered during the write. func (f *File) WriteAt(b []byte, off int64) (written int, err error) { if len(b) <= f.c.maxPacket { // We can do this in one write. return f.writeChunkAt(nil, b, off) } if f.c.useConcurrentWrites { return f.writeAtConcurrent(b, off) } ch := make(chan result, 1) // reusable channel chunkSize := f.c.maxPacket for written < len(b) { wb := b[written:] if len(wb) > chunkSize { wb = wb[:chunkSize] } n, err := f.writeChunkAt(ch, wb, off+int64(written)) if n > 0 { written += n } if err != nil { return written, err } } return len(b), nil } // ReadFromWithConcurrency implements ReaderFrom, // but uses the given concurrency to issue multiple requests at the same time. // // Giving a concurrency of less than one will default to the Client’s max concurrency. // // Otherwise, the given concurrency will be capped by the Client's max concurrency. func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) { // Split the write into multiple maxPacket sized concurrent writes. // This allows writes with a suitably large reader // to transfer data at a much faster rate due to overlapping round trip times. cancel := make(chan struct{}) type work struct { b []byte n int off int64 } workCh := make(chan work) type rwErr struct { off int64 err error } errCh := make(chan rwErr) if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { concurrency = f.c.maxConcurrentRequests } pool := newBufPool(concurrency, f.c.maxPacket) // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets. go func() { defer close(workCh) off := f.offset for { b := pool.Get() n, err := r.Read(b) if n > 0 { read += int64(n) select { case workCh <- work{b, n, off}: // We need the pool.Put(b) to put the whole slice, not just trunced. case <-cancel: return } off += int64(n) } if err != nil { if err != io.EOF { errCh <- rwErr{off, err} } return } } }() var wg sync.WaitGroup wg.Add(concurrency) for i := 0; i < concurrency; i++ { // Map_i: each worker gets work, and does the Write from each buffer to its respective offset. go func() { defer wg.Done() ch := make(chan result, 1) // reusable channel per mapper. for packet := range workCh { n, err := f.writeChunkAt(ch, packet.b[:packet.n], packet.off) if err != nil { // return the offset as the start + how much we wrote before the error. errCh <- rwErr{packet.off + int64(n), err} } pool.Put(packet.b) } }() } // Wait for long tail, before closing results. go func() { wg.Wait() close(errCh) }() // Reduce: Collect all the results into a relevant return: the earliest offset to return an error. firstErr := rwErr{math.MaxInt64, nil} for rwErr := range errCh { if rwErr.off <= firstErr.off { firstErr = rwErr } select { case <-cancel: default: // stop any more work from being distributed. close(cancel) } } if firstErr.err != nil { // firstErr.err != nil if and only if firstErr.off is a valid offset. // // firstErr.off will then be the lesser of: // * the offset of the first error from writing, // * the last successfully read offset. // // This could be less than the last successfully written offset, // which is the whole reason for the UseConcurrentWrites() ClientOption. // // Callers are responsible for truncating any SFTP files to a safe length. f.offset = firstErr.off // ReadFrom is defined to return the read bytes, regardless of any writer errors. return read, firstErr.err } f.offset += read return read, nil } // ReadFrom reads data from r until EOF and writes it to the file. The return // value is the number of bytes read. Any error except io.EOF encountered // during the read is also returned. // // This method is preferred over calling Write multiple times // to maximise throughput for transferring the entire file, // especially over high-latency links. func (f *File) ReadFrom(r io.Reader) (int64, error) { f.mu.Lock() defer f.mu.Unlock() if f.c.useConcurrentWrites { var remain int64 switch r := r.(type) { case interface{ Len() int }: remain = int64(r.Len()) case interface{ Size() int64 }: remain = r.Size() case *io.LimitedReader: remain = r.N case interface{ Stat() (os.FileInfo, error) }: info, err := r.Stat() if err == nil { remain = info.Size() } } if remain < 0 { // We can strongly assert that we want default max concurrency here. return f.ReadFromWithConcurrency(r, f.c.maxConcurrentRequests) } if remain > int64(f.c.maxPacket) { // Otherwise, only use concurrency, if it would be at least two packets. // This is the best reasonable guess we can make. concurrency64 := remain/int64(f.c.maxPacket) + 1 // We need to cap this value to an `int` size value to avoid overflow on 32-bit machines. // So, we may as well pre-cap it to `f.c.maxConcurrentRequests`. if concurrency64 > int64(f.c.maxConcurrentRequests) { concurrency64 = int64(f.c.maxConcurrentRequests) } return f.ReadFromWithConcurrency(r, int(concurrency64)) } } ch := make(chan result, 1) // reusable channel b := make([]byte, f.c.maxPacket) var read int64 for { n, err := r.Read(b) if n < 0 { panic("sftp.File: reader returned negative count from Read") } if n > 0 { read += int64(n) m, err2 := f.writeChunkAt(ch, b[:n], f.offset) f.offset += int64(m) if err == nil { err = err2 } } if err != nil { if err == io.EOF { return read, nil // return nil explicitly. } return read, err } } } // Seek implements io.Seeker by setting the client offset for the next Read or // Write. It returns the next offset read. Seeking before or after the end of // the file is undefined. Seeking relative to the end calls Stat. func (f *File) Seek(offset int64, whence int) (int64, error) { f.mu.Lock() defer f.mu.Unlock() switch whence { case io.SeekStart: case io.SeekCurrent: offset += f.offset case io.SeekEnd: fi, err := f.Stat() if err != nil { return f.offset, err } offset += fi.Size() default: return f.offset, unimplementedSeekWhence(whence) } if offset < 0 { return f.offset, os.ErrInvalid } f.offset = offset return f.offset, nil } // Chown changes the uid/gid of the current file. func (f *File) Chown(uid, gid int) error { return f.c.Chown(f.path, uid, gid) } // Chmod changes the permissions of the current file. // // See Client.Chmod for details. func (f *File) Chmod(mode os.FileMode) error { return f.c.setfstat(f.handle, sshFileXferAttrPermissions, toChmodPerm(mode)) } // Sync requests a flush of the contents of a File to stable storage. // // Sync requires the server to support the fsync@openssh.com extension. func (f *File) Sync() error { id := f.c.nextID() typ, data, err := f.c.sendPacket(nil, &sshFxpFsyncPacket{ ID: id, Handle: f.handle, }) switch { case err != nil: return err case typ == sshFxpStatus: return normaliseError(unmarshalStatus(id, data)) default: return &unexpectedPacketErr{want: sshFxpStatus, got: typ} } } // Truncate sets the size of the current file. Although it may be safely assumed // that if the size is less than its current size it will be truncated to fit, // the SFTP protocol does not specify what behavior the server should do when setting // size greater than the current size. // We send a SSH_FXP_FSETSTAT here since we have a file handle func (f *File) Truncate(size int64) error { return f.c.setfstat(f.handle, sshFileXferAttrSize, uint64(size)) } // normaliseError normalises an error into a more standard form that can be // checked against stdlib errors like io.EOF or os.ErrNotExist. func normaliseError(err error) error { switch err := err.(type) { case *StatusError: switch err.Code { case sshFxEOF: return io.EOF case sshFxNoSuchFile: return os.ErrNotExist case sshFxPermissionDenied: return os.ErrPermission case sshFxOk: return nil default: return err } default: return err } } // flags converts the flags passed to OpenFile into ssh flags. // Unsupported flags are ignored. func flags(f int) uint32 { var out uint32 switch f & os.O_WRONLY { case os.O_WRONLY: out |= sshFxfWrite case os.O_RDONLY: out |= sshFxfRead } if f&os.O_RDWR == os.O_RDWR { out |= sshFxfRead | sshFxfWrite } if f&os.O_APPEND == os.O_APPEND { out |= sshFxfAppend } if f&os.O_CREATE == os.O_CREATE { out |= sshFxfCreat } if f&os.O_TRUNC == os.O_TRUNC { out |= sshFxfTrunc } if f&os.O_EXCL == os.O_EXCL { out |= sshFxfExcl } return out } // toChmodPerm converts Go permission bits to POSIX permission bits. // // This differs from fromFileMode in that we preserve the POSIX versions of // setuid, setgid and sticky in m, because we've historically supported those // bits, and we mask off any non-permission bits. func toChmodPerm(m os.FileMode) (perm uint32) { const mask = os.ModePerm | s_ISUID | s_ISGID | s_ISVTX perm = uint32(m & mask) if m&os.ModeSetuid != 0 { perm |= s_ISUID } if m&os.ModeSetgid != 0 { perm |= s_ISGID } if m&os.ModeSticky != 0 { perm |= s_ISVTX } return perm }