@@ -319,6 +319,13 @@ OutgoingMessage.prototype.assignSocket = function(socket) {
319
319
} ;
320
320
321
321
322
+ OutgoingMessage . prototype . detachSocket = function ( socket ) {
323
+ assert ( socket . _httpMessage == this ) ;
324
+ socket . _httpMessage = null ;
325
+ this . socket = this . connection = null ;
326
+ } ;
327
+
328
+
322
329
OutgoingMessage . prototype . destroy = function ( error ) {
323
330
this . socket . destroy ( error ) ;
324
331
} ;
@@ -343,7 +350,9 @@ OutgoingMessage.prototype._send = function(data, encoding) {
343
350
344
351
345
352
OutgoingMessage . prototype . _writeRaw = function ( data , encoding ) {
346
- if ( this . connection . _httpMessage === this && this . connection . writable ) {
353
+ if ( this . connection &&
354
+ this . connection . _httpMessage === this &&
355
+ this . connection . writable ) {
347
356
// There might be pending data in the this.output buffer.
348
357
while ( this . output . length ) {
349
358
if ( ! this . connection . writable ) {
@@ -602,8 +611,6 @@ OutgoingMessage.prototype.end = function(data, encoding) {
602
611
603
612
604
613
OutgoingMessage . prototype . _finish = function ( ) {
605
- this . socket . _httpMessage = null ;
606
- this . socket = this . connection = null ;
607
614
this . emit ( 'finish' ) ;
608
615
} ;
609
616
@@ -868,6 +875,8 @@ function connectionListener(socket) {
868
875
// When we're finished writing the response, check if this is the last
869
876
// respose, if so destroy the socket.
870
877
res . on ( 'finish' , function ( ) {
878
+ res . detachSocket ( socket ) ;
879
+
871
880
if ( res . _last ) {
872
881
socket . destroySoon ( ) ;
873
882
} else {
@@ -915,37 +924,117 @@ Agent.prototype.appendMessage = function(options) {
915
924
var req = new ClientRequest ( options ) ;
916
925
this . queue . push ( req ) ;
917
926
927
+ /*
918
928
req.on('finish', function () {
919
929
self._cycle();
920
930
});
931
+ */
921
932
922
933
this . _cycle ( ) ;
934
+
935
+ return req ;
923
936
} ;
924
937
925
938
926
- Agent . prototype . _establishNewConnection = function ( socket , message ) {
939
+ Agent . prototype . _establishNewConnection = function ( ) {
927
940
var self = this ;
928
941
assert ( this . sockets . length < this . maxSockets ) ;
929
942
930
943
// Grab a new "socket". Depending on the implementation of _getConnection
931
944
// this could either be a raw TCP socket or a TLS stream.
932
945
var socket = this . _getConnection ( this . host , this . port , function ( ) {
946
+ debug ( "Agent _getConnection callback" ) ;
933
947
self . _cycle ( ) ;
934
948
} ) ;
935
949
936
950
this . sockets . push ( socket ) ;
937
951
952
+ // Add a parser to the socket.
953
+ var parser = parsers . alloc ( ) ;
954
+ parser . reinitialize ( 'response' ) ;
955
+ parser . socket = socket ;
956
+
957
+ socket . ondata = function ( d , start , end ) {
958
+ var ret = parser . execute ( d , start , end - start ) ;
959
+ if ( ret instanceof Error ) {
960
+ debug ( 'parse error' ) ;
961
+ socket . destroy ( ret ) ;
962
+ } else if ( parser . incoming && parser . incoming . upgrade ) {
963
+ var bytesParsed = ret ;
964
+ socket . ondata = null ;
965
+ socket . onend = null ;
966
+
967
+ var res = parser . incoming ;
968
+
969
+ // This is start + byteParsed + 1 due to the error of getting \n
970
+ // in the upgradeHead from the closing lines of the headers
971
+ var upgradeHead = d . slice ( start + bytesParsed + 1 , end ) ;
972
+
973
+ if ( self . listeners ( 'upgrade' ) . length ) {
974
+ self . emit ( 'upgrade' , res , res . socket , upgradeHead ) ;
975
+ } else {
976
+ // Got upgrade header, but have no handler.
977
+ socket . destroy ( ) ;
978
+ }
979
+ }
980
+ } ;
981
+
982
+ socket . onend = function ( ) {
983
+ parser . finish ( ) ;
984
+ socket . destroy ( ) ;
985
+ } ;
986
+
938
987
// When the socket closes remove it from the list of available sockets.
939
988
socket . on ( 'close' , function ( ) {
940
989
var i = self . sockets . indexOf ( socket ) ;
941
990
if ( i >= 0 ) self . sockets . splice ( i , 1 ) ;
991
+ // unref the parser for easy gc
992
+ parsers . free ( parser ) ;
942
993
} ) ;
994
+
995
+ parser . onIncoming = function ( res , shouldKeepAlive ) {
996
+ debug ( 'AGENT incoming response!' ) ;
997
+
998
+ var req = socket . _httpMessage ;
999
+ assert ( req ) ;
1000
+
1001
+ // Responses to HEAD requests are AWFUL. Ask Ryan.
1002
+ // A major oversight in HTTP. Hence this nastiness.
1003
+ var isHeadResponse = req . method == 'HEAD' ;
1004
+ debug ( 'AGENT isHeadResponse ' + isHeadResponse ) ;
1005
+
1006
+ if ( res . statusCode == 100 ) {
1007
+ // restart the parser, as this is a continue message.
1008
+ req . emit ( 'continue' ) ;
1009
+ return true ;
1010
+ }
1011
+
1012
+ if ( req . shouldKeepAlive && res . headers . connection === 'close' ) {
1013
+ req . shouldKeepAlive = false ;
1014
+ }
1015
+
1016
+ res . addListener ( 'end' , function ( ) {
1017
+ debug ( 'AGENT request complete disconnecting.' ) ;
1018
+ // For the moment we reconnect for every request. FIXME!
1019
+ // All that should be required for keep-alive is to not reconnect,
1020
+ // but outgoingFlush instead.
1021
+ if ( ! req . shouldKeepAlive ) socket . end ( ) ;
1022
+
1023
+ req . detachSocket ( socket ) ;
1024
+ self . _cycle ( ) ;
1025
+ } ) ;
1026
+
1027
+ req . emit ( 'response' , res ) ;
1028
+
1029
+ return isHeadResponse ;
1030
+ } ;
943
1031
} ;
944
1032
945
1033
946
1034
// Sub-classes can overwrite this method with e.g. something that supplies
947
1035
// TLS streams.
948
1036
Agent . prototype . _getConnection = function ( host , port , cb ) {
1037
+ debug ( "Agent connected!" ) ;
949
1038
var c = net . createConnection ( port , host ) ;
950
1039
c . on ( 'connect' , cb ) ;
951
1040
return c ;
@@ -956,6 +1045,8 @@ Agent.prototype._getConnection = function(host, port, cb) {
956
1045
// waiting sockets. If a waiting socket cannot be found, it will
957
1046
// start the process of establishing one.
958
1047
Agent . prototype . _cycle = function ( ) {
1048
+ debug ( "Agent _cycle" ) ;
1049
+
959
1050
var first = this . queue [ 0 ] ;
960
1051
if ( ! first ) return ;
961
1052
@@ -965,6 +1056,7 @@ Agent.prototype._cycle = function() {
965
1056
// If the socket doesn't already have a message it's sending out
966
1057
// and the socket is available for writing...
967
1058
if ( ! socket . _httpMessage && ( socket . writable && socket . readable ) ) {
1059
+ debug ( "Agent found socket, shift" ) ;
968
1060
// We found an available connection!
969
1061
this . queue . shift ( ) ; // remove first from queue.
970
1062
first . assignSocket ( socket ) ;
0 commit comments