os: support overlapped IO with NewFile

The runtime/poll package has just gained support for overlapped IO,
see CL 660595 and CL 661955. The only remaining piece was making it
visible to user code via os.NewFile.

Some of the poll.FD.Init responsibility has been moved to os.NewFile
to avoid unnecessary syscalls for the common case of using os.Open,
os.Create, os.OpenFile, and os.Pipe, where we know that the file
is not opened for overlapped IO.

Some internal/poll tests have been moved to the os package to exercise
public APIs rather than internal ones.

The os.NewFile function definition has been moved into an OS-agnostic
file to avoid having duplicated documentation and ensure that the
caller is aware of its behavior across all platforms.

Closes #19098.

Cq-Include-Trybots: luci.golang.try:gotip-windows-amd64-longtest,gotip-windows-amd64-race,gotip-windows-arm64
Change-Id: If043f8b34d588cd4b481777203107ed92d660fd9
Reviewed-on: https://go-review.googlesource.com/c/go/+/662236
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Damien Neil <dneil@google.com>
Reviewed-by: Carlos Amedee <carlos@golang.org>
Auto-Submit: Damien Neil <dneil@google.com>
This commit is contained in:
qmuntal 2025-04-02 12:47:32 +02:00 committed by Gopher Robot
parent 5fc596ebe7
commit 7e60bdd7aa
10 changed files with 449 additions and 427 deletions

View File

