diff --git a/src/internal/poll/export_windows_test.go b/src/internal/poll/export_windows_test.go deleted file mode 100644 index 88ed71ad84..0000000000 --- a/src/internal/poll/export_windows_test.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2017 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Export guts for testing on windows. -// Since testing imports os and os imports internal/poll, -// the internal/poll tests can not be in package poll. - -package poll - -var ( - LogInitFD = &logInitFD -) - -func (fd *FD) IsPartOfNetpoll() bool { - return fd.pd.runtimeCtx != 0 -} diff --git a/src/internal/poll/fd_windows.go b/src/internal/poll/fd_windows.go index f94d6f49d3..c440377948 100644 --- a/src/internal/poll/fd_windows.go +++ b/src/internal/poll/fd_windows.go @@ -153,8 +153,46 @@ func (o *operation) InitMsg(p []byte, oob []byte) { } } +// waitIO waits for the IO operation o to complete. +func waitIO(o *operation) error { + fd := o.fd + if !fd.pd.pollable() { + // The overlapped handle is not added to the runtime poller, + // the only way to wait for the IO to complete is block. + _, err := syscall.WaitForSingleObject(fd.Sysfd, syscall.INFINITE) + return err + } + // Wait for our request to complete. + err := fd.pd.wait(int(o.mode), fd.isFile) + switch err { + case nil, ErrNetClosing, ErrFileClosing, ErrDeadlineExceeded: + // No other error is expected. + default: + panic("unexpected runtime.netpoll error: " + err.Error()) + } + return err +} + +// cancelIO cancels the IO operation o and waits for it to complete. +func cancelIO(o *operation) { + fd := o.fd + if !fd.pd.pollable() { + return + } + // Cancel our request. + err := syscall.CancelIoEx(fd.Sysfd, &o.o) + // Assuming ERROR_NOT_FOUND is returned, if IO is completed. + if err != nil && err != syscall.ERROR_NOT_FOUND { + // TODO(brainman): maybe do something else, but panic. + panic(err) + } + fd.pd.waitCanceled(int(o.mode)) +} + // execIO executes a single IO operation o. // It supports both synchronous and asynchronous IO. +// o.qty and o.flags are set to zero before calling submit +// to avoid reusing the values from a previous call. func execIO(o *operation, submit func(o *operation) error) (int, error) { fd := o.fd fd.initIO() @@ -163,89 +201,42 @@ func execIO(o *operation, submit func(o *operation) error) (int, error) { if err != nil { return 0, err } - getOverlappedResult := func() (int, error) { + // Start IO. + o.qty = 0 + o.flags = 0 + err = submit(o) + var waitErr error + if err == syscall.ERROR_IO_PENDING || (err == nil && !o.fd.skipSyncNotif) { + // IO started asynchronously or completed synchronously but + // a sync notification is required. Wait for it to complete. + waitErr = waitIO(o) + if waitErr != nil { + // IO interrupted by "close" or "timeout". + cancelIO(o) + // We issued a cancellation request, but the IO operation may still succeeded + // before the cancellation request runs. + } if fd.isFile { err = windows.GetOverlappedResult(fd.Sysfd, &o.o, &o.qty, false) } else { err = windows.WSAGetOverlappedResult(fd.Sysfd, &o.o, &o.qty, false, &o.flags) } - switch err { - case nil: - return int(o.qty), nil - case syscall.ERROR_HANDLE_EOF: - // EOF reached. - return int(o.qty), io.EOF - case syscall.ERROR_MORE_DATA, windows.WSAEMSGSIZE: - // More data available. Return back the size of received data. - return int(o.qty), err - default: - return 0, err + } + // ERROR_OPERATION_ABORTED may have been caused by us. In that case, + // map it to our own error. Don't do more than that, each submitted + // function may have its own meaning for each error. + if err == syscall.ERROR_OPERATION_ABORTED { + if waitErr != nil { + // IO canceled by the poller while waiting for completion. + err = waitErr + } else if fd.kind == kindPipe && fd.closing() { + // Close uses CancelIoEx to interrupt concurrent I/O for pipes. + // If the fd is a pipe and the Write was interrupted by CancelIoEx, + // we assume it is interrupted by Close. + err = errClosing(fd.isFile) } } - // Start IO. - err = submit(o) - if !fd.pd.pollable() { - if err == syscall.ERROR_IO_PENDING { - // The overlapped handle is not added to the runtime poller, - // the only way to wait for the IO to complete is block. - _, err = syscall.WaitForSingleObject(fd.Sysfd, syscall.INFINITE) - if err == nil { - return getOverlappedResult() - } - } - if err != nil { - return 0, err - } - return int(o.qty), nil - } - switch err { - case nil: - // IO completed immediately - if o.fd.skipSyncNotif { - // No completion message will follow, so return immediately. - return int(o.qty), nil - } - // Need to get our completion message anyway. - case syscall.ERROR_IO_PENDING: - // IO started, and we have to wait for its completion. - err = nil - default: - return 0, err - } - // Wait for our request to complete. - err = fd.pd.wait(int(o.mode), fd.isFile) - if err == nil { - // All is good. Extract our IO results and return. - return getOverlappedResult() - } - // IO is interrupted by "close" or "timeout" - netpollErr := err - switch netpollErr { - case ErrNetClosing, ErrFileClosing, ErrDeadlineExceeded: - // will deal with those. - default: - panic("unexpected runtime.netpoll error: " + netpollErr.Error()) - } - // Cancel our request. - err = syscall.CancelIoEx(fd.Sysfd, &o.o) - // Assuming ERROR_NOT_FOUND is returned, if IO is completed. - if err != nil && err != syscall.ERROR_NOT_FOUND { - // TODO(brainman): maybe do something else, but panic. - panic(err) - } - // Wait for cancellation to complete. - fd.pd.waitCanceled(int(o.mode)) - n, err := getOverlappedResult() - if err != nil { - if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled - err = netpollErr - } - return n, err - } - // We issued a cancellation request. But, it seems, IO operation succeeded - // before the cancellation request run. We need to treat the IO operation as - // succeeded (the bytes are actually sent/recv from network). - return n, nil + return int(o.qty), err } // FD is a file descriptor. The net and os packages embed this type in @@ -341,9 +332,6 @@ const ( kindPipe ) -// logInitFD is set by tests to enable file descriptor initialization logging. -var logInitFD func(net int, fd *FD, err error) - func (fd *FD) initIO() error { fd.initIOOnce.Do(func() { if fd.initPollable { @@ -358,19 +346,13 @@ func (fd *FD) initIO() error { fd.initPollable = false } } - if logInitFD != nil { - logInitFD(int(fd.kind), fd, fd.initIOErr) - } if !fd.initPollable { // Handle opened for overlapped I/O (aka non-blocking) that are not added // to the runtime poller need special handling when reading and writing. - var info windows.FILE_MODE_INFORMATION - if err := windows.NtQueryInformationFile(fd.Sysfd, &windows.IO_STATUS_BLOCK{}, unsafe.Pointer(&info), uint32(unsafe.Sizeof(info)), windows.FileModeInformation); err == nil { - fd.isBlocking = info.Mode&(windows.FILE_SYNCHRONOUS_IO_ALERT|windows.FILE_SYNCHRONOUS_IO_NONALERT) != 0 - } else { - // If we fail to get the file mode information, assume the file is blocking. - fd.isBlocking = true - } + // If we fail to get the file mode information, assume the file is blocking. + overlapped, _ := windows.IsNonblock(fd.Sysfd) + fd.isBlocking = !overlapped + fd.skipSyncNotif = true } else { fd.rop.runtimeCtx = fd.pd.runtimeCtx fd.wop.runtimeCtx = fd.pd.runtimeCtx @@ -379,9 +361,7 @@ func (fd *FD) initIO() error { err := syscall.SetFileCompletionNotificationModes(fd.Sysfd, syscall.FILE_SKIP_SET_EVENT_ON_HANDLE|syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS, ) - if err == nil { - fd.skipSyncNotif = true - } + fd.skipSyncNotif = err == nil } } }) @@ -429,11 +409,6 @@ func (fd *FD) Init(net string, pollable bool) error { // handles and that cares about handle IOCP association errors. // We can should do the IOCP association here. return fd.initIO() - } else { - if logInitFD != nil { - // For testing. - logInitFD(int(fd.kind), fd, nil) - } } return nil } @@ -508,21 +483,13 @@ func (fd *FD) Read(buf []byte) (int, error) { return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped()) }) fd.addOffset(n) - if err == syscall.ERROR_HANDLE_EOF { + switch err { + case syscall.ERROR_HANDLE_EOF: err = io.EOF - } - if fd.kind == kindPipe && err != nil { - switch err { - case syscall.ERROR_BROKEN_PIPE: - // Returned by pipes when the other end is closed. - err = nil - case syscall.ERROR_OPERATION_ABORTED: - if fd.closing() { - // Close uses CancelIoEx to interrupt concurrent I/O for pipes. - // If the fd is a pipe and the Read was interrupted by CancelIoEx, - // we assume it is interrupted by Close. - err = ErrFileClosing - } + case syscall.ERROR_BROKEN_PIPE: + // ReadFile only documents ERROR_BROKEN_PIPE for pipes. + if fd.kind == kindPipe { + err = io.EOF } } case kindNet: @@ -646,10 +613,8 @@ func (fd *FD) Pread(b []byte, off int64) (int, error) { n, err := execIO(o, func(o *operation) error { return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, &o.o) }) - if err != nil { - if err == syscall.ERROR_HANDLE_EOF { - err = io.EOF - } + if err == syscall.ERROR_HANDLE_EOF { + err = io.EOF } if len(b) != 0 { err = fd.eofError(n, err) @@ -774,12 +739,6 @@ func (fd *FD) Write(buf []byte) (int, error) { return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped()) }) fd.addOffset(n) - if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED && fd.closing() { - // Close uses CancelIoEx to interrupt concurrent I/O for pipes. - // If the fd is a pipe and the Write was interrupted by CancelIoEx, - // we assume it is interrupted by Close. - err = ErrFileClosing - } case kindNet: if race.Enabled { race.ReleaseMerge(unsafe.Pointer(&ioSync)) @@ -1185,10 +1144,10 @@ func (fd *FD) RawRead(f func(uintptr) bool) error { // socket is readable. h/t https://stackoverflow.com/a/42019668/332798 o := &fd.rop o.InitBuf(nil) - if !fd.IsStream { - o.flags |= windows.MSG_PEEK - } _, err := execIO(o, func(o *operation) error { + if !fd.IsStream { + o.flags |= windows.MSG_PEEK + } return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) }) if err == windows.WSAEMSGSIZE { diff --git a/src/internal/poll/fd_windows_test.go b/src/internal/poll/fd_windows_test.go index 3ba915ed41..1378dc9e15 100644 --- a/src/internal/poll/fd_windows_test.go +++ b/src/internal/poll/fd_windows_test.go @@ -7,7 +7,6 @@ package poll_test import ( "bytes" "errors" - "fmt" "internal/poll" "internal/syscall/windows" "io" @@ -22,57 +21,37 @@ import ( "unsafe" ) -type loggedFD struct { - Net int - FD *poll.FD - Err error -} - -var ( - logMu sync.Mutex - loggedFDs map[syscall.Handle]*loggedFD -) - -func logFD(net int, fd *poll.FD, err error) { - logMu.Lock() - defer logMu.Unlock() - - loggedFDs[fd.Sysfd] = &loggedFD{ - Net: net, - FD: fd, - Err: err, - } -} - func init() { - loggedFDs = make(map[syscall.Handle]*loggedFD) - *poll.LogInitFD = logFD - poll.InitWSA() } -func findLoggedFD(h syscall.Handle) (lfd *loggedFD, found bool) { - logMu.Lock() - defer logMu.Unlock() - - lfd, found = loggedFDs[h] - return lfd, found -} - // checkFileIsNotPartOfNetpoll verifies that f is not managed by netpoll. -// It returns error, if check fails. -func checkFileIsNotPartOfNetpoll(f *os.File) error { - lfd, found := findLoggedFD(syscall.Handle(f.Fd())) - if !found { - return fmt.Errorf("%v fd=%v: is not found in the log", f.Name(), f.Fd()) +func checkFileIsNotPartOfNetpoll(t *testing.T, f *os.File) { + t.Helper() + sc, err := f.SyscallConn() + if err != nil { + t.Fatal(err) } - if lfd.FD.IsPartOfNetpoll() { - return fmt.Errorf("%v fd=%v: is part of netpoll, but should not be (logged: net=%v err=%v)", f.Name(), f.Fd(), lfd.Net, lfd.Err) + if err := sc.Control(func(fd uintptr) { + // Only try to associate the file with an IOCP if the handle is opened for overlapped I/O, + // else the association will always fail. + overlapped, err := windows.IsNonblock(syscall.Handle(fd)) + if err != nil { + t.Fatalf("%v fd=%v: %v", f.Name(), fd, err) + } + if overlapped { + // If the file is part of netpoll, then associating it with another IOCP should fail. + if _, err := windows.CreateIoCompletionPort(syscall.Handle(fd), 0, 0, 1); err != nil { + t.Fatalf("%v fd=%v: is part of netpoll, but should not be: %v", f.Name(), fd, err) + } + } + }); err != nil { + t.Fatalf("%v fd=%v: is not initialized", f.Name(), f.Fd()) } - return nil } func TestFileFdsAreInitialised(t *testing.T) { + t.Parallel() exe, err := os.Executable() if err != nil { t.Fatal(err) @@ -83,15 +62,14 @@ func TestFileFdsAreInitialised(t *testing.T) { } defer f.Close() - err = checkFileIsNotPartOfNetpoll(f) - if err != nil { - t.Fatal(err) - } + checkFileIsNotPartOfNetpoll(t, f) } func TestSerialFdsAreInitialised(t *testing.T) { + t.Parallel() for _, name := range []string{"COM1", "COM2", "COM3", "COM4"} { t.Run(name, func(t *testing.T) { + t.Parallel() h, err := syscall.CreateFile(syscall.StringToUTF16Ptr(name), syscall.GENERIC_READ|syscall.GENERIC_WRITE, 0, @@ -113,15 +91,13 @@ func TestSerialFdsAreInitialised(t *testing.T) { f := os.NewFile(uintptr(h), name) defer f.Close() - err = checkFileIsNotPartOfNetpoll(f) - if err != nil { - t.Fatal(err) - } + checkFileIsNotPartOfNetpoll(t, f) }) } } func TestWSASocketConflict(t *testing.T) { + t.Parallel() s, err := windows.WSASocket(syscall.AF_INET, syscall.SOCK_STREAM, syscall.IPPROTO_TCP, nil, 0, windows.WSA_FLAG_OVERLAPPED) if err != nil { t.Fatal(err) @@ -200,8 +176,12 @@ func newFD(t testing.TB, h syscall.Handle, kind string, overlapped, pollable boo err := fd.Init(kind, pollable) if overlapped && err != nil { // Overlapped file handles should not error. + fd.Close() t.Fatal(err) } + t.Cleanup(func() { + fd.Close() + }) return &fd } @@ -221,13 +201,9 @@ func newFile(t testing.TB, name string, overlapped, pollable bool) *poll.FD { if err != nil { t.Fatal(err) } - t.Cleanup(func() { - if err := syscall.CloseHandle(h); err != nil { - t.Fatal(err) - } - }) typ, err := syscall.GetFileType(h) if err != nil { + syscall.CloseHandle(h) t.Fatal(err) } kind := "file" @@ -266,19 +242,14 @@ func newPipe(t testing.TB, name string, message, overlapped, pollable bool) *pol if overlapped { flags |= syscall.FILE_FLAG_OVERLAPPED } - typ := windows.PIPE_TYPE_BYTE + typ := windows.PIPE_TYPE_BYTE | windows.PIPE_READMODE_BYTE if message { - typ = windows.PIPE_TYPE_MESSAGE + typ = windows.PIPE_TYPE_MESSAGE | windows.PIPE_READMODE_MESSAGE } h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil) if err != nil { t.Fatal(err) } - t.Cleanup(func() { - if err := syscall.CloseHandle(h); err != nil { - t.Fatal(err) - } - }) return newFD(t, h, "pipe", overlapped, pollable) } @@ -354,6 +325,29 @@ func testPreadPwrite(t *testing.T, fdr, fdw *poll.FD) { close(write) } +func testFileReadEOF(t *testing.T, f *poll.FD) { + end, err := f.Seek(0, io.SeekEnd) + if err != nil { + t.Fatal(err) + } + var buf [1]byte + n, err := f.Read(buf[:]) + if err != nil && err != io.EOF { + t.Errorf("expected EOF, got %v", err) + } + if n != 0 { + t.Errorf("expected 0 bytes, got %d", n) + } + + n, err = f.Pread(buf[:], end) + if err != nil && err != io.EOF { + t.Errorf("expected EOF, got %v", err) + } + if n != 0 { + t.Errorf("expected 0 bytes, got %d", n) + } +} + func TestFile(t *testing.T) { t.Parallel() tests := []struct { @@ -377,6 +371,7 @@ func TestFile(t *testing.T) { wh := newFile(t, name, tt.overlappedWrite, tt.pollable) testReadWrite(t, rh, wh) testPreadPwrite(t, rh, wh) + testFileReadEOF(t, rh) }) } } @@ -426,20 +421,35 @@ func TestPipe(t *testing.T) { }) } -func TestPipeWriteEOF(t *testing.T) { +func TestPipeMessageReadEOF(t *testing.T) { t.Parallel() name := pipeName() - pipe := newMessagePipe(t, name, false, true) - file := newFile(t, name, false, true) - read := make(chan struct{}, 1) - go func() { - _, err := pipe.Write(nil) - read <- struct{}{} - if err != nil { - t.Error(err) - } - }() - <-read + pipe := newMessagePipe(t, name, true, true) + file := newFile(t, name, true, true) + + _, err := pipe.Write(nil) + if err != nil { + t.Error(err) + } + + var buf [10]byte + n, err := file.Read(buf[:]) + if err != io.EOF { + t.Errorf("expected EOF, got %v", err) + } + if n != 0 { + t.Errorf("expected 0 bytes, got %d", n) + } +} + +func TestPipeClosedEOF(t *testing.T) { + t.Parallel() + name := pipeName() + pipe := newBytePipe(t, name, true, false) + file := newFile(t, name, true, true) + + pipe.Close() + var buf [10]byte n, err := file.Read(buf[:]) if err != io.EOF { diff --git a/src/internal/poll/sendfile_windows.go b/src/internal/poll/sendfile_windows.go index 2ae8a8d1d7..a24c36c2d2 100644 --- a/src/internal/poll/sendfile_windows.go +++ b/src/internal/poll/sendfile_windows.go @@ -59,11 +59,11 @@ func SendFile(fd *FD, src syscall.Handle, n int64) (written int64, err error) { chunkSize = n } - o.qty = uint32(chunkSize) o.o.Offset = uint32(curpos) o.o.OffsetHigh = uint32(curpos >> 32) nw, err := execIO(o, func(o *operation) error { + o.qty = uint32(chunkSize) return syscall.TransmitFile(o.fd.Sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND) }) if err != nil { diff --git a/src/internal/syscall/windows/nonblocking_windows.go b/src/internal/syscall/windows/nonblocking_windows.go new file mode 100644 index 0000000000..ec6f520a8e --- /dev/null +++ b/src/internal/syscall/windows/nonblocking_windows.go @@ -0,0 +1,21 @@ +// Copyright 2025 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package windows + +import ( + "syscall" + "unsafe" +) + +// IsNonblock returns whether the file descriptor fd is opened +// in non-blocking mode, that is, the [syscall.FILE_FLAG_OVERLAPPED] flag +// was set when the file was opened. +func IsNonblock(fd syscall.Handle) (nonblocking bool, err error) { + var info FILE_MODE_INFORMATION + if err := NtQueryInformationFile(syscall.Handle(fd), &IO_STATUS_BLOCK{}, unsafe.Pointer(&info), uint32(unsafe.Sizeof(info)), FileModeInformation); err != nil { + return false, err + } + return info.Mode&(FILE_SYNCHRONOUS_IO_ALERT|FILE_SYNCHRONOUS_IO_NONALERT) == 0, nil +} diff --git a/src/internal/syscall/windows/syscall_windows.go b/src/internal/syscall/windows/syscall_windows.go index 2f35d83c44..a34d85df0f 100644 --- a/src/internal/syscall/windows/syscall_windows.go +++ b/src/internal/syscall/windows/syscall_windows.go @@ -511,6 +511,9 @@ const ( PIPE_TYPE_BYTE = 0x00000000 PIPE_TYPE_MESSAGE = 0x00000004 + + PIPE_READMODE_BYTE = 0x00000000 + PIPE_READMODE_MESSAGE = 0x00000002 ) //sys CreateIoCompletionPort(filehandle syscall.Handle, cphandle syscall.Handle, key uintptr, threadcnt uint32) (handle syscall.Handle, err error)