Skip to content

Commit 656b5b3

Browse files
committed
internal/poll: don't skip empty writes on Windows
Empty writes might be important for some protocols. Let Windows decide what do with them rather than skipping them on our side. This is inline with the behavior of other platforms. While here, refactor the Read/Write/Pwrite methods to reduce one indentation level and make the code easier to read. Fixes golang#73084. Change-Id: Ic5393358e237d53b8be6097cd7359ac0ff205309 Reviewed-on: https://go-review.googlesource.com/c/go/+/661435 Reviewed-by: Dmitri Shuralyov <[email protected]> Reviewed-by: Damien Neil <[email protected]> LUCI-TryBot-Result: Go LUCI <[email protected]>
1 parent e6c2e12 commit 656b5b3

File tree

3 files changed

+99
-75
lines changed

3 files changed

+99
-75
lines changed

src/internal/poll/fd_windows.go

Lines changed: 65 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -434,43 +434,40 @@ func (fd *FD) Read(buf []byte) (int, error) {
434434
return 0, err
435435
}
436436
defer fd.readUnlock()
437+
if fd.isFile {
438+
fd.l.Lock()
439+
defer fd.l.Unlock()
440+
}
437441

438442
if len(buf) > maxRW {
439443
buf = buf[:maxRW]
440444
}
441445

