Skip to content

Commit d837d2d

Browse files
MSCetin37pre-commit-ci[bot]ashahba
authored andcommitted
Data Ingestion and Retrieval with custom index_name (opea-project#1439)
* CodeGen initial Signed-off-by: Mustafa <[email protected]> * code update Signed-off-by: Mustafa <[email protected]> * code update Signed-off-by: Mustafa <[email protected]> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * adding index_name variable Signed-off-by: Mustafa <[email protected]> * update unit tests Signed-off-by: Mustafa <[email protected]> * update the tests Signed-off-by: Mustafa <[email protected]> * code update & add ingest_with_index test Signed-off-by: Mustafa <[email protected]> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * update redis test Signed-off-by: Mustafa <[email protected]> * update retrievers Signed-off-by: Mustafa <[email protected]> * update redis-delete_files Signed-off-by: Mustafa <[email protected]> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: Mustafa <[email protected]> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Abolfazl Shahbazi <[email protected]> Signed-off-by: Anthony Mahanna <[email protected]>
1 parent 6f5af84 commit d837d2d

File tree

8 files changed

+168
-39
lines changed

8 files changed

+168
-39
lines changed

comps/cores/proto/api_protocol.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ class ChatCompletionRequest(BaseModel):
262262
lambda_mult: float = 0.5
263263
score_threshold: float = 0.2
264264
retrieved_docs: Union[List[RetrievalResponseData], List[Dict[str, Any]]] = Field(default_factory=list)
265+
index_name: Optional[str] = None
265266

266267
# reranking
267268
top_n: int = 1

comps/cores/proto/docarray.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ class EmbedDoc(BaseDoc):
102102
lambda_mult: float = 0.5
103103
score_threshold: float = 0.2
104104
constraints: Optional[Union[Dict[str, Any], List[Dict[str, Any]], None]] = None
105+
index_name: Optional[str] = None
105106

106107

107108
class EmbedMultimodalDoc(EmbedDoc):
@@ -225,6 +226,7 @@ class LLMParams(BaseDoc):
225226
repetition_penalty: float = 1.03
226227
stream: bool = True
227228
language: str = "auto" # can be "en", "zh"
229+
index_name: Optional[str] = None
228230

229231
chat_template: Optional[str] = Field(
230232
default=None,

comps/dataprep/src/integrations/redis.py

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -187,14 +187,17 @@ async def delete_by_id(client, id):
187187
return True
188188

189189

190-
async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder):
190+
async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder, index_name: str):
191191
if logflag:
192192
logger.info(f"[ redis ingest chunks ] file name: {file_name}")
193193

194194
# Batch size
195195
batch_size = 32
196196
num_chunks = len(chunks)
197197

198+
# if data will be saved to a different index name than the default one
199+
ingest_index_name = index_name if index_name else INDEX_NAME
200+
198201
file_ids = []
199202
for i in range(0, num_chunks, batch_size):
200203
if logflag:
@@ -206,7 +209,7 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder):
206209
Redis.from_texts_return_keys,
207210
texts=batch_texts,
208211
embedding=embedder,
209-
index_name=INDEX_NAME,
212+
index_name=ingest_index_name,
210213
redis_url=REDIS_URL,
211214
)
212215
if logflag:
@@ -222,15 +225,15 @@ async def ingest_chunks_to_redis(file_name: str, chunks: List, embedder):
222225
await create_index(client)
223226

224227
try:
225-
await store_by_id(client, key=file_name, value="#".join(file_ids))
228+
await store_by_id(client, key=encode_filename(ingest_index_name) + "_" + file_name, value="#".join(file_ids))
226229
except Exception as e:
227230
if logflag:
228231
logger.info(f"[ redis ingest chunks ] {e}. Fail to store chunks of file {file_name}.")
229232
raise HTTPException(status_code=500, detail=f"Fail to store chunks of file {file_name}.")
230233
return True
231234

232235

233-
async def ingest_data_to_redis(doc_path: DocPath, embedder):
236+
async def ingest_data_to_redis(doc_path: DocPath, embedder, index_name):
234237
"""Ingest document to Redis."""
235238
path = doc_path.path
236239
if logflag:
@@ -271,7 +274,7 @@ async def ingest_data_to_redis(doc_path: DocPath, embedder):
271274
logger.info(f"[ redis ingest data ] Done preprocessing. Created {len(chunks)} chunks of the given file.")
272275

