Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 73 additions & 48 deletions src/connection/HttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ export default class HttpConnection extends BaseConnection {
}

debug('Starting a new request', params)

// tracking response.end, request.finish and the value of the returnable response object here is necessary:
// we only know a request is truly finished when one of the following is true:
// - request.finish and response.end have both fired (success)
// - request.error has fired (failure)
// - response.close has fired (failure)
let responseEnded = false
let requestFinished = false
let connectionRequestResponse: ConnectionRequestResponse | ConnectionRequestResponseAsStream

let request: http.ClientRequest
try {
request = this.makeRequest(requestParams)
Expand All @@ -135,7 +145,6 @@ export default class HttpConnection extends BaseConnection {

const onResponse = (response: http.IncomingMessage): void => {
cleanListeners()
this._openRequests--

if (options.asStream === true) {
return resolve({
Expand Down Expand Up @@ -189,29 +198,25 @@ export default class HttpConnection extends BaseConnection {
}
}

const onEnd = (err: Error): void => {
const onEnd = (): void => {
response.removeListener('data', onData)
response.removeListener('end', onEnd)
response.removeListener('error', onEnd)
request.removeListener('error', noop)

if (err != null) {
// @ts-expect-error
if (err.message === 'aborted' && err.code === 'ECONNRESET') {
response.destroy()
return reject(new ConnectionError('Response aborted while reading the body'))
}
if (err.name === 'RequestAbortedError') {
return reject(err)
}
return reject(new ConnectionError(err.message))
}

resolve({
responseEnded = true

connectionRequestResponse = {
body: isCompressed || bodyIsBinary ? Buffer.concat(payload as Buffer[]) : payload as string,
statusCode: response.statusCode as number,
headers: response.headers
})
}

if (requestFinished) {
return resolve(connectionRequestResponse)
}
}

const onResponseClose = (): void => {
return reject(new ConnectionError('Response aborted while reading the body'))
}

if (!isCompressed && !bodyIsBinary) {
Expand All @@ -220,30 +225,36 @@ export default class HttpConnection extends BaseConnection {

this.diagnostic.emit('deserialization', null, options)
response.on('data', onData)
response.on('error', onEnd)
response.on('end', onEnd)
response.on('close', onResponseClose)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see the onResponseClose() is throwing Response aborted while reading the body, which is also emitted on an EPIPE (shown below in line 255). These two messages should be distinct, since we won't know which one caused this message.

It also appears we don't remove this listener. It seems like we should remove it in the same places we remove data and end, most specifically on onEnd(). I'm wondering if the listener is going off when the socket closes, making it seem like an otherwise successful call failed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're both EPIPE errors, which is why I gave them the same error message. It's the same symptom, just happening under slightly different circumstances: one gets a response body back, the other doesn't. That difference requires you to inspect the error's meta property to tell the difference, but I can use a slightly different message to help distinguish them further.

While the 'close' event should be implicitly cleaned up during garbage collection, it won't hurt to explicitly remove those listeners at the end of the lifecycle.

I've made both of these changes in #266. Please take a look!

}

const onTimeout = (): void => {
cleanListeners()
this._openRequests--
request.once('error', () => {}) // we need to catch the request aborted error
request.once('error', noop) // we need to catch the request aborted error
request.destroy()
reject(new TimeoutError('Request timed out'))
return reject(new TimeoutError('Request timed out'))
}

const onError = (err: Error): void => {
// @ts-expect-error
let { name, message, code } = err

// ignore this error, it means we got a response body for a request that didn't expect a body (e.g. HEAD)
// rather than failing, let it return a response with an empty string as body
if (code === 'HPE_INVALID_CONSTANT' && message.startsWith('Parse Error: Expected HTTP/')) return

cleanListeners()
this._openRequests--
let message = err.message
if (err.name === 'RequestAbortedError') {
if (name === 'RequestAbortedError') {
return reject(err)
}
// @ts-expect-error
if (err.code === 'ECONNRESET') {

if (code === 'ECONNRESET') {
message += ` - Local: ${request.socket?.localAddress ?? 'unknown'}:${request.socket?.localPort ?? 'unknown'}, Remote: ${request.socket?.remoteAddress ?? 'unknown'}:${request.socket?.remotePort ?? 'unknown'}`
} else if (code === 'EPIPE') {
message = 'Response aborted while reading the body'
}
reject(new ConnectionError(message))
return reject(new ConnectionError(message))
}

const onSocket = (socket: TLSSocket): void => {
Expand All @@ -269,9 +280,42 @@ export default class HttpConnection extends BaseConnection {
}
}

const onFinish = (): void => {
requestFinished = true

if (responseEnded) {
if (connectionRequestResponse != null) {
return resolve(connectionRequestResponse)
} else {
return reject(new Error('No response body received'))
}
}
}

const cleanListeners = (): void => {
if (cleanedListeners) return

this._openRequests--

// we do NOT stop listening to request.error here
// all errors we care about in the request/response lifecycle will bubble up to request.error, and may occur even after the request has been sent
request.removeListener('response', onResponse)
request.removeListener('timeout', onTimeout)
request.removeListener('socket', onSocket)
if (options.signal != null) {
if ('removeEventListener' in options.signal) {
options.signal.removeEventListener('abort', abortListener)
} else {
options.signal.removeListener('abort', abortListener)
}
}
cleanedListeners = true
}

request.on('response', onResponse)
request.on('timeout', onTimeout)
request.on('error', onError)
request.on('finish', onFinish)
if (this[kCaFingerprint] != null && requestParams.protocol === 'https:') {
request.on('socket', onSocket)
}
Expand All @@ -285,31 +329,12 @@ export default class HttpConnection extends BaseConnection {
/* istanbul ignore if */
if (err != null && !cleanedListeners) {
cleanListeners()
this._openRequests--
reject(err)
return reject(err)
}
})
} else {
request.end(params.body)
}

return request

function cleanListeners (): void {
request.removeListener('response', onResponse)
request.removeListener('timeout', onTimeout)
request.removeListener('error', onError)
request.on('error', noop)
request.removeListener('socket', onSocket)
if (options.signal != null) {
if ('removeEventListener' in options.signal) {
options.signal.removeEventListener('abort', abortListener)
} else {
options.signal.removeListener('abort', abortListener)
}
}
cleanedListeners = true
}
})
}

Expand Down
90 changes: 90 additions & 0 deletions test/unit/http-connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,96 @@ test('Socket destroyed while reading the body', async t => {
server.stop()
})

test('Connection closed while sending the request body as stream (EPIPE)', async t => {
t.plan(2)

let dataCounter = 0
function handler (_req: http.IncomingMessage, res: http.ServerResponse) {
dataCounter++
if (dataCounter === 1) {
res.writeHead(413, { 'Connection': 'close' });
res.end('Payload Too Large');
} else {
t.fail('Request should stop trying to send data')
}
}
const [{ port }, server] = await buildServer(handler)

const connection = new HttpConnection({
url: new URL(`http://localhost:${port}`),
})

const body = new Readable({
async read (_size: number) {
await setTimeout(500)
// run one large request where data will be received by socket in multiple chunks
this.push('x'.repeat(99999999))
await setTimeout(500)
this.push(null) // EOF
}
})

try {
// run one large request where data will be received by socket in multiple chunks
await connection.request({
path: '/hello',
method: 'POST',
body
}, options)
t.fail('ConnectionError should have been caught')
} catch (err: any) {
t.ok(err instanceof ConnectionError, `Not a ConnectionError: ${err}`)
t.ok(
err.message === 'Response aborted while reading the body' ||
err.message.startsWith('write ECONNRESET - Local:') ||
err.message.startsWith('read ECONNRESET - Local:'),
`Unexpected error message: ${err.message}`
)
}

server.stop()
})

test('Connection closed while sending the request body as string (EPIPE)', async t => {
t.plan(2)

let dataCounter = 0
function handler (_req: http.IncomingMessage, res: http.ServerResponse) {
dataCounter++
if (dataCounter === 1) {
res.writeHead(413, { 'Connection': 'close' });
res.end('Payload Too Large');
} else {
t.fail('Request should stop trying to send data')
}
}
const [{ port }, server] = await buildServer(handler)

const connection = new HttpConnection({
url: new URL(`http://localhost:${port}`),
})

try {
// run one large request where data will be received by socket in multiple chunks
await connection.request({
path: '/hello',
method: 'POST',
body: 'x'.repeat(99999999)
}, options)
t.fail('ConnectionError should have been caught')
} catch (err: any) {
t.ok(err instanceof ConnectionError, `Not a ConnectionError: ${err}`)
t.ok(
err.message === 'Response aborted while reading the body' ||
err.message.startsWith('write ECONNRESET - Local:') ||
err.message.startsWith('read ECONNRESET - Local:'),
`Unexpected error message: ${err.message}`
)
}

server.stop()
})

test('Compressed response should return a buffer as body (gzip)', async t => {
t.plan(2)

Expand Down
Loading