Skip to content

Commit 7626907

Browse files
authored
ARC works for async on Windows (#13179)
1 parent 796aafe commit 7626907

File tree

5 files changed

+110
-40
lines changed

5 files changed

+110
-40
lines changed

compiler/types.nim

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,8 @@ proc canFormAcycleAux(marker: var IntSet, typ: PType, startId: int): bool =
366366
else: discard
367367

368368
proc isFinal*(t: PType): bool =
369-
var t = t.skipTypes(abstractInst)
370-
result = t.kind != tyObject or tfFinal in t.flags
369+
let t = t.skipTypes(abstractInst)
370+
result = t.kind != tyObject or tfFinal in t.flags or isPureObject(t)
371371

372372
proc canFormAcycle*(typ: PType): bool =
373373
var marker = initIntSet()

lib/pure/asyncdispatch.nim

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -247,18 +247,18 @@ when defined(windows) or defined(nimdoc):
247247
ioPort: Handle
248248
handles: HashSet[AsyncFD]
249249

250-
CustomOverlapped = object of OVERLAPPED
250+
CustomObj = object of OVERLAPPED
251251
data*: CompletionData
252252

253-
PCustomOverlapped* = ref CustomOverlapped
253+
CustomRef* = ref CustomObj
254254

255255
AsyncFD* = distinct int
256256

257257
PostCallbackData = object
258258
ioPort: Handle
259259
handleFd: AsyncFD
260260
waitFd: Handle
261-
ovl: owned PCustomOverlapped
261+
ovl: owned CustomRef
262262
PostCallbackDataPtr = ptr PostCallbackData
263263

264264
AsyncEventImpl = object
@@ -336,13 +336,15 @@ when defined(windows) or defined(nimdoc):
336336
337337
var lpNumberOfBytesTransferred: DWORD
338338
var lpCompletionKey: ULONG_PTR
339-
var customOverlapped: PCustomOverlapped
339+
var customOverlapped: CustomRef
340340
let res = getQueuedCompletionStatus(p.ioPort,
341341
addr lpNumberOfBytesTransferred, addr lpCompletionKey,
342342
cast[ptr POVERLAPPED](addr customOverlapped), llTimeout).bool
343343
result = true
344-
when defined(gcDestructors):
345-
GC_ref(customOverlapped)
344+
# For 'gcDestructors' the destructor of 'customOverlapped' will
345+
# be called at the end and we are the only owner here. This means
346+
# We do not have to 'GC_unref(customOverlapped)' because the destructor
347+
# does that for us.
346348
347349
# http://stackoverflow.com/a/12277264/492186
348350
# TODO: http://www.serverframework.com/handling-multiple-pending-socket-read-and-write-operations.html
@@ -359,7 +361,8 @@ when defined(windows) or defined(nimdoc):
359361
if customOverlapped.data.cell.data != nil:
360362
system.dispose(customOverlapped.data.cell)
361363
362-
GC_unref(customOverlapped)
364+
when not defined(gcDestructors):
365+
GC_unref(customOverlapped)
363366
else:
364367
let errCode = osLastError()
365368
if customOverlapped != nil:
@@ -368,7 +371,8 @@ when defined(windows) or defined(nimdoc):
368371
lpNumberOfBytesTransferred, errCode)
369372
if customOverlapped.data.cell.data != nil:
370373
system.dispose(customOverlapped.data.cell)
371-
GC_unref(customOverlapped)
374+
when not defined(gcDestructors):
375+
GC_unref(customOverlapped)
372376
else:
373377
if errCode.int32 == WAIT_TIMEOUT:
374378
# Timed out
@@ -409,6 +413,13 @@ when defined(windows) or defined(nimdoc):
409413
getAcceptExSockAddrs = cast[WSAPROC_GETACCEPTEXSOCKADDRS](fun)
410414
close(dummySock)
411415
416+
proc newCustom*(): CustomRef =
417+
result = CustomRef() # 0
418+
GC_ref(result) # 1 prevent destructor from doing a premature free.
419+
# destructor of newCustom's caller --> 0. This means
420+
# Windows holds a ref for us with RC == 0 (single owner).
421+
# This is passed back to us in the IO completion port.
422+
412423
proc recv*(socket: AsyncFD, size: int,
413424
flags = {SocketFlag.SafeDisconn}): owned(Future[string]) =
414425
## Reads **up to** ``size`` bytes from ``socket``. Returned future will
@@ -435,8 +446,7 @@ when defined(windows) or defined(nimdoc):
435446
436447
var bytesReceived: DWORD
437448
var flagsio = flags.toOSFlags().DWORD
438-
var ol = PCustomOverlapped()
439-
GC_ref(ol)
449+
var ol = newCustom()
440450
ol.data = CompletionData(fd: socket, cb:
441451
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
442452
if not retFuture.finished:
@@ -512,8 +522,7 @@ when defined(windows) or defined(nimdoc):
512522
513523
var bytesReceived: DWORD
514524
var flagsio = flags.toOSFlags().DWORD
515-
var ol = PCustomOverlapped()
516-
GC_ref(ol)
525+
var ol = newCustom()
517526
ol.data = CompletionData(fd: socket, cb:
518527
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
519528
if not retFuture.finished:
@@ -565,8 +574,7 @@ when defined(windows) or defined(nimdoc):
565574
dataBuf.len = size.ULONG
566575
567576
var bytesReceived, lowFlags: DWORD
568-
var ol = PCustomOverlapped()
569-
GC_ref(ol)
577+
var ol = newCustom()
570578
ol.data = CompletionData(fd: socket, cb:
571579
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
572580
if not retFuture.finished:
@@ -616,8 +624,7 @@ when defined(windows) or defined(nimdoc):
616624
zeroMem(addr(staddr[0]), 128)
617625
copyMem(addr(staddr[0]), saddr, saddrLen)
618626

619-
var ol = PCustomOverlapped()
620-
GC_ref(ol)
627+
var ol = newCustom()
621628
ol.data = CompletionData(fd: socket, cb:
622629
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
623630
if not retFuture.finished:
@@ -658,8 +665,7 @@ when defined(windows) or defined(nimdoc):
658665
var bytesReceived = 0.DWORD
659666
var lowFlags = 0.DWORD
660667
661-
var ol = PCustomOverlapped()
662-
GC_ref(ol)
668+
var ol = newCustom()
663669
ol.data = CompletionData(fd: socket, cb:
664670
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
665671
if not retFuture.finished:
@@ -754,8 +760,7 @@ when defined(windows) or defined(nimdoc):
754760
clientSock.close()
755761
retFuture.fail(getCurrentException())
756762

757-
var ol = PCustomOverlapped()
758-
GC_ref(ol)
763+
var ol = newCustom()
759764
ol.data = CompletionData(fd: socket, cb:
760765
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
761766
if not retFuture.finished:
@@ -799,7 +804,7 @@ when defined(windows) or defined(nimdoc):
799804
800805
{.push stackTrace: off.}
801806
proc waitableCallback(param: pointer,
802-
timerOrWaitFired: WINBOOL): void {.stdcall.} =
807+
timerOrWaitFired: WINBOOL) {.stdcall.} =
803808
var p = cast[PostCallbackDataPtr](param)
804809
discard postQueuedCompletionStatus(p.ioPort, timerOrWaitFired.DWORD,
805810
ULONG_PTR(p.handleFd),
@@ -815,8 +820,7 @@ when defined(windows) or defined(nimdoc):
815820
var pcd = cast[PostCallbackDataPtr](allocShared0(sizeof(PostCallbackData)))
816821
pcd.ioPort = p.ioPort
817822
pcd.handleFd = fd
818-
var ol = PCustomOverlapped()
819-
GC_ref(ol)
823+
var ol = newCustom()
820824
821825
ol.data = CompletionData(fd: fd, cb:
822826
proc(fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) {.gcsafe.} =
@@ -931,8 +935,7 @@ when defined(windows) or defined(nimdoc):
931935
let handleFD = AsyncFD(hEvent)
932936
pcd.ioPort = p.ioPort
933937
pcd.handleFd = handleFD
934-
var ol = PCustomOverlapped()
935-
GC_ref(ol)
938+
var ol = newCustom()
936939
ol.data.fd = handleFD
937940
ol.data.cb = handleCallback
938941
# We need to protect our callback environment value, so GC will not free it
@@ -1621,8 +1624,7 @@ when defined(windows) or defined(nimdoc):
16211624
let retFuture = newFuture[void]("doConnect")
16221625
result = retFuture
16231626
1624-
var ol = PCustomOverlapped()
1625-
GC_ref(ol)
1627+
var ol = newCustom()
16261628
ol.data = CompletionData(fd: socket, cb:
16271629
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
16281630
if not retFuture.finished:

lib/pure/asyncfile.nim

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,7 @@ proc readBuffer*(f: AsyncFile, buf: pointer, size: int): Future[int] =
132132
var retFuture = newFuture[int]("asyncfile.readBuffer")
133133

134134
when defined(windows) or defined(nimdoc):
135-
var ol = PCustomOverlapped()
136-
GC_ref(ol)
135+
var ol = newCustom()
137136
ol.data = CompletionData(fd: f.fd, cb:
138137
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
139138
if not retFuture.finished:
@@ -212,8 +211,7 @@ proc read*(f: AsyncFile, size: int): Future[string] =
212211
when defined(windows) or defined(nimdoc):
213212
var buffer = alloc0(size)
214213
215-
var ol = PCustomOverlapped()
216-
GC_ref(ol)
214+
var ol = newCustom()
217215
ol.data = CompletionData(fd: f.fd, cb:
218216
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
219217
if not retFuture.finished:
@@ -340,8 +338,7 @@ proc writeBuffer*(f: AsyncFile, buf: pointer, size: int): Future[void] =
340338
## specified file.
341339
var retFuture = newFuture[void]("asyncfile.writeBuffer")
342340
when defined(windows) or defined(nimdoc):
343-
var ol = PCustomOverlapped()
344-
GC_ref(ol)
341+
var ol = newCustom()
345342
ol.data = CompletionData(fd: f.fd, cb:
346343
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
347344
if not retFuture.finished:
@@ -414,8 +411,7 @@ proc write*(f: AsyncFile, data: string): Future[void] =
414411
var buffer = alloc0(data.len)
415412
copyMem(buffer, addr copy[0], data.len)
416413

417-
var ol = PCustomOverlapped()
418-
GC_ref(ol)
414+
var ol = newCustom()
419415
ol.data = CompletionData(fd: f.fd, cb:
420416
proc (fd: AsyncFD, bytesCount: DWORD, errcode: OSErrorCode) =
421417
if not retFuture.finished:

lib/system/refs_v2.nim

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ proc nimDecWeakRef(p: pointer) {.compilerRtl, inl.} =
8383
8484
proc nimIncRef(p: pointer) {.compilerRtl, inl.} =
8585
inc head(p).rc, rcIncrement
86-
#cprintf("[INCREF] %p\n", p)
86+
when traceCollector:
87+
cprintf("[INCREF] %p\n", head(p))
8788
8889
proc nimRawDispose(p: pointer) {.compilerRtl.} =
8990
when not defined(nimscript):
@@ -131,11 +132,13 @@ proc nimDecRefIsLast(p: pointer): bool {.compilerRtl, inl.} =
131132
var cell = head(p)
132133
if (cell.rc and not rcMask) == 0:
133134
result = true
134-
#cprintf("[DESTROY] %p\n", p)
135+
when traceCollector:
136+
cprintf("[ABOUT TO DESTROY] %p\n", cell)
135137
else:
136138
dec cell.rc, rcIncrement
137139
# According to Lins it's correct to do nothing else here.
138-
#cprintf("[DeCREF] %p\n", p)
140+
when traceCollector:
141+
cprintf("[DeCREF] %p\n", cell)
139142
140143
proc GC_unref*[T](x: ref T) =
141144
## New runtime only supports this operation for 'ref T'.

tests/arc/tasyncawait.nim

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
discard """
2+
output: "5000"
3+
cmd: "nim c --gc:arc $file"
4+
"""
5+
6+
import asyncdispatch, asyncnet, nativesockets, net, strutils, os
7+
8+
var msgCount = 0
9+
10+
const
11+
swarmSize = 50
12+
messagesToSend = 100
13+
14+
var clientCount = 0
15+
16+
proc sendMessages(client: AsyncFD) {.async.} =
17+
for i in 0 ..< messagesToSend:
18+
await send(client, "Message " & $i & "\c\L")
19+
20+
proc launchSwarm(port: Port) {.async.} =
21+
for i in 0 ..< swarmSize:
22+
var sock = createAsyncNativeSocket()
23+
24+
await connect(sock, "localhost", port)
25+
await sendMessages(sock)
26+
closeSocket(sock)
27+
28+
proc readMessages(client: AsyncFD) {.async.} =
29+
# wrapping the AsyncFd into a AsyncSocket object
30+
var sockObj = newAsyncSocket(client)
31+
var (ipaddr, port) = sockObj.getPeerAddr()
32+
doAssert ipaddr == "127.0.0.1"
33+
(ipaddr, port) = sockObj.getLocalAddr()
34+
doAssert ipaddr == "127.0.0.1"
35+
while true:
36+
var line = await recvLine(sockObj)
37+
if line == "":
38+
closeSocket(client)
39+
clientCount.inc
40+
break
41+
else:
42+
if line.startswith("Message "):
43+
msgCount.inc
44+
else:
45+
doAssert false
46+
47+
proc createServer(port: Port) {.async.} =
48+
var server = createAsyncNativeSocket()
49+
block:
50+
var name: Sockaddr_in
51+
name.sin_family = typeof(name.sin_family)(toInt(AF_INET))
52+
name.sin_port = htons(uint16(port))
53+
name.sin_addr.s_addr = htonl(INADDR_ANY)
54+
if bindAddr(server.SocketHandle, cast[ptr SockAddr](addr(name)),
55+
sizeof(name).Socklen) < 0'i32:
56+
raiseOSError(osLastError())
57+
58+
discard server.SocketHandle.listen()
59+
while true:
60+
asyncCheck readMessages(await accept(server))
61+
62+
asyncCheck createServer(Port(10335))
63+
asyncCheck launchSwarm(Port(10335))
64+
while true:
65+
poll()
66+
if clientCount == swarmSize: break
67+
68+
assert msgCount == swarmSize * messagesToSend
69+
echo msgCount

0 commit comments

Comments
 (0)