Skip to content

[Backport 5.18] PR 8028,8878,8810,8880 #8894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ config-local.js
*.sublime*
.DS_Store
heapdump-*
.cache
.clangd

## PRIVATE
*.pem
Expand Down
48 changes: 48 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,44 @@ config.NSFS_GLACIER_EXPIRY_TZ = 'LOCAL';
// the request will be used
config.NSFS_GLACIER_EXPIRY_TIME_OF_DAY = '';

// If set to to true, NooBaa will attempt to read DMAPI
// xattrs
config.NSFS_GLACIER_DMAPI_ENABLE = false;

// NSFS_GLACIER_DMAPI_IMPLICIT_RESTORE_STATUS if enabled then
// NooBaa will derive restore status of the files based on DMAPI
// xattr IF there are no explicit restore status attributes on
// the file.
config.NSFS_GLACIER_DMAPI_IMPLICIT_RESTORE_STATUS = false;

// If set to true then NooBaa will consider DMAPI extended attributes
// in conjuction with NooBaa's `user.storage_class` extended attribute
// to determine state of an object.
//
// NOTE:NSFS_GLACIER_DMAPI_ENABLE should be enabled to use this.
config.NSFS_GLACIER_DMAPI_IMPLICIT_SC = false;

// NSFS_GLACIER_DMAPI_ALLOW_NOOBAA_TAKEOVER allows NooBaa to take over lifecycle
// management of an object which was originally NOT managed by NooBaa.
//
// NOTE:NSFS_GLACIER_DMAPI_ENABLE and NSFS_GLACIER_USE_DMAPI should be enabled to use this.
config.NSFS_GLACIER_DMAPI_ALLOW_NOOBAA_TAKEOVER = false;

// NSFS_GLACIER_DMAPI_TPS_HTTP_HEADER_ENABLE if true will add additional HTTP headers
// `config.NSFS_GLACIER_DMAPI_TPS_HTTP_HEADER` based on `dmapi.IBMTPS` EA.
//
// NOTE:NSFS_GLACIER_DMAPI_ENABLE should be enabled to use this.
config.NSFS_GLACIER_DMAPI_TPS_HTTP_HEADER_ENABLE = false;
config.NSFS_GLACIER_DMAPI_TPS_HTTP_HEADER = 'x-tape-meta-copy';

// NSFS_GLACIER_DMAPI_PMIG_DAYS controls the "virtual"/fake expiry
// days that will be shown if we detect a glacier object whose life-
// cycle NSFS doesn't controls
//
// This is initialized to be the same as S3_RESTORE_REQUEST_MAX_DAYS
// but can be overridden to any numberical value
config.NSFS_GLACIER_DMAPI_PMIG_DAYS = config.S3_RESTORE_REQUEST_MAX_DAYS;

config.NSFS_STATFS_CACHE_SIZE = 10000;
config.NSFS_STATFS_CACHE_EXPIRY_MS = 1 * 1000;

Expand Down Expand Up @@ -897,6 +935,16 @@ config.NSFS_LOW_FREE_SPACE_MB_UNLEASH = 10 * 1024;
// operations safely.
config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH = 0.10;

// NSFS_GLACIER_GET_FORCE_EXPIRE if set to true then any restored item in the GLACIER
// storage class will expire as soon as first GET request is received for it or
// if the previous restore time has exceed, whichever is the earlier.
config.NSFS_GLACIER_FORCE_EXPIRE_ON_GET = false;

// NSFS_GLACIER_MIGRATE_LOG_THRESHOLD controls that how big the migration log file should be
// Once this size is exceeded, migrate calls are supposed to kick in regardless of configured
// interval
config.NSFS_GLACIER_MIGRATE_LOG_THRESHOLD = 50 * 1024;

// anonymous account name
config.ANONYMOUS_ACCOUNT_NAME = 'anonymous';

Expand Down
8 changes: 8 additions & 0 deletions src/endpoint/s3/s3_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
'use strict';