@ -1,6 +1,14 @@
On Windows, [NewFile] supports overlapped (a.k.a non-blocking) file handles even On Windows, [NewFile] now supports handles opened for asynchronous I/O (that is,
when the handle can't be added to the Go runtime I/O Completion Port (IOCP), normally [syscall.FILE_FLAG_OVERLAPPED] is specified in the [syscall.CreateFile] call).
because it is already attached to another IOCP. The I/O operations will be performed in These handles are associated with the Go runtime's I/O completion port,
a blocking manner instead of using the Go runtime IOCP. which provides the following benefits for the resulting [File]:
Particularly, this means that is now possible to reliably pass overlapped named pipes and
sockets to a Go process standard input, output, and error. - I/O methods ([File.Read], [File.Write], [File.ReadAt], and [File.WriteAt]) do not block an OS thread.
- Deadline methods ([File.SetDeadline], [File.SetReadDeadline], and [File.SetWriteDeadline]) are supported.
This enhancement is especially beneficial for applications that communicate via named pipes on Windows.
Note that a handle can only be associated with one completion port at a time.
If the handle provided to [NewFile] is already associated with a completion port,
the returned [File] is downgraded to synchronous I/O mode.
In this case, I/O methods will block an OS thread, and the deadline methods have no effect.

View File

@ -293,9 +293,8 @@ type FD struct {
isBlocking bool isBlocking bool
// Initialization parameters. // Initialization parameters.
initIOOnce sync.Once initIOOnce sync.Once
initIOErr error // only used in the net package initIOErr error // only used in the net package
initPollable bool // value passed to [FD.Init]
} }
// setOffset sets the offset fields of the overlapped object // setOffset sets the offset fields of the overlapped object
@ -333,36 +332,30 @@ const (
) )
func (fd *FD) initIO() error { func (fd *FD) initIO() error {
if fd.isBlocking {
return nil
}
fd.initIOOnce.Do(func() { fd.initIOOnce.Do(func() {
if fd.initPollable { // The runtime poller will ignore I/O completion
// The runtime poller will ignore I/O completion // notifications not initiated by this package,
// notifications not initiated by this package, // so it is safe to add handles owned by the caller.
// so it is safe to add handles owned by the caller. fd.initIOErr = fd.pd.init(fd)
// if fd.initIOErr != nil {
// If we could not add the handle to the runtime poller, // This can happen if the handle is already associated
// assume the handle hasn't been opened for overlapped I/O. // with another IOCP or if the isBlocking flag is incorrect.
fd.initIOErr = fd.pd.init(fd) // In both cases, fallback to synchronous IO.
if fd.initIOErr != nil { fd.isBlocking = true
fd.initPollable = false
}
}
if !fd.initPollable {
// Handle opened for overlapped I/O (aka non-blocking) that are not added
// to the runtime poller need special handling when reading and writing.
// If we fail to get the file mode information, assume the file is blocking.
overlapped, _ := windows.IsNonblock(fd.Sysfd)
fd.isBlocking = !overlapped
fd.skipSyncNotif = true fd.skipSyncNotif = true
} else { return
fd.rop.runtimeCtx = fd.pd.runtimeCtx }
fd.wop.runtimeCtx = fd.pd.runtimeCtx fd.rop.runtimeCtx = fd.pd.runtimeCtx
if fd.kind != kindNet || socketCanUseSetFileCompletionNotificationModes { fd.wop.runtimeCtx = fd.pd.runtimeCtx
// Non-socket handles can use SetFileCompletionNotificationModes without problems. if fd.kind != kindNet || socketCanUseSetFileCompletionNotificationModes {
err := syscall.SetFileCompletionNotificationModes(fd.Sysfd, // Non-socket handles can use SetFileCompletionNotificationModes without problems.
syscall.FILE_SKIP_SET_EVENT_ON_HANDLE|syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS, err := syscall.SetFileCompletionNotificationModes(fd.Sysfd,
) syscall.FILE_SKIP_SET_EVENT_ON_HANDLE|syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS,
fd.skipSyncNotif = err == nil )
} fd.skipSyncNotif = err == nil
} }
}) })
return fd.initIOErr return fd.initIOErr
@ -373,6 +366,7 @@ func (fd *FD) initIO() error {
// The net argument is a network name from the net package (e.g., "tcp"), // The net argument is a network name from the net package (e.g., "tcp"),
// or "file" or "console" or "dir". // or "file" or "console" or "dir".
// Set pollable to true if fd should be managed by runtime netpoll. // Set pollable to true if fd should be managed by runtime netpoll.
// Pollable must be set to true for overlapped fds.
func (fd *FD) Init(net string, pollable bool) error { func (fd *FD) Init(net string, pollable bool) error {
if initErr != nil { if initErr != nil {
return initErr return initErr
@ -390,7 +384,8 @@ func (fd *FD) Init(net string, pollable bool) error {
fd.kind = kindNet fd.kind = kindNet
} }
fd.isFile = fd.kind != kindNet fd.isFile = fd.kind != kindNet
fd.initPollable = pollable fd.isBlocking = !pollable
fd.skipSyncNotif = fd.isBlocking
fd.rop.mode = 'r' fd.rop.mode = 'r'
fd.wop.mode = 'w' fd.wop.mode = 'w'
fd.rop.fd = fd fd.rop.fd = fd

View File

@ -5,19 +5,14 @@
package poll_test package poll_test
import ( import (
"bytes"
"errors" "errors"
"internal/poll" "internal/poll"
"internal/syscall/windows" "internal/syscall/windows"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"sync"
"sync/atomic"
"syscall" "syscall"
"testing" "testing"
"time"
"unsafe" "unsafe"
) )
@ -167,13 +162,13 @@ type _TCP_INFO_v0 struct {
SynRetrans uint8 SynRetrans uint8
} }
func newFD(t testing.TB, h syscall.Handle, kind string, overlapped, pollable bool) *poll.FD { func newFD(t testing.TB, h syscall.Handle, kind string, overlapped bool) *poll.FD {
fd := poll.FD{ fd := poll.FD{
Sysfd: h, Sysfd: h,
IsStream: true, IsStream: true,
ZeroReadIsEOF: true, ZeroReadIsEOF: true,
} }
err := fd.Init(kind, pollable) err := fd.Init(kind, overlapped)
if overlapped && err != nil { if overlapped && err != nil {
// Overlapped file handles should not error. // Overlapped file handles should not error.
fd.Close() fd.Close()
@ -185,7 +180,7 @@ func newFD(t testing.TB, h syscall.Handle, kind string, overlapped, pollable boo
return &fd return &fd
} }
func newFile(t testing.TB, name string, overlapped, pollable bool) *poll.FD { func newFile(t testing.TB, name string, overlapped bool) *poll.FD {
namep, err := syscall.UTF16PtrFromString(name) namep, err := syscall.UTF16PtrFromString(name)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -210,326 +205,7 @@ func newFile(t testing.TB, name string, overlapped, pollable bool) *poll.FD {
if typ == syscall.FILE_TYPE_PIPE { if typ == syscall.FILE_TYPE_PIPE {
kind = "pipe" kind = "pipe"
} }
return newFD(t, h, kind, overlapped, pollable) return newFD(t, h, kind, overlapped)
}
var currentProces = sync.OnceValue(func() string {
// Convert the process ID to a string.
return strconv.FormatUint(uint64(os.Getpid()), 10)
})
var pipeCounter atomic.Uint64
func newBytePipe(t testing.TB, name string, overlapped, pollable bool) *poll.FD {
return newPipe(t, name, false, overlapped, pollable)
}
func newMessagePipe(t testing.TB, name string, overlapped, pollable bool) *poll.FD {
return newPipe(t, name, true, overlapped, pollable)
}
func pipeName() string {
return `\\.\pipe\go-internal-poll-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
}
func newPipe(t testing.TB, name string, message, overlapped, pollable bool) *poll.FD {
wname, err := syscall.UTF16PtrFromString(name)
if err != nil {
t.Fatal(err)
}
// Create the read handle.
flags := windows.PIPE_ACCESS_DUPLEX
if overlapped {
flags |= syscall.FILE_FLAG_OVERLAPPED
}
typ := windows.PIPE_TYPE_BYTE | windows.PIPE_READMODE_BYTE
if message {
typ = windows.PIPE_TYPE_MESSAGE | windows.PIPE_READMODE_MESSAGE
}
h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil)
if err != nil {
t.Fatal(err)
}
return newFD(t, h, "pipe", overlapped, pollable)
}
func testReadWrite(t *testing.T, fdr, fdw *poll.FD) {
write := make(chan string, 1)
read := make(chan struct{}, 1)
go func() {
for s := range write {
n, err := fdw.Write([]byte(s))
read <- struct{}{}
if err != nil {
t.Error(err)
}
if n != len(s) {
t.Errorf("expected to write %d bytes, got %d", len(s), n)
}
}
}()
for i := range 10 {
s := strconv.Itoa(i)
write <- s
<-read
buf := make([]byte, len(s))
_, err := io.ReadFull(fdr, buf)
if err != nil {
t.Fatalf("read failed: %v", err)
}
if !bytes.Equal(buf, []byte(s)) {
t.Fatalf("expected %q, got %q", s, buf)
}
}
close(read)
close(write)
}
func testPreadPwrite(t *testing.T, fdr, fdw *poll.FD) {
type op struct {
s string
off int64
}
write := make(chan op, 1)
read := make(chan struct{}, 1)
go func() {
for o := range write {
n, err := fdw.Pwrite([]byte(o.s), o.off)
read <- struct{}{}
if err != nil {
t.Error(err)
}
if n != len(o.s) {
t.Errorf("expected to write %d bytes, got %d", len(o.s), n)
}
}
}()
for i := range 10 {
off := int64(i % 3) // exercise some back and forth
s := strconv.Itoa(i)
write <- op{s, off}
<-read
buf := make([]byte, len(s))
n, err := fdr.Pread(buf, off)
if err != nil {
t.Fatal(err)
}
if n != len(s) {
t.Fatalf("expected to read %d bytes, got %d", len(s), n)
}
if !bytes.Equal(buf, []byte(s)) {
t.Fatalf("expected %q, got %q", s, buf)
}
}
close(read)
close(write)
}
func testFileReadEOF(t *testing.T, f *poll.FD) {
end, err := f.Seek(0, io.SeekEnd)
if err != nil {
t.Fatal(err)
}
var buf [1]byte
n, err := f.Read(buf[:])
if err != nil && err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
n, err = f.Pread(buf[:], end)
if err != nil && err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
}
func TestFile(t *testing.T) {
t.Parallel()
tests := []struct {
name string
overlappedRead bool
overlappedWrite bool
pollable bool
}{
{"overlapped", true, true, true},
{"overlapped-nonpollable", true, true, false},
{"overlapped-read", true, false, true},
{"overlapped-write", false, true, true},
{"sync", false, false, false},
{"sync-pollable", false, false, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
name := filepath.Join(t.TempDir(), "foo")
rh := newFile(t, name, tt.overlappedRead, tt.pollable)
wh := newFile(t, name, tt.overlappedWrite, tt.pollable)
testReadWrite(t, rh, wh)
testPreadPwrite(t, rh, wh)
testFileReadEOF(t, rh)
})
}
}
func TestPipe(t *testing.T) {
t.Parallel()
tests := []struct {
name string
overlappedRead bool
overlappedWrite bool
pollable bool
}{
{"overlapped", true, true, true},
{"overlapped-nonpollable", true, true, false},
{"overlapped-write", false, true, true},
{"overlapped-read", true, false, true},
{"sync", false, false, false},
{"sync-pollable", false, false, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
name := pipeName()
pipe := newBytePipe(t, name, tt.overlappedWrite, tt.pollable)
file := newFile(t, name, tt.overlappedRead, tt.pollable)
testReadWrite(t, pipe, file)
})
}
t.Run("anonymous", func(t *testing.T) {
t.Parallel()
var r, w syscall.Handle
if err := syscall.CreatePipe(&r, &w, nil, 0); err != nil {
t.Fatal(err)
}
defer func() {
if err := syscall.CloseHandle(r); err != nil {
t.Fatal(err)
}
if err := syscall.CloseHandle(w); err != nil {
t.Fatal(err)
}
}()
// CreatePipe always returns sync handles.
fdr := newFD(t, r, "pipe", false, false)
fdw := newFD(t, w, "file", false, false)
testReadWrite(t, fdr, fdw)
})
}
func TestPipeMessageReadEOF(t *testing.T) {
t.Parallel()
name := pipeName()
pipe := newMessagePipe(t, name, true, true)
file := newFile(t, name, true, true)
_, err := pipe.Write(nil)
if err != nil {
t.Error(err)
}
var buf [10]byte
n, err := file.Read(buf[:])
if err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
}
func TestPipeClosedEOF(t *testing.T) {
t.Parallel()
name := pipeName()
pipe := newBytePipe(t, name, true, false)
file := newFile(t, name, true, true)
pipe.Close()
var buf [10]byte
n, err := file.Read(buf[:])
if err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
}
func TestPipeReadTimeout(t *testing.T) {
t.Parallel()
name := pipeName()
_ = newBytePipe(t, name, true, true)
file := newFile(t, name, true, true)
err := file.SetReadDeadline(time.Now().Add(time.Millisecond))
if err != nil {
t.Fatal(err)
}
var buf [10]byte
_, err = file.Read(buf[:])
if err != poll.ErrDeadlineExceeded {
t.Errorf("expected deadline exceeded, got %v", err)
}
}
func TestPipeCanceled(t *testing.T) {
t.Parallel()
name := pipeName()
_ = newBytePipe(t, name, true, true)
file := newFile(t, name, true, true)
ch := make(chan struct{}, 1)
go func() {
for {
select {
case <-ch:
return
default:
syscall.CancelIo(syscall.Handle(file.Sysfd))
time.Sleep(100 * time.Millisecond)
}
}
}()
// Try to cancel for max 1 second.
// Canceling is normally really fast, but it can take an
// arbitrary amount of time on busy systems.
// If it takes too long, we skip the test.
file.SetReadDeadline(time.Now().Add(1 * time.Second))
var tmp [1]byte
// Read will block until the cancel is complete.
_, err := file.Read(tmp[:])
ch <- struct{}{}
if err == poll.ErrDeadlineExceeded {
t.Skip("took too long to cancel")
}
if err != syscall.ERROR_OPERATION_ABORTED {
t.Errorf("expected ERROR_OPERATION_ABORTED, got %v", err)
}
}
func TestPipeExternalIOCP(t *testing.T) {
// Test that a caller can associate an overlapped handle to an external IOCP
// even when the handle is also associated to a poll.FD. Also test that
// the FD can still perform I/O after the association.
t.Parallel()
name := pipeName()
pipe := newMessagePipe(t, name, true, true)
_ = newFile(t, name, true, true) // Just open a pipe client
_, err := windows.CreateIoCompletionPort(syscall.Handle(pipe.Sysfd), 0, 0, 1)
if err != nil {
t.Fatal(err)
}
_, err = pipe.Write([]byte("hello"))
if err != nil {
t.Fatal(err)
}
} }
func BenchmarkReadOverlapped(b *testing.B) { func BenchmarkReadOverlapped(b *testing.B) {
@ -547,7 +223,7 @@ func benchmarkRead(b *testing.B, overlapped bool) {
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
file := newFile(b, name, overlapped, true) file := newFile(b, name, overlapped)
var buf [len(content)]byte var buf [len(content)]byte
for b.Loop() { for b.Loop() {
_, err := io.ReadFull(file, buf[:]) _, err := io.ReadFull(file, buf[:])

View File

@ -115,6 +115,25 @@ func (e *LinkError) Unwrap() error {
return e.Err return e.Err
} }
// NewFile returns a new [File] with the given file descriptor and name.
// The returned value will be nil if fd is not a valid file descriptor.
//
// NewFile's behavior differs on some platforms:
//
// - On Unix, if fd is in non-blocking mode, NewFile will attempt to return a pollable file.
// - On Windows, if fd is opened for asynchronous I/O (that is, [syscall.FILE_FLAG_OVERLAPPED]
// has been specified in the [syscall.CreateFile] call), NewFile will attempt to return a pollable
// file by associating fd with the Go runtime I/O completion port.
// The I/O operations will be performed synchronously if the association fails.
//
// Only pollable files support [File.SetDeadline], [File.SetReadDeadline], and [File.SetWriteDeadline].
//
// After passing it to NewFile, fd may become invalid under the same conditions described
// in the comments of [File.Fd], and the same constraints apply.
func NewFile(fd uintptr, name string) *File {
return newFileFromNewFile(fd, name)
}
// Read reads up to len(b) bytes from the File and stores them in b. // Read reads up to len(b) bytes from the File and stores them in b.
// It returns the number of bytes read and any error encountered. // It returns the number of bytes read and any error encountered.
// At end of file, Read returns 0, io.EOF. // At end of file, Read returns 0, io.EOF.

View File

@ -42,10 +42,8 @@ func (f *File) fd() uintptr {
return uintptr(f.sysfd) return uintptr(f.sysfd)
} }
// NewFile returns a new File with the given file descriptor and // newFileFromNewFile is called by [NewFile].
// name. The returned value will be nil if fd is not a valid file func newFileFromNewFile(fd uintptr, name string) *File {
// descriptor.
func NewFile(fd uintptr, name string) *File {
fdi := int(fd) fdi := int(fd)
if fdi < 0 { if fdi < 0 {
return nil return nil

View File

@ -84,16 +84,8 @@ func (f *File) fd() uintptr {
return uintptr(f.pfd.Sysfd) return uintptr(f.pfd.Sysfd)
} }
// NewFile returns a new File with the given file descriptor and // newFileFromNewFile is called by [NewFile].
// name. The returned value will be nil if fd is not a valid file func newFileFromNewFile(fd uintptr, name string) *File {
// descriptor. On Unix systems, if the file descriptor is in
// non-blocking mode, NewFile will attempt to return a pollable File
// (one for which the SetDeadline methods work).
//
// After passing it to NewFile, fd may become invalid under the same
// conditions described in the comments of the Fd method, and the same
// constraints apply.
func NewFile(fd uintptr, name string) *File {
fdi := int(fd) fdi := int(fd)
if fdi < 0 { if fdi < 0 {
return nil return nil

View File

@ -42,7 +42,8 @@ func (file *File) fd() uintptr {
// newFile returns a new File with the given file handle and name. // newFile returns a new File with the given file handle and name.
// Unlike NewFile, it does not check that h is syscall.InvalidHandle. // Unlike NewFile, it does not check that h is syscall.InvalidHandle.
func newFile(h syscall.Handle, name string, kind string) *File { // If nonBlocking is true, it tries to add the file to the runtime poller.
func newFile(h syscall.Handle, name string, kind string, nonBlocking bool) *File {
if kind == "file" { if kind == "file" {
t, err := syscall.GetFileType(h) t, err := syscall.GetFileType(h)
if err != nil || t == syscall.FILE_TYPE_CHAR { if err != nil || t == syscall.FILE_TYPE_CHAR {
@ -67,25 +68,23 @@ func newFile(h syscall.Handle, name string, kind string) *File {
// Ignore initialization errors. // Ignore initialization errors.
// Assume any problems will show up in later I/O. // Assume any problems will show up in later I/O.
f.pfd.Init(kind, false) f.pfd.Init(kind, nonBlocking)
return f return f
} }
// newConsoleFile creates new File that will be used as console. // newConsoleFile creates new File that will be used as console.
func newConsoleFile(h syscall.Handle, name string) *File { func newConsoleFile(h syscall.Handle, name string) *File {
return newFile(h, name, "console") return newFile(h, name, "console", false)
} }
// NewFile returns a new File with the given file descriptor and // newFileFromNewFile is called by [NewFile].
// name. The returned value will be nil if fd is not a valid file func newFileFromNewFile(fd uintptr, name string) *File {
// descriptor.
func NewFile(fd uintptr, name string) *File {
h := syscall.Handle(fd) h := syscall.Handle(fd)
if h == syscall.InvalidHandle { if h == syscall.InvalidHandle {
return nil return nil
} }
return newFile(h, name, "file") nonBlocking, _ := windows.IsNonblock(syscall.Handle(fd))
return newFile(h, name, "file", nonBlocking)
} }
func epipecheck(file *File, e error) { func epipecheck(file *File, e error) {
@ -105,7 +104,8 @@ func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
if err != nil { if err != nil {
return nil, &PathError{Op: "open", Path: name, Err: err} return nil, &PathError{Op: "open", Path: name, Err: err}
} }
return newFile(r, name, "file"), nil // syscall.Open always returns a non-blocking handle.
return newFile(r, name, "file", false), nil
} }
func openDirNolog(name string) (*File, error) { func openDirNolog(name string) (*File, error) {
@ -219,7 +219,8 @@ func Pipe() (r *File, w *File, err error) {
if e != nil { if e != nil {
return nil, nil, NewSyscallError("pipe", e) return nil, nil, NewSyscallError("pipe", e)
} }
return newFile(p[0], "|0", "pipe"), newFile(p[1], "|1", "pipe"), nil // syscall.Pipe always returns a non-blocking handle.
return newFile(p[0], "|0", "pipe", false), newFile(p[1], "|1", "pipe", false), nil
} }
var useGetTempPath2 = sync.OnceValue(func() bool { var useGetTempPath2 = sync.OnceValue(func() bool {

View File

@ -26,6 +26,7 @@ import (
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"testing" "testing"
"time"
"unicode/utf16" "unicode/utf16"
"unsafe" "unsafe"
) )
@ -1606,38 +1607,6 @@ func TestReadWriteFileOverlapped(t *testing.T) {
} }
} }
var currentProces = sync.OnceValue(func() string {
// Convert the process ID to a string.
return strconv.FormatUint(uint64(os.Getpid()), 10)
})
var pipeCounter atomic.Uint64
func pipeName() string {
return `\\.\pipe\go-os-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
}
func createPipe(t *testing.T, name string, inherit bool) *os.File {
t.Helper()
wname, err := syscall.UTF16PtrFromString(name)
if err != nil {
t.Fatal(err)
}
flags := windows.PIPE_ACCESS_DUPLEX | syscall.FILE_FLAG_OVERLAPPED
typ := windows.PIPE_TYPE_BYTE
sa := &syscall.SecurityAttributes{
Length: uint32(unsafe.Sizeof(syscall.SecurityAttributes{})),
}
if inherit {
sa.InheritHandle = 1
}
rh, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, sa)
if err != nil {
t.Fatal(err)
}
return os.NewFile(uintptr(rh), name)
}
func TestStdinOverlappedPipe(t *testing.T) { func TestStdinOverlappedPipe(t *testing.T) {
// Test that we can read from a named pipe open with FILE_FLAG_OVERLAPPED. // Test that we can read from a named pipe open with FILE_FLAG_OVERLAPPED.
// See https://go.dev/issue/15388. // See https://go.dev/issue/15388.
@ -1656,7 +1625,7 @@ func TestStdinOverlappedPipe(t *testing.T) {
name := pipeName() name := pipeName()
// Create the read handle inherited by the child process. // Create the read handle inherited by the child process.
r := createPipe(t, name, true) r := newPipe(t, name, false, true)
defer r.Close() defer r.Close()
// Create a write handle. // Create a write handle.
@ -1687,3 +1656,366 @@ func TestStdinOverlappedPipe(t *testing.T) {
t.Fatalf("output %q does not contain %q", got, want) t.Fatalf("output %q does not contain %q", got, want)
} }
} }
func newFileOverlapped(t testing.TB, name string, overlapped bool) *os.File {
namep, err := syscall.UTF16PtrFromString(name)
if err != nil {
t.Fatal(err)
}
flags := syscall.FILE_ATTRIBUTE_NORMAL
if overlapped {
flags |= syscall.FILE_FLAG_OVERLAPPED
}
h, err := syscall.CreateFile(namep,
syscall.GENERIC_READ|syscall.GENERIC_WRITE,
syscall.FILE_SHARE_WRITE|syscall.FILE_SHARE_READ,
nil, syscall.OPEN_ALWAYS, uint32(flags), 0)
if err != nil {
t.Fatal(err)
}
f := os.NewFile(uintptr(h), name)
t.Cleanup(func() {
if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
t.Fatal(err)
}
})
return f
}
var currentProcess = sync.OnceValue(func() string {
// Convert the process ID to a string.
return strconv.FormatUint(uint64(os.Getpid()), 10)
})
var pipeCounter atomic.Uint64
func newBytePipe(t testing.TB, name string, overlapped bool) *os.File {
return newPipe(t, name, false, overlapped)
}
func newMessagePipe(t testing.TB, name string, overlapped bool) *os.File {
return newPipe(t, name, true, overlapped)
}
func pipeName() string {
return `\\.\pipe\go-os-test-` + currentProcess() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
}
func newPipe(t testing.TB, name string, message, overlapped bool) *os.File {
wname, err := syscall.UTF16PtrFromString(name)
if err != nil {
t.Fatal(err)
}
// Create the read handle.
flags := windows.PIPE_ACCESS_DUPLEX
if overlapped {
flags |= syscall.FILE_FLAG_OVERLAPPED
}
typ := windows.PIPE_TYPE_BYTE | windows.PIPE_READMODE_BYTE
if message {
typ = windows.PIPE_TYPE_MESSAGE | windows.PIPE_READMODE_MESSAGE
}
h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil)
if err != nil {
t.Fatal(err)
}
f := os.NewFile(uintptr(h), name)
t.Cleanup(func() {
if err := f.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
t.Fatal(err)
}
})
return f
}
func testReadWrite(t *testing.T, fdr, fdw *os.File) {
write := make(chan string, 1)
read := make(chan struct{}, 1)
go func() {
for s := range write {
n, err := fdw.Write([]byte(s))
read <- struct{}{}
if err != nil {
t.Error(err)
}
if n != len(s) {
t.Errorf("expected to write %d bytes, got %d", len(s), n)
}
}
}()
for i := range 10 {
s := strconv.Itoa(i)
write <- s
<-read
buf := make([]byte, len(s))
_, err := io.ReadFull(fdr, buf)
if err != nil {
t.Fatalf("read failed: %v", err)
}
if !bytes.Equal(buf, []byte(s)) {
t.Fatalf("expected %q, got %q", s, buf)
}
}
close(read)
close(write)
}
func testPreadPwrite(t *testing.T, fdr, fdw *os.File) {
type op struct {
s string
off int64
}
write := make(chan op, 1)
read := make(chan struct{}, 1)
go func() {
for o := range write {
n, err := fdw.WriteAt([]byte(o.s), o.off)
read <- struct{}{}
if err != nil {
t.Error(err)
}
if n != len(o.s) {
t.Errorf("expected to write %d bytes, got %d", len(o.s), n)
}
}
}()
for i := range 10 {
off := int64(i % 3) // exercise some back and forth
s := strconv.Itoa(i)
write <- op{s, off}
<-read
buf := make([]byte, len(s))
n, err := fdr.ReadAt(buf, off)
if err != nil {
t.Fatal(err)
}
if n != len(s) {
t.Fatalf("expected to read %d bytes, got %d", len(s), n)
}
if !bytes.Equal(buf, []byte(s)) {
t.Fatalf("expected %q, got %q", s, buf)
}
}
close(read)
close(write)
}
func testFileReadEOF(t *testing.T, f *os.File) {
end, err := f.Seek(0, io.SeekEnd)
if err != nil {
t.Fatal(err)
}
var buf [1]byte
n, err := f.Read(buf[:])
if err != nil && err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
n, err = f.ReadAt(buf[:], end)
if err != nil && err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
}
func TestFile(t *testing.T) {
t.Parallel()
tests := []struct {
name string
overlappedRead bool
overlappedWrite bool
}{
{"overlapped", true, true},
{"overlapped-read", true, false},
{"overlapped-write", false, true},
{"sync", false, false},
{"sync-pollable", false, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
name := filepath.Join(t.TempDir(), "foo")
rh := newFileOverlapped(t, name, tt.overlappedRead)
wh := newFileOverlapped(t, name, tt.overlappedWrite)
testReadWrite(t, rh, wh)
testPreadPwrite(t, rh, wh)
testFileReadEOF(t, rh)
})
}
}
func TestPipe(t *testing.T) {
t.Parallel()
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
defer func() {
if err := r.Close(); err != nil {
t.Fatal(err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
}()
testReadWrite(t, r, w)
}
func TestNamedPipe(t *testing.T) {
t.Parallel()
tests := []struct {
name string
overlappedRead bool
overlappedWrite bool
pollable bool
}{
{"overlapped", true, true, true},
{"overlapped-write", false, true, true},
{"overlapped-read", true, false, true},
{"sync", false, false, false},
{"sync-pollable", false, false, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
name := pipeName()
pipe := newBytePipe(t, name, tt.overlappedWrite)
file := newFileOverlapped(t, name, tt.overlappedRead)
testReadWrite(t, pipe, file)
})
}
}
func TestPipeMessageReadEOF(t *testing.T) {
t.Parallel()
name := pipeName()
pipe := newMessagePipe(t, name, true)
file := newFileOverlapped(t, name, true)
_, err := pipe.Write(nil)
if err != nil {
t.Error(err)
}
var buf [10]byte
n, err := file.Read(buf[:])
if err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
}
func TestPipeClosedEOF(t *testing.T) {
t.Parallel()
name := pipeName()
pipe := newBytePipe(t, name, true)
file := newFileOverlapped(t, name, true)
pipe.Close()
var buf [10]byte
n, err := file.Read(buf[:])
if err != io.EOF {
t.Errorf("expected EOF, got %v", err)
}
if n != 0 {
t.Errorf("expected 0 bytes, got %d", n)
}
}
func TestPipeReadTimeout(t *testing.T) {
t.Parallel()
name := pipeName()
_ = newBytePipe(t, name, true)
file := newFileOverlapped(t, name, true)
err := file.SetReadDeadline(time.Now().Add(time.Millisecond))
if err != nil {
t.Fatal(err)
}
var buf [10]byte
_, err = file.Read(buf[:])
if !errors.Is(err, os.ErrDeadlineExceeded) {
t.Errorf("expected deadline exceeded, got %v", err)
}
}
func TestPipeCanceled(t *testing.T) {
t.Parallel()
name := pipeName()
_ = newBytePipe(t, name, true)
file := newFileOverlapped(t, name, true)
ch := make(chan struct{}, 1)
go func() {
for {
select {
case <-ch:
return
default:
sc, err := file.SyscallConn()
if err != nil {
t.Error(err)
return
}
if err := sc.Control(func(fd uintptr) {
syscall.CancelIo(syscall.Handle(fd))
}); err != nil {
t.Error(err)
}
time.Sleep(100 * time.Millisecond)
}
}
}()
// Try to cancel for max 1 second.
// Canceling is normally really fast, but it can take an
// arbitrary amount of time on busy systems.
// If it takes too long, we skip the test.
file.SetReadDeadline(time.Now().Add(1 * time.Second))
var tmp [1]byte
// Read will block until the cancel is complete.
_, err := file.Read(tmp[:])
ch <- struct{}{}
if errors.Is(err, os.ErrDeadlineExceeded) {
t.Skip("took too long to cancel")
}
if !errors.Is(err, syscall.ERROR_OPERATION_ABORTED) {
t.Errorf("expected ERROR_OPERATION_ABORTED, got %v", err)
}
}
func TestPipeExternalIOCP(t *testing.T) {
// Test that a caller can associate an overlapped handle to an external IOCP
// even when the handle is also associated to a poll.FD. Also test that
// the FD can still perform I/O after the association.
t.Parallel()
name := pipeName()
pipe := newMessagePipe(t, name, true)
_ = newFileOverlapped(t, name, true) // Just open a pipe client
sc, err := pipe.SyscallConn()
if err != nil {
t.Error(err)
return
}
if err := sc.Control(func(fd uintptr) {
_, err := windows.CreateIoCompletionPort(syscall.Handle(fd), 0, 0, 1)
if err != nil {
t.Fatal(err)
}
}); err != nil {
t.Error(err)
}
_, err = pipe.Write([]byte("hello"))
if err != nil {
t.Fatal(err)
}
}

View File

@ -13,5 +13,5 @@ func isErrNoFollow(err error) bool {
} }
func newDirFile(fd syscall.Handle, name string) (*File, error) { func newDirFile(fd syscall.Handle, name string) (*File, error) {
return newFile(fd, name, "file"), nil return newFile(fd, name, "file", false), nil
} }

View File

@ -131,7 +131,8 @@ func rootOpenFileNolog(root *Root, name string, flag int, perm FileMode) (*File,
if err != nil { if err != nil {
return nil, &PathError{Op: "openat", Path: name, Err: err} return nil, &PathError{Op: "openat", Path: name, Err: err}
} }
return newFile(fd, joinPath(root.Name(), name), "file"), nil // openat always returns a non-blocking handle.
return newFile(fd, joinPath(root.Name(), name), "file", false), nil
} }
func openat(dirfd syscall.Handle, name string, flag int, perm FileMode) (syscall.Handle, error) { func openat(dirfd syscall.Handle, name string, flag int, perm FileMode) (syscall.Handle, error) {