aboutsummaryrefslogtreecommitdiff
path: root/sftp/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'sftp/client.go')
-rw-r--r--sftp/client.go1936
1 files changed, 1936 insertions, 0 deletions
diff --git a/sftp/client.go b/sftp/client.go
new file mode 100644
index 0000000..ce62286
--- /dev/null
+++ b/sftp/client.go
@@ -0,0 +1,1936 @@
+package sftp
+
+import (
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "io"
+ "math"
+ "os"
+ "path"
+ "sync"
+ "sync/atomic"
+ "syscall"
+ "time"
+
+ "github.com/kr/fs"
+ "golang.org/x/crypto/ssh"
+)
+
+var (
+ // ErrInternalInconsistency indicates the packets sent and the data queued to be
+ // written to the file don't match up. It is an unusual error and usually is
+ // caused by bad behavior server side or connection issues. The error is
+ // limited in scope to the call where it happened, the client object is still
+ // OK to use as long as the connection is still open.
+ ErrInternalInconsistency = errors.New("internal inconsistency")
+ // InternalInconsistency alias for ErrInternalInconsistency.
+ //
+ // Deprecated: please use ErrInternalInconsistency
+ InternalInconsistency = ErrInternalInconsistency
+)
+
+// A ClientOption is a function which applies configuration to a Client.
+type ClientOption func(*Client) error
+
+// MaxPacketChecked sets the maximum size of the payload, measured in bytes.
+// This option only accepts sizes servers should support, ie. <= 32768 bytes.
+//
+// If you get the error "failed to send packet header: EOF" when copying a
+// large file, try lowering this number.
+//
+// The default packet size is 32768 bytes.
+func MaxPacketChecked(size int) ClientOption {
+ return func(c *Client) error {
+ if size < 1 {
+ return errors.New("size must be greater or equal to 1")
+ }
+ if size > 32768 {
+ return errors.New("sizes larger than 32KB might not work with all servers")
+ }
+ c.maxPacket = size
+ return nil
+ }
+}
+
+// MaxPacketUnchecked sets the maximum size of the payload, measured in bytes.
+// It accepts sizes larger than the 32768 bytes all servers should support.
+// Only use a setting higher than 32768 if your application always connects to
+// the same server or after sufficiently broad testing.
+//
+// If you get the error "failed to send packet header: EOF" when copying a
+// large file, try lowering this number.
+//
+// The default packet size is 32768 bytes.
+func MaxPacketUnchecked(size int) ClientOption {
+ return func(c *Client) error {
+ if size < 1 {
+ return errors.New("size must be greater or equal to 1")
+ }
+ c.maxPacket = size
+ return nil
+ }
+}
+
+// MaxPacket sets the maximum size of the payload, measured in bytes.
+// This option only accepts sizes servers should support, ie. <= 32768 bytes.
+// This is a synonym for MaxPacketChecked that provides backward compatibility.
+//
+// If you get the error "failed to send packet header: EOF" when copying a
+// large file, try lowering this number.
+//
+// The default packet size is 32768 bytes.
+func MaxPacket(size int) ClientOption {
+ return MaxPacketChecked(size)
+}
+
+// MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.
+//
+// The default maximum concurrent requests is 64.
+func MaxConcurrentRequestsPerFile(n int) ClientOption {
+ return func(c *Client) error {
+ if n < 1 {
+ return errors.New("n must be greater or equal to 1")
+ }
+ c.maxConcurrentRequests = n
+ return nil
+ }
+}
+
+// UseConcurrentWrites allows the Client to perform concurrent Writes.
+//
+// Using concurrency while doing writes, requires special consideration.
+// A write to a later offset in a file after an error,
+// could end up with a file length longer than what was successfully written.
+//
+// When using this option, if you receive an error during `io.Copy` or `io.WriteTo`,
+// you may need to `Truncate` the target Writer to avoid “holes” in the data written.
+func UseConcurrentWrites(value bool) ClientOption {
+ return func(c *Client) error {
+ c.useConcurrentWrites = value
+ return nil
+ }
+}
+
+// UseConcurrentReads allows the Client to perform concurrent Reads.
+//
+// Concurrent reads are generally safe to use and not using them will degrade
+// performance, so this option is enabled by default.
+//
+// When enabled, WriteTo will use Stat/Fstat to get the file size and determines
+// how many concurrent workers to use.
+// Some "read once" servers will delete the file if they receive a stat call on an
+// open file and then the download will fail.
+// Disabling concurrent reads you will be able to download files from these servers.
+// If concurrent reads are disabled, the UseFstat option is ignored.
+func UseConcurrentReads(value bool) ClientOption {
+ return func(c *Client) error {
+ c.disableConcurrentReads = !value
+ return nil
+ }
+}
+
+// UseFstat sets whether to use Fstat or Stat when File.WriteTo is called
+// (usually when copying files).
+// Some servers limit the amount of open files and calling Stat after opening
+// the file will throw an error From the server. Setting this flag will call
+// Fstat instead of Stat which is suppose to be called on an open file handle.
+//
+// It has been found that that with IBM Sterling SFTP servers which have
+// "extractability" level set to 1 which means only 1 file can be opened at
+// any given time.
+//
+// If the server you are working with still has an issue with both Stat and
+// Fstat calls you can always open a file and read it until the end.
+//
+// Another reason to read the file until its end and Fstat doesn't work is
+// that in some servers, reading a full file will automatically delete the
+// file as some of these mainframes map the file to a message in a queue.
+// Once the file has been read it will get deleted.
+func UseFstat(value bool) ClientOption {
+ return func(c *Client) error {
+ c.useFstat = value
+ return nil
+ }
+}
+
+// Client represents an SFTP session on a *ssh.ClientConn SSH connection.
+// Multiple Clients can be active on a single SSH connection, and a Client
+// may be called concurrently from multiple Goroutines.
+//
+// Client implements the github.com/kr/fs.FileSystem interface.
+type Client struct {
+ clientConn
+
+ ext map[string]string // Extensions (name -> data).
+
+ maxPacket int // max packet size read or written.
+ maxConcurrentRequests int
+ nextid uint32
+
+ // write concurrency is… error prone.
+ // Default behavior should be to not use it.
+ useConcurrentWrites bool
+ useFstat bool
+ disableConcurrentReads bool
+}
+
+// NewClient creates a new SFTP client on conn, using zero or more option
+// functions.
+func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) {
+ s, err := conn.NewSession()
+ if err != nil {
+ return nil, err
+ }
+ if err := s.RequestSubsystem("sftp"); err != nil {
+ return nil, err
+ }
+ pw, err := s.StdinPipe()
+ if err != nil {
+ return nil, err
+ }
+ pr, err := s.StdoutPipe()
+ if err != nil {
+ return nil, err
+ }
+
+ return NewClientPipe(pr, pw, opts...)
+}
+
+// NewClientPipe creates a new SFTP client given a Reader and a WriteCloser.
+// This can be used for connecting to an SFTP server over TCP/TLS or by using
+// the system's ssh client program (e.g. via exec.Command).
+func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Client, error) {
+ sftp := &Client{
+ clientConn: clientConn{
+ conn: conn{
+ Reader: rd,
+ WriteCloser: wr,
+ },
+ inflight: make(map[uint32]chan<- result),
+ closed: make(chan struct{}),
+ },
+
+ ext: make(map[string]string),
+
+ maxPacket: 1 << 15,
+ maxConcurrentRequests: 64,
+ }
+
+ for _, opt := range opts {
+ if err := opt(sftp); err != nil {
+ wr.Close()
+ return nil, err
+ }
+ }
+
+ if err := sftp.sendInit(); err != nil {
+ wr.Close()
+ return nil, err
+ }
+ if err := sftp.recvVersion(); err != nil {
+ wr.Close()
+ return nil, err
+ }
+
+ sftp.clientConn.wg.Add(1)
+ go sftp.loop()
+
+ return sftp, nil
+}
+
+// Create creates the named file mode 0666 (before umask), truncating it if it
+// already exists. If successful, methods on the returned File can be used for
+// I/O; the associated file descriptor has mode O_RDWR. If you need more
+// control over the flags/mode used to open the file see client.OpenFile.
+//
+// Note that some SFTP servers (eg. AWS Transfer) do not support opening files
+// read/write at the same time. For those services you will need to use
+// `client.OpenFile(os.O_WRONLY|os.O_CREATE|os.O_TRUNC)`.
+func (c *Client) Create(path string) (*File, error) {
+ return c.open(path, flags(os.O_RDWR|os.O_CREATE|os.O_TRUNC))
+}
+
+const sftpProtocolVersion = 3 // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02
+
+func (c *Client) sendInit() error {
+ return c.clientConn.conn.sendPacket(&sshFxInitPacket{
+ Version: sftpProtocolVersion, // http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02
+ })
+}
+
+// returns the next value of c.nextid
+func (c *Client) nextID() uint32 {
+ return atomic.AddUint32(&c.nextid, 1)
+}
+
+func (c *Client) recvVersion() error {
+ typ, data, err := c.recvPacket(0)
+ if err != nil {
+ return err
+ }
+ if typ != sshFxpVersion {
+ return &unexpectedPacketErr{sshFxpVersion, typ}
+ }
+
+ version, data, err := unmarshalUint32Safe(data)
+ if err != nil {
+ return err
+ }
+ if version != sftpProtocolVersion {
+ return &unexpectedVersionErr{sftpProtocolVersion, version}
+ }
+
+ for len(data) > 0 {
+ var ext extensionPair
+ ext, data, err = unmarshalExtensionPair(data)
+ if err != nil {
+ return err
+ }
+ c.ext[ext.Name] = ext.Data
+ }
+
+ return nil
+}
+
+// HasExtension checks whether the server supports a named extension.
+//
+// The first return value is the extension data reported by the server
+// (typically a version number).
+func (c *Client) HasExtension(name string) (string, bool) {
+ data, ok := c.ext[name]
+ return data, ok
+}
+
+// Walk returns a new Walker rooted at root.
+func (c *Client) Walk(root string) *fs.Walker {
+ return fs.WalkFS(root, c)
+}
+
+// ReadDir reads the directory named by dirname and returns a list of
+// directory entries.
+func (c *Client) ReadDir(p string) ([]os.FileInfo, error) {
+ handle, err := c.opendir(p)
+ if err != nil {
+ return nil, err
+ }
+ defer c.close(handle) // this has to defer earlier than the lock below
+ var attrs []os.FileInfo
+ var done = false
+ for !done {
+ id := c.nextID()
+ typ, data, err1 := c.sendPacket(nil, &sshFxpReaddirPacket{
+ ID: id,
+ Handle: handle,
+ })
+ if err1 != nil {
+ err = err1
+ done = true
+ break
+ }
+ switch typ {
+ case sshFxpName:
+ sid, data := unmarshalUint32(data)
+ if sid != id {
+ return nil, &unexpectedIDErr{id, sid}
+ }
+ count, data := unmarshalUint32(data)
+ for i := uint32(0); i < count; i++ {
+ var filename string
+ filename, data = unmarshalString(data)
+ _, data = unmarshalString(data) // discard longname
+ var attr *FileStat
+ attr, data = unmarshalAttrs(data)
+ if filename == "." || filename == ".." {
+ continue
+ }
+ attrs = append(attrs, fileInfoFromStat(attr, path.Base(filename)))
+ }
+ case sshFxpStatus:
+ // TODO(dfc) scope warning!
+ err = normaliseError(unmarshalStatus(id, data))
+ done = true
+ default:
+ return nil, unimplementedPacketErr(typ)
+ }
+ }
+ if err == io.EOF {
+ err = nil
+ }
+ return attrs, err
+}
+
+func (c *Client) opendir(path string) (string, error) {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpOpendirPacket{
+ ID: id,
+ Path: path,
+ })
+ if err != nil {
+ return "", err
+ }
+ switch typ {
+ case sshFxpHandle:
+ sid, data := unmarshalUint32(data)
+ if sid != id {
+ return "", &unexpectedIDErr{id, sid}
+ }
+ handle, _ := unmarshalString(data)
+ return handle, nil
+ case sshFxpStatus:
+ return "", normaliseError(unmarshalStatus(id, data))
+ default:
+ return "", unimplementedPacketErr(typ)
+ }
+}
+
+// Stat returns a FileInfo structure describing the file specified by path 'p'.
+// If 'p' is a symbolic link, the returned FileInfo structure describes the referent file.
+func (c *Client) Stat(p string) (os.FileInfo, error) {
+ fs, err := c.stat(p)
+ if err != nil {
+ return nil, err
+ }
+ return fileInfoFromStat(fs, path.Base(p)), nil
+}
+
+// Lstat returns a FileInfo structure describing the file specified by path 'p'.
+// If 'p' is a symbolic link, the returned FileInfo structure describes the symbolic link.
+func (c *Client) Lstat(p string) (os.FileInfo, error) {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpLstatPacket{
+ ID: id,
+ Path: p,
+ })
+ if err != nil {
+ return nil, err
+ }
+ switch typ {
+ case sshFxpAttrs:
+ sid, data := unmarshalUint32(data)
+ if sid != id {
+ return nil, &unexpectedIDErr{id, sid}
+ }
+ attr, _ := unmarshalAttrs(data)
+ return fileInfoFromStat(attr, path.Base(p)), nil
+ case sshFxpStatus:
+ return nil, normaliseError(unmarshalStatus(id, data))
+ default:
+ return nil, unimplementedPacketErr(typ)
+ }
+}
+
+// ReadLink reads the target of a symbolic link.
+func (c *Client) ReadLink(p string) (string, error) {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpReadlinkPacket{
+ ID: id,
+ Path: p,
+ })
+ if err != nil {
+ return "", err
+ }
+ switch typ {
+ case sshFxpName:
+ sid, data := unmarshalUint32(data)
+ if sid != id {
+ return "", &unexpectedIDErr{id, sid}
+ }
+ count, data := unmarshalUint32(data)
+ if count != 1 {
+ return "", unexpectedCount(1, count)
+ }
+ filename, _ := unmarshalString(data) // ignore dummy attributes
+ return filename, nil
+ case sshFxpStatus:
+ return "", normaliseError(unmarshalStatus(id, data))
+ default:
+ return "", unimplementedPacketErr(typ)
+ }
+}
+
+// Link creates a hard link at 'newname', pointing at the same inode as 'oldname'
+func (c *Client) Link(oldname, newname string) error {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpHardlinkPacket{
+ ID: id,
+ Oldpath: oldname,
+ Newpath: newname,
+ })
+ if err != nil {
+ return err
+ }
+ switch typ {
+ case sshFxpStatus:
+ return normaliseError(unmarshalStatus(id, data))
+ default:
+ return unimplementedPacketErr(typ)
+ }
+}
+
+// Symlink creates a symbolic link at 'newname', pointing at target 'oldname'
+func (c *Client) Symlink(oldname, newname string) error {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpSymlinkPacket{
+ ID: id,
+ Linkpath: newname,
+ Targetpath: oldname,
+ })
+ if err != nil {
+ return err
+ }
+ switch typ {
+ case sshFxpStatus:
+ return normaliseError(unmarshalStatus(id, data))
+ default:
+ return unimplementedPacketErr(typ)
+ }
+}
+
+func (c *Client) setfstat(handle string, flags uint32, attrs interface{}) error {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpFsetstatPacket{
+ ID: id,
+ Handle: handle,
+ Flags: flags,
+ Attrs: attrs,
+ })
+ if err != nil {
+ return err
+ }
+ switch typ {
+ case sshFxpStatus:
+ return normaliseError(unmarshalStatus(id, data))
+ default:
+ return unimplementedPacketErr(typ)
+ }
+}
+
+// setstat is a convience wrapper to allow for changing of various parts of the file descriptor.
+func (c *Client) setstat(path string, flags uint32, attrs interface{}) error {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpSetstatPacket{
+ ID: id,
+ Path: path,
+ Flags: flags,
+ Attrs: attrs,
+ })
+ if err != nil {
+ return err
+ }
+ switch typ {
+ case sshFxpStatus:
+ return normaliseError(unmarshalStatus(id, data))
+ default:
+ return unimplementedPacketErr(typ)
+ }
+}
+
+// Chtimes changes the access and modification times of the named file.
+func (c *Client) Chtimes(path string, atime time.Time, mtime time.Time) error {
+ type times struct {
+ Atime uint32
+ Mtime uint32
+ }
+ attrs := times{uint32(atime.Unix()), uint32(mtime.Unix())}
+ return c.setstat(path, sshFileXferAttrACmodTime, attrs)
+}
+
+// Chown changes the user and group owners of the named file.
+func (c *Client) Chown(path string, uid, gid int) error {
+ type owner struct {
+ UID uint32
+ GID uint32
+ }
+ attrs := owner{uint32(uid), uint32(gid)}
+ return c.setstat(path, sshFileXferAttrUIDGID, attrs)
+}
+
+// Chmod changes the permissions of the named file.
+//
+// Chmod does not apply a umask, because even retrieving the umask is not
+// possible in a portable way without causing a race condition. Callers
+// should mask off umask bits, if desired.
+func (c *Client) Chmod(path string, mode os.FileMode) error {
+ return c.setstat(path, sshFileXferAttrPermissions, toChmodPerm(mode))
+}
+
+// Truncate sets the size of the named file. Although it may be safely assumed
+// that if the size is less than its current size it will be truncated to fit,
+// the SFTP protocol does not specify what behavior the server should do when setting
+// size greater than the current size.
+func (c *Client) Truncate(path string, size int64) error {
+ return c.setstat(path, sshFileXferAttrSize, uint64(size))
+}
+
+// Open opens the named file for reading. If successful, methods on the
+// returned file can be used for reading; the associated file descriptor
+// has mode O_RDONLY.
+func (c *Client) Open(path string) (*File, error) {
+ return c.open(path, flags(os.O_RDONLY))
+}
+
+// OpenFile is the generalized open call; most users will use Open or
+// Create instead. It opens the named file with specified flag (O_RDONLY
+// etc.). If successful, methods on the returned File can be used for I/O.
+func (c *Client) OpenFile(path string, f int) (*File, error) {
+ return c.open(path, flags(f))
+}
+
+func (c *Client) open(path string, pflags uint32) (*File, error) {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpOpenPacket{
+ ID: id,
+ Path: path,
+ Pflags: pflags,
+ })
+ if err != nil {
+ return nil, err
+ }
+ switch typ {
+ case sshFxpHandle:
+ sid, data := unmarshalUint32(data)
+ if sid != id {
+ return nil, &unexpectedIDErr{id, sid}
+ }
+ handle, _ := unmarshalString(data)
+ return &File{c: c, path: path, handle: handle}, nil
+ case sshFxpStatus:
+ return nil, normaliseError(unmarshalStatus(id, data))
+ default:
+ return nil, unimplementedPacketErr(typ)
+ }
+}
+
+// close closes a handle handle previously returned in the response
+// to SSH_FXP_OPEN or SSH_FXP_OPENDIR. The handle becomes invalid
+// immediately after this request has been sent.
+func (c *Client) close(handle string) error {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpClosePacket{
+ ID: id,
+ Handle: handle,
+ })
+ if err != nil {
+ return err
+ }
+ switch typ {
+ case sshFxpStatus:
+ return normaliseError(unmarshalStatus(id, data))
+ default:
+ return unimplementedPacketErr(typ)
+ }
+}
+
+func (c *Client) stat(path string) (*FileStat, error) {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpStatPacket{
+ ID: id,
+ Path: path,
+ })
+ if err != nil {
+ return nil, err
+ }
+ switch typ {
+ case sshFxpAttrs:
+ sid, data := unmarshalUint32(data)
+ if sid != id {
+ return nil, &unexpectedIDErr{id, sid}
+ }
+ attr, _ := unmarshalAttrs(data)
+ return attr, nil
+ case sshFxpStatus:
+ return nil, normaliseError(unmarshalStatus(id, data))
+ default:
+ return nil, unimplementedPacketErr(typ)
+ }
+}
+
+func (c *Client) fstat(handle string) (*FileStat, error) {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpFstatPacket{
+ ID: id,
+ Handle: handle,
+ })
+ if err != nil {
+ return nil, err
+ }
+ switch typ {
+ case sshFxpAttrs:
+ sid, data := unmarshalUint32(data)
+ if sid != id {
+ return nil, &unexpectedIDErr{id, sid}
+ }
+ attr, _ := unmarshalAttrs(data)
+ return attr, nil
+ case sshFxpStatus:
+ return nil, normaliseError(unmarshalStatus(id, data))
+ default:
+ return nil, unimplementedPacketErr(typ)
+ }
+}
+
+// StatVFS retrieves VFS statistics from a remote host.
+//
+// It implements the statvfs@openssh.com SSH_FXP_EXTENDED feature
+// from http://www.opensource.apple.com/source/OpenSSH/OpenSSH-175/openssh/PROTOCOL?txt.
+func (c *Client) StatVFS(path string) (*StatVFS, error) {
+ // send the StatVFS packet to the server
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpStatvfsPacket{
+ ID: id,
+ Path: path,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ switch typ {
+ // server responded with valid data
+ case sshFxpExtendedReply:
+ var response StatVFS
+ err = binary.Read(bytes.NewReader(data), binary.BigEndian, &response)
+ if err != nil {
+ return nil, errors.New("can not parse reply")
+ }
+
+ return &response, nil
+
+ // the resquest failed
+ case sshFxpStatus:
+ return nil, normaliseError(unmarshalStatus(id, data))
+
+ default:
+ return nil, unimplementedPacketErr(typ)
+ }
+}
+
+// Join joins any number of path elements into a single path, adding a
+// separating slash if necessary. The result is Cleaned; in particular, all
+// empty strings are ignored.
+func (c *Client) Join(elem ...string) string { return path.Join(elem...) }
+
+// Remove removes the specified file or directory. An error will be returned if no
+// file or directory with the specified path exists, or if the specified directory
+// is not empty.
+func (c *Client) Remove(path string) error {
+ err := c.removeFile(path)
+ // some servers, *cough* osx *cough*, return EPERM, not ENODIR.
+ // serv-u returns ssh_FX_FILE_IS_A_DIRECTORY
+ // EPERM is converted to os.ErrPermission so it is not a StatusError
+ if err, ok := err.(*StatusError); ok {
+ switch err.Code {
+ case sshFxFailure, sshFxFileIsADirectory:
+ return c.RemoveDirectory(path)
+ }
+ }
+ if os.IsPermission(err) {
+ return c.RemoveDirectory(path)
+ }
+ return err
+}
+
+func (c *Client) removeFile(path string) error {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpRemovePacket{
+ ID: id,
+ Filename: path,
+ })
+ if err != nil {
+ return err
+ }
+ switch typ {
+ case sshFxpStatus:
+ return normaliseError(unmarshalStatus(id, data))
+ default:
+ return unimplementedPacketErr(typ)
+ }
+}
+
+// RemoveDirectory removes a directory path.
+func (c *Client) RemoveDirectory(path string) error {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpRmdirPacket{
+ ID: id,
+ Path: path,
+ })
+ if err != nil {
+ return err
+ }
+ switch typ {
+ case sshFxpStatus:
+ return normaliseError(unmarshalStatus(id, data))
+ default:
+ return unimplementedPacketErr(typ)
+ }
+}
+
+// Rename renames a file.
+func (c *Client) Rename(oldname, newname string) error {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpRenamePacket{
+ ID: id,
+ Oldpath: oldname,
+ Newpath: newname,
+ })
+ if err != nil {
+ return err
+ }
+ switch typ {
+ case sshFxpStatus:
+ return normaliseError(unmarshalStatus(id, data))
+ default:
+ return unimplementedPacketErr(typ)
+ }
+}
+
+// PosixRename renames a file using the posix-rename@openssh.com extension
+// which will replace newname if it already exists.
+func (c *Client) PosixRename(oldname, newname string) error {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpPosixRenamePacket{
+ ID: id,
+ Oldpath: oldname,
+ Newpath: newname,
+ })
+ if err != nil {
+ return err
+ }
+ switch typ {
+ case sshFxpStatus:
+ return normaliseError(unmarshalStatus(id, data))
+ default:
+ return unimplementedPacketErr(typ)
+ }
+}
+
+// RealPath can be used to have the server canonicalize any given path name to an absolute path.
+//
+// This is useful for converting path names containing ".." components,
+// or relative pathnames without a leading slash into absolute paths.
+func (c *Client) RealPath(path string) (string, error) {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpRealpathPacket{
+ ID: id,
+ Path: path,
+ })
+ if err != nil {
+ return "", err
+ }
+ switch typ {
+ case sshFxpName:
+ sid, data := unmarshalUint32(data)
+ if sid != id {
+ return "", &unexpectedIDErr{id, sid}
+ }
+ count, data := unmarshalUint32(data)
+ if count != 1 {
+ return "", unexpectedCount(1, count)
+ }
+ filename, _ := unmarshalString(data) // ignore attributes
+ return filename, nil
+ case sshFxpStatus:
+ return "", normaliseError(unmarshalStatus(id, data))
+ default:
+ return "", unimplementedPacketErr(typ)
+ }
+}
+
+// Getwd returns the current working directory of the server. Operations
+// involving relative paths will be based at this location.
+func (c *Client) Getwd() (string, error) {
+ return c.RealPath(".")
+}
+
+// Mkdir creates the specified directory. An error will be returned if a file or
+// directory with the specified path already exists, or if the directory's
+// parent folder does not exist (the method cannot create complete paths).
+func (c *Client) Mkdir(path string) error {
+ id := c.nextID()
+ typ, data, err := c.sendPacket(nil, &sshFxpMkdirPacket{
+ ID: id,
+ Path: path,
+ })
+ if err != nil {
+ return err
+ }
+ switch typ {
+ case sshFxpStatus:
+ return normaliseError(unmarshalStatus(id, data))
+ default:
+ return unimplementedPacketErr(typ)
+ }
+}
+
+// MkdirAll creates a directory named path, along with any necessary parents,
+// and returns nil, or else returns an error.
+// If path is already a directory, MkdirAll does nothing and returns nil.
+// If path contains a regular file, an error is returned
+func (c *Client) MkdirAll(path string) error {
+ // Most of this code mimics https://golang.org/src/os/path.go?s=514:561#L13
+ // Fast path: if we can tell whether path is a directory or file, stop with success or error.
+ dir, err := c.Stat(path)
+ if err == nil {
+ if dir.IsDir() {
+ return nil
+ }
+ return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR}
+ }
+
+ // Slow path: make sure parent exists and then call Mkdir for path.
+ i := len(path)
+ for i > 0 && path[i-1] == '/' { // Skip trailing path separator.
+ i--
+ }
+
+ j := i
+ for j > 0 && path[j-1] != '/' { // Scan backward over element.
+ j--
+ }
+
+ if j > 1 {
+ // Create parent
+ err = c.MkdirAll(path[0 : j-1])
+ if err != nil {
+ return err
+ }
+ }
+
+ // Parent now exists; invoke Mkdir and use its result.
+ err = c.Mkdir(path)
+ if err != nil {
+ // Handle arguments like "foo/." by
+ // double-checking that directory doesn't exist.
+ dir, err1 := c.Lstat(path)
+ if err1 == nil && dir.IsDir() {
+ return nil
+ }
+ return err
+ }
+ return nil
+}
+
+// File represents a remote file.
+type File struct {
+ c *Client
+ path string
+ handle string
+
+ mu sync.Mutex
+ offset int64 // current offset within remote file
+}
+
+// Close closes the File, rendering it unusable for I/O. It returns an
+// error, if any.
+func (f *File) Close() error {
+ return f.c.close(f.handle)
+}
+
+// Name returns the name of the file as presented to Open or Create.
+func (f *File) Name() string {
+ return f.path
+}
+
+// Read reads up to len(b) bytes from the File. It returns the number of bytes
+// read and an error, if any. Read follows io.Reader semantics, so when Read
+// encounters an error or EOF condition after successfully reading n > 0 bytes,
+// it returns the number of bytes read.
+//
+// To maximise throughput for transferring the entire file (especially
+// over high latency links) it is recommended to use WriteTo rather
+// than calling Read multiple times. io.Copy will do this
+// automatically.
+func (f *File) Read(b []byte) (int, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ n, err := f.ReadAt(b, f.offset)
+ f.offset += int64(n)
+ return n, err
+}
+
+// readChunkAt attempts to read the whole entire length of the buffer from the file starting at the offset.
+// It will continue progressively reading into the buffer until it fills the whole buffer, or an error occurs.
+func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err error) {
+ for err == nil && n < len(b) {
+ id := f.c.nextID()
+ typ, data, err := f.c.sendPacket(ch, &sshFxpReadPacket{
+ ID: id,
+ Handle: f.handle,
+ Offset: uint64(off) + uint64(n),
+ Len: uint32(len(b) - n),
+ })
+ if err != nil {
+ return n, err
+ }
+
+ switch typ {
+ case sshFxpStatus:
+ return n, normaliseError(unmarshalStatus(id, data))
+
+ case sshFxpData:
+ sid, data := unmarshalUint32(data)
+ if id != sid {
+ return n, &unexpectedIDErr{id, sid}
+ }
+
+ l, data := unmarshalUint32(data)
+ n += copy(b[n:], data[:l])
+
+ default:
+ return n, unimplementedPacketErr(typ)
+ }
+ }
+
+ return
+}
+
+func (f *File) readAtSequential(b []byte, off int64) (read int, err error) {
+ for read < len(b) {
+ rb := b[read:]
+ if len(rb) > f.c.maxPacket {
+ rb = rb[:f.c.maxPacket]
+ }
+ n, err := f.readChunkAt(nil, rb, off+int64(read))
+ if n < 0 {
+ panic("sftp.File: returned negative count from readChunkAt")
+ }
+ if n > 0 {
+ read += n
+ }
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ return read, nil // return nil explicitly.
+ }
+ return read, err
+ }
+ }
+ return read, nil
+}
+
+// ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns
+// the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics,
+// so the file offset is not altered during the read.
+func (f *File) ReadAt(b []byte, off int64) (int, error) {
+ if len(b) <= f.c.maxPacket {
+ // This should be able to be serviced with 1/2 requests.
+ // So, just do it directly.
+ return f.readChunkAt(nil, b, off)
+ }
+
+ if f.c.disableConcurrentReads {
+ return f.readAtSequential(b, off)
+ }
+
+ // Split the read into multiple maxPacket-sized concurrent reads bounded by maxConcurrentRequests.
+ // This allows writes with a suitably large buffer to transfer data at a much faster rate
+ // by overlapping round trip times.
+
+ cancel := make(chan struct{})
+
+ concurrency := len(b)/f.c.maxPacket + 1
+ if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
+ concurrency = f.c.maxConcurrentRequests
+ }
+
+ resPool := newResChanPool(concurrency)
+
+ type work struct {
+ id uint32
+ res chan result
+
+ b []byte
+ off int64
+ }
+ workCh := make(chan work)
+
+ // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
+ go func() {
+ defer close(workCh)
+
+ b := b
+ offset := off
+ chunkSize := f.c.maxPacket
+
+ for len(b) > 0 {
+ rb := b
+ if len(rb) > chunkSize {
+ rb = rb[:chunkSize]
+ }
+
+ id := f.c.nextID()
+ res := resPool.Get()
+
+ f.c.dispatchRequest(res, &sshFxpReadPacket{
+ ID: id,
+ Handle: f.handle,
+ Offset: uint64(offset),
+ Len: uint32(chunkSize),
+ })
+
+ select {
+ case workCh <- work{id, res, rb, offset}:
+ case <-cancel:
+ return
+ }
+
+ offset += int64(len(rb))
+ b = b[len(rb):]
+ }
+ }()
+
+ type rErr struct {
+ off int64
+ err error
+ }
+ errCh := make(chan rErr)
+
+ var wg sync.WaitGroup
+ wg.Add(concurrency)
+ for i := 0; i < concurrency; i++ {
+ // Map_i: each worker gets work, and then performs the Read into its buffer from its respective offset.
+ go func() {
+ defer wg.Done()
+
+ for packet := range workCh {
+ var n int
+
+ s := <-packet.res
+ resPool.Put(packet.res)
+
+ err := s.err
+ if err == nil {
+ switch s.typ {
+ case sshFxpStatus:
+ err = normaliseError(unmarshalStatus(packet.id, s.data))
+
+ case sshFxpData:
+ sid, data := unmarshalUint32(s.data)
+ if packet.id != sid {
+ err = &unexpectedIDErr{packet.id, sid}
+
+ } else {
+ l, data := unmarshalUint32(data)
+ n = copy(packet.b, data[:l])
+
+ // For normal disk files, it is guaranteed that this will read
+ // the specified number of bytes, or up to end of file.
+ // This implies, if we have a short read, that means EOF.
+ if n < len(packet.b) {
+ err = io.EOF
+ }
+ }
+
+ default:
+ err = unimplementedPacketErr(s.typ)
+ }
+ }
+
+ if err != nil {
+ // return the offset as the start + how much we read before the error.
+ errCh <- rErr{packet.off + int64(n), err}
+ return
+ }
+ }
+ }()
+ }
+
+ // Wait for long tail, before closing results.
+ go func() {
+ wg.Wait()
+ close(errCh)
+ }()
+
+ // Reduce: collect all the results into a relevant return: the earliest offset to return an error.
+ firstErr := rErr{math.MaxInt64, nil}
+ for rErr := range errCh {
+ if rErr.off <= firstErr.off {
+ firstErr = rErr
+ }
+
+ select {
+ case <-cancel:
+ default:
+ // stop any more work from being distributed. (Just in case.)
+ close(cancel)
+ }
+ }
+
+ if firstErr.err != nil {
+ // firstErr.err != nil if and only if firstErr.off > our starting offset.
+ return int(firstErr.off - off), firstErr.err
+ }
+
+ // As per spec for io.ReaderAt, we return nil error if and only if we read everything.
+ return len(b), nil
+}
+
+// writeToSequential implements WriteTo, but works sequentially with no parallelism.
+func (f *File) writeToSequential(w io.Writer) (written int64, err error) {
+ b := make([]byte, f.c.maxPacket)
+ ch := make(chan result, 1) // reusable channel
+
+ for {
+ n, err := f.readChunkAt(ch, b, f.offset)
+ if n < 0 {
+ panic("sftp.File: returned negative count from readChunkAt")
+ }
+
+ if n > 0 {
+ f.offset += int64(n)
+
+ m, err2 := w.Write(b[:n])
+ written += int64(m)
+
+ if err == nil {
+ err = err2
+ }
+ }
+
+ if err != nil {
+ if err == io.EOF {
+ return written, nil // return nil explicitly.
+ }
+
+ return written, err
+ }
+ }
+}
+
+// WriteTo writes the file to the given Writer.
+// The return value is the number of bytes written.
+// Any error encountered during the write is also returned.
+//
+// This method is preferred over calling Read multiple times
+// to maximise throughput for transferring the entire file,
+// especially over high latency links.
+func (f *File) WriteTo(w io.Writer) (written int64, err error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ if f.c.disableConcurrentReads {
+ return f.writeToSequential(w)
+ }
+
+ // For concurrency, we want to guess how many concurrent workers we should use.
+ var fileStat *FileStat
+ if f.c.useFstat {
+ fileStat, err = f.c.fstat(f.handle)
+ } else {
+ fileStat, err = f.c.stat(f.path)
+ }
+ if err != nil {
+ return 0, err
+ }
+
+ fileSize := fileStat.Size
+ if fileSize <= uint64(f.c.maxPacket) || !isRegular(fileStat.Mode) {
+ // only regular files are guaranteed to return (full read) xor (partial read, next error)
+ return f.writeToSequential(w)
+ }
+
+ concurrency64 := fileSize/uint64(f.c.maxPacket) + 1 // a bad guess, but better than no guess
+ if concurrency64 > uint64(f.c.maxConcurrentRequests) || concurrency64 < 1 {
+ concurrency64 = uint64(f.c.maxConcurrentRequests)
+ }
+ // Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow.
+ concurrency := int(concurrency64)
+
+ chunkSize := f.c.maxPacket
+ pool := newBufPool(concurrency, chunkSize)
+ resPool := newResChanPool(concurrency)
+
+ cancel := make(chan struct{})
+ var wg sync.WaitGroup
+ defer func() {
+ // Once the writing Reduce phase has ended, all the feed work needs to unconditionally stop.
+ close(cancel)
+
+ // We want to wait until all outstanding goroutines with an `f` or `f.c` reference have completed.
+ // Just to be sure we don’t orphan any goroutines any hanging references.
+ wg.Wait()
+ }()
+
+ type writeWork struct {
+ b []byte
+ off int64
+ err error
+
+ next chan writeWork
+ }
+ writeCh := make(chan writeWork)
+
+ type readWork struct {
+ id uint32
+ res chan result
+ off int64
+
+ cur, next chan writeWork
+ }
+ readCh := make(chan readWork)
+
+ // Slice: hand out chunks of work on demand, with a `cur` and `next` channel built-in for sequencing.
+ go func() {
+ defer close(readCh)
+
+ off := f.offset
+
+ cur := writeCh
+ for {
+ id := f.c.nextID()
+ res := resPool.Get()
+
+ next := make(chan writeWork)
+ readWork := readWork{
+ id: id,
+ res: res,
+ off: off,
+
+ cur: cur,
+ next: next,
+ }
+
+ f.c.dispatchRequest(res, &sshFxpReadPacket{
+ ID: id,
+ Handle: f.handle,
+ Offset: uint64(off),
+ Len: uint32(chunkSize),
+ })
+
+ select {
+ case readCh <- readWork:
+ case <-cancel:
+ return
+ }
+
+ off += int64(chunkSize)
+ cur = next
+ }
+ }()
+
+ wg.Add(concurrency)
+ for i := 0; i < concurrency; i++ {
+ // Map_i: each worker gets readWork, and does the Read into a buffer at the given offset.
+ go func() {
+ defer wg.Done()
+
+ for readWork := range readCh {
+ var b []byte
+ var n int
+
+ s := <-readWork.res
+ resPool.Put(readWork.res)
+
+ err := s.err
+ if err == nil {
+ switch s.typ {
+ case sshFxpStatus:
+ err = normaliseError(unmarshalStatus(readWork.id, s.data))
+
+ case sshFxpData:
+ sid, data := unmarshalUint32(s.data)
+ if readWork.id != sid {
+ err = &unexpectedIDErr{readWork.id, sid}
+
+ } else {
+ l, data := unmarshalUint32(data)
+ b = pool.Get()[:l]
+ n = copy(b, data[:l])
+ b = b[:n]
+ }
+
+ default:
+ err = unimplementedPacketErr(s.typ)
+ }
+ }
+
+ writeWork := writeWork{
+ b: b,
+ off: readWork.off,
+ err: err,
+
+ next: readWork.next,
+ }
+
+ select {
+ case readWork.cur <- writeWork:
+ case <-cancel:
+ return
+ }
+
+ if err != nil {
+ return
+ }
+ }
+ }()
+ }
+
+ // Reduce: serialize the results from the reads into sequential writes.
+ cur := writeCh
+ for {
+ packet, ok := <-cur
+ if !ok {
+ return written, errors.New("sftp.File.WriteTo: unexpectedly closed channel")
+ }
+
+ // Because writes are serialized, this will always be the last successfully read byte.
+ f.offset = packet.off + int64(len(packet.b))
+
+ if len(packet.b) > 0 {
+ n, err := w.Write(packet.b)
+ written += int64(n)
+ if err != nil {
+ return written, err
+ }
+ }
+
+ if packet.err != nil {
+ if packet.err == io.EOF {
+ return written, nil
+ }
+
+ return written, packet.err
+ }
+
+ pool.Put(packet.b)
+ cur = packet.next
+ }
+}
+
+// Stat returns the FileInfo structure describing file. If there is an
+// error.
+func (f *File) Stat() (os.FileInfo, error) {
+ fs, err := f.c.fstat(f.handle)
+ if err != nil {
+ return nil, err
+ }
+ return fileInfoFromStat(fs, path.Base(f.path)), nil
+}
+
+// Write writes len(b) bytes to the File. It returns the number of bytes
+// written and an error, if any. Write returns a non-nil error when n !=
+// len(b).
+//
+// To maximise throughput for transferring the entire file (especially
+// over high latency links) it is recommended to use ReadFrom rather
+// than calling Write multiple times. io.Copy will do this
+// automatically.
+func (f *File) Write(b []byte) (int, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ n, err := f.WriteAt(b, f.offset)
+ f.offset += int64(n)
+ return n, err
+}
+
+func (f *File) writeChunkAt(ch chan result, b []byte, off int64) (int, error) {
+ typ, data, err := f.c.sendPacket(ch, &sshFxpWritePacket{
+ ID: f.c.nextID(),
+ Handle: f.handle,
+ Offset: uint64(off),
+ Length: uint32(len(b)),
+ Data: b,
+ })
+ if err != nil {
+ return 0, err
+ }
+
+ switch typ {
+ case sshFxpStatus:
+ id, _ := unmarshalUint32(data)
+ err := normaliseError(unmarshalStatus(id, data))
+ if err != nil {
+ return 0, err
+ }
+
+ default:
+ return 0, unimplementedPacketErr(typ)
+ }
+
+ return len(b), nil
+}
+
+// writeAtConcurrent implements WriterAt, but works concurrently rather than sequentially.
+func (f *File) writeAtConcurrent(b []byte, off int64) (int, error) {
+ // Split the write into multiple maxPacket sized concurrent writes
+ // bounded by maxConcurrentRequests. This allows writes with a suitably
+ // large buffer to transfer data at a much faster rate due to
+ // overlapping round trip times.
+
+ cancel := make(chan struct{})
+
+ type work struct {
+ b []byte
+ off int64
+ }
+ workCh := make(chan work)
+
+ // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
+ go func() {
+ defer close(workCh)
+
+ var read int
+ chunkSize := f.c.maxPacket
+
+ for read < len(b) {
+ wb := b[read:]
+ if len(wb) > chunkSize {
+ wb = wb[:chunkSize]
+ }
+
+ select {
+ case workCh <- work{wb, off + int64(read)}:
+ case <-cancel:
+ return
+ }
+
+ read += len(wb)
+ }
+ }()
+
+ type wErr struct {
+ off int64
+ err error
+ }
+ errCh := make(chan wErr)
+
+ concurrency := len(b)/f.c.maxPacket + 1
+ if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
+ concurrency = f.c.maxConcurrentRequests
+ }
+
+ var wg sync.WaitGroup
+ wg.Add(concurrency)
+ for i := 0; i < concurrency; i++ {
+ // Map_i: each worker gets work, and does the Write from each buffer to its respective offset.
+ go func() {
+ defer wg.Done()
+
+ ch := make(chan result, 1) // reusable channel per mapper.
+
+ for packet := range workCh {
+ n, err := f.writeChunkAt(ch, packet.b, packet.off)
+ if err != nil {
+ // return the offset as the start + how much we wrote before the error.
+ errCh <- wErr{packet.off + int64(n), err}
+ }
+ }
+ }()
+ }
+
+ // Wait for long tail, before closing results.
+ go func() {
+ wg.Wait()
+ close(errCh)
+ }()
+
+ // Reduce: collect all the results into a relevant return: the earliest offset to return an error.
+ firstErr := wErr{math.MaxInt64, nil}
+ for wErr := range errCh {
+ if wErr.off <= firstErr.off {
+ firstErr = wErr
+ }
+
+ select {
+ case <-cancel:
+ default:
+ // stop any more work from being distributed. (Just in case.)
+ close(cancel)
+ }
+ }
+
+ if firstErr.err != nil {
+ // firstErr.err != nil if and only if firstErr.off >= our starting offset.
+ return int(firstErr.off - off), firstErr.err
+ }
+
+ return len(b), nil
+}
+
+// WriteAt writes up to len(b) byte to the File at a given offset `off`. It returns
+// the number of bytes written and an error, if any. WriteAt follows io.WriterAt semantics,
+// so the file offset is not altered during the write.
+func (f *File) WriteAt(b []byte, off int64) (written int, err error) {
+ if len(b) <= f.c.maxPacket {
+ // We can do this in one write.
+ return f.writeChunkAt(nil, b, off)
+ }
+
+ if f.c.useConcurrentWrites {
+ return f.writeAtConcurrent(b, off)
+ }
+
+ ch := make(chan result, 1) // reusable channel
+
+ chunkSize := f.c.maxPacket
+
+ for written < len(b) {
+ wb := b[written:]
+ if len(wb) > chunkSize {
+ wb = wb[:chunkSize]
+ }
+
+ n, err := f.writeChunkAt(ch, wb, off+int64(written))
+ if n > 0 {
+ written += n
+ }
+
+ if err != nil {
+ return written, err
+ }
+ }
+
+ return len(b), nil
+}
+
+// ReadFromWithConcurrency implements ReaderFrom,
+// but uses the given concurrency to issue multiple requests at the same time.
+//
+// Giving a concurrency of less than one will default to the Client’s max concurrency.
+//
+// Otherwise, the given concurrency will be capped by the Client's max concurrency.
+func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) {
+ // Split the write into multiple maxPacket sized concurrent writes.
+ // This allows writes with a suitably large reader
+ // to transfer data at a much faster rate due to overlapping round trip times.
+
+ cancel := make(chan struct{})
+
+ type work struct {
+ b []byte
+ n int
+ off int64
+ }
+ workCh := make(chan work)
+
+ type rwErr struct {
+ off int64
+ err error
+ }
+ errCh := make(chan rwErr)
+
+ if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
+ concurrency = f.c.maxConcurrentRequests
+ }
+
+ pool := newBufPool(concurrency, f.c.maxPacket)
+
+ // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
+ go func() {
+ defer close(workCh)
+
+ off := f.offset
+
+ for {
+ b := pool.Get()
+
+ n, err := r.Read(b)
+ if n > 0 {
+ read += int64(n)
+
+ select {
+ case workCh <- work{b, n, off}:
+ // We need the pool.Put(b) to put the whole slice, not just trunced.
+ case <-cancel:
+ return
+ }
+
+ off += int64(n)
+ }
+
+ if err != nil {
+ if err != io.EOF {
+ errCh <- rwErr{off, err}
+ }
+ return
+ }
+ }
+ }()
+
+ var wg sync.WaitGroup
+ wg.Add(concurrency)
+ for i := 0; i < concurrency; i++ {
+ // Map_i: each worker gets work, and does the Write from each buffer to its respective offset.
+ go func() {
+ defer wg.Done()
+
+ ch := make(chan result, 1) // reusable channel per mapper.
+
+ for packet := range workCh {
+ n, err := f.writeChunkAt(ch, packet.b[:packet.n], packet.off)
+ if err != nil {
+ // return the offset as the start + how much we wrote before the error.
+ errCh <- rwErr{packet.off + int64(n), err}
+ }
+ pool.Put(packet.b)
+ }
+ }()
+ }
+
+ // Wait for long tail, before closing results.
+ go func() {
+ wg.Wait()
+ close(errCh)
+ }()
+
+ // Reduce: Collect all the results into a relevant return: the earliest offset to return an error.
+ firstErr := rwErr{math.MaxInt64, nil}
+ for rwErr := range errCh {
+ if rwErr.off <= firstErr.off {
+ firstErr = rwErr
+ }
+
+ select {
+ case <-cancel:
+ default:
+ // stop any more work from being distributed.
+ close(cancel)
+ }
+ }
+
+ if firstErr.err != nil {
+ // firstErr.err != nil if and only if firstErr.off is a valid offset.
+ //
+ // firstErr.off will then be the lesser of:
+ // * the offset of the first error from writing,
+ // * the last successfully read offset.
+ //
+ // This could be less than the last successfully written offset,
+ // which is the whole reason for the UseConcurrentWrites() ClientOption.
+ //
+ // Callers are responsible for truncating any SFTP files to a safe length.
+ f.offset = firstErr.off
+
+ // ReadFrom is defined to return the read bytes, regardless of any writer errors.
+ return read, firstErr.err
+ }
+
+ f.offset += read
+ return read, nil
+}
+
+// ReadFrom reads data from r until EOF and writes it to the file. The return
+// value is the number of bytes read. Any error except io.EOF encountered
+// during the read is also returned.
+//
+// This method is preferred over calling Write multiple times
+// to maximise throughput for transferring the entire file,
+// especially over high-latency links.
+func (f *File) ReadFrom(r io.Reader) (int64, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ if f.c.useConcurrentWrites {
+ var remain int64
+ switch r := r.(type) {
+ case interface{ Len() int }:
+ remain = int64(r.Len())
+
+ case interface{ Size() int64 }:
+ remain = r.Size()
+
+ case *io.LimitedReader:
+ remain = r.N
+
+ case interface{ Stat() (os.FileInfo, error) }:
+ info, err := r.Stat()
+ if err == nil {
+ remain = info.Size()
+ }
+ }
+
+ if remain < 0 {
+ // We can strongly assert that we want default max concurrency here.
+ return f.ReadFromWithConcurrency(r, f.c.maxConcurrentRequests)
+ }
+
+ if remain > int64(f.c.maxPacket) {
+ // Otherwise, only use concurrency, if it would be at least two packets.
+
+ // This is the best reasonable guess we can make.
+ concurrency64 := remain/int64(f.c.maxPacket) + 1
+
+ // We need to cap this value to an `int` size value to avoid overflow on 32-bit machines.
+ // So, we may as well pre-cap it to `f.c.maxConcurrentRequests`.
+ if concurrency64 > int64(f.c.maxConcurrentRequests) {
+ concurrency64 = int64(f.c.maxConcurrentRequests)
+ }
+
+ return f.ReadFromWithConcurrency(r, int(concurrency64))
+ }
+ }
+
+ ch := make(chan result, 1) // reusable channel
+
+ b := make([]byte, f.c.maxPacket)
+
+ var read int64
+ for {
+ n, err := r.Read(b)
+ if n < 0 {
+ panic("sftp.File: reader returned negative count from Read")
+ }
+
+ if n > 0 {
+ read += int64(n)
+
+ m, err2 := f.writeChunkAt(ch, b[:n], f.offset)
+ f.offset += int64(m)
+
+ if err == nil {
+ err = err2
+ }
+ }
+
+ if err != nil {
+ if err == io.EOF {
+ return read, nil // return nil explicitly.
+ }
+
+ return read, err
+ }
+ }
+}
+
+// Seek implements io.Seeker by setting the client offset for the next Read or
+// Write. It returns the next offset read. Seeking before or after the end of
+// the file is undefined. Seeking relative to the end calls Stat.
+func (f *File) Seek(offset int64, whence int) (int64, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ switch whence {
+ case io.SeekStart:
+ case io.SeekCurrent:
+ offset += f.offset
+ case io.SeekEnd:
+ fi, err := f.Stat()
+ if err != nil {
+ return f.offset, err
+ }
+ offset += fi.Size()
+ default:
+ return f.offset, unimplementedSeekWhence(whence)
+ }
+
+ if offset < 0 {
+ return f.offset, os.ErrInvalid
+ }
+
+ f.offset = offset
+ return f.offset, nil
+}
+
+// Chown changes the uid/gid of the current file.
+func (f *File) Chown(uid, gid int) error {
+ return f.c.Chown(f.path, uid, gid)
+}
+
+// Chmod changes the permissions of the current file.
+//
+// See Client.Chmod for details.
+func (f *File) Chmod(mode os.FileMode) error {
+ return f.c.setfstat(f.handle, sshFileXferAttrPermissions, toChmodPerm(mode))
+}
+
+// Sync requests a flush of the contents of a File to stable storage.
+//
+// Sync requires the server to support the fsync@openssh.com extension.
+func (f *File) Sync() error {
+ id := f.c.nextID()
+ typ, data, err := f.c.sendPacket(nil, &sshFxpFsyncPacket{
+ ID: id,
+ Handle: f.handle,
+ })
+
+ switch {
+ case err != nil:
+ return err
+ case typ == sshFxpStatus:
+ return normaliseError(unmarshalStatus(id, data))
+ default:
+ return &unexpectedPacketErr{want: sshFxpStatus, got: typ}
+ }
+}
+
+// Truncate sets the size of the current file. Although it may be safely assumed
+// that if the size is less than its current size it will be truncated to fit,
+// the SFTP protocol does not specify what behavior the server should do when setting
+// size greater than the current size.
+// We send a SSH_FXP_FSETSTAT here since we have a file handle
+func (f *File) Truncate(size int64) error {
+ return f.c.setfstat(f.handle, sshFileXferAttrSize, uint64(size))
+}
+
+// normaliseError normalises an error into a more standard form that can be
+// checked against stdlib errors like io.EOF or os.ErrNotExist.
+func normaliseError(err error) error {
+ switch err := err.(type) {
+ case *StatusError:
+ switch err.Code {
+ case sshFxEOF:
+ return io.EOF
+ case sshFxNoSuchFile:
+ return os.ErrNotExist
+ case sshFxPermissionDenied:
+ return os.ErrPermission
+ case sshFxOk:
+ return nil
+ default:
+ return err
+ }
+ default:
+ return err
+ }
+}
+
+// flags converts the flags passed to OpenFile into ssh flags.
+// Unsupported flags are ignored.
+func flags(f int) uint32 {
+ var out uint32
+ switch f & os.O_WRONLY {
+ case os.O_WRONLY:
+ out |= sshFxfWrite
+ case os.O_RDONLY:
+ out |= sshFxfRead
+ }
+ if f&os.O_RDWR == os.O_RDWR {
+ out |= sshFxfRead | sshFxfWrite
+ }
+ if f&os.O_APPEND == os.O_APPEND {
+ out |= sshFxfAppend
+ }
+ if f&os.O_CREATE == os.O_CREATE {
+ out |= sshFxfCreat
+ }
+ if f&os.O_TRUNC == os.O_TRUNC {
+ out |= sshFxfTrunc
+ }
+ if f&os.O_EXCL == os.O_EXCL {
+ out |= sshFxfExcl
+ }
+ return out
+}
+
+// toChmodPerm converts Go permission bits to POSIX permission bits.
+//
+// This differs from fromFileMode in that we preserve the POSIX versions of
+// setuid, setgid and sticky in m, because we've historically supported those
+// bits, and we mask off any non-permission bits.
+func toChmodPerm(m os.FileMode) (perm uint32) {
+ const mask = os.ModePerm | s_ISUID | s_ISGID | s_ISVTX
+ perm = uint32(m & mask)
+
+ if m&os.ModeSetuid != 0 {
+ perm |= s_ISUID
+ }
+ if m&os.ModeSetgid != 0 {
+ perm |= s_ISGID
+ }
+ if m&os.ModeSticky != 0 {
+ perm |= s_ISVTX
+ }
+
+ return perm
+}