const _ = require('lodash');
const querystring = require('querystring');

const dbg = require('../../util/debug_module')(__filename);
const S3Error = require('./s3_errors').S3Error;
Expand Down Expand Up @@ -325,6 +326,13 @@ function set_response_object_md(res, object_md) {

res.setHeader('x-amz-restore', restore);
}
if (config.NSFS_GLACIER_DMAPI_TPS_HTTP_HEADER_ENABLE) {
object_md.restore_status?.tape_info?.forEach?.((meta, idx) => {
// @ts-ignore - For some TS check doesn't like "meta" being passed to querystring
const header = querystring.stringify(meta);
res.setHeader(`${config.NSFS_GLACIER_DMAPI_TPS_HTTP_HEADER}-${idx}`, header);
});
}
}

/** set_response_headers_get_object_attributes is based on set_response_object_md
Expand Down
77 changes: 61 additions & 16 deletions src/manage_nsfs/manage_nsfs_glacier.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ const path = require('path');
const { PersistentLogger } = require('../util/persistent_logger');
const config = require('../../config');
const nb_native = require('../util/nb_native');
const { GlacierBackend } = require('../sdk/nsfs_glacier_backend/backend');
const { getGlacierBackend } = require('../sdk/nsfs_glacier_backend/helper');
const { Glacier } = require('../sdk/glacier');
const native_fs_utils = require('../util/native_fs_utils');

const CLUSTER_LOCK = 'cluster.lock';
Expand All @@ -16,14 +15,15 @@ async function process_migrations() {
const fs_context = native_fs_utils.get_process_fs_context();

await lock_and_run(fs_context, CLUSTER_LOCK, async () => {
const backend = getGlacierBackend();
const backend = Glacier.getBackend();

if (
await backend.low_free_space() ||
await time_exceeded(fs_context, config.NSFS_GLACIER_MIGRATE_INTERVAL, GlacierBackend.MIGRATE_TIMESTAMP_FILE)
await time_exceeded(fs_context, config.NSFS_GLACIER_MIGRATE_INTERVAL, Glacier.MIGRATE_TIMESTAMP_FILE) ||
await migrate_log_exceeds_threshold()
) {
await run_glacier_migrations(fs_context, backend);
await record_current_time(fs_context, GlacierBackend.MIGRATE_TIMESTAMP_FILE);
await record_current_time(fs_context, Glacier.MIGRATE_TIMESTAMP_FILE);
}
});
}
Expand All @@ -32,56 +32,56 @@ async function process_migrations() {
* run_tape_migrations reads the migration WALs and attempts to migrate the
* files mentioned in the WAL.
* @param {nb.NativeFSContext} fs_context
* @param {import('../sdk/nsfs_glacier_backend/backend').GlacierBackend} backend
* @param {import('../sdk/glacier').Glacier} backend
*/
async function run_glacier_migrations(fs_context, backend) {
await run_glacier_operation(fs_context, GlacierBackend.MIGRATE_WAL_NAME, backend.migrate.bind(backend));
await run_glacier_operation(fs_context, Glacier.MIGRATE_WAL_NAME, backend.migrate.bind(backend));
}

async function process_restores() {
const fs_context = native_fs_utils.get_process_fs_context();

await lock_and_run(fs_context, CLUSTER_LOCK, async () => {
const backend = getGlacierBackend();
const backend = Glacier.getBackend();

if (
await backend.low_free_space() ||
!(await time_exceeded(fs_context, config.NSFS_GLACIER_RESTORE_INTERVAL, GlacierBackend.RESTORE_TIMESTAMP_FILE))
!(await time_exceeded(fs_context, config.NSFS_GLACIER_RESTORE_INTERVAL, Glacier.RESTORE_TIMESTAMP_FILE))
) return;


await run_glacier_restore(fs_context, backend);
await record_current_time(fs_context, GlacierBackend.RESTORE_TIMESTAMP_FILE);
await record_current_time(fs_context, Glacier.RESTORE_TIMESTAMP_FILE);
});
}