273276
file_name = doc_path.path.split("/")[-1]
274-
return await ingest_chunks_to_redis(file_name, chunks, embedder)
277+
return await ingest_chunks_to_redis(file_name, chunks, embedder, index_name)
275278

276279

277280
@OpeaComponentRegistry.register("OPEA_DATAPREP_REDIS")
@@ -360,6 +363,7 @@ async def ingest_files(
360363
process_table: bool = Form(False),
361364
table_strategy: str = Form("fast"),
362365
ingest_from_graphDB: bool = Form(False),
366+
index_name: str = Form(None),
363367
):
364368
"""Ingest files/links content into redis database.
365369
@@ -372,6 +376,7 @@ async def ingest_files(
372376
chunk_overlap (int, optional): The overlap between chunks. Defaults to Form(100).
373377
process_table (bool, optional): Whether to process tables in PDFs. Defaults to Form(False).
374378
table_strategy (str, optional): The strategy to process tables in PDFs. Defaults to Form("fast").
379+
index_name (str, optional): The name of the index where data will be ingested.
375380
"""
376381
if logflag:
377382
logger.info(f"[ redis ingest ] files:{files}")
@@ -384,7 +389,9 @@ async def ingest_files(
384389

385390
for file in files:
386391
encode_file = encode_filename(file.filename)
387-
doc_id = "file:" + encode_file
392+
index_name_id = encode_filename(INDEX_NAME if index_name is None else index_name)
393+
doc_id = "file:" + index_name_id + "_" + encode_file
394+
388395
if logflag:
389396
logger.info(f"[ redis ingest ] processing file {doc_id}")
390397

@@ -400,7 +407,7 @@ async def ingest_files(
400407
if key_ids:
401408
raise HTTPException(
402409
status_code=400,
403-
detail=f"Uploaded file {file.filename} already exists. Please change file name.",
410+
detail=f"Uploaded file {file.filename} already exists. Please change file name or index name.",
404411
)
405412

406413
save_path = upload_folder + encode_file
@@ -414,6 +421,7 @@ async def ingest_files(
414421
table_strategy=table_strategy,
415422
),
416423
self.embedder,
424+
index_name,
417425
)
418426
uploaded_files.append(save_path)
419427
if logflag:
@@ -430,7 +438,8 @@ async def ingest_files(
430438
raise HTTPException(status_code=400, detail=f"Link_list {link_list} should be a list.")
431439
for link in link_list:
432440
encoded_link = encode_filename(link)
433-
doc_id = "file:" + encoded_link + ".txt"
441+
index_name_id = encode_filename(INDEX_NAME if index_name is None else index_name)
442+
doc_id = "file:" + index_name_id + "_" + encoded_link + ".txt"
434443
if logflag:
435444
logger.info(f"[ redis ingest] processing link {doc_id}")
436445

@@ -445,7 +454,8 @@ async def ingest_files(
445454
logger.info(f"[ redis ingest] Link {link} does not exist. Keep storing.")
446455
if key_ids:
447456
raise HTTPException(
448-
status_code=400, detail=f"Uploaded link {link} already exists. Please change another link."
457+
status_code=400,
458+
detail=f"Uploaded link {link} already exists. Please change another link or index_name.",
449459
)
450460

451461
save_path = upload_folder + encoded_link + ".txt"
@@ -460,6 +470,7 @@ async def ingest_files(
460470
table_strategy=table_strategy,
461471
),
462472
self.embedder,
473+
index_name,
463474
)
464475
if logflag:
465476
logger.info(f"[ redis ingest] Successfully saved link list {link_list}")
@@ -505,7 +516,7 @@ async def get_files(self):
505516
logger.info(f"[get] final file_list: {file_list}")
506517
return file_list
507518

508-
async def delete_files(self, file_path: str = Body(..., embed=True)):
519+
async def delete_files(self, file_path: str = Body(..., embed=True), index_name: str = Body(None, embed=True)):
509520
"""Delete file according to `file_path`.
510521
511522
`file_path`:
@@ -531,17 +542,19 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
531542
else:
532543
logger.info(f"[ redis delete ] Index {KEY_INDEX_NAME} does not exits.")
533544

534-
# drop index INDEX_NAME
535-
if await check_index_existance(self.data_index_client):
536-
try:
537-
assert drop_index(index_name=INDEX_NAME)
538-
except Exception as e:
539-
if logflag:
540-
logger.info(f"[ redis delete ] {e}. Fail to drop index {INDEX_NAME}.")
541-
raise HTTPException(status_code=500, detail=f"Fail to drop index {INDEX_NAME}.")
545+
if len(self.get_list_of_indices()) > 0:
546+
for i in self.get_list_of_indices():
547+
try:
548+
# drop index INDEX_NAME
549+
assert drop_index(index_name=i)
550+
logger.info(f"[ redis delete ] Index_name: {i} is deleted.")
551+
except Exception as e:
552+
if logflag:
553+
logger.info(f"[ redis delete ] {e}. Fail to drop index {i}.")
554+
raise HTTPException(status_code=500, detail=f"Fail to drop index {i}.")
542555
else:
543556
if logflag:
544-
logger.info(f"[ redis delete ] Index {INDEX_NAME} does not exits.")
557+
logger.info("[ redis delete ] There is no index_name registered to redis db.")
545558

546559
# delete files on local disk
547560
try:
@@ -563,7 +576,10 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
563576
logger.info(f"[ redis delete ] delete_path: {delete_path}")
564577

565578
# partially delete files
566-
doc_id = "file:" + encode_filename(file_path)
579+
encode_file = encode_filename(file_path)
580+
index_name = INDEX_NAME if index_name is None else index_name
581+
index_name_id = encode_filename(index_name)
582+
doc_id = "file:" + index_name_id + "_" + encode_file
567583
logger.info(f"[ redis delete ] doc id: {doc_id}")
568584

569585
# determine whether this file exists in db KEY_INDEX_NAME
@@ -587,16 +603,16 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
587603
logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {KEY_INDEX_NAME}.")
588604
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for key index.")
589605

590-
# delete file content in db INDEX_NAME
606+
# delete file content in db index_name
591607
for file_id in file_ids:
592-
# determine whether this file exists in db INDEX_NAME
608+
# determine whether this file exists in db index_name
593609
try:
594610
await search_by_id(self.data_index_client, file_id)
595611
except Exception as e:
596612
if logflag:
597613
logger.info(f"[ redis delete ] {e}. File {file_path} does not exists.")
598614
raise HTTPException(
599-
status_code=404, detail=f"File not found in db {INDEX_NAME}. Please check file_path."
615+
status_code=404, detail=f"File not found in db {index_name}. Please check file_path."
600616
)
601617

602618
# delete file content
@@ -605,7 +621,7 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
605621
assert res
606622
except Exception as e:
607623
if logflag:
608-
logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {INDEX_NAME}")
624+
logger.info(f"[ redis delete ] {e}. File {file_path} delete failed for db {index_name}")
609625
raise HTTPException(status_code=500, detail=f"File {file_path} delete failed for index.")
610626

611627
# local file does not exist (restarted docker container)
@@ -627,3 +643,15 @@ async def delete_files(self, file_path: str = Body(..., embed=True)):
627643
if logflag:
628644
logger.info(f"[ redis delete ] Delete folder {file_path} is not supported for now.")
629645
raise HTTPException(status_code=404, detail=f"Delete folder {file_path} is not supported for now.")
646+
647+
def get_list_of_indices(self):
648+
"""Retrieves a list of all indices from the Redis client.
649+
650+
Returns:
651+
A list of index names as strings.
652+
"""
653+
# Execute the command to list all indices
654+
indices = self.client.execute_command("FT._LIST")
655+
# Decode each index name from bytes to string
656+
indices_list = [item.decode("utf-8") for item in indices]
657+
return indices_list

comps/dataprep/src/opea_dataprep_loader.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ async def delete_files(self, *args, **kwargs):
3232
logger.info("[ dataprep loader ] delete files")
3333
return await self.component.delete_files(*args, **kwargs)
3434

35+
async def get_list_of_indices(self, *args, **kwargs):
36+
if logflag:
37+
logger.info("[ dataprep loader ] get indices")
38+
return self.component.get_list_of_indices(*args, **kwargs)
39+
3540

3641
class OpeaDataprepMultiModalLoader(OpeaComponentLoader):
3742
def __init__(self, component_name, **kwargs):

comps/dataprep/src/opea_dataprep_microservice.py

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ async def ingest_files(
5757
process_table: bool = Form(False),
5858
table_strategy: str = Form("fast"),
5959
ingest_from_graphDB: bool = Form(False),
60+
index_name: Optional[str] = Form(None),
6061
):
6162
start = time.time()
6263

@@ -66,9 +67,28 @@ async def ingest_files(
6667

6768
try:
6869
# Use the loader to invoke the component
69-
response = await loader.ingest_files(
70-
files, link_list, chunk_size, chunk_overlap, process_table, table_strategy, ingest_from_graphDB
71-
)
70+
if dataprep_component_name == "OPEA_DATAPREP_REDIS":
71+
response = await loader.ingest_files(
72+
files,
73+
link_list,
74+
chunk_size,
75+
chunk_overlap,
76+
process_table,
77+
table_strategy,
78+
ingest_from_graphDB,
79+
index_name,
80+
)
81+
else:
82+
if index_name:
83+
logger.error(
84+
'Error during dataprep ingest invocation: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"'
85+
)
86+
raise
87+
88+
response = await loader.ingest_files(
89+
files, link_list, chunk_size, chunk_overlap, process_table, table_strategy, ingest_from_graphDB
90+
)
91+
7292
# Log the result if logging is enabled
7393
if logflag:
7494
logger.info(f"[ ingest ] Output generated: {response}")
@@ -116,15 +136,25 @@ async def get_files():
116136
port=5000,
117137
)
118138
@register_statistics(names=["opea_service@dataprep"])
119-
async def delete_files(file_path: str = Body(..., embed=True)):
139+
async def delete_files(file_path: str = Body(..., embed=True), index_name: str = Body(None, embed=True)):
120140
start = time.time()
121141

122142
if logflag:
123143
logger.info("[ delete ] start to delete ingested files")
124144

125145
try:
126146
# Use the loader to invoke the component
127-
response = await loader.delete_files(file_path)
147+
if dataprep_component_name == "OPEA_DATAPREP_REDIS":
148+
response = await loader.delete_files(file_path, index_name)
149+
else:
150+
if index_name:
151+
logger.error(
152+
'Error during dataprep delete files: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"'
153+
)
154+
raise
155+
# Use the loader to invoke the component
156+
response = await loader.delete_files(file_path)
157+
128158
# Log the result if logging is enabled
129159
if logflag:
130160
logger.info(f"[ delete ] deleted result: {response}")
@@ -136,6 +166,42 @@ async def delete_files(file_path: str = Body(..., embed=True)):
136166
raise
137167

138168

169+
@register_microservice(
170+
name="opea_service@dataprep",
171+
service_type=ServiceType.DATAPREP,
172+
endpoint="/v1/dataprep/indices",
173+
host="0.0.0.0",
174+
port=5000,
175+
)
176+
@register_statistics(names=["opea_service@dataprep"])
177+
async def get_list_of_indices():
178+
start = time.time()
179+
if logflag:
180+
logger.info("[ get ] start to get list of indices.")
181+
182+
if dataprep_component_name != "OPEA_DATAPREP_REDIS":
183+
logger.error(
184+
'Error during dataprep - get list of indices: "index_name" option is supported if "DATAPREP_COMPONENT_NAME" environment variable is set to "OPEA_DATAPREP_REDIS". i.e: export DATAPREP_COMPONENT_NAME="OPEA_DATAPREP_REDIS"'
185+
)
186+
raise
187+
188+
try:
189+
# Use the loader to invoke the component
190+
response = await loader.get_list_of_indices()
191+
192+
# Log the result if logging is enabled
193+
if logflag:
194+
logger.info(f"[ get ] list of indices: {response}")
195+
196+
# Record statistics
197+
statistics_dict["opea_service@dataprep"].append_latency(time.time() - start, None)
198+
199+
return response
200+
except Exception as e:
201+
logger.error(f"Error during dataprep get list of indices: {e}")
202+
raise
203+
204+
139205
if __name__ == "__main__":
140206
logger.info("OPEA Dataprep Microservice is starting...")
141207
create_upload_folder(upload_folder)

0 commit comments

Comments
 (0)