Skip to content

Introduce advanced logging controls #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/CallbackContext.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
'use strict';

const BeforeExitListener = require('./BeforeExitListener.js');
const { toFormatted, intoError } = require('./Errors');
const { structuredConsole } = require('./LogPatch');

/**
* Build the callback function and the part of the context which exposes
Expand All @@ -20,7 +20,7 @@ const { toFormatted, intoError } = require('./Errors');
*/
function _rawCallbackContext(client, id, scheduleNext) {
const postError = (err, callback) => {
console.error('Invoke Error', toFormatted(intoError(err)));
structuredConsole.logError('Invoke Error', err);
client.postInvocationError(err, id, callback);
};

Expand Down
204 changes: 169 additions & 35 deletions src/LogPatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,57 @@

const util = require('util');
const fs = require('fs');
const Errors = require('./Errors');

const levels = Object.freeze({
INFO: { name: 'INFO' },
DEBUG: { name: 'DEBUG' },
WARN: { name: 'WARN' },
ERROR: { name: 'ERROR' },
TRACE: { name: 'TRACE' },
FATAL: { name: 'FATAL' },
});
const structuredConsole = {};

const jsonErrorReplacer = (_, value) => {
if (value instanceof Error) {
let serializedErr = Object.assign(
{
errorType: value?.constructor?.name ?? 'UnknownError',
errorMessage: value.message,
stackTrace:
typeof value.stack === 'string'
? value.stack.split('\n')
: value.stack,
},
value,
);
return serializedErr;
}
return value;
};

function formatJsonMessage(requestId, timestamp, level, ...messageParams) {
let result = {
timestamp: timestamp,
level: level.name,
requestId: requestId,
};

if (messageParams.length === 1) {
result.message = messageParams[0];
try {
return JSON.stringify(result, jsonErrorReplacer);
} catch (_) {
result.message = util.format(result.message);
return JSON.stringify(result);
}
}

result.message = util.format(...messageParams);
for (const param of messageParams) {
if (param instanceof Error) {
result.errorType = param?.constructor?.name ?? 'UnknownError';
result.errorMessage = param.message;
result.stackTrace =
typeof param.stack === 'string' ? param.stack.split('\n') : [];
break;
}
}
return JSON.stringify(result);
}

/* Use a unique symbol to provide global access without risk of name clashes. */
const REQUEST_ID_SYMBOL = Symbol.for('aws.lambda.runtime.requestId');
Expand All @@ -26,10 +68,21 @@ let _currentRequestId = {
/**
* Write logs to stdout.
*/
let _logToStdout = (level, message) => {
let logTextToStdout = (level, message, ...params) => {
let time = new Date().toISOString();
let requestId = _currentRequestId.get();
let line = `${time}\t${requestId}\t${level.name}\t${util.format(
message,
...params,
)}`;
line = line.replace(/\n/g, '\r');
process.stdout.write(line + '\n');
};

let logJsonToStdout = (level, message, ...params) => {
let time = new Date().toISOString();
let requestId = _currentRequestId.get();
let line = `${time}\t${requestId}\t${level.name}\t${message}`;
let line = formatJsonMessage(requestId, time, level, message, ...params);
line = line.replace(/\n/g, '\r');
process.stdout.write(line + '\n');
};
Expand All @@ -46,15 +99,41 @@ let _logToStdout = (level, message) => {
* The next 8 bytes are the UNIX timestamp of the message with microseconds precision.
* The remaining bytes ar ethe message itself. Byte order is big-endian.
*/
let _logToFd = function (logTarget) {
let logTextToFd = function (logTarget) {
let typeAndLength = Buffer.alloc(16);
typeAndLength.writeUInt32BE(0xa55a0003, 0);
return (level, message, ...params) => {
let date = new Date();
let time = date.toISOString();
let requestId = _currentRequestId.get();
let enrichedMessage = `${time}\t${requestId}\t${level.name}\t${util.format(
message,
...params,
)}\n`;

return (level, message) => {
typeAndLength.writeUInt32BE((0xa55a0003 | level.tlvMask) >>> 0, 0);
let messageBytes = Buffer.from(enrichedMessage, 'utf8');
typeAndLength.writeInt32BE(messageBytes.length, 4);
typeAndLength.writeBigInt64BE(BigInt(date.valueOf()) * 1000n, 8);
fs.writeSync(logTarget, typeAndLength);
fs.writeSync(logTarget, messageBytes);
};
};

let logJsonToFd = function (logTarget) {
let typeAndLength = Buffer.alloc(16);
return (level, message, ...params) => {
let date = new Date();
let time = date.toISOString();
let requestId = _currentRequestId.get();
let enrichedMessage = `${time}\t${requestId}\t${level.name}\t${message}\n`;
let enrichedMessage = formatJsonMessage(
requestId,
time,
level,
message,
...params,
);

typeAndLength.writeUInt32BE((0xa55a0002 | level.tlvMask) >>> 0, 0);
let messageBytes = Buffer.from(enrichedMessage, 'utf8');
typeAndLength.writeInt32BE(messageBytes.length, 4);
typeAndLength.writeBigInt64BE(BigInt(date.valueOf()) * 1000n, 8);
Expand All @@ -66,45 +145,100 @@ let _logToFd = function (logTarget) {
/**
* Replace console functions with a log function.
* @param {Function(level, String)} log
* Apply log filters, based on `AWS_LAMBDA_LOG_LEVEL` env var
*/
function _patchConsoleWith(log) {
console.log = (msg, ...params) => {
log(levels.INFO, util.format(msg, ...params));
};
console.debug = (msg, ...params) => {
log(levels.DEBUG, util.format(msg, ...params));
};
console.info = (msg, ...params) => {
log(levels.INFO, util.format(msg, ...params));
};
console.warn = (msg, ...params) => {
log(levels.WARN, util.format(msg, ...params));
};
console.error = (msg, ...params) => {
log(levels.ERROR, util.format(msg, ...params));
};
console.trace = (msg, ...params) => {
log(levels.TRACE, util.format(msg, ...params));
};
const NopLog = (_message, ..._params) => {};
const levels = Object.freeze({
TRACE: { name: 'TRACE', priority: 1, tlvMask: 0b00100 },
DEBUG: { name: 'DEBUG', priority: 2, tlvMask: 0b01000 },
INFO: { name: 'INFO', priority: 3, tlvMask: 0b01100 },
WARN: { name: 'WARN', priority: 4, tlvMask: 0b10000 },
ERROR: { name: 'ERROR', priority: 5, tlvMask: 0b10100 },
FATAL: { name: 'FATAL', priority: 6, tlvMask: 0b11000 },
});
let awsLambdaLogLevel =
levels[process.env['AWS_LAMBDA_LOG_LEVEL']?.toUpperCase()] ?? levels.TRACE;

if (levels.TRACE.priority >= awsLambdaLogLevel.priority) {
console.trace = (msg, ...params) => {
log(levels.TRACE, msg, ...params);
};
} else {
console.trace = NopLog;
}
if (levels.DEBUG.priority >= awsLambdaLogLevel.priority) {
console.debug = (msg, ...params) => {
log(levels.DEBUG, msg, ...params);
};
} else {
console.debug = NopLog;
}
if (levels.INFO.priority >= awsLambdaLogLevel.priority) {
console.info = (msg, ...params) => {
log(levels.INFO, msg, ...params);
};
} else {
console.info = NopLog;
}
console.log = console.info;
if (levels.WARN.priority >= awsLambdaLogLevel.priority) {
console.warn = (msg, ...params) => {
log(levels.WARN, msg, ...params);
};
} else {
console.warn = NopLog;
}
if (levels.ERROR.priority >= awsLambdaLogLevel.priority) {
console.error = (msg, ...params) => {
log(levels.ERROR, msg, ...params);
};
} else {
console.error = NopLog;
}
console.fatal = (msg, ...params) => {
log(levels.FATAL, util.format(msg, ...params));
log(levels.FATAL, msg, ...params);
};
}

let _patchConsole = () => {
const JsonName = 'JSON',
TextName = 'TEXT';
let awsLambdaLogFormat =
process.env['AWS_LAMBDA_LOG_FORMAT']?.toUpperCase() === JsonName
? JsonName
: TextName;
let jsonErrorLogger = (_, err) => {
console.error(Errors.intoError(err));
},
textErrorLogger = (msg, err) => {
console.error(msg, Errors.toFormatted(Errors.intoError(err)));
};

/**
Resolve log format here, instead of inside log function.
This avoids conditional statements in the log function hot path.
**/
let logger;
if (
process.env['_LAMBDA_TELEMETRY_LOG_FD'] != null &&
process.env['_LAMBDA_TELEMETRY_LOG_FD'] != undefined
) {
let logFd = parseInt(process.env['_LAMBDA_TELEMETRY_LOG_FD']);
_patchConsoleWith(_logToFd(logFd));
delete process.env['_LAMBDA_TELEMETRY_LOG_FD'];
logger =
awsLambdaLogFormat === JsonName ? logJsonToFd(logFd) : logTextToFd(logFd);
} else {
_patchConsoleWith(_logToStdout);
logger =
awsLambdaLogFormat === JsonName ? logJsonToStdout : logTextToStdout;
}
_patchConsoleWith(logger);
structuredConsole.logError =
awsLambdaLogFormat === JsonName ? jsonErrorLogger : textErrorLogger;
};

module.exports = {
setCurrentRequestId: _currentRequestId.set,
patchConsole: _patchConsole,
structuredConsole: structuredConsole,
};
9 changes: 3 additions & 6 deletions src/StreamingContext.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
'use strict';

const BeforeExitListener = require('./BeforeExitListener.js');
const {
InvalidStreamingOperation,
toFormatted,
intoError,
} = require('./Errors');
const { InvalidStreamingOperation } = require('./Errors');
const { verbose, vverbose } = require('./VerboseLog.js').logger('STREAM');
const { tryCallFail } = require('./ResponseStream');
const { structuredConsole } = require('./LogPatch');

/**
* Construct the base-context object which includes the required flags and
Expand Down Expand Up @@ -67,7 +64,7 @@ module.exports.build = function (client, id, scheduleNext, options) {

return {
fail: (err, callback) => {
console.error('Invoke Error', toFormatted(intoError(err)));
structuredConsole.logError('Invoke Error', err);

tryCallFail(responseStream, err, callback);
},
Expand Down
4 changes: 2 additions & 2 deletions src/index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ export async function run(appRootOrHandler, handler = '') {
};

process.on('uncaughtException', (error) => {
console.error('Uncaught Exception', Errors.toFormatted(error));
LogPatch.structuredConsole.logError('Uncaught Exception', error);
errorCallbacks.uncaughtException(error);
});

process.on('unhandledRejection', (reason, promise) => {
let error = new Errors.UnhandledPromiseRejection(reason, promise);
console.error('Unhandled Promise Rejection', Errors.toFormatted(error));
LogPatch.structuredConsole.logError('Unhandled Promise Rejection', error);
errorCallbacks.unhandledRejection(error);
});

Expand Down
37 changes: 33 additions & 4 deletions test/unit/FakeTelemetryTarget.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,16 @@ const fs = require('fs');
const path = require('path');
const assert = require('assert');

const _LOG_IDENTIFIER = Buffer.from('a55a0003', 'hex');
const levels = Object.freeze({
TRACE: { name: 'TRACE', tlvMask: 0b00100 },
DEBUG: { name: 'DEBUG', tlvMask: 0b01000 },
INFO: { name: 'INFO', tlvMask: 0b01100 },
WARN: { name: 'WARN', tlvMask: 0b10000 },
ERROR: { name: 'ERROR', tlvMask: 0b10100 },
FATAL: { name: 'FATAL', tlvMask: 0b11000 },
});

const TextName = 'TEXT';

/**
* A fake implementation of the multilne logging protocol.
Expand Down Expand Up @@ -55,7 +64,7 @@ module.exports = class FakeTelemetryTarget {
* - the prefix is malformed
* - there aren't enough bytes
*/
readLine() {
readLine(level = 'INFO', format = TextName, expectEmpty = false) {
let readLength = () => {
let logPrefix = Buffer.alloc(16);
let actualReadBytes = fs.readSync(
Expand All @@ -64,17 +73,34 @@ module.exports = class FakeTelemetryTarget {
0,
logPrefix.length,
);

if (expectEmpty) {
assert.strictEqual(
actualReadBytes,
0,
`Expected actualReadBytes[${actualReadBytes}] = 0`,
);
return 0;
}

assert.strictEqual(
actualReadBytes,
logPrefix.length,
`Expected actualReadBytes[${actualReadBytes}] = ${logPrefix.length}`,
);

var _tlvHeader;
if (format === TextName)
_tlvHeader = (0xa55a0003 | levels[level].tlvMask) >>> 0;
else _tlvHeader = (0xa55a0002 | levels[level].tlvMask) >>> 0;

let _logIdentifier = Buffer.from(_tlvHeader.toString(16), 'hex');
assert.strictEqual(
logPrefix.lastIndexOf(_LOG_IDENTIFIER),
logPrefix.lastIndexOf(_logIdentifier),
0,
`log prefix ${logPrefix.toString(
'hex',
)} should start with ${_LOG_IDENTIFIER.toString('hex')}`,
)} should start with ${_logIdentifier.toString('hex')}`,
);
let len = logPrefix.readUInt32BE(4);
// discard the timestamp
Expand All @@ -83,6 +109,9 @@ module.exports = class FakeTelemetryTarget {
};

let lineLength = readLength();
if (lineLength === 0) {
return '';
}
let lineBytes = Buffer.alloc(lineLength);
let actualLineSize = fs.readSync(
this.readTarget,
Expand Down
Loading