/**
* run_tape_restore reads the restore WALs and attempts to restore the
* files mentioned in the WAL.
* @param {nb.NativeFSContext} fs_context
* @param {import('../sdk/nsfs_glacier_backend/backend').GlacierBackend} backend
* @param {nb.NativeFSContext} fs_context
* @param {import('../sdk/glacier').Glacier} backend
*/
async function run_glacier_restore(fs_context, backend) {
await run_glacier_operation(fs_context, GlacierBackend.RESTORE_WAL_NAME, backend.restore.bind(backend));
await run_glacier_operation(fs_context, Glacier.RESTORE_WAL_NAME, backend.restore.bind(backend));
}

async function process_expiry() {
const fs_context = native_fs_utils.get_process_fs_context();

await lock_and_run(fs_context, SCAN_LOCK, async () => {
const backend = getGlacierBackend();
const backend = Glacier.getBackend();
if (
await backend.low_free_space() ||
await is_desired_time(
fs_context,
new Date(),
config.NSFS_GLACIER_EXPIRY_RUN_TIME,
config.NSFS_GLACIER_EXPIRY_RUN_DELAY_LIMIT_MINS,
GlacierBackend.EXPIRY_TIMESTAMP_FILE,
Glacier.EXPIRY_TIMESTAMP_FILE,
)
) {
await backend.expiry(fs_context);
await record_current_time(fs_context, GlacierBackend.EXPIRY_TIMESTAMP_FILE);
await record_current_time(fs_context, Glacier.EXPIRY_TIMESTAMP_FILE);
}
});
}
Expand Down Expand Up @@ -168,6 +168,27 @@ async function record_current_time(fs_context, timestamp_file) {
);
}

/**
* migrate_log_exceeds_threshold returns true if the underlying backend
* decides that the migrate log size has exceeded the given size threshold.
* @param {number} [threshold]
* @returns {Promise<boolean>}
*/
async function migrate_log_exceeds_threshold(threshold = config.NSFS_GLACIER_MIGRATE_LOG_THRESHOLD) {
const log = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, Glacier.MIGRATE_WAL_NAME, { locking: null });
let log_size = Number.MAX_SAFE_INTEGER;
try {
const fh = await log._open();

const { size } = await fh.stat(log.fs_context);
log_size = size;
} catch (error) {
console.error("failed to get size of", Glacier.MIGRATE_WAL_NAME, error);
}

return log_size > threshold;
}

