diff --git a/src/internal/poll/fd_windows.go b/src/internal/poll/fd_windows.go index b330ae21a9..4b2623ea8f 100644 --- a/src/internal/poll/fd_windows.go +++ b/src/internal/poll/fd_windows.go @@ -9,7 +9,6 @@ import ( "internal/race" "internal/syscall/windows" "io" - "runtime" "sync" "syscall" "unicode/utf16" @@ -22,18 +21,6 @@ var ( ioSync uint64 ) -// CancelIo Windows API cancels all outstanding IO for a particular -// socket on current thread. To overcome that limitation, we run -// special goroutine, locked to OS single thread, that both starts -// and cancels IO. It means, there are 2 unavoidable thread switches -// for every IO. -// Some newer versions of Windows has new CancelIoEx API, that does -// not have that limitation and can be used from any thread. This -// package uses CancelIoEx API, if present, otherwise it fallback -// to CancelIo. - -var canCancelIO bool // determines if CancelIoEx API is present - // This package uses the SetFileCompletionNotificationModes Windows // API to skip calling GetQueuedCompletionStatus if an IO operation // completes synchronously. There is a known bug where @@ -72,7 +59,6 @@ func init() { if e != nil { initErr = e } - canCancelIO = syscall.LoadCancelIoEx() == nil checkSetFileCompletionNotificationModes() } @@ -90,7 +76,6 @@ type operation struct { // fields used only by net package fd *FD - errc chan error buf syscall.WSABuf msg windows.WSAMsg sa syscall.Sockaddr @@ -155,46 +140,15 @@ func (o *operation) InitMsg(p []byte, oob []byte) { } } -// ioSrv executes net IO requests. -type ioSrv struct { - req chan ioSrvReq -} - -type ioSrvReq struct { - o *operation - submit func(o *operation) error // if nil, cancel the operation -} - -// ProcessRemoteIO will execute submit IO requests on behalf -// of other goroutines, all on a single os thread, so it can -// cancel them later. Results of all operations will be sent -// back to their requesters via channel supplied in request. -// It is used only when the CancelIoEx API is unavailable. -func (s *ioSrv) ProcessRemoteIO() { - runtime.LockOSThread() - defer runtime.UnlockOSThread() - for r := range s.req { - if r.submit != nil { - r.o.errc <- r.submit(r.o) - } else { - r.o.errc <- syscall.CancelIo(r.o.fd.Sysfd) - } - } -} - -// ExecIO executes a single IO operation o. It submits and cancels +// execIO executes a single IO operation o. It submits and cancels // IO in the current thread for systems where Windows CancelIoEx API // is available. Alternatively, it passes the request onto // runtime netpoll and waits for completion or cancels request. -func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, error) { +func execIO(o *operation, submit func(o *operation) error) (int, error) { if o.fd.pd.runtimeCtx == 0 { return 0, errors.New("internal error: polling on unsupported descriptor type") } - if !canCancelIO { - onceStartServer.Do(startServer) - } - fd := o.fd // Notify runtime netpoll about starting IO. err := fd.pd.prepare(int(o.mode), fd.isFile) @@ -202,14 +156,7 @@ func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, erro return 0, err } // Start IO. - if canCancelIO { - err = submit(o) - } else { - // Send request to a special dedicated thread, - // so it can stop the IO with CancelIO later. - s.req <- ioSrvReq{o, submit} - err = <-o.errc - } + err = submit(o) switch err { case nil: // IO completed immediately @@ -247,16 +194,11 @@ func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, erro panic("unexpected runtime.netpoll error: " + netpollErr.Error()) } // Cancel our request. - if canCancelIO { - err := syscall.CancelIoEx(fd.Sysfd, &o.o) - // Assuming ERROR_NOT_FOUND is returned, if IO is completed. - if err != nil && err != syscall.ERROR_NOT_FOUND { - // TODO(brainman): maybe do something else, but panic. - panic(err) - } - } else { - s.req <- ioSrvReq{o, nil} - <-o.errc + err = syscall.CancelIoEx(fd.Sysfd, &o.o) + // Assuming ERROR_NOT_FOUND is returned, if IO is completed. + if err != nil && err != syscall.ERROR_NOT_FOUND { + // TODO(brainman): maybe do something else, but panic. + panic(err) } // Wait for cancellation to complete. fd.pd.waitCanceled(int(o.mode)) @@ -273,21 +215,6 @@ func (s *ioSrv) ExecIO(o *operation, submit func(o *operation) error) (int, erro return int(o.qty), nil } -// Start helper goroutines. -var rsrv, wsrv ioSrv -var onceStartServer sync.Once - -func startServer() { - // This is called, once, when only the CancelIo API is available. - // Start two special goroutines, both locked to an OS thread, - // that start and cancel IO requests. - // One will process read requests, while the other will do writes. - rsrv.req = make(chan ioSrvReq) - go rsrv.ProcessRemoteIO() - wsrv.req = make(chan ioSrvReq) - go wsrv.ProcessRemoteIO() -} - // FD is a file descriptor. The net and os packages embed this type in // a larger type representing a network connection or OS file. type FD struct { @@ -385,9 +312,9 @@ func (fd *FD) Init(net string, pollable bool) (string, error) { // if the user is doing their own overlapped I/O. // See issue #21172. // - // In general the code below avoids calling the ExecIO - // method for non-network sockets. If some method does - // somehow call ExecIO, then ExecIO, and therefore the + // In general the code below avoids calling the execIO + // function for non-network sockets. If some method does + // somehow call execIO, then execIO, and therefore the // calling method, will return an error, because // fd.pd.runtimeCtx will be 0. err = fd.pd.init(fd) @@ -429,10 +356,6 @@ func (fd *FD) Init(net string, pollable bool) (string, error) { fd.wop.fd = fd fd.rop.runtimeCtx = fd.pd.runtimeCtx fd.wop.runtimeCtx = fd.pd.runtimeCtx - if !canCancelIO { - fd.rop.errc = make(chan error) - fd.wop.errc = make(chan error) - } return "", nil } @@ -515,7 +438,7 @@ func (fd *FD) Read(buf []byte) (int, error) { } else { o := &fd.rop o.InitBuf(buf) - n, err = rsrv.ExecIO(o, func(o *operation) error { + n, err = execIO(o, func(o *operation) error { return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) }) if race.Enabled { @@ -655,7 +578,7 @@ func (fd *FD) ReadFrom(buf []byte) (int, syscall.Sockaddr, error) { defer fd.readUnlock() o := &fd.rop o.InitBuf(buf) - n, err := rsrv.ExecIO(o, func(o *operation) error { + n, err := execIO(o, func(o *operation) error { if o.rsa == nil { o.rsa = new(syscall.RawSockaddrAny) } @@ -711,7 +634,7 @@ func (fd *FD) Write(buf []byte) (int, error) { } o := &fd.wop o.InitBuf(b) - n, err = wsrv.ExecIO(o, func(o *operation) error { + n, err = execIO(o, func(o *operation) error { return syscall.WSASend(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil) }) } @@ -820,7 +743,7 @@ func (fd *FD) Writev(buf *[][]byte) (int64, error) { } o := &fd.wop o.InitBufs(buf) - n, err := wsrv.ExecIO(o, func(o *operation) error { + n, err := execIO(o, func(o *operation) error { return syscall.WSASend(o.fd.Sysfd, &o.bufs[0], uint32(len(o.bufs)), &o.qty, 0, &o.o, nil) }) o.ClearBufs() @@ -841,7 +764,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { o := &fd.wop o.InitBuf(buf) o.sa = sa - n, err := wsrv.ExecIO(o, func(o *operation) error { + n, err := execIO(o, func(o *operation) error { return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil) }) return n, err @@ -856,7 +779,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { o := &fd.wop o.InitBuf(b) o.sa = sa - n, err := wsrv.ExecIO(o, func(o *operation) error { + n, err := execIO(o, func(o *operation) error { return syscall.WSASendto(o.fd.Sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil) }) ntotal += int(n) @@ -874,7 +797,7 @@ func (fd *FD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) { func (fd *FD) ConnectEx(ra syscall.Sockaddr) error { o := &fd.wop o.sa = ra - _, err := wsrv.ExecIO(o, func(o *operation) error { + _, err := execIO(o, func(o *operation) error { return ConnectExFunc(o.fd.Sysfd, o.sa, nil, 0, nil, &o.o) }) return err @@ -884,7 +807,7 @@ func (fd *FD) acceptOne(s syscall.Handle, rawsa []syscall.RawSockaddrAny, o *ope // Submit accept request. o.handle = s o.rsan = int32(unsafe.Sizeof(rawsa[0])) - _, err := rsrv.ExecIO(o, func(o *operation) error { + _, err := execIO(o, func(o *operation) error { return AcceptFunc(o.fd.Sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o) }) if err != nil { @@ -1008,7 +931,7 @@ func (fd *FD) RawRead(f func(uintptr) bool) error { if !fd.IsStream { o.flags |= windows.MSG_PEEK } - _, err := rsrv.ExecIO(o, func(o *operation) error { + _, err := execIO(o, func(o *operation) error { return syscall.WSARecv(o.fd.Sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) }) if err == windows.WSAEMSGSIZE { @@ -1078,7 +1001,7 @@ func (fd *FD) ReadMsg(p []byte, oob []byte) (int, int, int, syscall.Sockaddr, er o.rsa = new(syscall.RawSockaddrAny) o.msg.Name = o.rsa o.msg.Namelen = int32(unsafe.Sizeof(*o.rsa)) - n, err := rsrv.ExecIO(o, func(o *operation) error { + n, err := execIO(o, func(o *operation) error { return windows.WSARecvMsg(o.fd.Sysfd, &o.msg, &o.qty, &o.o, nil) }) err = fd.eofError(n, err) @@ -1110,7 +1033,7 @@ func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, err o.msg.Name = (*syscall.RawSockaddrAny)(rsa) o.msg.Namelen = len } - n, err := wsrv.ExecIO(o, func(o *operation) error { + n, err := execIO(o, func(o *operation) error { return windows.WSASendMsg(o.fd.Sysfd, &o.msg, 0, &o.qty, &o.o, nil) }) return n, int(o.msg.Control.Len), err diff --git a/src/internal/poll/sendfile_windows.go b/src/internal/poll/sendfile_windows.go index 5674af4189..50c3ee86c0 100644 --- a/src/internal/poll/sendfile_windows.go +++ b/src/internal/poll/sendfile_windows.go @@ -57,7 +57,7 @@ func SendFile(fd *FD, src syscall.Handle, n int64) (written int64, err error) { o.o.Offset = uint32(curpos) o.o.OffsetHigh = uint32(curpos >> 32) - nw, err := wsrv.ExecIO(o, func(o *operation) error { + nw, err := execIO(o, func(o *operation) error { return syscall.TransmitFile(o.fd.Sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND) }) if err != nil {