diff --git a/moonstreamapi/moonstreamapi/actions.py b/moonstreamapi/moonstreamapi/actions.py index 836688bc5..ee061bc77 100644 --- a/moonstreamapi/moonstreamapi/actions.py +++ b/moonstreamapi/moonstreamapi/actions.py @@ -467,6 +467,9 @@ def get_all_entries_from_search( results: List[BugoutSearchResult] = [] + print(bc.spire_url) + print(journal_id) + try: existing_metods = bc.search( token=token, @@ -480,6 +483,7 @@ def get_all_entries_from_search( results.extend(existing_metods.results) except Exception as e: + logger.error(f"Error get entries: {str(e)}") reporter.error_report(e) if len(results) != existing_metods.total_results: diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index a765e8206..b4a1f184f 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -15,7 +15,7 @@ from ..settings import BUGOUT_BROOD_URL, BUGOUT_SPIRE_URL, MOONSTREAM_APPLICATION_ID from ..web3_provider import yield_web3_provider -from . import subscription_types, subscriptions, moonworm_tasks +from . import subscription_types, subscriptions, moonworm_tasks, queries from .migrations import ( checksum_address, update_dashboard_subscription_key, @@ -478,6 +478,75 @@ def main() -> None: parser_moonworm_tasks_add.set_defaults(func=moonworm_tasks_add_subscription_handler) + ### Add new cli here + + parser_queries = subcommands.add_parser( + "queries", description="Manage Moonstream queries journal and resources." + ) + parser_queries.set_defaults(func=lambda _: parser_queries.print_help()) + subcommands_queries = parser_queries.add_subparsers() + + parser_queries_ensure_tags = subcommands_queries.add_parser( + "ensure-canonical-tags", + description="Add connanical tags to query entries", + help="Add connanical tags to query entries", + ) + parser_queries_ensure_tags.set_defaults(func=queries.ensure_queries_tags) + + parser_queries_get_users = subcommands_queries.add_parser( + "get-users", + description="Return list of users with query count.", + help="Return list of users with query count.", + ) + parser_queries_get_users.set_defaults(func=queries.get_users) + + parser_queries_get_user_queries = subcommands_queries.add_parser( + "get-user-queries", + description="Return list of user queries.", + help="Return list of user queries.", + ) + + parser_queries_get_user_queries.add_argument( + "-u", + "--user-id", + type=str, + help="User id.", + ) + + parser_queries_get_user_queries.set_defaults(func=queries.get_user_queries) + + parser_queries_copy_user_queries = subcommands_queries.add_parser( + "copy-user-queries", + description="Copy user queries to another user.", + help="Copy user queries to another user.", + ) + + parser_queries_copy_user_queries.add_argument( + "-f", + "--from-user-id", + type=str, + required=True, + help="User id from wich account query will be copied.", + ) + + parser_queries_copy_user_queries.add_argument( + "-t", + "--to-user-token", + type=str, + required=True, + help="User id to wich account query will be copied.", + ) + + parser_queries_copy_user_queries.set_defaults(func=queries.copy_queries) + + parser_get_queries_tags = subcommands_queries.add_parser( + "get-queries-tags", + description="Return list of queries tags.", + help="Return list of queries tags.", + ) + + parser_get_queries_tags.set_defaults(func=queries.get_all_parameters_tags) + args = parser.parse_args() args.func(args) diff --git a/moonstreamapi/moonstreamapi/admin/queries.py b/moonstreamapi/moonstreamapi/admin/queries.py new file mode 100644 index 000000000..b9c745481 --- /dev/null +++ b/moonstreamapi/moonstreamapi/admin/queries.py @@ -0,0 +1,392 @@ +""" +Utilities for managing subscription type resources for a Moonstream application. +""" +import argparse +from collections import Counter +import json + +from bugout.data import BugoutResources +from bugout.exceptions import BugoutResponseException +from moonstream.client import Moonstream # type: ignore +import logging +from typing import Dict, Any +from sqlalchemy import text + + +from ..data import BUGOUT_RESOURCE_QUERY_RESOLVER +from ..settings import ( + BUGOUT_REQUEST_TIMEOUT_SECONDS, + MOONSTREAM_ADMIN_ACCESS_TOKEN, + MOONSTREAM_QUERIES_JOURNAL_ID, +) +from ..settings import bugout_client as bc +from ..actions import get_all_entries_from_search + + +logger = logging.getLogger(__name__) + + +def ensure_queries_tags(args: argparse.Namespace) -> None: + """ + Check all queries resources and check if they entry have all required tags. + """ + params = {"type": BUGOUT_RESOURCE_QUERY_RESOLVER} + resources: BugoutResources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, params=params + ) + + for resource in resources.resources: + if "entry_id" not in resource.resource_data: + print(f"Missing entry_id for {resource.id}") + continue + if "name" not in resource.resource_data: + print(f"Missing name for {resource.id}") + continue + if "type" not in resource.resource_data: + print(f"Missing type for {resource.id}") + continue + if "user" not in resource.resource_data: + print(f"Missing user for {resource.id}") + continue + if "user_id" not in resource.resource_data: + print(f"Missing user_id for {resource.id}") + continue + + # get entry + try: + entry = bc.get_entry( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + entry_id=resource.resource_data["entry_id"], + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + except BugoutResponseException as e: + print(f"Error getting entry {resource.resource_data['entry_id']} err: {e}") + continue + + print( + f"Entry {entry.id} {entry.title} {entry.journal_url} user: {resource.resource_data['user']}" + ) + required_tags = ["user_name", "query_name", "user_id", "query_id"] + for tag in entry.tags: + tag_prefix = tag.split(":")[0] + + print(tag_prefix) + if tag_prefix in required_tags: + required_tags.remove(tag_prefix) + + print(f"Missing tags for {resource.resource_data['entry_id']}: {required_tags}") + + tags_for_update = [] + if len(required_tags) > 0: + for required_tag in required_tags: + if required_tag == "user_name": + tag_value = resource.resource_data["user"] + elif required_tag == "query_name": + tag_value = resource.resource_data["name"] + elif required_tag == "user_id": + tag_value = resource.resource_data["user_id"] + elif required_tag == "query_id": + tag_value = resource.resource_data["entry_id"] + + tag = f"{required_tag}:{tag_value}" + tags_for_update.append(tag) + + bc.update_tags( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + entry_id=resource.resource_data["entry_id"], + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + tags=tags_for_update, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + +def get_users(args: argparse.Namespace) -> Dict[str, Any]: + """ + Return all users with queries count. + """ + + params = {"type": BUGOUT_RESOURCE_QUERY_RESOLVER} + resources: BugoutResources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, params=params + ) + + users = {} + for resource in resources.resources: + user_id = resource.resource_data["user_id"] + if user_id not in users: + users[user_id] = { + "user_id": user_id, + "user_name": resource.resource_data["user"], + "queries_count": 1, + } + else: + users[user_id]["queries_count"] += 1 + + logger.info(f"Users: {json.dumps(users, indent=4)}") + + return users + + +def get_user_queries(args: argparse.Namespace) -> Dict[str, Any]: + """ + Return all queries for user. + """ + + params = {"type": BUGOUT_RESOURCE_QUERY_RESOLVER} + resources: BugoutResources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, params=params + ) + + queries = {} + + for resource in resources.resources: + user_id = resource.resource_data["user_id"] + if user_id == args.user_id: + queries[resource.resource_data["entry_id"]] = { + "query_name": resource.resource_data["name"] + } + + # get entries + for query_id in queries: + entry = bc.get_entry( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + entry_id=query_id, + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + queries[query_id]["entry"] = entry + + if "preapprove" in entry.tags: + queries[query_id]["status"] = "preapprove" + elif "approved" in entry.tags: + queries[query_id]["status"] = "approved" + + for query_id in queries: + print( + f"{query_id} name: {queries[query_id]['entry'].title} status: {queries[query_id].get('status')}" + ) + + return queries + + +def copy_queries(args: argparse.Namespace) -> None: + """ + Copy all queries from one user to another. + """ + + from_user_id = args.from_user_id + + to_user_token = args.to_user_token + + params = {"type": BUGOUT_RESOURCE_QUERY_RESOLVER} + resources: BugoutResources = bc.list_resources( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, params=params + ) + + queries = {} + for resource in resources.resources: + user_id = resource.resource_data["user_id"] + if user_id == from_user_id: + queries[resource.resource_data["entry_id"]] = { + "query_id": resource.resource_data["entry_id"], + "query_name": resource.resource_data["name"], + "user_id": user_id, + "user_name": resource.resource_data["user"], + } + + # get entries + for query_id in queries: + entry = bc.get_entry( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + entry_id=query_id, + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + queries[query_id]["entry"] = entry + + client = Moonstream() + + for query_id, query in queries.items(): + datatype_tags = [ + tag + for tag in query["entry"].tags + if not any( + tag.startswith(x) + for x in [ + "user_name", + "query_name", + "user_id", + "query_id", + "preapprove", + "approved", + ] + ) + ] + + # create query via bugout client + try: + created_query = client.create_query( + token=to_user_token, + name=query["query_name"], + query=query["entry"].content, + ) + + except Exception as e: + logger.error(f"Error getting entry {query_id} err: {e}") + continue + + logger.info(f"Created query {created_query.id} {created_query.name}") + + if ( + "preapprove" not in query["entry"].tags + and "approved" in query["entry"].tags + ): + # Delete preapprove tag + + try: + bc.delete_tag( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + entry_id=created_query.id, + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + tag="preapprove", + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + except BugoutResponseException as e: + logger.error(f"Error in applind tags to query entry: {str(e)}") + continue + except Exception as e: + logger.error( + f"Error in applind tags to query entry: {str(e)} unknown error" + ) + continue + + print(f"Delete preapprove tag for {created_query.id}") + + try: + bc.create_tags( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + entry_id=created_query.id, + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + tags=["approved"] + datatype_tags, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + except BugoutResponseException as e: + logger.error(f"Error in applind tags to query entry: {str(e)}") + continue + except Exception as e: + logger.error( + f"Error in applind tags to query entry: {str(e)} unknown error" + ) + continue + + logger.info(f"Add approved tag for {created_query.id}") + + +def get_all_parameters_tags(args: argparse.Namespace) -> None: + """ + Get all parameters tags for all queries. + Directly from queries journal. + """ + + entries = get_all_entries_from_search( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + search_query="", # get all entries + limit=100, + ) + + parameters_tags = [] + + for entry in entries: + if entry.tags is None: + continue + + entry_parameters_tags = [ + tag + for tag in entry.tags + if not any( + tag.startswith(x) + for x in [ + "user_name", + "query_name", + "user_id", + "query_id", + "preapprove", + "approved", + ] + ) + ] + + parameters_tags.extend(entry_parameters_tags) + + print(Counter(parameters_tags)) + + +def search_unmapped_query(args: argparse.Namespace) -> None: + """ + Get all entries from queries journal and search for unmapped queries. + where query parameters not in tags. + """ + + entries = get_all_entries_from_search( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + search_query="", # get all entries + limit=100, + ) + + for entry in entries: + # Check if it can transform to TextClause + + if entry.content is None: + continue + + try: + query = text(entry.content) + except Exception as e: + logger.error( + f"Can't parse query {entry.id} to TextClause in drones /query_update endpoint, error: {e}" # type: ignore + ) + + # Get requried keys for query + expected_query_parameters = query._bindparams.keys() + + entry_parameters_tags = [ + tag + for tag in entry.tags + if not any( + tag.startswith(x) + for x in [ + "user_name", + "query_name", + "user_id", + "query_id", + "preapprove", + "approved", + ] + ) + ] + + # make keys from tags + + entry_parameters_tags_dict = { + tag.split(":")[0]: tag.split(":")[1] for tag in entry_parameters_tags + } + + # make ordered set from keys + + expected_parameters = list(expected_query_parameters).sort() + + entry_parameters = list(entry_parameters_tags_dict.keys()).sort() + + if expected_parameters != entry_parameters: + print(f"Query {entry.id} {entry.title} not mapped") # type: ignore + print(f"Expected parameters: {expected_parameters}") + print(f"Entry parameters: {entry_parameters}") + print(f"Entry parameters tags: {entry_parameters_tags_dict}") + # print(f"Query: {entry.content}") + print("=====================================") diff --git a/moonstreamapi/setup.py b/moonstreamapi/setup.py index dc940b792..f1a2c17fb 100644 --- a/moonstreamapi/setup.py +++ b/moonstreamapi/setup.py @@ -16,6 +16,7 @@ "bugout>=0.1.19", "moonstream-entity>=0.0.5", "fastapi", + "moonstream>=0.1.1", "moonstreamdb>=0.3.3", "humbug", "pydantic",