442446
var n int
443447
var err error
444-
if fd.isFile {
445-
fd.l.Lock()
446-
defer fd.l.Unlock()
447-
switch fd.kind {
448-
case kindConsole:
449-
n, err = fd.readConsole(buf)
450-
default:
451-
o := &fd.rop
452-
o.InitBuf(buf)
453-
n, err = execIO(o, func(o *operation) error {
454-
return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
455-
})
456-
fd.addOffset(n)
457-
if fd.kind == kindPipe && err != nil {
458-
switch err {
459-
case syscall.ERROR_BROKEN_PIPE:
460-
// Returned by pipes when the other end is closed.
461-
err = nil
462-
case syscall.ERROR_OPERATION_ABORTED:
463-
// Close uses CancelIoEx to interrupt concurrent I/O for pipes.
464-
// If the fd is a pipe and the Read was interrupted by CancelIoEx,
465-
// we assume it is interrupted by Close.
466-
err = ErrFileClosing
467-
}
448+
switch fd.kind {
449+
case kindConsole:
450+
n, err = fd.readConsole(buf)
451+
case kindFile, kindPipe:
452+
o := &fd.rop
453+
o.InitBuf(buf)
454+
n, err = execIO(o, func(o *operation) error {
455+
return syscall.ReadFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
456+
})
457+
fd.addOffset(n)
458+
if fd.kind == kindPipe && err != nil {
459+
switch err {
460+
case syscall.ERROR_BROKEN_PIPE:
461+
// Returned by pipes when the other end is closed.
462+
err = nil
463+
case syscall.ERROR_OPERATION_ABORTED:
464+
// Close uses CancelIoEx to interrupt concurrent I/O for pipes.
465+
// If the fd is a pipe and the Read was interrupted by CancelIoEx,
466+
// we assume it is interrupted by Close.
467+
err = ErrFileClosing
468468
}
469469
}
470-
if err != nil {
471-
n = 0
472-
}
473-
} else {
470+
case kindNet:
474471
o := &fd.rop
475472
o.InitBuf(buf)
476473
n, err = execIO(o, func(o *operation) error {
@@ -701,36 +698,32 @@ func (fd *FD) Write(buf []byte) (int, error) {
701698
defer fd.l.Unlock()
702699
}
703700

704-
ntotal := 0
705-
for len(buf) > 0 {
706-
b := buf
707-
if len(b) > maxRW {
708-
b = b[:maxRW]
701+
var ntotal int
702+
for {
703+
max := len(buf)
704+
if max-ntotal > maxRW {
705+
max = ntotal + maxRW
709706
}
707+
b := buf[ntotal:max]
710708
var n int
711709
var err error
712-
if fd.isFile {
713-
switch fd.kind {
714-
case kindConsole:
715-
n, err = fd.writeConsole(b)
716-
default:
717-
o := &fd.wop
718-
o.InitBuf(b)
719-
n, err = execIO(o, func(o *operation) error {
720-
return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
721-
})
722-
fd.addOffset(n)
723-
if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
724-
// Close uses CancelIoEx to interrupt concurrent I/O for pipes.
725-
// If the fd is a pipe and the Write was interrupted by CancelIoEx,
726-
// we assume it is interrupted by Close.
727-
err = ErrFileClosing
728-
}
729-
}
730-
if err != nil {
731-
n = 0
710+
switch fd.kind {
711+
case kindConsole:
712+
n, err = fd.writeConsole(b)
713+
case kindPipe, kindFile:
714+
o := &fd.wop
715+
o.InitBuf(b)
716+
n, err = execIO(o, func(o *operation) error {
717+
return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, o.overlapped())
718+
})
719+
fd.addOffset(n)
720+
if fd.kind == kindPipe && err == syscall.ERROR_OPERATION_ABORTED {
721+
// Close uses CancelIoEx to interrupt concurrent I/O for pipes.
722+
// If the fd is a pipe and the Write was interrupted by CancelIoEx,
723+
// we assume it is interrupted by Close.
724+
err = ErrFileClosing
732725
}
733-
} else {
726+
case kindNet:
734727
if race.Enabled {
735728
race.ReleaseMerge(unsafe.Pointer(&ioSync))
736729
}
@@ -741,12 +734,13 @@ func (fd *FD) Write(buf []byte) (int, error) {
741734
})
742735
}
743736
ntotal += n
744-
if err != nil {
737+
if ntotal == len(buf) || err != nil {
745738
return ntotal, err
746739
}
747-
buf = buf[n:]
740+
if n == 0 {
741+
return ntotal, io.ErrUnexpectedEOF
742+
}
748743
}
749-
return ntotal, nil
750744
}
751745

752746
// writeConsole writes len(b) bytes to the console File.
@@ -814,26 +808,29 @@ func (fd *FD) Pwrite(buf []byte, off int64) (int, error) {
814808
defer syscall.Seek(fd.Sysfd, curoffset, io.SeekStart)
815809
defer fd.setOffset(curoffset)
816810

817-
ntotal := 0
818-
for len(buf) > 0 {
819-
b := buf
820-
if len(b) > maxRW {
821-
b = b[:maxRW]
811+
var ntotal int
812+
for {
813+
max := len(buf)
814+
if max-ntotal > maxRW {
815+
max = ntotal + maxRW
822816
}
817+
b := buf[ntotal:max]
823818
o := &fd.wop
824819
o.InitBuf(b)
825-
fd.setOffset(off)
820+
fd.setOffset(off + int64(ntotal))
826821
n, err := execIO(o, func(o *operation) error {
827822
return syscall.WriteFile(o.fd.Sysfd, unsafe.Slice(o.buf.Buf, o.buf.Len), &o.qty, &o.o)
828823
})
829-
ntotal += int(n)
830-
if err != nil {
824+
if n > 0 {
825+
ntotal += n
826+
}
827+
if ntotal == len(buf) || err != nil {
831828
return ntotal, err
832829
}
833-
buf = buf[n:]
834-
off += int64(n)
830+
if n == 0 {
831+
return ntotal, io.ErrUnexpectedEOF
832+
}
835833
}
836-
return ntotal, nil
837834
}
838835

839836
// Writev emulates the Unix writev system call.

src/internal/poll/fd_windows_test.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ var currentProces = sync.OnceValue(func() string {
239239

240240
var pipeCounter atomic.Uint64
241241

242-
func newPipe(t testing.TB, overlapped bool) (string, *poll.FD) {
242+
func newPipe(t testing.TB, overlapped, message bool) (string, *poll.FD) {
243243
name := `\\.\pipe\go-internal-poll-test-` + currentProces() + `-` + strconv.FormatUint(pipeCounter.Add(1), 10)
244244
wname, err := syscall.UTF16PtrFromString(name)
245245
if err != nil {
@@ -250,7 +250,11 @@ func newPipe(t testing.TB, overlapped bool) (string, *poll.FD) {
250250
if overlapped {
251251
flags |= syscall.FILE_FLAG_OVERLAPPED
252252
}
253-
h, err := windows.CreateNamedPipe(wname, uint32(flags), windows.PIPE_TYPE_BYTE, 1, 4096, 4096, 0, nil)
253+
typ := windows.PIPE_TYPE_BYTE
254+
if message {
255+
typ = windows.PIPE_TYPE_MESSAGE
256+
}
257+
h, err := windows.CreateNamedPipe(wname, uint32(flags), uint32(typ), 1, 4096, 4096, 0, nil)
254258
if err != nil {
255259
t.Fatal(err)
256260
}
@@ -358,22 +362,22 @@ func TestFile(t *testing.T) {
358362

359363
func TestPipe(t *testing.T) {
360364
t.Run("overlapped", func(t *testing.T) {
361-
name, pipe := newPipe(t, true)
365+
name, pipe := newPipe(t, true, false)
362366
file := newFile(t, name, true)
363367
testReadWrite(t, pipe, file)
364368
})
365369
t.Run("overlapped-write", func(t *testing.T) {
366-
name, pipe := newPipe(t, true)
370+
name, pipe := newPipe(t, true, false)
367371
file := newFile(t, name, false)
368372
testReadWrite(t, file, pipe)
369373
})
370374
t.Run("overlapped-read", func(t *testing.T) {
371-
name, pipe := newPipe(t, false)
375+
name, pipe := newPipe(t, false, false)
372376
file := newFile(t, name, true)
373377
testReadWrite(t, file, pipe)
374378
})
375379
t.Run("sync", func(t *testing.T) {
376-
name, pipe := newPipe(t, false)
380+
name, pipe := newPipe(t, false, false)
377381
file := newFile(t, name, false)
378382
testReadWrite(t, file, pipe)
379383
})
@@ -397,6 +401,28 @@ func TestPipe(t *testing.T) {
397401
})
398402
}
399403

404+
func TestPipeWriteEOF(t *testing.T) {
405+
name, pipe := newPipe(t, false, true)
406+
file := newFile(t, name, false)
407+
read := make(chan struct{}, 1)
408+
go func() {
409+
_, err := pipe.Write(nil)
410+
read <- struct{}{}
411+
if err != nil {
412+
t.Error(err)
413+
}
414+
}()
415+
<-read
416+
var buf [10]byte
417+
n, err := file.Read(buf[:])
418+
if err != io.EOF {
419+
t.Errorf("expected EOF, got %v", err)
420+
}
421+
if n != 0 {
422+
t.Errorf("expected 0 bytes, got %d", n)
423+
}
424+
}
425+
400426
func BenchmarkReadOverlapped(b *testing.B) {
401427
benchmarkRead(b, true)
402428
}

src/internal/syscall/windows/syscall_windows.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,8 @@ const (
509509
PIPE_ACCESS_OUTBOUND = 0x00000002
510510
PIPE_ACCESS_DUPLEX = 0x00000003
511511

512-
PIPE_TYPE_BYTE = 0x00000000
512+
PIPE_TYPE_BYTE = 0x00000000
513+
PIPE_TYPE_MESSAGE = 0x00000004
513514
)
514515

515516
//sys GetOverlappedResult(handle syscall.Handle, overlapped *syscall.Overlapped, done *uint32, wait bool) (err error)

0 commit comments

Comments
 (0)