/**
* run_glacier_operations takes a log_namespace and a callback and executes the
* callback on each log file in that namespace. It will also generate a failure
Expand All @@ -178,6 +199,8 @@ async function record_current_time(fs_context, timestamp_file) {
*/
async function run_glacier_operation(fs_context, log_namespace, cb) {
const log = new PersistentLogger(config.NSFS_GLACIER_LOGS_DIR, log_namespace, { locking: 'EXCLUSIVE' });

fs_context = prepare_galcier_fs_context(fs_context);
try {
await log.process(async (entry, failure_recorder) => cb(fs_context, entry, failure_recorder));
} catch (error) {
Expand Down Expand Up @@ -230,6 +253,28 @@ async function lock_and_run(fs_context, lockfilename, cb) {
}
}

/**
* prepare_galcier_fs_context returns a shallow copy of given
* fs_context with backend set to 'GPFS'.
*
* NOTE: The function will throw error if it detects that libgfs
* isn't loaded.
*
* @param {nb.NativeFSContext} fs_context
* @returns {nb.NativeFSContext}
*/
function prepare_galcier_fs_context(fs_context) {
if (config.NSFS_GLACIER_DMAPI_ENABLE) {
if (!nb_native().fs.gpfs) {
throw new Error('cannot use DMAPI xattrs: libgpfs not loaded');
}

return { ...fs_context, backend: 'GPFS', use_dmapi: true };
}

return { ...fs_context };
}

exports.process_migrations = process_migrations;
exports.process_restores = process_restores;
exports.process_expiry = process_expiry;
36 changes: 28 additions & 8 deletions src/native/fs/fs_napi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@
#define GPFS_XATTR_PREFIX "gpfs"
#define GPFS_DOT_ENCRYPTION_EA "Encryption"
#define GPFS_ENCRYPTION_XATTR_NAME GPFS_XATTR_PREFIX "." GPFS_DOT_ENCRYPTION_EA
#define GPFS_DMAPI_XATTR_PREFIX "dmapi"
#define GPFS_DMAPI_DOT_IBMOBJ_EA "IBMObj"
#define GPFS_DMAPI_DOT_IBMPMIG_EA "IBMPMig"
#define GPFS_DMAPI_DOT_IBMTPS_EA "IBMTPS"
#define GPFS_DMAPI_XATTR_TAPE_INDICATOR GPFS_DMAPI_XATTR_PREFIX "." GPFS_DMAPI_DOT_IBMOBJ_EA
#define GPFS_DMAPI_XATTR_TAPE_PREMIG GPFS_DMAPI_XATTR_PREFIX "." GPFS_DMAPI_DOT_IBMPMIG_EA
#define GPFS_DMAPI_XATTR_TAPE_TPS GPFS_DMAPI_XATTR_PREFIX "." GPFS_DMAPI_DOT_IBMTPS_EA

// This macro should be used after openning a file
// it will autoclose the file using AutoCloser and will throw an error in case of failures
Expand Down Expand Up @@ -244,6 +251,11 @@ parse_open_flags(std::string flags)
}

const static std::vector<std::string> GPFS_XATTRS{ GPFS_ENCRYPTION_XATTR_NAME };
const static std::vector<std::string> GPFS_DMAPI_XATTRS{
GPFS_DMAPI_XATTR_TAPE_INDICATOR,
GPFS_DMAPI_XATTR_TAPE_PREMIG,
GPFS_DMAPI_XATTR_TAPE_TPS,
};
const static std::vector<std::string> USER_XATTRS{
"user.content_type",
"user.content_md5",
Expand Down Expand Up @@ -286,8 +298,7 @@ build_gpfs_get_ea_request(gpfsRequest_t* reqP, std::string key)
reqP->payload.structLen = reqP->header.totalLength - sizeof(reqP->header);
reqP->payload.structType = GPFS_FCNTL_GET_XATTR;
reqP->payload.nameLen = nameLen;
// bufferLen is the size of buffer - roundingup of the attribute name to 8 chars
reqP->payload.bufferLen = bufLen - ROUNDUP(nameLen, 8);
reqP->payload.bufferLen = bufLen - nameLen;
reqP->payload.flags = GPFS_FCNTL_XATTRFLAG_NONE;
memcpy(&reqP->payload.buffer[0], key.c_str(), nameLen);
}
Expand Down Expand Up @@ -461,9 +472,14 @@ get_fd_xattr(int fd, XattrMap& xattr, const std::vector<std::string>& xattr_keys
}

