diff --git a/std/net/curl.d b/std/net/curl.d index 1706a2c65ba..3c91a5bcd18 100644 --- a/std/net/curl.d +++ b/std/net/curl.d @@ -159,7 +159,6 @@ module std.net.curl; public import etc.c.curl : CurlOption; import core.time : dur; import etc.c.curl : CURLcode; -import std.concurrency : Tid; import std.range.primitives; import std.encoding : EncodingScheme; import std.traits : isSomeChar; @@ -171,6 +170,8 @@ version(StdUnittest) private struct TestServer { + import std.concurrency : Tid; + import std.socket : Socket, TcpSocket; string addr() { return _addr; } @@ -1566,35 +1567,6 @@ private mixin template WorkerThreadProtocol(Unit, alias units) } } -// @@@@BUG 15831@@@@ -// this should be inside byLineAsync -// Range that reads one line at a time asynchronously. -private static struct AsyncLineInputRange(Char) -{ - private Char[] line; - mixin WorkerThreadProtocol!(Char, line); - - private Tid workerTid; - private State running; - - private this(Tid tid, size_t transmitBuffers, size_t bufferSize) - { - import std.concurrency : send; - - workerTid = tid; - state = State.needUnits; - - // Send buffers to other thread for it to use. Since no mechanism is in - // place for moving ownership a cast to shared is done here and casted - // back to non-shared in the receiving end. - foreach (i ; 0 .. transmitBuffers) - { - auto arr = new Char[](bufferSize); - workerTid.send(cast(immutable(Char[]))arr); - } - } -} - /** HTTP/FTP fetch content as a range of lines asynchronously. * * A range of lines is returned immediately and the request that fetches the @@ -1612,7 +1584,7 @@ private static struct AsyncLineInputRange(Char) * * If no data is available and the main thread accesses the range it will block * until data becomes available. An exception to this is the $(D wait(Duration)) method on - * the $(LREF AsyncLineInputRange). This method will wait at maximum for the + * the $(LREF LineInputRange). This method will wait at maximum for the * specified duration and return true if data is available. * * Example: @@ -1674,15 +1646,15 @@ if (isCurlConn!Conn && isSomeChar!Char && isSomeChar!Terminator) import std.concurrency; // 50 is just an arbitrary number for now setMaxMailboxSize(thisTid, 50, OnCrowding.block); - auto tid = spawn(&_spawnAsync!(Conn, Char, Terminator)); + auto tid = spawn(&_async!().spawn!(Conn, Char, Terminator)); tid.send(thisTid); tid.send(terminator); tid.send(keepTerminator == Yes.keepTerminator); - _asyncDuplicateConnection(url, conn, postData, tid); + _async!().duplicateConnection(url, conn, postData, tid); - return AsyncLineInputRange!Char(tid, transmitBuffers, - Conn.defaultAsyncStringBufferSize); + return _async!().LineInputRange!Char(tid, transmitBuffers, + Conn.defaultAsyncStringBufferSize); } } @@ -1722,35 +1694,6 @@ auto byLineAsync(Conn = AutoProtocol, Terminator = char, Char = char) } } -// @@@@BUG 15831@@@@ -// this should be inside byLineAsync -// Range that reads one chunk at a time asynchronously. -private static struct AsyncChunkInputRange -{ - import std.concurrency : Tid, send; - - private ubyte[] chunk; - mixin WorkerThreadProtocol!(ubyte, chunk); - - private Tid workerTid; - private State running; - - private this(Tid tid, size_t transmitBuffers, size_t chunkSize) - { - workerTid = tid; - state = State.needUnits; - - // Send buffers to other thread for it to use. Since no mechanism is in - // place for moving ownership a cast to shared is done here and a cast - // back to non-shared in the receiving end. - foreach (i ; 0 .. transmitBuffers) - { - ubyte[] arr = new ubyte[](chunkSize); - workerTid.send(cast(immutable(ubyte[]))arr); - } - } -} - /** HTTP/FTP fetch content as a range of chunks asynchronously. * * A range of chunks is returned immediately and the request that fetches the @@ -1768,7 +1711,7 @@ private static struct AsyncChunkInputRange * * If no data is available and the main thread access the range it will block * until data becomes available. An exception to this is the $(D wait(Duration)) - * method on the $(LREF AsyncChunkInputRange). This method will wait at maximum for the specified + * method on the $(LREF ChunkInputRange). This method will wait at maximum for the specified * duration and return true if data is available. * * Example: @@ -1827,12 +1770,12 @@ if (isCurlConn!(Conn)) import std.concurrency; // 50 is just an arbitrary number for now setMaxMailboxSize(thisTid, 50, OnCrowding.block); - auto tid = spawn(&_spawnAsync!(Conn, ubyte)); + auto tid = spawn(&_async!().spawn!(Conn, ubyte)); tid.send(thisTid); - _asyncDuplicateConnection(url, conn, postData, tid); + _async!().duplicateConnection(url, conn, postData, tid); - return AsyncChunkInputRange(tid, transmitBuffers, chunkSize); + return _async!().ChunkInputRange(tid, transmitBuffers, chunkSize); } } @@ -1874,54 +1817,6 @@ if (isCurlConn!(Conn)) } -/* Used by byLineAsync/byChunkAsync to duplicate an existing connection - * that can be used exclusively in a spawned thread. - */ -private void _asyncDuplicateConnection(Conn, PostData) - (const(char)[] url, Conn conn, PostData postData, Tid tid) -{ - import std.concurrency : send; - import std.exception : enforce; - - // no move semantic available in std.concurrency ie. must use casting. - auto connDup = conn.dup(); - connDup.url = url; - - static if ( is(Conn : HTTP) ) - { - connDup.p.headersOut = null; - connDup.method = conn.method == HTTP.Method.undefined ? - HTTP.Method.get : conn.method; - if (postData !is null) - { - if (connDup.method == HTTP.Method.put) - { - connDup.handle.set(CurlOption.infilesize_large, - postData.length); - } - else - { - // post - connDup.method = HTTP.Method.post; - connDup.handle.set(CurlOption.postfieldsize_large, - postData.length); - } - connDup.handle.set(CurlOption.copypostfields, - cast(void*) postData.ptr); - } - tid.send(cast(ulong) connDup.handle.handle); - tid.send(connDup.method); - } - else - { - enforce!CurlException(postData is null, - "Cannot put ftp data using byLineAsync()"); - tid.send(cast(ulong) connDup.handle.handle); - tid.send(HTTP.Method.undefined); - } - connDup.p.curl.handle = null; // make sure handle is not freed -} - /* Mixin template for all supported curl protocols. This is the commom functionallity such as timeouts and network interface settings. This should @@ -4883,285 +4778,397 @@ private struct Pool(Data) } } -// Shared function for reading incoming chunks of data and -// sending the to a parent thread -private static size_t _receiveAsyncChunks(ubyte[] data, ref ubyte[] outdata, - Pool!(ubyte[]) freeBuffers, - ref ubyte[] buffer, Tid fromTid, - ref bool aborted) +// Lazily-instantiated namespace to avoid importing std.concurrency until needed. +private struct _async() { - import std.concurrency : receive, send, thisTid; +static: + // @@@@BUG 15831@@@@ + // this should be inside byLineAsync + // Range that reads one chunk at a time asynchronously. + private struct ChunkInputRange + { + import std.concurrency : Tid, send; - immutable datalen = data.length; + private ubyte[] chunk; + mixin WorkerThreadProtocol!(ubyte, chunk); - // Copy data to fill active buffer - while (!data.empty) - { + private Tid workerTid; + private State running; - // Make sure a buffer is present - while ( outdata.empty && freeBuffers.empty) - { - // Active buffer is invalid and there are no - // available buffers in the pool. Wait for buffers - // to return from main thread in order to reuse - // them. - receive((immutable(ubyte)[] buf) - { - buffer = cast(ubyte[]) buf; - outdata = buffer[]; - }, - (bool flag) { aborted = true; } - ); - if (aborted) return cast(size_t) 0; - } - if (outdata.empty) + private this(Tid tid, size_t transmitBuffers, size_t chunkSize) { - buffer = freeBuffers.pop(); - outdata = buffer[]; + workerTid = tid; + state = State.needUnits; + + // Send buffers to other thread for it to use. Since no mechanism is in + // place for moving ownership a cast to shared is done here and a cast + // back to non-shared in the receiving end. + foreach (i ; 0 .. transmitBuffers) + { + ubyte[] arr = new ubyte[](chunkSize); + workerTid.send(cast(immutable(ubyte[]))arr); + } } + } - // Copy data - auto copyBytes = outdata.length < data.length ? - outdata.length : data.length; + // @@@@BUG 15831@@@@ + // this should be inside byLineAsync + // Range that reads one line at a time asynchronously. + private static struct LineInputRange(Char) + { + private Char[] line; + mixin WorkerThreadProtocol!(Char, line); - outdata[0 .. copyBytes] = data[0 .. copyBytes]; - outdata = outdata[copyBytes..$]; - data = data[copyBytes..$]; + private Tid workerTid; + private State running; - if (outdata.empty) - fromTid.send(thisTid, curlMessage(cast(immutable(ubyte)[])buffer)); - } + private this(Tid tid, size_t transmitBuffers, size_t bufferSize) + { + import std.concurrency : send; - return datalen; -} + workerTid = tid; + state = State.needUnits; -// ditto -private static void _finalizeAsyncChunks(ubyte[] outdata, ref ubyte[] buffer, - Tid fromTid) -{ - import std.concurrency : send, thisTid; - if (!outdata.empty) - { - // Resize the last buffer - buffer.length = buffer.length - outdata.length; - fromTid.send(thisTid, curlMessage(cast(immutable(ubyte)[])buffer)); + // Send buffers to other thread for it to use. Since no mechanism is in + // place for moving ownership a cast to shared is done here and casted + // back to non-shared in the receiving end. + foreach (i ; 0 .. transmitBuffers) + { + auto arr = new Char[](bufferSize); + workerTid.send(cast(immutable(Char[]))arr); + } + } } -} + import std.concurrency : Tid; -// Shared function for reading incoming lines of data and sending the to a -// parent thread -private static size_t _receiveAsyncLines(Terminator, Unit) - (const(ubyte)[] data, ref EncodingScheme encodingScheme, - bool keepTerminator, Terminator terminator, - ref const(ubyte)[] leftOverBytes, ref bool bufferValid, - ref Pool!(Unit[]) freeBuffers, ref Unit[] buffer, - Tid fromTid, ref bool aborted) -{ - import std.concurrency : prioritySend, receive, send, thisTid; - import std.exception : enforce; - import std.format : format; - import std.traits : isArray; + // Shared function for reading incoming chunks of data and + // sending the to a parent thread + private size_t receiveChunks(ubyte[] data, ref ubyte[] outdata, + Pool!(ubyte[]) freeBuffers, + ref ubyte[] buffer, Tid fromTid, + ref bool aborted) + { + import std.concurrency : receive, send, thisTid; - immutable datalen = data.length; + immutable datalen = data.length; - // Terminator is specified and buffers should be resized as determined by - // the terminator + // Copy data to fill active buffer + while (!data.empty) + { - // Copy data to active buffer until terminator is found. + // Make sure a buffer is present + while ( outdata.empty && freeBuffers.empty) + { + // Active buffer is invalid and there are no + // available buffers in the pool. Wait for buffers + // to return from main thread in order to reuse + // them. + receive((immutable(ubyte)[] buf) + { + buffer = cast(ubyte[]) buf; + outdata = buffer[]; + }, + (bool flag) { aborted = true; } + ); + if (aborted) return cast(size_t) 0; + } + if (outdata.empty) + { + buffer = freeBuffers.pop(); + outdata = buffer[]; + } - // Decode as many lines as possible - while (true) - { + // Copy data + auto copyBytes = outdata.length < data.length ? + outdata.length : data.length; - // Make sure a buffer is present - while (!bufferValid && freeBuffers.empty) - { - // Active buffer is invalid and there are no available buffers in - // the pool. Wait for buffers to return from main thread in order to - // reuse them. - receive((immutable(Unit)[] buf) - { - buffer = cast(Unit[]) buf; - buffer.length = 0; - buffer.assumeSafeAppend(); - bufferValid = true; - }, - (bool flag) { aborted = true; } - ); - if (aborted) return cast(size_t) 0; + outdata[0 .. copyBytes] = data[0 .. copyBytes]; + outdata = outdata[copyBytes..$]; + data = data[copyBytes..$]; + + if (outdata.empty) + fromTid.send(thisTid, curlMessage(cast(immutable(ubyte)[])buffer)); } - if (!bufferValid) + + return datalen; + } + + // ditto + private void finalizeChunks(ubyte[] outdata, ref ubyte[] buffer, + Tid fromTid) + { + import std.concurrency : send, thisTid; + if (!outdata.empty) { - buffer = freeBuffers.pop(); - bufferValid = true; + // Resize the last buffer + buffer.length = buffer.length - outdata.length; + fromTid.send(thisTid, curlMessage(cast(immutable(ubyte)[])buffer)); } + } - // Try to read a line from left over bytes from last onReceive plus the - // newly received bytes. - try + + // Shared function for reading incoming lines of data and sending the to a + // parent thread + private static size_t receiveLines(Terminator, Unit) + (const(ubyte)[] data, ref EncodingScheme encodingScheme, + bool keepTerminator, Terminator terminator, + ref const(ubyte)[] leftOverBytes, ref bool bufferValid, + ref Pool!(Unit[]) freeBuffers, ref Unit[] buffer, + Tid fromTid, ref bool aborted) + { + import std.concurrency : prioritySend, receive, send, thisTid; + import std.exception : enforce; + import std.format : format; + import std.traits : isArray; + + immutable datalen = data.length; + + // Terminator is specified and buffers should be resized as determined by + // the terminator + + // Copy data to active buffer until terminator is found. + + // Decode as many lines as possible + while (true) { - if (decodeLineInto(leftOverBytes, data, buffer, - encodingScheme, terminator)) + + // Make sure a buffer is present + while (!bufferValid && freeBuffers.empty) { - if (keepTerminator) + // Active buffer is invalid and there are no available buffers in + // the pool. Wait for buffers to return from main thread in order to + // reuse them. + receive((immutable(Unit)[] buf) + { + buffer = cast(Unit[]) buf; + buffer.length = 0; + buffer.assumeSafeAppend(); + bufferValid = true; + }, + (bool flag) { aborted = true; } + ); + if (aborted) return cast(size_t) 0; + } + if (!bufferValid) + { + buffer = freeBuffers.pop(); + bufferValid = true; + } + + // Try to read a line from left over bytes from last onReceive plus the + // newly received bytes. + try + { + if (decodeLineInto(leftOverBytes, data, buffer, + encodingScheme, terminator)) { - fromTid.send(thisTid, - curlMessage(cast(immutable(Unit)[])buffer)); + if (keepTerminator) + { + fromTid.send(thisTid, + curlMessage(cast(immutable(Unit)[])buffer)); + } + else + { + static if (isArray!Terminator) + fromTid.send(thisTid, + curlMessage(cast(immutable(Unit)[]) + buffer[0..$-terminator.length])); + else + fromTid.send(thisTid, + curlMessage(cast(immutable(Unit)[]) + buffer[0..$-1])); + } + bufferValid = false; } else { - static if (isArray!Terminator) - fromTid.send(thisTid, - curlMessage(cast(immutable(Unit)[]) - buffer[0..$-terminator.length])); - else - fromTid.send(thisTid, - curlMessage(cast(immutable(Unit)[]) - buffer[0..$-1])); + // Could not decode an entire line. Save + // bytes left in data for next call to + // onReceive. Can be up to a max of 4 bytes. + enforce!CurlException(data.length <= 4, + format( + "Too many bytes left not decoded %s"~ + " > 4. Maybe the charset specified in"~ + " headers does not match "~ + "the actual content downloaded?", + data.length)); + leftOverBytes ~= data; + break; } - bufferValid = false; } - else + catch (CurlException ex) { - // Could not decode an entire line. Save - // bytes left in data for next call to - // onReceive. Can be up to a max of 4 bytes. - enforce!CurlException(data.length <= 4, - format( - "Too many bytes left not decoded %s"~ - " > 4. Maybe the charset specified in"~ - " headers does not match "~ - "the actual content downloaded?", - data.length)); - leftOverBytes ~= data; - break; + prioritySend(fromTid, cast(immutable(CurlException))ex); + return cast(size_t) 0; } } - catch (CurlException ex) - { - prioritySend(fromTid, cast(immutable(CurlException))ex); - return cast(size_t) 0; - } + return datalen; } - return datalen; -} - -// ditto -private static -void _finalizeAsyncLines(Unit)(bool bufferValid, Unit[] buffer, Tid fromTid) -{ - import std.concurrency : send, thisTid; - if (bufferValid && buffer.length != 0) - fromTid.send(thisTid, curlMessage(cast(immutable(Unit)[])buffer[0..$])); -} + // ditto + private static + void finalizeLines(Unit)(bool bufferValid, Unit[] buffer, Tid fromTid) + { + import std.concurrency : send, thisTid; + if (bufferValid && buffer.length != 0) + fromTid.send(thisTid, curlMessage(cast(immutable(Unit)[])buffer[0..$])); + } -// Spawn a thread for handling the reading of incoming data in the -// background while the delegate is executing. This will optimize -// throughput by allowing simultaneous input (this struct) and -// output (e.g. AsyncHTTPLineOutputRange). -private static void _spawnAsync(Conn, Unit, Terminator = void)() -{ - import std.concurrency : prioritySend, receiveOnly, send, thisTid; - import etc.c.curl : CURL, CurlError; - Tid fromTid = receiveOnly!Tid(); + /* Used by byLineAsync/byChunkAsync to duplicate an existing connection + * that can be used exclusively in a spawned thread. + */ + private void duplicateConnection(Conn, PostData) + (const(char)[] url, Conn conn, PostData postData, Tid tid) + { + import std.concurrency : send; + import std.exception : enforce; - // Get buffer to read into - Pool!(Unit[]) freeBuffers; // Free list of buffer objects + // no move semantic available in std.concurrency ie. must use casting. + auto connDup = conn.dup(); + connDup.url = url; - // Number of bytes filled into active buffer - Unit[] buffer; - bool aborted = false; + static if ( is(Conn : HTTP) ) + { + connDup.p.headersOut = null; + connDup.method = conn.method == HTTP.Method.undefined ? + HTTP.Method.get : conn.method; + if (postData !is null) + { + if (connDup.method == HTTP.Method.put) + { + connDup.handle.set(CurlOption.infilesize_large, + postData.length); + } + else + { + // post + connDup.method = HTTP.Method.post; + connDup.handle.set(CurlOption.postfieldsize_large, + postData.length); + } + connDup.handle.set(CurlOption.copypostfields, + cast(void*) postData.ptr); + } + tid.send(cast(ulong) connDup.handle.handle); + tid.send(connDup.method); + } + else + { + enforce!CurlException(postData is null, + "Cannot put ftp data using byLineAsync()"); + tid.send(cast(ulong) connDup.handle.handle); + tid.send(HTTP.Method.undefined); + } + connDup.p.curl.handle = null; // make sure handle is not freed + } - EncodingScheme encodingScheme; - static if ( !is(Terminator == void)) + // Spawn a thread for handling the reading of incoming data in the + // background while the delegate is executing. This will optimize + // throughput by allowing simultaneous input (this struct) and + // output (e.g. AsyncHTTPLineOutputRange). + private static void spawn(Conn, Unit, Terminator = void)() { - // Only lines reading will receive a terminator - const terminator = receiveOnly!Terminator(); - const keepTerminator = receiveOnly!bool(); + import std.concurrency : Tid, prioritySend, receiveOnly, send, thisTid; + import etc.c.curl : CURL, CurlError; + Tid fromTid = receiveOnly!Tid(); - // max number of bytes to carry over from an onReceive - // callback. This is 4 because it is the max code units to - // decode a code point in the supported encodings. - auto leftOverBytes = new const(ubyte)[4]; - leftOverBytes.length = 0; - auto bufferValid = false; - } - else - { - Unit[] outdata; - } + // Get buffer to read into + Pool!(Unit[]) freeBuffers; // Free list of buffer objects - // no move semantic available in std.concurrency ie. must use casting. - auto connDup = cast(CURL*) receiveOnly!ulong(); - auto client = Conn(); - client.p.curl.handle = connDup; + // Number of bytes filled into active buffer + Unit[] buffer; + bool aborted = false; - // receive a method for both ftp and http but just use it for http - auto method = receiveOnly!(HTTP.Method)(); - - client.onReceive = (ubyte[] data) - { - // If no terminator is specified the chunk size is fixed. - static if ( is(Terminator == void) ) - return _receiveAsyncChunks(data, outdata, freeBuffers, buffer, - fromTid, aborted); + EncodingScheme encodingScheme; + static if ( !is(Terminator == void)) + { + // Only lines reading will receive a terminator + const terminator = receiveOnly!Terminator(); + const keepTerminator = receiveOnly!bool(); + + // max number of bytes to carry over from an onReceive + // callback. This is 4 because it is the max code units to + // decode a code point in the supported encodings. + auto leftOverBytes = new const(ubyte)[4]; + leftOverBytes.length = 0; + auto bufferValid = false; + } else - return _receiveAsyncLines(data, encodingScheme, - keepTerminator, terminator, leftOverBytes, - bufferValid, freeBuffers, buffer, - fromTid, aborted); - }; + { + Unit[] outdata; + } - static if ( is(Conn == HTTP) ) - { - client.method = method; - // register dummy header handler - client.onReceiveHeader = (in char[] key, in char[] value) + // no move semantic available in std.concurrency ie. must use casting. + auto connDup = cast(CURL*) receiveOnly!ulong(); + auto client = Conn(); + client.p.curl.handle = connDup; + + // receive a method for both ftp and http but just use it for http + auto method = receiveOnly!(HTTP.Method)(); + + client.onReceive = (ubyte[] data) { - if (key == "content-type") - encodingScheme = EncodingScheme.create(client.p.charset); + // If no terminator is specified the chunk size is fixed. + static if ( is(Terminator == void) ) + return receiveChunks(data, outdata, freeBuffers, buffer, + fromTid, aborted); + else + return receiveLines(data, encodingScheme, + keepTerminator, terminator, leftOverBytes, + bufferValid, freeBuffers, buffer, + fromTid, aborted); }; - } - else - { - encodingScheme = EncodingScheme.create(client.encoding); - } - // Start the request - CurlCode code; - try - { - code = client.perform(No.throwOnError); - } - catch (Exception ex) - { - prioritySend(fromTid, cast(immutable(Exception)) ex); - fromTid.send(thisTid, curlMessage(true)); // signal done - return; - } + static if ( is(Conn == HTTP) ) + { + client.method = method; + // register dummy header handler + client.onReceiveHeader = (in char[] key, in char[] value) + { + if (key == "content-type") + encodingScheme = EncodingScheme.create(client.p.charset); + }; + } + else + { + encodingScheme = EncodingScheme.create(client.encoding); + } - if (code != CurlError.ok) - { - if (aborted && (code == CurlError.aborted_by_callback || - code == CurlError.write_error)) + // Start the request + CurlCode code; + try + { + code = client.perform(No.throwOnError); + } + catch (Exception ex) { + prioritySend(fromTid, cast(immutable(Exception)) ex); fromTid.send(thisTid, curlMessage(true)); // signal done return; } - prioritySend(fromTid, cast(immutable(CurlException)) - new CurlException(client.p.curl.errorString(code))); - fromTid.send(thisTid, curlMessage(true)); // signal done - return; - } + if (code != CurlError.ok) + { + if (aborted && (code == CurlError.aborted_by_callback || + code == CurlError.write_error)) + { + fromTid.send(thisTid, curlMessage(true)); // signal done + return; + } + prioritySend(fromTid, cast(immutable(CurlException)) + new CurlException(client.p.curl.errorString(code))); - // Send remaining data that is not a full chunk size - static if ( is(Terminator == void) ) - _finalizeAsyncChunks(outdata, buffer, fromTid); - else - _finalizeAsyncLines(bufferValid, buffer, fromTid); + fromTid.send(thisTid, curlMessage(true)); // signal done + return; + } + + // Send remaining data that is not a full chunk size + static if ( is(Terminator == void) ) + finalizeChunks(outdata, buffer, fromTid); + else + finalizeLines(bufferValid, buffer, fromTid); - fromTid.send(thisTid, curlMessage(true)); // signal done + fromTid.send(thisTid, curlMessage(true)); // signal done + } }