Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion src/server/object_services/md_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ class MDStore {
* @property {1|-1} [order]
* @property {boolean} [pagination]
*
* @returns {nb.ObjectMD[]}
* @returns {Promise<nb.ObjectMD[]>}
*/
async find_objects({
bucket_id,
Expand Down Expand Up @@ -695,6 +695,80 @@ class MDStore {
});
}

/**
* TODO add support for versioning or add another function to support versioning.
* @typedef {Object} DeleteObjectsParams
* @property {nb.ID} bucket_id
* @property {RegExp} key
* @property {number} [max_create_time]
* @property {number} [max_size]
* @property {number} [min_size]
* @property {Array<{ key: string; value: string; }>} [tagging]
* @property {number} [limit]
* @property {boolean} [return_results]
*
* @param {DeleteObjectsParams} params
* @returns {Promise<nb.ObjectMD[]>}
*/
async delete_objects_by_query({
bucket_id,
key,
max_create_time,
max_size,
min_size,
tagging,
limit,
return_results = false,
}) {
const params = [new Date()];
const sql_conditions = [];
if (key) {
params.push(key.source);
sql_conditions.push(`data->>'key' ~ $${params.length}`);
}
if (max_size !== undefined) {
params.push(max_size);
sql_conditions.push(`(data->>'size')::BIGINT < $${params.length}`);
}
if (min_size !== undefined) {
params.push(min_size);
sql_conditions.push(`(data->>'size')::BIGINT > $${params.length}`);
}
if (tagging && tagging.length) {
params.push(JSON.stringify(tagging));
sql_conditions.push(`(data->>'tagging')::jsonb @> $${params.length}::jsonb`);
}
if (max_create_time) {
params.push(new Date(moment.unix(max_create_time).toISOString()).toISOString());
sql_conditions.push(`data->>'create_time' < $${params.length}`);
}

const sql_limit = limit === undefined ? "" : `LIMIT ${limit}`;

let query = `
WITH rows AS (
SELECT _id
FROM ${this._objects.name}
WHERE
${sql_and_conditions(
`data->>'bucket' = '${bucket_id}'`,
...sql_conditions,
`data->'deleted' IS NULL`,
`data->'upload_started' IS NULL`,
`data->'version_enabled' IS NULL`,
)}
${sql_limit}
)
UPDATE ${this._objects.name}
SET data = jsonb_set(data, '{deleted}', to_jsonb($1::text), true)
WHERE _id IN (
SELECT rows._id FROM rows
)`;
query += return_results ? ' RETURNING *;' : ';';
const result = await this._objects.executeSQL(query, params);
return return_results ? result.rows : [];
}

async find_unreclaimed_objects(limit) {
const results = await this._objects.find({
deleted: { $exists: true },
Expand Down
36 changes: 22 additions & 14 deletions src/server/object_services/object_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ async function delete_multiple_objects_by_filter(req) {
const key = new RegExp('^' + _.escapeRegExp(req.rpc_params.prefix));
const bucket_id = req.bucket._id;
const reply_objects = req.rpc_params.reply_objects;
// TODO: change it to perform changes in batch. Won't scale.

const query = {
bucket_id,
key,
Expand All @@ -983,18 +983,26 @@ async function delete_multiple_objects_by_filter(req) {
min_size: req.rpc_params.size_greater,
limit: req.rpc_params.limit,
};

const objects = await MDStore.instance().find_objects(query);

const delete_results = await delete_multiple_objects(_.assign(req, {
rpc_params: {
bucket: req.bucket.name,
objects: _.map(objects, obj => ({
key: obj.key,
version_id: req.rpc_params.delete_version ? MDStore.instance().get_object_version_id(obj) : '',
}))
}
}));
let delete_results;
let objects;
// TODO: Add support to delete_objects_by_query also for versioning or add another function to support versioning.
if (req.bucket.versioning === 'DISABLED' && config.DB_TYPE !== 'mongodb') /* only for postgres */ {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need config.DB_TYPE !== 'mongodb' ?
Is it possible to have DB_TYPE as mongodb?

Maybe it is used for the tests.

If it is, we should probably remove it.

@jackyalbo jackyalbo Aug 13, 2025

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's wait until we stop supporting MongoDB; then we will delete it, and it will be easy to find that we need to update this place. It is only for testing for now.

query.return_results = true; // we want to return the objects that were deleted
objects = await MDStore.instance().delete_objects_by_query(query);
} else {
// TODO: change it to perform changes in batch. Won't scale
objects = await MDStore.instance().find_objects(query);

delete_results = await delete_multiple_objects(_.assign(req, {
rpc_params: {
bucket: req.bucket.name,
objects: _.map(objects, obj => ({
key: obj.key,
version_id: req.rpc_params.delete_version ? MDStore.instance().get_object_version_id(obj) : '',
}))
}
}));
}

const reply = { num_objects_deleted: objects.length };
if (reply_objects) {
Expand All @@ -1004,7 +1012,7 @@ async function delete_multiple_objects_by_filter(req) {
//or incude the error if deletion failed
reply.deleted_objects = [];
for (let i = 0; i < objects.length; ++i) {
if (delete_results[i].err_code) {
if (delete_results && delete_results[i].err_code) {
reply.deleted_objects[i] = {
err_code: delete_results[i].err_code,
err_message: delete_results[i].err_message
Expand Down
Loading