Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ After the role is created, and then add permission by creating inline policy as
"arn:aws:bedrock:us-west-2::foundation-model/*",
"arn:aws:dynamodb:us-west-2:**YOURACCOUNTID**:table/Nlq*"
]
},
{
"Sid": "SecretsManagerAccess",
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": "arn:aws:secretsmanager:us-west-2:**YOURACCOUNTID**:secret:*"
}
]
}
Expand All @@ -88,7 +96,7 @@ sudo su - ec2-user

```bash
# Install components
sudo yum install docker python3-pip git -y && pip3 install -U awscli && pip3 install docker-compose
sudo dnf install docker python3-pip git -y && pip3 install -U awscli && pip3 install docker-compose

# For Amazon Linux 2,use yum to replace dnf

Expand Down
5 changes: 3 additions & 2 deletions application/.env.cntemplate
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ AOS_PASSWORD=admin
BEDROCK_REGION=cn-north-1
RDS_REGION_NAME=cn-north-1
AWS_DEFAULT_REGION=cn-north-1

DYNAMODB_AWS_REGION=cn-north-1

SAGEMAKER_ENDPOINT_EMBEDDING=embedding-bge-m3-3ab71
SAGEMAKER_ENDPOINT_INTENT=llm-internlm2-chat-7b-3ab71
SAGEMAKER_ENDPOINT_SQL=sql-sqlcoder-7b-2-7e5b6
SAGEMAKER_ENDPOINT_EXPLAIN=llm-internlm2-chat-7b-3ab71
SAGEMAKER_ENDPOINT_EXPLAIN=llm-internlm2-chat-7b-3ab71

EMBEDDING_DIMENSION=1024
4 changes: 3 additions & 1 deletion application/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,7 @@ AOS_PASSWORD=admin
BEDROCK_REGION=us-west-2
RDS_REGION_NAME=us-west-2
AWS_DEFAULT_REGION=us-west-2

DYNAMODB_AWS_REGION=us-west-2

EMBEDDING_DIMENSION=1024
BEDROCK_EMBEDDING_MODEL=amazon.titan-embed-text-v1
32 changes: 16 additions & 16 deletions application/nlq/business/vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import boto3
import json
from nlq.data_access.opensearch import OpenSearchDao
from utils.env_var import BEDROCK_REGION, AOS_HOST, AOS_PORT, AOS_USER, AOS_PASSWORD
from utils.env_var import BEDROCK_REGION, AOS_HOST, AOS_PORT, AOS_USER, AOS_PASSWORD, opensearch_info

logger = logging.getLogger(__name__)

Expand All @@ -15,7 +15,7 @@ class VectorStore:
@classmethod
def get_all_samples(cls, profile_name):
logger.info(f'get all samples for {profile_name}...')
samples = cls.opensearch_dao.retrieve_samples('uba', profile_name)
samples = cls.opensearch_dao.retrieve_samples(opensearch_info['sql_index'], profile_name)

sample_list = []
for sample in samples:
Expand All @@ -30,7 +30,7 @@ def get_all_samples(cls, profile_name):
@classmethod
def get_all_entity_samples(cls, profile_name):
logger.info(f'get all samples for {profile_name}...')
samples = cls.opensearch_dao.retrieve_entity_samples('uba_ner', profile_name)
samples = cls.opensearch_dao.retrieve_entity_samples(opensearch_info['ner_index'], profile_name)

sample_list = []
if samples is None:
Expand All @@ -48,7 +48,7 @@ def get_all_entity_samples(cls, profile_name):
@classmethod
def get_all_agent_cot_samples(cls, profile_name):
logger.info(f'get all agent cot samples for {profile_name}...')
samples = cls.opensearch_dao.retrieve_agent_cot_samples('uba_agent', profile_name)
samples = cls.opensearch_dao.retrieve_agent_cot_samples(opensearch_info['agent_index'], profile_name)

sample_list = []
if samples is None:
Expand All @@ -67,30 +67,30 @@ def get_all_agent_cot_samples(cls, profile_name):
def add_sample(cls, profile_name, question, answer):
logger.info(f'add sample question: {question} to profile {profile_name}')
embedding = cls.create_vector_embedding_with_bedrock(question)
has_same_sample = cls.search_same_query(profile_name, 1, 'uba', embedding)
has_same_sample = cls.search_same_query(profile_name, 1, opensearch_info['sql_index'], embedding)
if has_same_sample:
logger.info(f'delete sample sample entity: {question} to profile {profile_name}')
if cls.opensearch_dao.add_sample('uba', profile_name, question, answer, embedding):
if cls.opensearch_dao.add_sample(opensearch_info['sql_index'], profile_name, question, answer, embedding):
logger.info('Sample added')

@classmethod
def add_entity_sample(cls, profile_name, entity, comment):
logger.info(f'add sample entity: {entity} to profile {profile_name}')
embedding = cls.create_vector_embedding_with_bedrock(entity)
has_same_sample = cls.search_same_query(profile_name, 1, 'uba_ner', embedding)
has_same_sample = cls.search_same_query(profile_name, 1, opensearch_info['ner_index'], embedding)
if has_same_sample:
logger.info(f'delete sample sample entity: {entity} to profile {profile_name}')
if cls.opensearch_dao.add_entity_sample('uba_ner', profile_name, entity, comment, embedding):
if cls.opensearch_dao.add_entity_sample(opensearch_info['ner_index'], profile_name, entity, comment, embedding):
logger.info('Sample added')

@classmethod
def add_agent_cot_sample(cls, profile_name, entity, comment):
logger.info(f'add agent sample query: {entity} to profile {profile_name}')
embedding = cls.create_vector_embedding_with_bedrock(entity)
has_same_sample = cls.search_same_query(profile_name, 1, 'uba_agent', embedding)
has_same_sample = cls.search_same_query(profile_name, 1, opensearch_info['agent_index'], embedding)
if has_same_sample:
logger.info(f'delete agent sample sample query: {entity} to profile {profile_name}')
if cls.opensearch_dao.add_agent_cot_sample('uba_agent', profile_name, entity, comment, embedding):
if cls.opensearch_dao.add_agent_cot_sample(opensearch_info['agent_index'], profile_name, entity, comment, embedding):
logger.info('Sample added')

@classmethod
Expand All @@ -113,19 +113,19 @@ def create_vector_embedding_with_bedrock(cls, text):
@classmethod
def delete_sample(cls, profile_name, doc_id):
logger.info(f'delete sample question id: {doc_id} from profile {profile_name}')
ret = cls.opensearch_dao.delete_sample('uba', profile_name, doc_id)
ret = cls.opensearch_dao.delete_sample(opensearch_info['sql_index'], profile_name, doc_id)
print(ret)

@classmethod
def delete_entity_sample(cls, profile_name, doc_id):
logger.info(f'delete sample question id: {doc_id} from profile {profile_name}')
ret = cls.opensearch_dao.delete_sample('uba_ner', profile_name, doc_id)
ret = cls.opensearch_dao.delete_sample(opensearch_info['ner_index'], profile_name, doc_id)
print(ret)

@classmethod
def delete_agent_cot_sample(cls, profile_name, doc_id):
logger.info(f'delete sample question id: {doc_id} from profile {profile_name}')
ret = cls.opensearch_dao.delete_sample('uba_agent', profile_name, doc_id)
ret = cls.opensearch_dao.delete_sample(opensearch_info['agent_index'], profile_name, doc_id)
print(ret)

@classmethod
Expand All @@ -147,13 +147,13 @@ def search_same_query(cls, profile_name, top_k, index_name, embedding):
similarity_score = similarity_sample["_score"]
similarity_id = similarity_sample['_id']
if similarity_score == 1.0:
if index_name == "uba":
if index_name == opensearch_info['sql_index']:
cls.delete_sample(profile_name, similarity_id)
return True
elif index_name == "uba_ner":
elif index_name == opensearch_info['ner_index']:
cls.delete_entity_sample(profile_name, similarity_id)
return True
elif index_name == "uba_agent":
elif index_name == opensearch_info['agent_index']:
cls.delete_agent_cot_sample(profile_name, similarity_id)
return True
else:
Expand Down
169 changes: 13 additions & 156 deletions application/opensearch_deploy.py
Original file line number Diff line number Diff line change
@@ -1,105 +1,27 @@
import json
from utils import opensearch
from opensearchpy import OpenSearch
from dotenv import load_dotenv
import os
import boto3
import sys
import logging

from nlq.business.vector_store import VectorStore
from utils.opensearch import get_opensearch_cluster_client, opensearch_index_init
from utils.env_var import opensearch_info

logger = logging.getLogger(__name__)

load_dotenv()

SAGEMAKER_ENDPOINT_EMBEDDING = os.getenv('SAGEMAKER_ENDPOINT_EMBEDDING', '')

AOS_HOST = os.getenv('AOS_HOST', '')
AOS_PORT = os.getenv('AOS_PORT', 9200)
AOS_USER = os.getenv('AOS_USER', 'admin')
AOS_PASSWORD = os.getenv('AOS_PASSWORD', 'admin')
AOS_DOMAIN = os.getenv('AOS_DOMAIN', 'llm-data-analytics')
AOS_REGION = os.getenv('AOS_REGION')
AOS_INDEX = os.getenv('AOS_INDEX', 'uba')
AOS_INDEX_NER = os.getenv('AOS_INDEX_NER', 'uba_ner')
AOS_INDEX_AGENT = os.getenv('AOS_INDEX_AGENT', 'uba_agent')
AOS_TYPE = os.getenv('AOS_TYPE', 'uba')
BEDROCK_REGION = os.getenv('BEDROCK_REGION')

REGION_NAME = AOS_REGION
early_stop_record_count = 100
index_name = AOS_INDEX
index_name_ner = AOS_INDEX_NER
index_name_agent = AOS_INDEX_AGENT
opensearch_user = AOS_USER
opensearch_password = AOS_PASSWORD
# create opensearch domain
domain = AOS_DOMAIN


def index_to_opensearch():
create_index = True
if len(sys.argv) == 1:
from deployment.default_index_data import bulk_questions
logger.info(f'found {len(bulk_questions)} questions in default collection')
elif 2 <= len(sys.argv) < 4:
from deployment.custom_index_data_sample import custom_bulk_questions
bulk_questions = custom_bulk_questions[sys.argv[1]]
logger.info(f'found {len(bulk_questions)} questions in {sys.argv[1]} collection')
if len(sys.argv) == 3 and sys.argv[2] == 'false':
create_index = False
else:
logger.info('Usage: python3 opensearch_deploy.py <collection_name> <create_index:bool>')
logger.info(' create_index: true (default) or false')
return

if AOS_HOST == '':
# add a new opensearch domain named llm-data-analytics in us-west-2
client = boto3.client('opensearch', region_name=REGION_NAME)
client.create_domain(
DomainName='llm-data-analytics',
EngineVersion='OpenSearch_2.7',
NodeToNodeEncryptionOptions={
'Enabled': True
},
EncryptionAtRestOptions={
'Enabled': True
},
AdvancedSecurityOptions={
'Enabled': True,
'InternalUserDatabaseEnabled': True,
'MasterUserOptions': {
'MasterUserName': 'admin',
'MasterUserPassword': 'Admin&123'
}
},
DomainEndpointOptions={
'EnforceHTTPS': True
},
EBSOptions={
'EBSEnabled': True,
'VolumeType': 'gp2',
'VolumeSize': 10
}
)

# initiate AWS OpenSearch client and insert new data into the index
opensearch_client = opensearch.get_opensearch_cluster_client(domain, opensearch_user, opensearch_password,
REGION_NAME,
index_name)
else:
auth = (opensearch_user, opensearch_password)
host = AOS_HOST
port = AOS_PORT
# Create the client with SSL/TLS enabled, but hostname verification disabled.
opensearch_client = OpenSearch(
hosts=[{'host': host, 'port': port}],
http_compress=True, # enables gzip compression for request bodies
http_auth=auth,
use_ssl=True,
verify_certs=False,
ssl_assert_hostname=False,
ssl_show_warn=False
)
opensearch_client = get_opensearch_cluster_client(opensearch_info["domain"], opensearch_info["host"], opensearch_info["port"],
opensearch_info["username"], opensearch_info["password"], opensearch_info["region"])

def create_vector_embedding_with_bedrock(text, index_name, bedrock_client):
payload = {"inputText": f"{text}"}
Expand Down Expand Up @@ -141,10 +63,6 @@ def get_sagemaker_client():
sagemaker_client = boto3.client("sagemaker-runtime")
return sagemaker_client


# Check if to delete OpenSearch index with the argument passed to the script --recreate 1
# response = opensearch.delete_opensearch_index(opensearch_client, name)

# Initialize
if SAGEMAKER_ENDPOINT_EMBEDDING:
dimension = 1024
Expand All @@ -153,75 +71,14 @@ def get_sagemaker_client():
dimension = 1536
bedrock_client = get_bedrock_client(BEDROCK_REGION)

exists = opensearch.check_opensearch_index(opensearch_client, index_name)
if not exists:
logger.info("Creating OpenSearch index")
success = opensearch.create_index(opensearch_client, index_name)
if success:
logger.info("Creating OpenSearch index mapping")
success = opensearch.create_index_mapping(opensearch_client, index_name, dimension)
logger.info(f"OpenSearch Index mapping created")
opensearch_index_flag = opensearch_index_init()
if not opensearch_index_flag:
logger.info("OpenSearch Index Create Fail")
else:
if create_index:
logger.info("Index already exists. Exit with 0 now.")

all_records = bulk_questions

# Vector embedding using Amazon Bedrock Titan text embedding
all_json_records = []
logger.info(f"Creating embeddings for records")

# using the arg --early-stop
i = 0
for record in all_records:
i += 1
if SAGEMAKER_ENDPOINT_EMBEDDING:
records_with_embedding = create_vector_embedding_with_sagemaker(record['question'], index_name,
sagemaker_client)
else:
records_with_embedding = create_vector_embedding_with_bedrock(record['question'], index_name,
bedrock_client)
logger.info(f"Embedding for record {i} created")
records_with_embedding['sql'] = record['sql']
records_with_embedding['profile'] = record.get('profile', 'default')
all_json_records.append(records_with_embedding)
if i % 500 == 0 or i == len(all_records):
# Bulk put all records to OpenSearch
success, failed = opensearch.put_bulk_in_opensearch(all_json_records, opensearch_client)
all_json_records = []
logger.info(f"Documents saved {success}, documents failed to save {failed}")

logger.info("Finished creating records using Amazon Bedrock Titan text embedding")

# init index_name_ner
if AOS_HOST == '':
opensearch_client = opensearch.get_opensearch_cluster_client(domain, opensearch_user, opensearch_password,
REGION_NAME,
index_name_ner)

exists_ner = opensearch.check_opensearch_index(opensearch_client, index_name_ner)
if not exists_ner:
logger.info("Creating OpenSearch Ner index")
success = opensearch.create_index(opensearch_client, index_name_ner)
if success:
logger.info("Creating OpenSearch Ner Index mapping")
success = opensearch.create_index_mapping(opensearch_client, index_name_ner, dimension)
logger.info(f"OpenSearch Ner Index mapping created")

# init index_name_agent
if AOS_HOST == '':
opensearch_client = opensearch.get_opensearch_cluster_client(domain, opensearch_user, opensearch_password,
REGION_NAME,
index_name_agent)

exists_agent = opensearch.check_opensearch_index(opensearch_client, index_name_agent)
if not exists_agent:
logger.info("Create OpenSearch Agent index")
success = opensearch.create_index(opensearch_client, index_name_agent)
if success:
logger.info("Creating OpenSearch Agent Index mapping")
success = opensearch.create_index_mapping(opensearch_client, index_name_agent, dimension)
logger.info(f"OpenSearch Agent Index mapping created")
current_profile = "entity_insert_test"
entity = "环比"
comment = "环比增长率是指本期和上期相比较的增长率,计算公式为:环比增长率 =(本期数-上期数)/ 上期数 ×100%"
VectorStore.add_entity_sample(current_profile, entity, comment)


if __name__ == "__main__":
Expand Down
Loading