Skip to content

Commit 465005a

Browse files
authored
Bugfix/Remove postgres vector store data when deletion (#5536)
Remove postgres vector store data when deletion - Introduced a new `doc_id` column in MySQL, Postgres, and SQLite record managers to support document identification. - Updated the `update` method to handle both string and object formats for keys, allowing for better flexibility in document updates. - Enhanced `listKeys` method to filter by `doc_id` when provided in options. - Updated vector store integrations to utilize the new `doc_id` filtering capability
1 parent e6e0c2d commit 465005a

File tree

20 files changed

+619
-216
lines changed

20 files changed

+619
-216
lines changed

packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ class MySQLRecordManager_RecordManager implements INode {
6262
label: 'Namespace',
6363
name: 'namespace',
6464
type: 'string',
65-
description: 'If not specified, chatflowid will be used',
6665
additionalParams: true,
6766
optional: true
6867
},
@@ -219,7 +218,16 @@ class MySQLRecordManager implements RecordManagerInterface {
219218
unique key \`unique_key_namespace\` (\`key\`,
220219
\`namespace\`));`)
221220

222-
const columns = [`updated_at`, `key`, `namespace`, `group_id`]
221+
// Add doc_id column if it doesn't exist (migration for existing tables)
222+
const checkColumn = await queryRunner.manager.query(
223+
`SELECT COUNT(1) ColumnExists FROM INFORMATION_SCHEMA.COLUMNS
224+
WHERE table_schema=DATABASE() AND table_name='${tableName}' AND column_name='doc_id';`
225+
)
226+
if (checkColumn[0].ColumnExists === 0) {
227+
await queryRunner.manager.query(`ALTER TABLE \`${tableName}\` ADD COLUMN \`doc_id\` longtext;`)
228+
}
229+
230+
const columns = [`updated_at`, `key`, `namespace`, `group_id`, `doc_id`]
223231
for (const column of columns) {
224232
// MySQL does not support 'IF NOT EXISTS' function for Index
225233
const Check = await queryRunner.manager.query(
@@ -261,7 +269,7 @@ class MySQLRecordManager implements RecordManagerInterface {
261269
}
262270
}
263271

264-
async update(keys: string[], updateOptions?: UpdateOptions): Promise<void> {
272+
async update(keys: Array<{ uid: string; docId: string }> | string[], updateOptions?: UpdateOptions): Promise<void> {
265273
if (keys.length === 0) {
266274
return
267275
}
@@ -277,23 +285,23 @@ class MySQLRecordManager implements RecordManagerInterface {
277285
throw new Error(`Time sync issue with database ${updatedAt} < ${timeAtLeast}`)
278286
}
279287

280-
const groupIds = _groupIds ?? keys.map(() => null)
288+
// Handle both new format (objects with uid and docId) and old format (strings)
289+
const isNewFormat = keys.length > 0 && typeof keys[0] === 'object' && 'uid' in keys[0]
290+
const keyStrings = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.uid) : (keys as string[])
291+
const docIds = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.docId) : keys.map(() => null)
292+
293+
const groupIds = _groupIds ?? keyStrings.map(() => null)
281294

282-
if (groupIds.length !== keys.length) {
283-
throw new Error(`Number of keys (${keys.length}) does not match number of group_ids (${groupIds.length})`)
295+
if (groupIds.length !== keyStrings.length) {
296+
throw new Error(`Number of keys (${keyStrings.length}) does not match number of group_ids (${groupIds.length})`)
284297
}
285298

286-
const recordsToUpsert = keys.map((key, i) => [
287-
key,
288-
this.namespace,
289-
updatedAt,
290-
groupIds[i] ?? null // Ensure groupIds[i] is null if undefined
291-
])
299+
const recordsToUpsert = keyStrings.map((key, i) => [key, this.namespace, updatedAt, groupIds[i] ?? null, docIds[i] ?? null])
292300

293301
const query = `
294-
INSERT INTO \`${tableName}\` (\`key\`, \`namespace\`, \`updated_at\`, \`group_id\`)
295-
VALUES (?, ?, ?, ?)
296-
ON DUPLICATE KEY UPDATE \`updated_at\` = VALUES(\`updated_at\`)`
302+
INSERT INTO \`${tableName}\` (\`key\`, \`namespace\`, \`updated_at\`, \`group_id\`, \`doc_id\`)
303+
VALUES (?, ?, ?, ?, ?)
304+
ON DUPLICATE KEY UPDATE \`updated_at\` = VALUES(\`updated_at\`), \`doc_id\` = VALUES(\`doc_id\`)`
297305

298306
// To handle multiple files upsert
299307
try {
@@ -349,13 +357,13 @@ class MySQLRecordManager implements RecordManagerInterface {
349357
}
350358
}
351359

352-
async listKeys(options?: ListKeyOptions): Promise<string[]> {
360+
async listKeys(options?: ListKeyOptions & { docId?: string }): Promise<string[]> {
353361
const dataSource = await this.getDataSource()
354362
const queryRunner = dataSource.createQueryRunner()
355363
const tableName = this.sanitizeTableName(this.tableName)
356364

357365
try {
358-
const { before, after, limit, groupIds } = options ?? {}
366+
const { before, after, limit, groupIds, docId } = options ?? {}
359367
let query = `SELECT \`key\` FROM \`${tableName}\` WHERE \`namespace\` = ?`
360368
const values: (string | number | string[])[] = [this.namespace]
361369

@@ -382,6 +390,11 @@ class MySQLRecordManager implements RecordManagerInterface {
382390
values.push(...groupIds.filter((gid): gid is string => gid !== null))
383391
}
384392

393+
if (docId) {
394+
query += ` AND \`doc_id\` = ?`
395+
values.push(docId)
396+
}
397+
385398
query += ';'
386399

387400
// Directly using try/catch with async/await for cleaner flow

packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ class PostgresRecordManager_RecordManager implements INode {
7878
label: 'Namespace',
7979
name: 'namespace',
8080
type: 'string',
81-
description: 'If not specified, chatflowid will be used',
8281
additionalParams: true,
8382
optional: true
8483
},
@@ -241,6 +240,19 @@ class PostgresRecordManager implements RecordManagerInterface {
241240
CREATE INDEX IF NOT EXISTS namespace_index ON "${tableName}" (namespace);
242241
CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
243242

243+
// Add doc_id column if it doesn't exist (migration for existing tables)
244+
await queryRunner.manager.query(`
245+
DO $$
246+
BEGIN
247+
IF NOT EXISTS (
248+
SELECT 1 FROM information_schema.columns
249+
WHERE table_name = '${tableName}' AND column_name = 'doc_id'
250+
) THEN
251+
ALTER TABLE "${tableName}" ADD COLUMN doc_id TEXT;
252+
CREATE INDEX IF NOT EXISTS doc_id_index ON "${tableName}" (doc_id);
253+
END IF;
254+
END $$;`)
255+
244256
await queryRunner.release()
245257
} catch (e: any) {
246258
// This error indicates that the table already exists
@@ -286,7 +298,7 @@ class PostgresRecordManager implements RecordManagerInterface {
286298
return `(${placeholders.join(', ')})`
287299
}
288300

289-
async update(keys: string[], updateOptions?: UpdateOptions): Promise<void> {
301+
async update(keys: Array<{ uid: string; docId: string }> | string[], updateOptions?: UpdateOptions): Promise<void> {
290302
if (keys.length === 0) {
291303
return
292304
}
@@ -302,17 +314,22 @@ class PostgresRecordManager implements RecordManagerInterface {
302314
throw new Error(`Time sync issue with database ${updatedAt} < ${timeAtLeast}`)
303315
}
304316

305-
const groupIds = _groupIds ?? keys.map(() => null)
317+
// Handle both new format (objects with uid and docId) and old format (strings)
318+
const isNewFormat = keys.length > 0 && typeof keys[0] === 'object' && 'uid' in keys[0]
319+
const keyStrings = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.uid) : (keys as string[])
320+
const docIds = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.docId) : keys.map(() => null)
321+
322+
const groupIds = _groupIds ?? keyStrings.map(() => null)
306323

307-
if (groupIds.length !== keys.length) {
308-
throw new Error(`Number of keys (${keys.length}) does not match number of group_ids ${groupIds.length})`)
324+
if (groupIds.length !== keyStrings.length) {
325+
throw new Error(`Number of keys (${keyStrings.length}) does not match number of group_ids ${groupIds.length})`)
309326
}
310327

311-
const recordsToUpsert = keys.map((key, i) => [key, this.namespace, updatedAt, groupIds[i]])
328+
const recordsToUpsert = keyStrings.map((key, i) => [key, this.namespace, updatedAt, groupIds[i], docIds[i]])
312329

313330
const valuesPlaceholders = recordsToUpsert.map((_, j) => this.generatePlaceholderForRowAt(j, recordsToUpsert[0].length)).join(', ')
314331

315-
const query = `INSERT INTO "${tableName}" (key, namespace, updated_at, group_id) VALUES ${valuesPlaceholders} ON CONFLICT (key, namespace) DO UPDATE SET updated_at = EXCLUDED.updated_at;`
332+
const query = `INSERT INTO "${tableName}" (key, namespace, updated_at, group_id, doc_id) VALUES ${valuesPlaceholders} ON CONFLICT (key, namespace) DO UPDATE SET updated_at = EXCLUDED.updated_at, doc_id = EXCLUDED.doc_id;`
316333
try {
317334
await queryRunner.manager.query(query, recordsToUpsert.flat())
318335
await queryRunner.release()
@@ -351,8 +368,8 @@ class PostgresRecordManager implements RecordManagerInterface {
351368
}
352369
}
353370

354-
async listKeys(options?: ListKeyOptions): Promise<string[]> {
355-
const { before, after, limit, groupIds } = options ?? {}
371+
async listKeys(options?: ListKeyOptions & { docId?: string }): Promise<string[]> {
372+
const { before, after, limit, groupIds, docId } = options ?? {}
356373
const tableName = this.sanitizeTableName(this.tableName)
357374

358375
let query = `SELECT key FROM "${tableName}" WHERE namespace = $1`
@@ -383,6 +400,12 @@ class PostgresRecordManager implements RecordManagerInterface {
383400
index += 1
384401
}
385402

403+
if (docId) {
404+
values.push(docId)
405+
query += ` AND doc_id = $${index}`
406+
index += 1
407+
}
408+
386409
query += ';'
387410

388411
const dataSource = await this.getDataSource()

packages/components/nodes/recordmanager/SQLiteRecordManager/SQLiteRecordManager.ts

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ class SQLiteRecordManager_RecordManager implements INode {
5151
label: 'Namespace',
5252
name: 'namespace',
5353
type: 'string',
54-
description: 'If not specified, chatflowid will be used',
5554
additionalParams: true,
5655
optional: true
5756
},
@@ -198,6 +197,15 @@ CREATE INDEX IF NOT EXISTS key_index ON "${tableName}" (key);
198197
CREATE INDEX IF NOT EXISTS namespace_index ON "${tableName}" (namespace);
199198
CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
200199

200+
// Add doc_id column if it doesn't exist (migration for existing tables)
201+
const checkColumn = await queryRunner.manager.query(
202+
`SELECT COUNT(*) as count FROM pragma_table_info('${tableName}') WHERE name='doc_id';`
203+
)
204+
if (checkColumn[0].count === 0) {
205+
await queryRunner.manager.query(`ALTER TABLE "${tableName}" ADD COLUMN doc_id TEXT;`)
206+
await queryRunner.manager.query(`CREATE INDEX IF NOT EXISTS doc_id_index ON "${tableName}" (doc_id);`)
207+
}
208+
201209
await queryRunner.release()
202210
} catch (e: any) {
203211
// This error indicates that the table already exists
@@ -228,7 +236,7 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
228236
}
229237
}
230238

231-
async update(keys: string[], updateOptions?: UpdateOptions): Promise<void> {
239+
async update(keys: Array<{ uid: string; docId: string }> | string[], updateOptions?: UpdateOptions): Promise<void> {
232240
if (keys.length === 0) {
233241
return
234242
}
@@ -243,23 +251,23 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
243251
throw new Error(`Time sync issue with database ${updatedAt} < ${timeAtLeast}`)
244252
}
245253

246-
const groupIds = _groupIds ?? keys.map(() => null)
254+
// Handle both new format (objects with uid and docId) and old format (strings)
255+
const isNewFormat = keys.length > 0 && typeof keys[0] === 'object' && 'uid' in keys[0]
256+
const keyStrings = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.uid) : (keys as string[])
257+
const docIds = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.docId) : keys.map(() => null)
258+
259+
const groupIds = _groupIds ?? keyStrings.map(() => null)
247260

248-
if (groupIds.length !== keys.length) {
249-
throw new Error(`Number of keys (${keys.length}) does not match number of group_ids (${groupIds.length})`)
261+
if (groupIds.length !== keyStrings.length) {
262+
throw new Error(`Number of keys (${keyStrings.length}) does not match number of group_ids (${groupIds.length})`)
250263
}
251264

252-
const recordsToUpsert = keys.map((key, i) => [
253-
key,
254-
this.namespace,
255-
updatedAt,
256-
groupIds[i] ?? null // Ensure groupIds[i] is null if undefined
257-
])
265+
const recordsToUpsert = keyStrings.map((key, i) => [key, this.namespace, updatedAt, groupIds[i] ?? null, docIds[i] ?? null])
258266

259267
const query = `
260-
INSERT INTO "${tableName}" (key, namespace, updated_at, group_id)
261-
VALUES (?, ?, ?, ?)
262-
ON CONFLICT (key, namespace) DO UPDATE SET updated_at = excluded.updated_at`
268+
INSERT INTO "${tableName}" (key, namespace, updated_at, group_id, doc_id)
269+
VALUES (?, ?, ?, ?, ?)
270+
ON CONFLICT (key, namespace) DO UPDATE SET updated_at = excluded.updated_at, doc_id = excluded.doc_id`
263271

264272
try {
265273
// To handle multiple files upsert
@@ -314,8 +322,8 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
314322
}
315323
}
316324

317-
async listKeys(options?: ListKeyOptions): Promise<string[]> {
318-
const { before, after, limit, groupIds } = options ?? {}
325+
async listKeys(options?: ListKeyOptions & { docId?: string }): Promise<string[]> {
326+
const { before, after, limit, groupIds, docId } = options ?? {}
319327
const tableName = this.sanitizeTableName(this.tableName)
320328

321329
let query = `SELECT key FROM "${tableName}" WHERE namespace = ?`
@@ -344,6 +352,11 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${tableName}" (group_id);`)
344352
values.push(...groupIds.filter((gid): gid is string => gid !== null))
345353
}
346354

355+
if (docId) {
356+
query += ` AND doc_id = ?`
357+
values.push(docId)
358+
}
359+
347360
query += ';'
348361

349362
const dataSource = await this.getDataSource()

packages/components/nodes/vectorstores/Chroma/Chroma.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,11 @@ class Chroma_VectorStores implements INode {
186186
const vectorStoreName = collectionName
187187
await recordManager.createSchema()
188188
;(recordManager as any).namespace = (recordManager as any).namespace + '_' + vectorStoreName
189-
const keys: string[] = await recordManager.listKeys({})
189+
const filterKeys: ICommonObject = {}
190+
if (options.docId) {
191+
filterKeys.docId = options.docId
192+
}
193+
const keys: string[] = await recordManager.listKeys(filterKeys)
190194

191195
const chromaStore = new ChromaExtended(embeddings, obj)
192196

packages/components/nodes/vectorstores/Elasticsearch/Elasticsearch.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,11 @@ class Elasticsearch_VectorStores implements INode {
198198
const vectorStoreName = indexName
199199
await recordManager.createSchema()
200200
;(recordManager as any).namespace = (recordManager as any).namespace + '_' + vectorStoreName
201-
const keys: string[] = await recordManager.listKeys({})
201+
const filterKeys: ICommonObject = {}
202+
if (options.docId) {
203+
filterKeys.docId = options.docId
204+
}
205+
const keys: string[] = await recordManager.listKeys(filterKeys)
202206

203207
await vectorStore.delete({ ids: keys })
204208
await recordManager.deleteKeys(keys)

packages/components/nodes/vectorstores/Pinecone/Pinecone.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,11 @@ class Pinecone_VectorStores implements INode {
212212
const vectorStoreName = pineconeNamespace
213213
await recordManager.createSchema()
214214
;(recordManager as any).namespace = (recordManager as any).namespace + '_' + vectorStoreName
215-
const keys: string[] = await recordManager.listKeys({})
215+
const filterKeys: ICommonObject = {}
216+
if (options.docId) {
217+
filterKeys.docId = options.docId
218+
}
219+
const keys: string[] = await recordManager.listKeys(filterKeys)
216220

217221
await pineconeStore.delete({ ids: keys })
218222
await recordManager.deleteKeys(keys)

packages/components/nodes/vectorstores/Postgres/Postgres.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class Postgres_VectorStores implements INode {
4949
constructor() {
5050
this.label = 'Postgres'
5151
this.name = 'postgres'
52-
this.version = 7.0
52+
this.version = 7.1
5353
this.type = 'Postgres'
5454
this.icon = 'postgres.svg'
5555
this.category = 'Vector Stores'
@@ -173,6 +173,15 @@ class Postgres_VectorStores implements INode {
173173
additionalParams: true,
174174
optional: true
175175
},
176+
{
177+
label: 'Upsert Batch Size',
178+
name: 'batchSize',
179+
type: 'number',
180+
step: 1,
181+
description: 'Upsert in batches of size N',
182+
additionalParams: true,
183+
optional: true
184+
},
176185
{
177186
label: 'Additional Configuration',
178187
name: 'additionalConfig',
@@ -232,6 +241,7 @@ class Postgres_VectorStores implements INode {
232241
const docs = nodeData.inputs?.document as Document[]
233242
const recordManager = nodeData.inputs?.recordManager
234243
const isFileUploadEnabled = nodeData.inputs?.fileUpload as boolean
244+
const _batchSize = nodeData.inputs?.batchSize
235245
const vectorStoreDriver: VectorStoreDriver = Postgres_VectorStores.getDriverFromConfig(nodeData, options)
236246

237247
const flattenDocs = docs && docs.length ? flatten(docs) : []
@@ -265,7 +275,15 @@ class Postgres_VectorStores implements INode {
265275

266276
return res
267277
} else {
268-
await vectorStoreDriver.fromDocuments(finalDocs)
278+
if (_batchSize) {
279+
const batchSize = parseInt(_batchSize, 10)
280+
for (let i = 0; i < finalDocs.length; i += batchSize) {
281+
const batch = finalDocs.slice(i, i + batchSize)
282+
await vectorStoreDriver.fromDocuments(batch)
283+
}
284+
} else {
285+
await vectorStoreDriver.fromDocuments(finalDocs)
286+
}
269287

270288
return { numAdded: finalDocs.length, addedDocs: finalDocs }
271289
}
@@ -285,7 +303,11 @@ class Postgres_VectorStores implements INode {
285303
const vectorStoreName = tableName
286304
await recordManager.createSchema()
287305
;(recordManager as any).namespace = (recordManager as any).namespace + '_' + vectorStoreName
288-
const keys: string[] = await recordManager.listKeys({})
306+
const filterKeys: ICommonObject = {}
307+
if (options.docId) {
308+
filterKeys.docId = options.docId
309+
}
310+
const keys: string[] = await recordManager.listKeys(filterKeys)
289311

290312
await vectorStore.delete({ ids: keys })
291313
await recordManager.deleteKeys(keys)

0 commit comments

Comments
 (0)