static int
get_fd_gpfs_xattr(int fd, XattrMap& xattr, int& gpfs_error)
get_fd_gpfs_xattr(int fd, XattrMap& xattr, int& gpfs_error, bool use_dmapi)
{
for (auto const& key : GPFS_XATTRS) {
auto gpfs_xattrs { GPFS_XATTRS };
if (use_dmapi) {
gpfs_xattrs.insert(gpfs_xattrs.end(), GPFS_DMAPI_XATTRS.begin(), GPFS_DMAPI_XATTRS.end());
}

for (auto const& key : gpfs_xattrs) {
gpfsRequest_t gpfsGetXattrRequest;
build_gpfs_get_ea_request(&gpfsGetXattrRequest, key);
int r = dlsym_gpfs_fcntl(fd, &gpfsGetXattrRequest);
Expand All @@ -472,7 +488,7 @@ get_fd_gpfs_xattr(int fd, XattrMap& xattr, int& gpfs_error)
if (gpfs_error == GPFS_FCNTL_ERR_NONE) {
int name_len = gpfsGetXattrRequest.payload.nameLen;
int buffer_len = gpfsGetXattrRequest.payload.bufferLen;
xattr[key] = std::string(gpfsGetXattrRequest.buffer[ROUNDUP(name_len, 8)], buffer_len);
xattr[key] = std::string((char*)gpfsGetXattrRequest.buffer + name_len, buffer_len);
} else if (gpfs_error != GPFS_FCNTL_ERR_NO_ATTR) {
LOG("get_fd_gpfs_xattr: get GPFS xattr with fcntl failed with error." << DVAL(gpfs_error));
return gpfs_error;
Expand Down Expand Up @@ -603,6 +619,8 @@ struct FSWorker : public Napi::AsyncWorker
// NOTE: If _do_ctime_check = false, then some functions will fallback to using mtime check
bool _do_ctime_check;

bool _use_dmapi;

FSWorker(const Napi::CallbackInfo& info)
: AsyncWorker(info.Env())
, _deferred(Napi::Promise::Deferred::New(info.Env()))
Expand All @@ -616,6 +634,7 @@ struct FSWorker : public Napi::AsyncWorker
, _should_add_thread_capabilities(false)
, _supplemental_groups()
, _do_ctime_check(false)
, _use_dmapi(false)
{
for (int i = 0; i < (int)info.Length(); ++i) _args_ref.Set(i, info[i]);
if (info[0].ToBoolean()) {
Expand All @@ -635,6 +654,7 @@ struct FSWorker : public Napi::AsyncWorker
_report_fs_stats = Napi::Persistent(fs_context.Get("report_fs_stats").As<Napi::Function>());
}
_do_ctime_check = fs_context.Get("do_ctime_check").ToBoolean();
_use_dmapi = fs_context.Get("use_dmapi").ToBoolean();
}
}
void Begin(std::string desc)
Expand Down Expand Up @@ -793,7 +813,7 @@ struct Stat : public FSWorker
if (!_use_lstat) {
SYSCALL_OR_RETURN(get_fd_xattr(fd, _xattr, _xattr_get_keys));
if (use_gpfs_lib()) {
GPFS_FCNTL_OR_RETURN(get_fd_gpfs_xattr(fd, _xattr, gpfs_error));
GPFS_FCNTL_OR_RETURN(get_fd_gpfs_xattr(fd, _xattr, gpfs_error, _use_dmapi));
}
}

Expand Down Expand Up @@ -1221,7 +1241,7 @@ struct Readfile : public FSWorker
if (_read_xattr) {
SYSCALL_OR_RETURN(get_fd_xattr(fd, _xattr, _xattr_get_keys));
if (use_gpfs_lib()) {
GPFS_FCNTL_OR_RETURN(get_fd_gpfs_xattr(fd, _xattr, gpfs_error));
GPFS_FCNTL_OR_RETURN(get_fd_gpfs_xattr(fd, _xattr, gpfs_error, _use_dmapi));
}
}

Expand Down Expand Up @@ -1752,7 +1772,7 @@ struct FileStat : public FSWrapWorker<FileWrap>
SYSCALL_OR_RETURN(fstat(fd, &_stat_res));
SYSCALL_OR_RETURN(get_fd_xattr(fd, _xattr, _xattr_get_keys));
if (use_gpfs_lib()) {
GPFS_FCNTL_OR_RETURN(get_fd_gpfs_xattr(fd, _xattr, gpfs_error));
GPFS_FCNTL_OR_RETURN(get_fd_gpfs_xattr(fd, _xattr, gpfs_error, _use_dmapi));
}

if (_do_ctime_check) {
Expand Down
Loading