diff --git a/app/README.md b/app/README.md index 3f92e91..47ef8ad 100644 --- a/app/README.md +++ b/app/README.md @@ -7,24 +7,49 @@ This is an add-on powered by the Splunk Add-on Builder. The Databricks Add-on for Splunk is used to query Databricks data and execute Databricks notebooks from Splunk. * Author - Databricks, Inc. -* Version - 1.2.0 +* Version - 1.4.1 * Creates Index - False * Prerequisites - - * This application requires appropriate credentials to query data from Databricks platform. For Details refer to Configuration > Add Databricks Credentials section. + * This application requires appropriate credentials to query data from the Databricks platform. For Details refer to Configuration > Add Databricks Credentials section. * Compatible with: - * Splunk Enterprise version: 8.2.x and 9.0.x + * Splunk Enterprise version: 9.2.x, 9.1.x and 9.0.x * REST API: 1.2 and 2.0 * OS: Platform independent * Browser: Safari, Chrome and Firefox +# RELEASE NOTES VERSION 1.4.1 +* Modified timestamp format for log messages. +* Minor bugfix. + +# RELEASE NOTES VERSION 1.4.0 +* Introduced threading mechanism to fetch query results faster. Thread count can be configured from 'Addidtional Parameters' page. +* Databricks Warehouse will now starts automatically if not running while running databricksquery command. + +# RELEASE NOTES VERSION 1.3.1 +* Added backward compatibility of Interactive Cluster along with DBSQL Warehouse for databricksquery command. +* "Query Result Limit" configuration is supported only for DBSQL Warehouse for using databricksquery command. + +# RELEASE NOTES VERSION 1.3.0 + +* Introduced Run cancelation feature. Cancel the run from the "Databricks Job Execution Details" dashboard by clicking on the "Cancel Run" button and cancel the query execution by clicking on the Splunk search stop button. +* Introduced "Command Timeout Value", "Query Result Limit" and "Index" option configurations on the "Configuration" page. +* Introduced the "Execution Status" column on the "Databricks Job Execution Details" dashboard to display the current status of the triggered run. The status will be updated every 5 minutes. +* Introduced the "UID" column on the "Databricks Job Execution Details" dashboard to track more details of respective runs. +* Introduced DBSQL Warehouse to overcome limitation of number of results obtained in databricksquery command. +* Added "Run Name" parameter in the "Launch Notebook" dashboard. +* Execution details will be now stored in the index instead of the KV Store lookup. +* Updated DNS Resolution help text on the Proxy configuration screen. +* Enhanced logs and introduced UID in logs to distinguish each command execution logs. +* Removed the table command dependency from the custom command searches. + # RELEASE NOTES VERSION 1.2.0 * Updated the Add-on to allow non-admin users to execute the custom commands. -* Added support for creation of multiple accounts. +* Added support for the creation of multiple accounts. * Added a mechanism to use Proxy just for AAD authentication. # RELEASE NOTES VERSION 1.1.0 * Added support for authentication through Azure Active Directory for Azure Databricks instance. -* Introduced an alert action to run a parameterized notebook on Databricks instance. +* Introduced an alert action to run a parameterized notebook on the Databricks instance. * Added saved search to delete old notebook run details from lookup. * Added macro to specify the retaining days in saved search. * Added a custom command databricksretiredrun to delete specific databricks notebook run details from lookup based on provided parameters. @@ -43,25 +68,25 @@ The Databricks Add-on for Splunk is used to query Databricks data and execute Da 2. **Distributed Environment**: * Install the Databricks Add-on for Splunk on the search head and configure an account to use the custom commands. * In case of deployment in the search head cluster environment use a deployer to push the apps. Follow the below steps to push the apps to search head cluster members: - * On deployer node, extract the app at $SPLUNK_HOME$/etc/shcluster/apps. - * Create a `shclustering` stanza at path $SPLUNK_HOME$/etc/shcluster/apps/TA-Databricks/local/server.conf and add following information to the stanza: conf_replication_include.ta_databricks_settings = true as shown below. + * On the deployer node, extract the app at $SPLUNK_HOME$/etc/shcluster/apps. + * Create a `shclustering` stanza at path $SPLUNK_HOME$/etc/shcluster/apps/TA-Databricks/local/server.conf and add the following information to the stanza: conf_replication_include.ta_databricks_settings = true as shown below. * `[shclustering]` * `conf_replication_include.ta_databricks_settings = true` * Push the bundle to search head members # INSTALLATION -Databricks Add-on for Splunk can be installed through UI using "Manage Apps" > "Install app from file" or by extracting tarball directly into $SPLUNK_HOME/etc/apps/ folder. +Databricks Add-on for Splunk can be installed through UI using "Manage Apps" > "Install the app from file" or by extracting tarball directly into $SPLUNK_HOME/etc/apps/ folder. # CAPABILITIES -* User with 'admin' role can do the Configuration of the Account and Proxy, whereas users without 'admin' role can't do the Configuration or view it. +* Users with an 'admin' role can do the Configuration of the Account and Proxy, whereas users without an 'admin' role can't do the Configuration or view it. * Any user can run the Custom commands. -* User who is not having 'admin' role, for them the Configuration Page won't load. +* User who does not have an 'admin' role, for them the Configuration Page won't load. # CONFIGURATION -Users will be required to have 'admin' role in order to configure Databricks Add-on for Splunk. This integration allows a user to configure multiple pair of Databricks Instances, its credentials, and Databricks Cluster Name at a time. In case a user is using the integration in search head cluster environment, configuration on all the search cluster nodes will be overwritten as and when a user changes some configuration on any one of the search head cluster members. Hence a user should configure the integration on only one of the search head cluster members. +Users will be required to have an 'admin' role to configure the Databricks Add-on for Splunk. This integration allows a user to configure multiple pairs of Databricks Instances, their credentials, and Databricks Cluster Name at a time. In case a user is using the integration in the search head cluster environment, configuration on all the search cluster nodes will be overwritten as and when a user changes some configuration on any one of the search head cluster members. Hence a user should configure the integration on only one of the search head cluster members. **Configuration pre-requisites**: To configure the Add-on with Azure Active Directory token authentication, you need to provision a service principal in Azure Portal and add it to the target Azure Databricks workspace. @@ -78,7 +103,9 @@ To configure Databricks Add-on for Splunk, navigate to Databricks Add-on for Spl | ------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------- | | Account Name | Unique name for account. | Yes | Databricks Instance | Databricks Instance URL. | Yes +| 'databricksquery' to run on | Mode through which databricksquery command should execute | Yes | | Databricks Cluster Name | Name of the Databricks cluster to use for query and notebook execution. A user can override this value while executing the custom command. | No +| Databricks Warehouse ID | ID of the Databricks warehouse to use for query execution. A user can override this value while executing the custom command. | No | Authentication Method | SingleSelect: Authentication via Azure Active Directory or using a Personal Access Token | Yes | Databricks Access Token | [Auth: Personal Access Token] Databricks personal access token to use for authentication. Refer [Generate Databricks Access Token](https://docs.databricks.com/dev-tools/api/latest/authentication.html#generate-a-personal-access-token) document to generate the access token. | Yes | | Client Id | [Auth: Azure Active Directory] Azure Active Directory Client Id from your Azure portal.| Yes @@ -97,32 +124,41 @@ Navigate to Databricks Add-on for Splunk, click on "Configuration", go to the "P | Port | Port of proxy | Yes | | Username | Username for proxy authentication (Username and Password are inclusive fields) | No | | Password | Password for proxy authentication (Username and Password are inclusive fields) | No | -| Remote DNS resolution | Whether to resolve DNS or not | No | -| Use Proxy for OAuth | Check this box if you want to use proxy just for AAD token generation (https://login.microsoftonline.com/). All other network calls will skip the proxy even if it's enabled. | No | +| Remote DNS resolution | Enabling this option allows the proxy server to handle DNS resolution for clients, enhancing privacy and centralizing control over DNS requests. | No | +| Use Proxy for OAuth | Check this box if you want to use a proxy just for AAD token generation (https://login.microsoftonline.com/). All other network calls will skip the proxy even if it's enabled. | No | **Steps to configure an HTTPS proxy** * Select Proxy Type as "http" and provide the other required details for proxy configuration. -* To install proxy certificate in the Add-on , Go to folder $SPLUNK_HOME/etc/apps/TA-Databricks/bin/ta_databricks/aob_py3/certifi +* To install the proxy certificate in the Add-on , Go to folder $SPLUNK_HOME/etc/apps/TA-Databricks/bin/ta_databricks/aob_py3/certifi * Put the proxy certificate at the end of the file named cacert.pem -Once the above steps are completed, all the folllowing requests would be directed through the proxy. +Once the above steps are completed, all the following requests will be directed through the proxy. **Note**: $SPLUNK_HOME denotes the path where Splunk is installed. Ex: /opt/splunk -After enabling proxy, re-visit the "Databricks Credentials" tab, fill in the details, and click on "Save" to verify if the proxy is working. +After enabling the proxy, re-visit the "Databricks Credentials" tab, fill in the details, and click on "Save" to verify if the proxy is working. ## 3. Configure Logging (Optional) Navigate to Databricks Add-on for Splunk, click on "Configuration", go to the "Logging" tab, select the preferred "Log level" value from the dropdown, and click "Save". +## 4. Configure Additional Parameters (Optional) +Navigate to Databricks Add-on for Splunk, click on "Configuration", go to the "Additional Parameters" tab, fill in the details asked, and click "Save". Field descriptions are as below: + +| Field Name | Field Description | Required | +| ------------------- | --------------------------------------------------------------------------- | --------- | +| Command Timeout Value | Maximum value of command timeout to be allowed for databricksquery command. | Yes | +| Query Result Limit | Maximum limit of rows in query result for databricksquery command. | Yes | +| Index | Index in which you want to store the command execution details. | Yes | +| Max Thread Count | Maximum number of threads to be allowed for databricksquery command to fetch the results. | Yes | # CUSTOM COMMANDS: Any user will be able to execute the custom command. Once the admin user configures Databricks Add-on for Splunk successfully, they can execute custom commands. With custom commands, users can: * Query their data present in the Databricks table from Splunk. * Execute Databricks notebooks from Splunk. -Currently, Databricks Add-on for Splunk provides four custom commands. Users can open the Splunk search bar and can execute the commands. Below are the command details. +Currently, Databricks Add-on for Splunk provides four custom commands. Users can open the Splunk search bar and execute the commands. Below are the command details. ## 1. databricksquery This custom command helps users to query their data present in the Databricks table from Splunk. @@ -133,12 +169,14 @@ This custom command helps users to query their data present in the Databricks ta | --------------- | -------- | ---------------------------------------------------------------- | | account_name | Yes | Configured account name. | | query | Yes | SQL query to get data from Databricks delta table. | -| cluster | No | Name of the cluster to use for execution. | +| cluster | No | Name of the cluster to use for execution. | +| warehouse_id | No | Warehouse ID to use for execution. | +| limit | No | Limit of rows in SQL query execution result. Default value: 10000| | command_timeout | No | Time to wait in seconds for query completion. Default value: 300 | * Syntax - -| databricksquery account_name="" cluster="" query="" command_timeout= | table * + - Syntax 1 : | databricksquery account_name="" warehouse_id="" query="" command_timeout= limit= + - Syntax 2 : | databricksquery account_name="" cluster="" query="" command_timeout= * Output @@ -146,7 +184,9 @@ The command gives the output of the query in tabular format. It will return an e * Example -| databricksquery account_name="db_account" query="SELECT * FROM default.people WHERE age>30" cluster="test_cluster" command_timeout=60 | table * +| databricksquery account_name="db_account" query="SELECT * FROM default.people WHERE age>30" warehouse_id=12345a67 command_timeout=60 limit=500 + +| databricksquery query="SELECT * FROM default.people WHERE age>30" cluster="test_cluster" command_timeout=60 account_name="AAD_account" ## 2. databricksrun @@ -156,16 +196,16 @@ This custom command helps users to submit a one-time run without creating a job. | Parameter | Required | Overview | | ------------------ | -------- | ----------------------------------------------------------------------------------------------------------- | -| account_name | Yes | Configured account name. +| account_name | Yes | Configured account name. | | notebook_path | Yes | The absolute path of the notebook to be run in the Databricks workspace. This path must begin with a slash. | | run_name | No | Name of the submitted run. | -| cluster | No | Name of the cluster to use for execution. | +| cluster | No | Name of the cluster to use for execution. | | revision_timestamp | No | The epoch timestamp of the revision of the notebook. | | notebook_params | No | Parameters to pass while executing the run. Refer below example to view the format. | * Syntax -| databricksrun account_name="" notebook_path="" run_name="" cluster="" revision_timestamp= notebook_params="" | table * +| databricksrun account_name="" notebook_path="" run_name="" cluster="" revision_timestamp= notebook_params="" * Output @@ -173,27 +213,27 @@ The command will give the details about the executed run through job. * Example 1 -| databricksrun account_name="db_account" notebook_path="/path/to/test_notebook" run_name="run_comm" cluster="test_cluster" revision_timestamp=1609146477 notebook_params="key1=value1||key2=value2" | table * +| databricksrun account_name="db_account" notebook_path="/path/to/test_notebook" run_name="run_comm" cluster="test_cluster" revision_timestamp=1609146477 notebook_params="key1=value1||key2=value2" * Example 2 -| databricksrun account_name="db_account" notebook_path="/path/to/test_notebook" run_name="run_comm" cluster="test_cluster" revision_timestamp=1609146477 notebook_params="key1=value with \"double quotes\" in it||key2=value2" | table * +| databricksrun account_name="db_account" notebook_path="/path/to/test_notebook" run_name="run_comm" cluster="test_cluster" revision_timestamp=1609146477 notebook_params="key1=value with \"double quotes\" in it||key2=value2" ## 3. databricksjob -This custom command helps users to run an already created job now from Splunk. +This custom command helps users to run an already created job from Splunk. * Command Parameters | Parameter | Required | Overview | | --------------- | -------- | ------------------------------------------------------------------------------------------ | -| account_name | Yes | Configured account name. +| account_name | Yes | Configured account name. | | job_id | Yes | Job ID of your existing job in Databricks. | | notebook_params | No | Parameters to pass while executing the job. Refer below example to view the format. | * Syntax -| databricksjob account_name="" job_id= notebook_params="" | table * +| databricksjob account_name="" job_id= notebook_params="" * Output @@ -201,74 +241,38 @@ The command will give the details about the executed run through job. * Example 1 -| databricksjob account_name="db_account" job_id=2 notebook_params="key1=value1||key2=value2" | table * +| databricksjob account_name="db_account" job_id=2 notebook_params="key1=value1||key2=value2" * Example 2 -| databricksjob account_name="db_account" job_id=2 notebook_params="key1=value with \"double quotes\" in it||key2=value2" | table * - -## 4. databricksretiredrun - -This command is used to delete the records based on provided parameter from the submit_run_logs lookup, which maintains the details of notebook runs. To run the command at least one of the parameters is required. When all parameters are provided, it will delete the records matching all the parameters together. - -* Command Parameters - -| Parameter | Required | Overview | -| ------------------ | -------- | ----------------------------------------------------------------------------------------------------------- | -| account_name | Yes | Configured account name. -| days | No | The number of days, records older than which will be deleted from submit_run_log lookup | -| run_id | No | ID of the submitted run. | -| user | No | Name of an existing databricks user | - -* Syntax - -| databricksretiredrun account_name="" days="" run_id="" user="" - -* Output - -The command will delete the details of notebook runs from submit_run_log lookup. - -* Example 1 - -| databricksretiredrun account_name="db_account" days=90 - -* Example 2 - -| databricksretiredrun account_name="db_account" user="john doe" - -* Example 3 - -| databricksretiredrun account_name="db_account" run_id="12344" - -* Example 4 - -| databricksretiredrun account_name="db_account" days=90 user="john doe" run_id="12344" +| databricksjob account_name="db_account" job_id=2 notebook_params="key1=value with \"double quotes\" in it||key2=value2" # Macro -Macro `databricks_run_retiring_days` specifies the days, records older than which will be deleted from submit_run_log lookup using saved search `databricks_retire_run`. The default value configured is 90 days. +Macro `databricks_index_macro` specifies the index in which you want to store the command execution details. To modify Macro from Splunk UI, 1. Go to `Settings` -> `Advanced search` -> `Search Macros`. 2. Select `Databricks Add-on for Splunk` in the App context. -3. Configure the macro by clicking on the `Name` of the Macro, go to the `Definition` field and update it as per requirements. +3. Configure the macro by clicking on the `Name` of the Macro, go to the `Definition` field, and update it as per requirements. 4. Click on the `Save` button. # SAVED SEARCH -Saved search `databricks_retire_run` uses databricksretiredrun command to delete the records older than days specified in macro `databricks_run_retiring_days` from the submit_run_logs lookup. By default, it is invoked once every day at 1:00 hrs and deletes records older than 90 days. The `databricks_run_retiring_days` can be modified to change the default 90 days. +Saved search `databricks_update_run_execution_status` uses databricksrunstatus custom command to fetch run execution status and ingest updated details in Splunk for runs invoked through databricksrun and databricksjob command. +It runs every 5 minutes and ingests the data with updated execution status in Splunk. # DASHBOARDS This app contains the following dashboards: * Databricks Job Execution Details: The dashboard provides the details about the one-time runs and jobs executed using `databricksrun` and `databricksjob` custom commands respectively. -* Launch Notebook: The dashboard allows users to launch a notebook on their databricks cluster by providing the required parameters. The users can then navigate to the job results page on the databricks instance from the generated link on the dashboard. +* Launch Notebook: The dashboard allows users to launch a notebook on their Databricks cluster by providing the required parameters. The users can then navigate to the job results page on the Databricks instance from the generated link on the dashboard. The dashboards will be accessible to all the users. A user with admin_all_objects capability can navigate to “:/en-US/app/TA-Databricks/dashboards” to modify the permissions for “Databricks Job Execution Details” dashboard. # ALERT ACTIONS -The `Launch Notebook` alert action is used to launch a parameterized notebook based on the provided parameters. The alert can be scheduled or run as ad-hoc. It can also be used as Adaptive response action in "Enterprise Security> Incident review dashboard". -When this alert action is run as Adaptive response action from "Enterprise Security > Incident review dashboard", a `launch_notebook` link will be visible in the Adaptive Responses table in the Incident review dashboard which will redirect to the Launch Notebook dashboard. +The `Launch Notebook` alert action is used to launch a parameterized notebook based on the provided parameters. The alert can be scheduled or run as ad-hoc. It can also be used as an Adaptive response action in "Enterprise Security> Incident review dashboard". +When this alert action is run as an Adaptive response action from "Enterprise Security > Incident review dashboard", a `launch_notebook` link will be visible in the Adaptive Responses table in the Incident review dashboard which will redirect to the Launch Notebook dashboard. **Note**: The redirection will work properly only when the status is in Sucess state. @@ -282,21 +286,37 @@ When this alert action is run as Adaptive response action from "Enterprise Secur * Click on Upload. * Restart Splunk. +## Upgrade from Databricks Add-On for Splunk v1.4.0 to v1.4.1 +* Follow the General upgrade steps section. +* No additional steps are required. + +## Upgrade from Databricks Add-On for Splunk v1.3.1 to v1.4.0 +* Follow the General upgrade steps section. +* No additional steps are required. + +## Upgrade from Databricks Add-On for Splunk v1.3.0 to v1.3.1 +* Follow the General upgrade steps section. +* No additional steps are required. + +## Upgrade from Databricks Add-On for Splunk v1.2.0 to v1.3.0 +* Follow the General upgrade steps section. +* No additional steps are required. + ## Upgrade from Databricks Add-On for Splunk v1.1.0 to v1.2.0 Follow the below steps to upgrade the Add-on to 1.2.0 -* Follow the General upgrade steps section.. -* Login with the user having 'admin' role. +* Follow the General upgrade steps section. +* Login with the user having an 'admin' role. * Navigate to Databricks Add-on for Splunk > Configuration. -* Click on Add button, and reconfigure account with required information. -* The logged in user will now able to execute any custom commands. -* Login with the user without 'admin' role. -* The logged in user will now able to execute any custom commands. +* Click on the Add button, and reconfigure the account with the required information. +* The logged-in user will now be able to execute any custom commands. +* Login with the user without an 'admin' role. +* The logged-in user will now be able to execute any custom commands. ## Upgrade from Databricks Add-On for Splunk v1.0.0 to v1.1.0 -No special steps required. Upload and install v1.1.0 of the add-on normally. +No special steps are required. Upload and install v1.1.0 of the add-on normally. # OPEN SOURCE COMPONENTS AND LICENSES @@ -306,22 +326,20 @@ Some of the components included in "Databricks Add-on for Splunk" are licensed u * pycryptodome version 3.16.0 https://pypi.org/project/pycryptodome/ (LICENSE https://github.com/Legrandin/pycryptodome/blob/master/LICENSE.rst) # KNOWN ISSUES -* When the commands fail, sometimes an indistinct/unclear error message is displayed in UI, not giving a precise reason for the failure. To troubleshoot such cases, please check the logs at $SPLUNK_HOME/var/log/TA-Databricks/_command.log to get the precise reason for the failure. +* When the commands fail, sometimes an indistinct/unclear error message is displayed in the UI, not giving a precise reason for the failure. To troubleshoot such cases, please check the logs at $SPLUNK_HOME/var/log/TA-Databricks/_command.log to get the precise reason for the failure. * When the Adaptive response action `Launch Notebook` is run more than once for the same notable event in Enterprise Security security, clicking on any of the `launch_notebook` links will redirect to the Launch Notebook dashboard with the latest run details. # LIMITATIONS -* The Databricks API used in the `databricksquery` custom command has a limit on the number of results to be returned. Hence, sometimes the results obtained from this custom command may not be complete. +* Only if `databricksquery` custom command is executed using clusters : The Databricks API used in the `databricksquery` custom command has a limit on the number of results to be returned. Hence, sometimes the results obtained from this custom command may not be complete. # TROUBLESHOOTING * Authentication Failure: Check the network connectivity and verify that the configuration details provided are correct. * For any other unknown failure, please check the log files $SPLUNK_HOME/var/log/ta_databricks*.log to get more details on the issue. * The Add-on does not require a restart after the installation for all functionalities to work. However, the icons will be visible after one Splunk restart post-installation. -* If all custom commands/notebooks fail to run with https response code [403] then most probably the client secret has expired. Please regenerate your client secret in this case on your Azure portal and configure the add-on again with the new client secret. Set the client secret's expiration time to a custom value that you seem fit. Refer this [guide](https://docs.microsoft.com/en-us/azure/active-directory/develop/quickstart-register-app#add-a-client-secret) for setting a client secret in Azure Active Directory. -* If proxy is enabled and Use Proxy for OAuth is checked, and custom commands fail to run and throw the below mentioned error. +* If all custom commands/notebooks fail to run with the https response code [403] then most probably the client secret has expired. Please regenerate your client secret in this case on your Azure portal and configure the add-on again with the new client secret. Set the client secret's expiration time to a custom value that you see fit. Refer this [guide](https://docs.microsoft.com/en-us/azure/active-directory/develop/quickstart-register-app#add-a-client-secret) for setting a client secret in Azure Active Directory. +* If the proxy is enabled and Use Proxy for OAuth is checked, and custom commands fail to run and throw the below mentioned error. HTTPSConnectionPool(host=, port=443): Max retries exceeded with url: (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 110] Connection timed out')) -In this case, uncheck 'Use Proxy for OAuth' and save the Proxy configuration and re-run the custom command again. - - +In this case, uncheck 'Use Proxy for OAuth' and save the Proxy configuration, and re-run the custom command again. **Note**: $SPLUNK_HOME denotes the path where Splunk is installed. Ex: /opt/splunk @@ -330,7 +348,7 @@ In this case, uncheck 'Use Proxy for OAuth' and save the Proxy configuration and * Remove $SPLUNK_HOME/etc/apps/TA-Databricks/ * Remove $SPLUNK_HOME/var/log/TA-Databricks/ * Remove $SPLUNK_HOME/var/log/splunk/**ta_databricks*.log** -* To reflect the cleanup changes in UI, restart Splunk instance. Refer [Start Splunk](https://docs.splunk.com/Documentation/Splunk/8.0.6/Admin/StartSplunk) documentation to get information on how to restart Splunk. +* To reflect the cleanup changes in UI, restart the Splunk instance. Refer [Start Splunk](https://docs.splunk.com/Documentation/Splunk/8.0.6/Admin/StartSplunk) documentation to get information on how to restart Splunk. **Note**: $SPLUNK_HOME denotes the path where Splunk is installed. Ex: /opt/splunk @@ -338,4 +356,4 @@ In this case, uncheck 'Use Proxy for OAuth' and save the Proxy configuration and * This app is not officially supported by Databricks. Please send an email to cybersecurity@databricks.com for help. # COPYRIGHT -© Databricks 2023. All rights reserved. Apache, Apache Spark, Spark and the Spark logo are trademarks of the Apache Software Foundation. \ No newline at end of file +© Databricks 2024. All rights reserved. Apache, Apache Spark, Spark and the Spark logo are trademarks of the Apache Software Foundation. \ No newline at end of file diff --git a/app/README/alert_actions.conf.spec b/app/README/alert_actions.conf.spec index f080b30..f9f0f55 100644 --- a/app/README/alert_actions.conf.spec +++ b/app/README/alert_actions.conf.spec @@ -7,8 +7,9 @@ param.paramTwo = Field Name for Parameter Two [launch_notebook] python.version = python3 -param.notebook_path = Notebook path. It's a required parameter. -param.revision_timestamp = Revision timestamp. -param.notebook_parameters = Notebook parameters. -param.cluster_name = Cluster name. +param.notebook_path = Notebook Path. It's a required parameter. +param.revision_timestamp = Revision Timestamp. +param.notebook_parameters = Notebook Parameters. +param.cluster_name = Cluster Name. +param.run_name = Run Name. param._cam = Active response parameters. \ No newline at end of file diff --git a/app/README/ta_databricks_account.conf.spec b/app/README/ta_databricks_account.conf.spec index 057388d..2eec475 100644 --- a/app/README/ta_databricks_account.conf.spec +++ b/app/README/ta_databricks_account.conf.spec @@ -4,5 +4,7 @@ aad_client_id = aad_tenant_id = aad_client_secret = aad_access_token = +config_for_dbquery = cluster_name = +warehouse_id = databricks_pat = \ No newline at end of file diff --git a/app/README/ta_databricks_settings.conf.spec b/app/README/ta_databricks_settings.conf.spec index 7df5ca0..7d3d6fd 100644 --- a/app/README/ta_databricks_settings.conf.spec +++ b/app/README/ta_databricks_settings.conf.spec @@ -9,4 +9,9 @@ proxy_rdns = use_for_oauth = [logging] -loglevel = \ No newline at end of file +loglevel = + +[additional_parameters] +admin_command_timeout = +query_result_limit = +index = \ No newline at end of file diff --git a/app/app.manifest b/app/app.manifest index 6f462c4..eeab219 100644 --- a/app/app.manifest +++ b/app/app.manifest @@ -5,7 +5,7 @@ "id": { "group": null, "name": "TA-Databricks", - "version": "1.2.0" + "version": "1.4.1" }, "author": [ { diff --git a/app/appserver/static/cancel_button.css b/app/appserver/static/cancel_button.css new file mode 100644 index 0000000..96b9b99 --- /dev/null +++ b/app/appserver/static/cancel_button.css @@ -0,0 +1,107 @@ +.cancel_button{ + display: inline-block; + padding: 4px 12px; + margin-bottom: 0; + border-radius: 100px; + text-align: center; + font-size: 17px; + line-height: 16px; + cursor: pointer; + background-color: #00A36C; + color: #fff; +} + +.loading{ + display: inline-block; + padding: 4px 12px; + margin-bottom: 0; + border-radius: 100px; + text-align: center; + font-size: 17px; + line-height: 16px; + cursor: pointer; + background-color: #0096FF; + color: #fff; +} + +.cancel_button_disabled{ + display: inline-block; + padding: 4px 12px; + margin-bottom: 0; + border-radius: 100px; + text-align: center; + font-size: 17px; + line-height: 16px; + cursor: pointer; + background-color: grey; + color: #fff; +} + +.popup-container-for-no-cancelation { + position: fixed; + top: 0; + left: 0; + width: 100%; + height: 100%; + display: flex; + align-items: center; + justify-content: center; + background-color: rgba(0, 0, 0, 0.5); +} + +.popup-content-for-no-cancelation { + background-color: black; + color: white; + padding: 20px; + border-radius: 8px; + box-shadow: 0 2px 6px rgba(0, 0, 0, 0.2); + max-width: 80%; + text-align: center; + font-size: 18px; +} + +.popup-container-for-successful-cancelation { + position: fixed; + top: 0; + left: 0; + width: 100%; + height: 100%; + display: flex; + align-items: center; + justify-content: center; + background-color: rgba(0, 0, 0, 0.5); +} + +.popup-content-for-successful-cancelation { + background-color: green; + color: white; + padding: 20px; + border-radius: 8px; + box-shadow: 0 2px 6px rgba(0, 0, 0, 0.2); + max-width: 80%; + text-align: center; + font-size: 18px; +} + +.popup-container-for-err-in-cancelation { + position: fixed; + top: 0; + left: 0; + width: 100%; + height: 100%; + display: flex; + align-items: center; + justify-content: center; + background-color: rgba(0, 0, 0, 0.5); +} + +.popup-content-for-err-in-cancelation { + background-color: red; + color: white; + padding: 20px; + border-radius: 8px; + box-shadow: 0 2px 6px rgba(0, 0, 0, 0.2); + max-width: 80%; + text-align: center; + font-size: 18px; +} \ No newline at end of file diff --git a/app/appserver/static/cancel_button.js b/app/appserver/static/cancel_button.js new file mode 100644 index 0000000..7b81b53 --- /dev/null +++ b/app/appserver/static/cancel_button.js @@ -0,0 +1,102 @@ +require([ + 'underscore', + 'splunkjs/mvc', + 'splunkjs/mvc/tableview', + 'splunkjs/mvc/searchmanager', + 'splunkjs/mvc/simplexml/ready!' +], function(_, mvc, TableView, SearchManager) { + + var runDetailsTable = mvc.Components.get("databricks_run_table"); + var jobDetailsTable = mvc.Components.get("databricks_job_table"); + customRenderer = TableView.BaseCellRenderer.extend({ + canRender: function (cell) { + return _(['Cancel Run']).contains(cell.field); + }, + render: function ($td, cell) { + if(cell.field == "Cancel Run"){ + let isCancelled = false; + var run_details = cell.value; + var array = run_details.split("||"); + var run_ex_status = array[2]; + + if(run_ex_status !== "Running" && run_ex_status !== "Initiated" && run_ex_status !== "Pending"){ + $td.html("
Cancel Run
"); + $td.on('click', function(event) { + var popupContainer = document.createElement('div'); + popupContainer.className = 'popup-container-for-no-cancelation'; + var popupContent = document.createElement('div'); + popupContent.className = 'popup-content-for-no-cancelation'; + popupContent.innerHTML = 'This can not be canceled as Execution Status is not in Running, Pending or Initiated mode!'; + popupContainer.appendChild(popupContent); + document.body.appendChild(popupContainer); + popupContainer.addEventListener('click', function() { + document.body.removeChild(popupContainer); + }); + }); + } + else{ + $td.html("
Cancel Run
"); + $td.on('click', function (event) { + if (isCancelled){ + return; + } + $td.html("
Canceling..
"); + var fields = {} + fields['run_id'] = array[0] + var ENDPOINT_URL = '/services/cancel_run' + var service = mvc.createService({ owner: "nobody" }); + fields['account_name'] = array[1] + fields['uid'] = array[3] + $td.css("pointer-events", "none"); + + service.post(ENDPOINT_URL, fields, function (err, response) { + if (response != undefined && (response.data != null || response.data != undefined)) { + canceled_response = response.data['canceled'] + if (canceled_response == "Success"){ + var popupContainer = document.createElement('div'); + popupContainer.className = 'popup-container-for-successful-cancelation'; + var popupContent = document.createElement('div'); + popupContent.className = 'popup-content-for-successful-cancelation'; + popupContent.innerHTML = 'Successfully Canceled the run! An updated event with canceled execution status will be ingested in Splunk in few minutes.'; + popupContainer.appendChild(popupContent); + document.body.appendChild(popupContainer); + popupContainer.addEventListener('click', function() { + document.body.removeChild(popupContainer); + }); + $td.html("
Canceled
"); + isCancelled = true; + } + } + else { + var popupContainer = document.createElement('div'); + popupContainer.className = 'popup-container-for-err-in-cancelation'; + var popupContent = document.createElement('div'); + popupContent.className = 'popup-content-for-err-in-cancelation'; + popupContent.innerHTML = 'Error while Canceling the run! Please try after sometime!'; + popupContainer.appendChild(popupContent); + document.body.appendChild(popupContainer); + popupContainer.addEventListener('click', function() { + document.body.removeChild(popupContainer); + }); + $td.html("
Cancel Run
"); + } + $td.css("pointer-events", "auto"); + }); + }); + } + } + } + }); + if (runDetailsTable !== undefined) { + runDetailsTable.getVisualization(function (tableView) { + tableView.table.addCellRenderer(new customRenderer()); + tableView.table.render(); + }); + } + if (jobDetailsTable !== undefined) { + jobDetailsTable.getVisualization(function (tableView) { + tableView.table.addCellRenderer(new customRenderer()); + tableView.table.render(); + }); + } +}) \ No newline at end of file diff --git a/app/appserver/static/dashboard.css b/app/appserver/static/dashboard.css new file mode 100644 index 0000000..77d02a2 --- /dev/null +++ b/app/appserver/static/dashboard.css @@ -0,0 +1,62 @@ +#run_name_text.input.input-text::after { + content: "Run Name to identify the run execution."; + display: inline-block; + color: black; + margin-left: 5px; + font-size: 12px; + font-style: italic; + } + +#path_text.input.input-text::after { + content: "[Required] Absolute path of notebook to be run in Databricks. eg: /Users/user_1/notebook_1"; + display: inline-block; + color: black; + margin-left: 5px; + font-size: 12px; + font-style: italic; + } + +#revision_timestamp_text.input.input-text::after { + content: "The timestamp of the revision of the notebook."; + display: inline-block; + color: black; + margin-left: 5px; + font-size: 12px; + font-style: italic; +} + +#params_text.input.input-text::after { + content: "Parameters to pass while executing the notebook. eg: key1=value1||key2=value2"; + display: inline-block; + color: black; + margin-left: 5px; + font-size: 12px; + font-style: italic; +} + +#cluster_text.input.input-text::after { + content: "Name of Databricks cluster to use for execution."; + display: inline-block; + color: black; + margin-left: 5px; + font-size: 12px; + font-style: italic; +} + +#acct_dropdown.input.input-dropdown::after { + content: "[Required] Name of Databricks Account to use for execution."; + display: inline-block; + color: black; + margin-left: 5px; + font-size: 12px; + font-style: italic; +} + +#auto_fwd.input.input-dropdown::after { + content: "Whether to directly redirect to Databricks portal."; + display: inline-block; + color: black; + margin-left: 5px; + font-size: 12px; + font-style: italic; +} diff --git a/app/appserver/static/img/DB_config.png b/app/appserver/static/img/DB_config.png index bdfa884..0a4ada1 100644 Binary files a/app/appserver/static/img/DB_config.png and b/app/appserver/static/img/DB_config.png differ diff --git a/app/appserver/static/img/databricks_databricksjob.png b/app/appserver/static/img/databricks_databricksjob.png deleted file mode 100644 index be92e86..0000000 Binary files a/app/appserver/static/img/databricks_databricksjob.png and /dev/null differ diff --git a/app/appserver/static/img/databricks_databricksquery.png b/app/appserver/static/img/databricks_databricksquery.png deleted file mode 100644 index c91fdc4..0000000 Binary files a/app/appserver/static/img/databricks_databricksquery.png and /dev/null differ diff --git a/app/appserver/static/img/databricks_databricksrun.png b/app/appserver/static/img/databricks_databricksrun.png deleted file mode 100644 index 955f923..0000000 Binary files a/app/appserver/static/img/databricks_databricksrun.png and /dev/null differ diff --git a/app/appserver/static/img/databricks_job.png b/app/appserver/static/img/databricks_job.png new file mode 100644 index 0000000..bf9a914 Binary files /dev/null and b/app/appserver/static/img/databricks_job.png differ diff --git a/app/appserver/static/img/databricks_query_cluster.png b/app/appserver/static/img/databricks_query_cluster.png new file mode 100644 index 0000000..0388383 Binary files /dev/null and b/app/appserver/static/img/databricks_query_cluster.png differ diff --git a/app/appserver/static/img/databricks_query_dbsql.png b/app/appserver/static/img/databricks_query_dbsql.png new file mode 100644 index 0000000..8695205 Binary files /dev/null and b/app/appserver/static/img/databricks_query_dbsql.png differ diff --git a/app/appserver/static/img/databricks_run.png b/app/appserver/static/img/databricks_run.png new file mode 100644 index 0000000..42f66ef Binary files /dev/null and b/app/appserver/static/img/databricks_run.png differ diff --git a/app/appserver/static/img/databricksquery_run_mode.png b/app/appserver/static/img/databricksquery_run_mode.png new file mode 100644 index 0000000..b550bc1 Binary files /dev/null and b/app/appserver/static/img/databricksquery_run_mode.png differ diff --git a/app/appserver/static/img/dbrun_cc_example.png b/app/appserver/static/img/dbrun_cc_example.png deleted file mode 100644 index 955f923..0000000 Binary files a/app/appserver/static/img/dbrun_cc_example.png and /dev/null differ diff --git a/app/appserver/static/img/launch_notebook_ar.webm b/app/appserver/static/img/launch_notebook_ar.webm deleted file mode 100644 index 72d9abb..0000000 Binary files a/app/appserver/static/img/launch_notebook_ar.webm and /dev/null differ diff --git a/app/appserver/static/img/launch_notebook_dashboard.webm b/app/appserver/static/img/launch_notebook_dashboard.webm deleted file mode 100644 index e40bd65..0000000 Binary files a/app/appserver/static/img/launch_notebook_dashboard.webm and /dev/null differ diff --git a/app/appserver/static/img/launch_notebook_from_AR.webm b/app/appserver/static/img/launch_notebook_from_AR.webm new file mode 100644 index 0000000..ed213f9 Binary files /dev/null and b/app/appserver/static/img/launch_notebook_from_AR.webm differ diff --git a/app/appserver/static/img/launch_notebook_from_dashboard.webm b/app/appserver/static/img/launch_notebook_from_dashboard.webm new file mode 100644 index 0000000..859a1e6 Binary files /dev/null and b/app/appserver/static/img/launch_notebook_from_dashboard.webm differ diff --git a/app/appserver/static/js/build/custom/auth_select_hook.js b/app/appserver/static/js/build/custom/auth_select_hook.js index d116862..9fbdae0 100644 --- a/app/appserver/static/js/build/custom/auth_select_hook.js +++ b/app/appserver/static/js/build/custom/auth_select_hook.js @@ -15,6 +15,13 @@ class AuthSelectHook { this.toggleAADFields(false); } } + if (field == 'config_for_dbquery') { + if (value == 'interactive_cluster') { + this.hideWarehouseField(false); + } else { + this.hideWarehouseField(true); + } + } } onRender() { @@ -24,7 +31,14 @@ class AuthSelectHook { } else { this.toggleAADFields(false); } + } + hideWarehouseField(state) { + this.util.setState((prevState) => { + let data = {...prevState.data }; + data.warehouse_id.display = state; + return { data } + }); } toggleAADFields(state) { diff --git a/app/appserver/static/js/build/globalConfig.json b/app/appserver/static/js/build/globalConfig.json index fb6133b..4b3e7d4 100644 --- a/app/appserver/static/js/build/globalConfig.json +++ b/app/appserver/static/js/build/globalConfig.json @@ -2,7 +2,7 @@ "meta": { "name": "TA-Databricks", "displayName": "Databricks Add-on For Splunk", - "version": "1.2.0", + "version": "1.4.1", "apiVersion": "3.0.0", "restRoot": "TA_Databricks" }, @@ -83,6 +83,26 @@ "placeholder": "community.cloud.databricks.com" } }, + { + "field": "config_for_dbquery", + "label": "'databricksquery' to run on", + "type": "singleSelect", + "help": "", + "required": true, + "defaultValue": "dbsql", + "options": { + "autoCompleteFields": [ + { + "value": "dbsql", + "label": "DBSQL (Recommended)" + }, + { + "value": "interactive_cluster", + "label": "Interactive Cluster" + } + ] + } + }, { "field": "cluster_name", "label": "Databricks Cluster Name", @@ -97,6 +117,20 @@ "errorMsg": "Max length of text input is 500" }] }, + { + "field": "warehouse_id", + "label": "Databricks Warehouse ID", + "type": "text", + "help": "ID of the Databricks warehouse to use for query execution. A user can override this value while executing the custom command.", + "required": false, + "defaultValue": "", + "validators": [{ + "type": "string", + "minLength": 0, + "maxLength": 500, + "errorMsg": "Max length of text input is 500" + }] + }, { "field": "auth_type", "label": "Authentication Method", @@ -189,7 +223,10 @@ "placeholder": "required" } } - ] + ], + "options": { + "saveValidator": "function(formData) { if(formData.name === 'proxy' || formData.name === 'logging' || formData.name === 'additional_parameters' || formData.name === 'additional_settings') { return 'Can not create account named anyone of [proxy, logging, additional_parameters, additional_settings]. These are reserved keywords in Splunk.' ; } return true; }" + } }, { "name": "proxy", @@ -274,7 +311,8 @@ { "field": "proxy_rdns", "label": "Remote DNS resolution", - "type": "checkbox" + "type": "checkbox", + "help": "Enabling this option allows the proxy server to handle DNS resolution for clients, enhancing privacy and centralizing control over DNS requests." }, { "field": "use_for_oauth", @@ -325,6 +363,78 @@ "options": { "saveValidator": "function(formData) {if(!formData.loglevel) { return 'Log level cannot be empty'; } return true; }" } + }, + { + "name": "additional_parameters", + "title": "Additional Parameters", + "entity": [{ + "field": "admin_command_timeout", + "label": "Command Timeout Value", + "defaultValue": 300, + "options": { + "placeholder": "Required" + }, + "type": "text", + "help": "Enter the maximum value of command timeout to be allowed for databricksquery command.", + "required": true, + "encrypted": false, + "validators": [{ + "type": "regex", + "pattern": "^[1-9]\\d*$", + "errorMsg": "Command Timeout Value must be a positive integer." + } + ] + }, + { + "field": "query_result_limit", + "label": "Query Result Limit", + "defaultValue": 10000, + "options": { + "placeholder": "Required" + }, + "type": "text", + "help": "Enter the maximum limit of rows in query result for databricksquery command.", + "required": true, + "encrypted": false, + "validators": [{ + "type": "regex", + "pattern": "^[1-9]\\d*$", + "errorMsg": "Query Result Limit must be a positive integer." + } + ] + }, + { + "field": "index", + "label": "Index", + "type": "singleSelect", + "defaultValue": "main", + "help": "Select the index in which you want to store the command execution details.", + "options": { + "endpointUrl": "data/indexes", + "placeholder": "Required", + "disableSearch": true, + "denyList": "^_.*$" + }, + "required": true + }, + { + "field": "thread_count", + "label": "Max Thread Count", + "defaultValue": 5, + "type": "text", + "help": "Maximum number of threads to be allowed for databricksquery command to fetch the results.", + "required": true, + "encrypted": false, + "options": { + "placeholder": "Required" + }, + "validators": [{ + "type": "regex", + "pattern": "^[1-9]\\d*$", + "errorMsg": "Max Thread Count value must be a positive integer." + } + ] + }] } ] } diff --git a/app/bin/TA_Databricks_rh_account.py b/app/bin/TA_Databricks_rh_account.py index 2d3d9cb..5bd4542 100644 --- a/app/bin/TA_Databricks_rh_account.py +++ b/app/bin/TA_Databricks_rh_account.py @@ -69,6 +69,13 @@ default='', validator=None ), + field.RestField( + 'config_for_dbquery', + required=True, + encrypted=False, + default='dbsql', + validator=None + ), field.RestField( 'cluster_name', required=False, @@ -79,6 +86,16 @@ max_len=500, ) ), + field.RestField( + 'warehouse_id', + required=False, + encrypted=False, + default='', + validator=validator.String( + min_len=0, + max_len=500, + ) + ), field.RestField( 'aad_access_token', required=False, diff --git a/app/bin/TA_Databricks_rh_settings.py b/app/bin/TA_Databricks_rh_settings.py index 22c86c1..7d39c34 100755 --- a/app/bin/TA_Databricks_rh_settings.py +++ b/app/bin/TA_Databricks_rh_settings.py @@ -2,6 +2,8 @@ from email.policy import default import ta_databricks_declare from databricks_validators import ValidateDatabricksInstance +from splunktaucclib.rest_handler.endpoint.validator import Validator +from databricks_common_utils import IndexMacroManager from splunktaucclib.rest_handler.endpoint import ( field, validator, @@ -10,9 +12,18 @@ ) from splunktaucclib.rest_handler import admin_external, util from splunk_aoblib.rest_migration import ConfigMigrationHandler +import os util.remove_http_proxy_env_vars() +class ValidateThread(Validator): + def validate(self, value, data): + thread_count_value = data.get("thread_count") + cpu_core = os.cpu_count() + if int(thread_count_value) > 2*int(cpu_core): + self.put_msg('Suggested Value for Max Thread Count is within twice of CPU Count. CPU Count is {}. Please enter a value equal to or lesser than {}.'.format(cpu_core, 2*int(cpu_core))) + return False + return True fields_proxy = [ field.RestField( @@ -98,11 +109,42 @@ ] model_logging = RestModel(fields_logging, name='logging') +fields_additional_parameters = [ + field.RestField( + 'admin_command_timeout', + required=True, + default=300, + validator=None + ), + field.RestField( + 'query_result_limit', + required=True, + default=10000, + validator=None + ), + field.RestField( + 'index', + required=True, + default='main', + encrypted=False, + validator=IndexMacroManager() + ), + field.RestField( + 'thread_count', + required=True, + default=5, + encrypted=False, + validator=ValidateThread() + ) +] +model_additional_parameters = RestModel(fields_additional_parameters, name='additional_parameters') + endpoint = MultipleModel( 'ta_databricks_settings', models=[ model_proxy, - model_logging + model_logging, + model_additional_parameters ], ) diff --git a/app/bin/cancel_run.py b/app/bin/cancel_run.py new file mode 100644 index 0000000..8d100be --- /dev/null +++ b/app/bin/cancel_run.py @@ -0,0 +1,84 @@ +"""This module contain class and method related to updating the finding state.""" +import sys +import os +sys.path.insert(0, os.path.abspath(os.path.join(__file__, '..'))) + +import ta_databricks_declare # noqa: F401, E402 +import json # noqa: E402 +import databricks_com as com # noqa: E402 +from splunk.persistconn.application import PersistentServerConnectionApplication # noqa: E402 +import databricks_const as const # noqa: E402 +from log_manager import setup_logging # noqa: E402 + +APP_NAME = const.APP_NAME +_LOGGER = setup_logging("ta_databricks_cancel_run") + + +class CancelRunningExecution(PersistentServerConnectionApplication): + """Run Cancelation Handler.""" + + def __init__(self, _command_line, _command_arg): + """Initialize object with given parameters.""" + self.run_id = None + self.account_name = None + self.uid = None + self.payload = {} + self.status = None + self.session_key = None + super(PersistentServerConnectionApplication, self).__init__() + + # Handle a synchronous from splunkd. + def handle(self, in_string): + """ + After user clicks on Cancel Run button, Called for a simple synchronous request. + + @param in_string: request data passed in + @rtype: string or dict + @return: String to return in response. If a dict was passed in, + it will automatically be JSON encoded before being returned. + """ + try: + req_data = json.loads(in_string) + form_data = dict(req_data.get("form")) + self.run_id = form_data.get("run_id") + self.account_name = form_data.get("account_name") + self.uid = form_data.get("uid") + LOG_PREFIX = "[UID: {}] Run ID: {}.".format(self.uid, self.run_id) + _LOGGER.info("{} Initiating cancelation request.".format(LOG_PREFIX)) + session = dict(req_data.get("session")) + self.session_key = session.get("authtoken") + client_ = com.DatabricksClient(self.account_name, self.session_key) + payload = { + "run_id": self.run_id, + } + try: + resp, status_code = client_.databricks_api("post", const.CANCEL_JOB_RUN_ENDPOINT, data=payload) + if status_code == 200: + _LOGGER.info("{} Successfully canceled.".format(LOG_PREFIX)) + _LOGGER.info("{} An updated event with canceled execution status will be ingested in Splunk " + "in few minutes.".format(LOG_PREFIX)) + self.payload['canceled'] = "Success" + self.status = 200 + else: + _LOGGER.info("{} Unable to cancel. Response returned from API: {}. Status Code: {}" + .format(LOG_PREFIX, resp, status_code)) + self.payload['canceled'] = "Failed" + self.status = 500 + except Exception as e: + _LOGGER.error("{} Error while canceling. Error: {}".format(LOG_PREFIX, str(e))) + self.payload['canceled'] = "Failed" + self.status = 500 + + except Exception as err: + _LOGGER.error("{} Error while canceling. Error: {}".format(LOG_PREFIX, str(err))) + self.payload['canceled'] = "Failed" + self.status = 500 + return {'payload': self.payload, 'status': self.status} + + def handleStream(self, handle, in_string): + """For future use.""" + raise NotImplementedError("PersistentServerConnectionApplication.handleStream") + + def done(self): + """Virtual method which can be optionally overridden to receive a callback after the request completes.""" + pass diff --git a/app/bin/databricks_com.py b/app/bin/databricks_com.py index 4ef1119..a6e732c 100755 --- a/app/bin/databricks_com.py +++ b/app/bin/databricks_com.py @@ -24,11 +24,7 @@ def __init__(self, account_name, session_key): """ databricks_configs = utils.get_databricks_configs(session_key, account_name) if not databricks_configs: - raise Exception( - "Account '{}' not found. Please provide valid Databricks account.".format( - account_name - ) - ) + raise Exception("Account '{}' not found. Please provide valid Databricks account.".format(account_name)) self.account_name = account_name databricks_instance = databricks_configs.get("databricks_instance") self.auth_type = databricks_configs.get("auth_type") @@ -46,7 +42,7 @@ def __init__(self, account_name, session_key): self.session.verify = const.VERIFY_SSL self.session.timeout = const.TIMEOUT - if self.auth_type == 'PAT': + if self.auth_type == "PAT": self.databricks_token = databricks_configs.get("databricks_pat") else: self.databricks_token = databricks_configs.get("aad_access_token") @@ -55,9 +51,7 @@ def __init__(self, account_name, session_key): self.aad_client_secret = databricks_configs.get("aad_client_secret") if not all([databricks_instance, self.databricks_token]): - raise Exception( - "Addon is not configured. Navigate to addon's configuration page to configure the addon." - ) + raise Exception("Addon is not configured. Navigate to addon's configuration page to configure the addon.") self.databricks_instance_url = "{}{}".format("https://", databricks_instance.strip("/")) self.request_headers = { "Authorization": "Bearer {}".format(self.databricks_token), @@ -67,6 +61,16 @@ def __init__(self, account_name, session_key): _LOGGER.debug( "Request made to the Databricks from Splunk user: {}".format(utils.get_current_user(self.session_key)) ) + + # Separate session to call external APIs + self.external_session = self.get_requests_retry_session() + self.external_session.proxies = self.session.proxies + self.external_session.verify = self.session.verify and False + # Setting timeout in session does not work but kept here for sake of + # consistency. Reference: https://requests.readthedocs.io/en/latest/api/#sessionapi + self.external_session.timeout = self.session.timeout + + # Set session headers with auth tokens self.session.headers.update(self.request_headers) if self.session.proxies: _LOGGER.info("Proxy is configured. Using proxy to execute the request.") @@ -119,29 +123,32 @@ def databricks_api(self, method, endpoint, data=None, args=None): response = None run_again = False _LOGGER.info("Refreshing AAD token.") - proxy_settings = utils.get_proxy_uri(self.session_key) # Reinitializing the proxy + databricks_configs = utils.get_databricks_configs(self.session_key, self.account_name) + proxy_config = databricks_configs.get("proxy_uri") db_token = utils.get_aad_access_token( self.session_key, self.account_name, self.aad_tenant_id, self.aad_client_id, self.aad_client_secret, - proxy_settings, # Using the reinit proxy. As proxy is getting updated on Line no: 43, 45 + proxy_config, # Using the reinit proxy. As proxy is getting updated on Line no: 43, 45 retry=const.RETRIES, # based on the condition and for this call we will always need proxy. + conf_update=True, # By passing True, the AAD access token will be updated in conf ) if isinstance(db_token, tuple): raise Exception(db_token[0]) else: self.databricks_token = db_token - self.request_headers["Authorization"] = "Bearer {}".format( - self.databricks_token - ) + self.request_headers["Authorization"] = "Bearer {}".format(self.databricks_token) self.session.headers.update(self.request_headers) elif status_code != 200: response.raise_for_status() else: break - return response.json() + if "cancel" in endpoint: + return response.json(), status_code + else: + return response.json() except Exception as e: msg = ( "Unable to request Databricks instance. " @@ -174,9 +181,7 @@ def get_cluster_id(self, cluster_name): resp = self.databricks_api("get", const.CLUSTER_ENDPOINT) response = resp.get("clusters") if response is None: - raise Exception( - "No cluster found with name {}. Provide a valid cluster name.".format(cluster_name) - ) + raise Exception("No cluster found with name {}. Provide a valid cluster name.".format(cluster_name)) for r in response: @@ -187,10 +192,36 @@ def get_cluster_id(self, cluster_name): return cluster_id raise Exception( - "Ensure that the cluster is in running state. Current cluster state is {}.".format( - r.get("state") - ) + "Ensure that the cluster is in running state. Current cluster state is {}.".format(r.get("state")) ) - raise Exception( - "No cluster found with name {}. Provide a valid cluster name.".format(cluster_name) - ) + raise Exception("No cluster found with name {}. Provide a valid cluster name.".format(cluster_name)) + + def external_api(self, method, url, data=None, args=None): + """ + Common method to request data from external APIs. + + :param method: "get" or "post" + :param url: URL to get the data from + :param data: Payload to be send over post call + :param args: Arguments to be add into the url + :return: response in the form of dictionary + """ + # Request arguments + kwargs = { + "timeout": self.external_session.timeout, + } + if args: + kwargs["params"] = args + if data: + kwargs["json"] = data + + # Call APIs + if method.lower() == "get": + _LOGGER.info("Executing REST call: {}.".format(url)) + response = self.external_session.get(url, **kwargs) + elif method.lower() == "post": + _LOGGER.info("Executing REST call: {} Payload: {}.".format(url, str(data))) + response = self.external_session.post(url, **kwargs) + response.raise_for_status() + + return response.json() diff --git a/app/bin/databricks_common_utils.py b/app/bin/databricks_common_utils.py index 2118176..e59b449 100755 --- a/app/bin/databricks_common_utils.py +++ b/app/bin/databricks_common_utils.py @@ -1,14 +1,23 @@ import ta_databricks_declare # noqa: F401 import json import requests +import os import traceback import re from urllib.parse import urlencode import databricks_const as const from log_manager import setup_logging +import splunklib.client as client_ +from splunktaucclib.rest_handler.endpoint.validator import Validator +from splunktaucclib.rest_handler.endpoint import ( + validator +) +import splunk.admin as admin +import splunk.clilib.cli_common import splunk.rest as rest from six.moves.urllib.parse import quote +from splunklib.binding import HTTPError from solnlib.utils import is_true from solnlib.credentials import CredentialManager, CredentialNotExistException import splunklib.results as results @@ -188,54 +197,6 @@ def get_proxy_uri(session_key): return None -def update_kv_store_collection(splunkd_uri, kv_collection_name, session_key, kv_log_info): - """ - Create and update KV store collection. - - :param splunkd_uri: Splunk management URI - :param kv_collection_name: KV Store collection to create/update - :param session_key: Splunk Session Key - :param kv_log_info: Information that needs to be updated - :return: Dictionary with updated value of KV Store update status - """ - header = { - "Authorization": "Bearer {}".format(session_key), - "Content-Type": "application/json", - "User-Agent": "{}".format(const.USER_AGENT_CONST), - } - - # Add the log of record into the KV Store - _LOGGER.info( - "Adding the command log info to KV Store. Command Log Info: {}".format(kv_log_info) - ) - - kv_update_url = "{}/servicesNS/nobody/{}/storage/collections/data/{}".format( - splunkd_uri, - const.APP_NAME, - kv_collection_name, - ) - - _LOGGER.info( - "Executing REST call, URL: {}, Payload: {}.".format(kv_update_url, str(kv_log_info)) - ) - response = requests.post( - kv_update_url, - headers=header, - data=json.dumps(kv_log_info), - verify=const.INTERNAL_VERIFY_SSL, - timeout=const.TIMEOUT - ) - - if response.status_code in {200, 201}: - _LOGGER.info("KV Store updated successfully.") - kv_log_info.update({"kv_status": "KV Store updated successfully"}) - else: - _LOGGER.info("Error occurred while updating KV Store.") - kv_log_info.update({"kv_status": "Error occurred while updating KV Store"}) - - return kv_log_info - - def format_to_json_parameters(params): """ Split the provided string by `||` and make dictionary of that splitted key-value pair string. @@ -282,7 +243,7 @@ def get_mgmt_port(session_key, logger): try: content = json.loads(content) content = re.findall(r':(\d+)', content["entry"][0]["content"]["mgmtHostPort"])[0] - logger.info("Databricks Info: Get managemant port from web.conf is {} ".format(content)) + logger.info("Databricks Info: Get management port from web.conf is {} ".format(content)) except Exception as e: logger.error("Databricks Error: Error while parsing" " web.conf file. Error: " + str(e)) logger.debug( @@ -336,6 +297,7 @@ def get_aad_access_token( aad_client_secret, proxy_settings=None, retry=1, + conf_update=False, ): """ Method to acquire a new AAD access token. @@ -371,7 +333,7 @@ def get_aad_access_token( resp.raise_for_status() response = resp.json() aad_access_token = response.get("access_token") - if not all([aad_tenant_id, aad_client_id, aad_client_secret]): + if conf_update: save_databricks_aad_access_token( account_name, session_key, aad_access_token, aad_client_secret ) @@ -407,3 +369,72 @@ def get_aad_access_token( def get_user_agent(): """Method to get user agent.""" return "{}".format(const.USER_AGENT_CONST) + + +class GetSessionKey(admin.MConfigHandler): + """To get Splunk session key.""" + + def __init__(self): + """Initialize.""" + self.session_key = self.getSessionKey() + + +def create_service(sessionkey=None): + """Create Service to communicate with splunk.""" + mgmt_port = splunk.clilib.cli_common.getMgmtUri().split(":")[-1] + if not sessionkey: + sessionkey = GetSessionKey().session_key + service = client.connect(port=mgmt_port, token=sessionkey, app=APP_NAME) + return service + + +class IndexMacroManager(Validator): + """Class provides methods for handling Macros.""" + + def __init__(self, *args, **kwargs): + """Initialize the parameters.""" + super(IndexMacroManager, self).__init__(*args, **kwargs) + self._validator = validator + self._args = args + self._kwargs = kwargs + self.path = os.path.abspath(__file__) + + def update_macros(self, service, macro_name, index_string): + """Update macro with the selected index.""" + service.post("properties/macros/{}".format(macro_name), definition=index_string) + _LOGGER.info("Macro: {} is updated Successfully with defintion: {}.".format(macro_name, index_string)) + + def validate(self, value, data): + """Update the macros with the selected index.""" + try: + service = create_service() + selected_index = data.get("index") + response_string = "index IN ({})".format(selected_index) + self.update_macros(service, "databricks_index_macro", response_string) + return True + except HTTPError: + _LOGGER.error("Error while updating Macros: {}".format(traceback.format_exc())) + self.put_msg("Error while updating Macros. Kindly check log file for more details.") + return False + except Exception as e: + msg = "Unrecognized error: {}".format(str(e)) + _LOGGER.error(msg) + self.put_msg(msg) + _LOGGER.error(traceback.format_exc()) + return False + + +def ingest_data_to_splunk(data, session_key, provided_index, sourcetype): + """Method to ingest data to Splunk.""" + json_string = json.dumps(data, ensure_ascii=False).replace('"', '\\"') + port = get_mgmt_port(session_key, _LOGGER) + searchquery = '| makeresults | eval _raw="{}" | collect index={} sourcetype={}'\ + .format(json_string, provided_index, sourcetype) + service = client_.connect( + host="localhost", + port=port, + scheme="https", + app=APP_NAME, + token=session_key + ) + service.jobs.oneshot(searchquery) diff --git a/app/bin/databricks_const.py b/app/bin/databricks_const.py index 61acbed..ed7e60e 100755 --- a/app/bin/databricks_const.py +++ b/app/bin/databricks_const.py @@ -6,11 +6,19 @@ CONTEXT_DESTROY_ENDPOINT = "/api/1.2/contexts/destroy" COMMAND_ENDPOINT = "/api/1.2/commands/execute" STATUS_ENDPOINT = "/api/1.2/commands/status" +EXECUTE_QUERY_ENDPOINT = "/api/2.0/sql/statements/" +QUERY_STATUS_ENDPOINT = "/api/2.0/sql/statements/{statement_id}" +CANCEL_QUERY_ENDPOINT_CLUSTER = "/api/1.2/commands/cancel" +CANCEL_QUERY_ENDPOINT_DBSQL = "/api/2.0/sql/statements/{statement_id}/cancel" GET_RUN_ENDPOINT = "/api/2.0/jobs/runs/get" RUN_SUBMIT_ENDPOINT = "/api/2.0/jobs/runs/submit" EXECUTE_JOB_ENDPOINT = "/api/2.0/jobs/run-now" GET_JOB_ENDPOINT = "/api/2.0/jobs/get" +CANCEL_JOB_RUN_ENDPOINT = "/api/2.0/jobs/runs/cancel" AAD_TOKEN_ENDPOINT = "https://login.microsoftonline.com/{}/oauth2/v2.0/token" +WAREHOUSE_STATUS_ENDPOINT = "/api/2.0/sql/warehouses" +WAREHOUSE_START_ENDPOINT = "/api/2.0/sql/warehouses/{}/start" +SPECIFIC_WAREHOUSE_STATUS_ENDPOINT = "/api/2.0/sql/warehouses/{}" # Azure Databricks scope SCOPE = "2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default" @@ -18,17 +26,16 @@ # App Name APP_NAME = __file__.split(os.sep)[-3] -# KV Store collection name -KV_COLLECTION_NAME_SUBMIT_RUN = "databricks_submit_run_log" -KV_COLLECTION_NAME_EXECUTE_JOB = "databricks_execute_job_log" - -REQUIRED_ROLES = ['databricks_user', 'databricks_admin'] - # Command execution configs -COMMAND_TIMEOUT_IN_SECONDS = 300 COMMAND_SLEEP_INTERVAL_IN_SECONDS = 3 -USER_AGENT_CONST = "Databricks-AddOnFor-Splunk-1.2.0" +SPLUNK_SEARCH_STATUS_CHECK_INTERVAL = 10 + +MINIMUM_COMMAND_TIMEOUT_VALUE = 30 + +MINIMUM_QUERY_ROW_LIMIT = 1 + +USER_AGENT_CONST = "Databricks-AddOnFor-Splunk-1.4.1" VERIFY_SSL = True INTERNAL_VERIFY_SSL = False @@ -39,12 +46,12 @@ # Error codes and message ERROR_CODE = { - '700016': 'Invalid Client ID provided.', - '900023': 'Invalid Tenant ID provided.', - '7000215': 'Invalid Client Secret provided.', - '403': 'Client secret may have expired. Please configure a valid Client secret.', - '404': 'Invalid API endpoint.', - '500': 'Internal server error.', - '400': 'Bad request. The request is malformed.', - '429': 'API limit exceeded. Please try again after some time.' + "700016": "Invalid Client ID provided.", + "900023": "Invalid Tenant ID provided.", + "7000215": "Invalid Client Secret provided.", + "403": "Client secret may have expired. Please configure a valid Client secret.", + "404": "Invalid API endpoint.", + "500": "Internal server error.", + "400": "Bad request. The request is malformed.", + "429": "API limit exceeded. Please try again after some time.", } diff --git a/app/bin/databricks_get_credentials.py b/app/bin/databricks_get_credentials.py index 7a16789..03b84f2 100644 --- a/app/bin/databricks_get_credentials.py +++ b/app/bin/databricks_get_credentials.py @@ -79,7 +79,9 @@ def handle(self, in_string): 'aad_tenant_id': None, 'aad_client_secret': None, 'aad_access_token': None, + 'config_for_dbquery': None, 'cluster_name': None, + 'warehouse_id': None, 'databricks_pat': None, 'auth_type': None, 'proxy_enabled': None, @@ -89,7 +91,11 @@ def handle(self, in_string): 'proxy_username': None, 'proxy_password': None, 'proxy_rdns': None, - 'use_for_oauth': None + 'use_for_oauth': None, + 'admin_command_timeout': None, + 'query_result_limit': None, + 'index': None, + 'thread_count': None } try: _LOGGER.info("Retrieving account and settings configurations.") @@ -109,7 +115,9 @@ def handle(self, in_string): config_dict['auth_type'] = account_config.get('auth_type') config_dict['databricks_instance'] = account_config.get('databricks_instance') + config_dict['config_for_dbquery'] = account_config.get('config_for_dbquery') config_dict['cluster_name'] = account_config.get('cluster_name') + config_dict['warehouse_id'] = account_config.get('warehouse_id') # Get clear account password from passwords.conf account_manager = CredentialManager( @@ -160,6 +168,23 @@ def handle(self, in_string): config_dict['proxy_rdns'] = proxy_config.get('proxy_rdns') config_dict['use_for_oauth'] = proxy_config.get('use_for_oauth') + # Get additional settings from conf + _, additional_settings_response_content = rest.simpleRequest( + "/servicesNS/nobody/{}/TA_Databricks_settings/additional_parameters".format( + APP_NAME + ), + sessionKey=self.admin_session_key, + getargs={"output_mode": "json"}, + raiseAllErrors=True, + ) + additional_settings_json = json.loads(additional_settings_response_content) + additional_settings_config = additional_settings_json.get("entry")[0].get("content") + _LOGGER.debug("Additional parameters configurations read successfully from settings.conf") + config_dict['admin_command_timeout'] = additional_settings_config.get("admin_command_timeout") + config_dict['query_result_limit'] = additional_settings_config.get("query_result_limit") + config_dict['index'] = additional_settings_config.get("index") + config_dict['thread_count'] = additional_settings_config.get("thread_count") + self.status = 200 return { 'payload': config_dict, diff --git a/app/bin/databricksjob.py b/app/bin/databricksjob.py index dfc73f9..83ce87b 100755 --- a/app/bin/databricksjob.py +++ b/app/bin/databricksjob.py @@ -2,6 +2,8 @@ import sys import time import traceback +import json +import uuid import databricks_com as com import databricks_const as const @@ -16,10 +18,12 @@ validators, ) -_LOGGER = setup_logging("ta_databricksjob_command") +APP_NAME = const.APP_NAME +UID = str(uuid.uuid4()) +_LOGGER = setup_logging("ta_databricksjob_command", UID) -@Configuration(type="events") +@Configuration(type="reporting") class DatabricksJobCommand(GeneratingCommand): """Custom Command of databricksjob.""" @@ -30,23 +34,32 @@ class DatabricksJobCommand(GeneratingCommand): def generate(self): """Generating custom command.""" - _LOGGER.info("Initiating databricksjob command") - kv_log_info = { + _LOGGER.info("Initiating databricksjob command.") + _LOGGER.info("Job ID: {}".format(self.job_id)) + _LOGGER.info("Notebook Params: {}".format(self.notebook_params)) + info_to_process = { "user": self._metadata.searchinfo.username, "account_name": self.account_name, "created_time": time.time(), "param": self._metadata.searchinfo.args, "run_id": "-", + "run_execution_status": "-", "output_url": "-", "result_url": "-", - "command_status": "Failed", + "command_submission_status": "Failed", "error": "-", + "uid": UID } session_key = self._metadata.searchinfo.session_key try: - + databricks_configs = utils.get_databricks_configs(session_key, self.account_name) + if not databricks_configs: + ERR_MSG = \ + "Account '{}' not found. Please provide valid Databricks account.".format(self.account_name) + raise Exception(ERR_MSG) + provided_index = databricks_configs.get("index") # Get job details client = com.DatabricksClient(self.account_name, session_key) @@ -82,9 +95,11 @@ def generate(self): _LOGGER.info("Submitting job for execution.") response = client.databricks_api("post", const.EXECUTE_JOB_ENDPOINT, data=payload) - kv_log_info.update(response) + info_to_process.update(response) run_id = response["run_id"] - _LOGGER.info("Successfully executed the job with ID: {}.".format(self.job_id)) + if run_id: + _LOGGER.info("run ID returned: {}".format(run_id)) + _LOGGER.info("Successfully executed the job with job ID: {}.".format(self.job_id)) # Request to get the run_id details _LOGGER.info("Fetching details for run ID: {}.".format(run_id)) @@ -94,27 +109,34 @@ def generate(self): output_url = response.get("run_page_url") if output_url: result_url = output_url.rstrip("/") + "/resultsOnly" - kv_log_info["output_url"] = output_url - kv_log_info["result_url"] = result_url - kv_log_info["command_status"] = "Success" + info_to_process["output_url"] = output_url + info_to_process["result_url"] = result_url + info_to_process["command_submission_status"] = "Success" + info_to_process["run_execution_status"] = "Initiated" _LOGGER.info("Output url returned: {}".format(output_url)) + _LOGGER.info("Successfully executed databricksjob command.") + except Exception as e: _LOGGER.error(e) _LOGGER.error(traceback.format_exc()) - kv_log_info["error"] = str(e) + info_to_process["error"] = str(e) self.write_error(str(e)) exit(1) finally: - updated_kv_info = utils.update_kv_store_collection( - self._metadata.searchinfo.splunkd_uri, - const.KV_COLLECTION_NAME_EXECUTE_JOB, - session_key, - kv_log_info, - ) - - yield updated_kv_info + try: + _LOGGER.info("Ingesting the data into Splunk index: {}".format(provided_index)) + indented_json = json.dumps(info_to_process, indent=4) + _LOGGER.info("Data to be ingested in Splunk:\n{}".format(indented_json)) + utils.ingest_data_to_splunk( + info_to_process, session_key, provided_index, "databricks:databricksjob" + ) + _LOGGER.info("Successfully ingested the data into Splunk index: {}.".format(provided_index)) + except Exception: + _LOGGER.error("Error occured while ingesting data into Splunk. Error: {}" + .format(traceback.format_exc())) + yield info_to_process dispatch(DatabricksJobCommand, sys.argv, sys.stdin, sys.stdout, __name__) diff --git a/app/bin/databricksquery.py b/app/bin/databricksquery.py index 6a0ad68..8769c50 100755 --- a/app/bin/databricksquery.py +++ b/app/bin/databricksquery.py @@ -1,7 +1,10 @@ import ta_databricks_declare # noqa: F401 import sys -import traceback +import threading import time +import uuid +import xml.etree.ElementTree as ET +from concurrent.futures import ThreadPoolExecutor import databricks_com as com import databricks_const as const @@ -15,161 +18,555 @@ Option, validators, ) +from solnlib.splunkenv import get_splunkd_uri +from splunk import rest -_LOGGER = setup_logging("ta_databricksquery_command") +UID = str(uuid.uuid4()) +_LOGGER = setup_logging("ta_databricksquery_command", UID) -@Configuration(type="events") +@Configuration(type="reporting") class DatabricksQueryCommand(GeneratingCommand): """Custom Command of databricksquery.""" # Take input from user using parameters + warehouse_id = Option(require=False) cluster = Option(require=False) query = Option(require=True) account_name = Option(require=True) command_timeout = Option(require=False, validate=validators.Integer(minimum=1)) + limit = Option(require=False, validate=validators.Integer(minimum=1)) + + def cancel_query(self, search_sid, session_key, client, cancel_endpoint, data_for_cancelation): + """Method to cancel query execution based on splunk search status.""" + while True: + try: + URL = "{}/services/search/jobs/{}".format(get_splunkd_uri(), search_sid) + _, content = rest.simpleRequest( + URL, sessionKey=session_key, method="GET", raiseAllErrors=True, getargs=None + ) + namespaces = { + "s": "http://dev.splunk.com/ns/rest", + } + root = ET.fromstring(content) + dispatch_state = root.find(".//s:key[@name='dispatchState']", namespaces).text + is_finalized = root.find(".//s:key[@name='isFinalized']", namespaces).text + + if dispatch_state == "FINALIZING" and is_finalized in [1, "1"]: + _LOGGER.info( + "Stop button of Splunk search has been clicked by User. Canceling the query execution." + ) + response, status_code = client.databricks_api( + "post", cancel_endpoint, data=data_for_cancelation + ) + if status_code == 200: + _LOGGER.info("Successfully canceled the query execution.") + else: + _LOGGER.error("Error while attempting to cancel the query execution." + " Response returned from API : {}" + .format(response)) + break + else: + time.sleep(const.SPLUNK_SEARCH_STATUS_CHECK_INTERVAL) + except Exception as e: + if "unknown sid" in str(e).lower(): + _LOGGER.debug("Query execution can not be canceled anymore as Splunk's search " + "ID does not exist. Error: {}".format(str(e))) + else: + _LOGGER.debug("Unknown error occured. Error: {}".format(str(e))) + break def generate(self): """Generating custom command.""" - _LOGGER.info("Initiating databricksquery command") - command_timeout_in_seconds = self.command_timeout or const.COMMAND_TIMEOUT_IN_SECONDS - _LOGGER.info("Setting command timeout to {} seconds.".format(command_timeout_in_seconds)) - - # Get session key + _LOGGER.info("Initiating databricksquery command.") + _LOGGER.info("Warehouse ID: {}".format(self.warehouse_id)) + _LOGGER.info("Cluster: {}".format(self.cluster)) + _LOGGER.info("Query: {}".format(self.query)) + _LOGGER.info("Command Timeout: {}".format(self.command_timeout)) + _LOGGER.info("Limit: {}".format(self.limit)) + + # Get session key and sid session_key = self._metadata.searchinfo.session_key + search_sid = self._metadata.searchinfo.sid try: - - # Fetching cluster name - self.cluster = self.cluster or utils.get_databricks_configs( - session_key, self.account_name - ).get("cluster_name") - if not self.cluster: - raise Exception( - "Databricks cluster is required to execute this custom command. " - "Provide a cluster parameter or configure the cluster in the TA's configuration page." + if self.command_timeout and self.command_timeout < const.MINIMUM_COMMAND_TIMEOUT_VALUE: + self.write_error( + "Command Timeout value must be greater than or equal to {} seconds.".format( + const.MINIMUM_COMMAND_TIMEOUT_VALUE + ) + ) + _LOGGER.warning( + "Command Timeout value must be greater than or equal to {} seconds." + " Exiting the command.".format(const.MINIMUM_COMMAND_TIMEOUT_VALUE) ) + sys.exit(0) - client = com.DatabricksClient(self.account_name, session_key) + def handle_invalid_limit_value(): + if self.limit and self.limit < const.MINIMUM_QUERY_ROW_LIMIT: + self.write_error( + "Limit value must be greater than or equal to {} rows.".format(const.MINIMUM_QUERY_ROW_LIMIT) + ) + _LOGGER.error( + "Limit value must be greater than or equal to {} rows." + " Exiting the command.".format(const.MINIMUM_QUERY_ROW_LIMIT) + ) + sys.exit(0) - # Request to get cluster ID - _LOGGER.info("Requesting cluster ID for cluster: {}.".format(self.cluster)) - cluster_id = client.get_cluster_id(self.cluster) - _LOGGER.info("Cluster ID received: {}.".format(cluster_id)) - - # Request to create context - _LOGGER.info("Creating Context in cluster.") - payload = {"language": "sql", "clusterId": cluster_id} - response = client.databricks_api("post", const.CONTEXT_ENDPOINT, data=payload) - - context_id = response.get("id") - _LOGGER.info("Context created: {}.".format(context_id)) - - # Request to execute command - _LOGGER.info("Submitting SQL query for execution.") - payload["contextId"] = context_id - payload["command"] = self.query - response = client.databricks_api("post", const.COMMAND_ENDPOINT, data=payload) - - command_id = response.get("id") - _LOGGER.info("Query submitted, command id: {}.".format(command_id)) - - # pulling mechanism - _LOGGER.info("Fetching query execution status.") - status = None - args = { - "clusterId": cluster_id, - "contextId": context_id, - "commandId": command_id, - } - - total_wait_time = 0 - while total_wait_time <= command_timeout_in_seconds: - response = client.databricks_api("get", const.STATUS_ENDPOINT, args=args) - - status = response.get("status") - _LOGGER.info("Query execution status: {}.".format(status)) - - if status in ("Cancelled", "Error"): - raise Exception( - "Could not complete the query execution. Status: {}.".format(status) - ) - - elif status == "Finished": - if response["results"]["resultType"] == "error": - msg = response["results"].get( - "summary", "Error encountered while executing query." - ) - raise Exception(str(msg)) + # Fetching TA configurations + databricks_configs = utils.get_databricks_configs(session_key, self.account_name) + if not databricks_configs: + self.write_error( + "Account '{}' not found. Please provide valid Databricks account.".format(self.account_name) + ) + _LOGGER.error( + "Account '{}' not found. Please provide valid Databricks account." + " Exiting the command.".format(self.account_name) + ) + sys.exit(0) - if response["results"]["resultType"] != "table": - raise Exception( - "Encountered unknown result type, terminating the execution." + # Fetching timeout value + admin_com_timeout = databricks_configs.get("admin_command_timeout") + if (self.command_timeout and self.command_timeout > int(admin_com_timeout)) or not self.command_timeout: + command_timeout_in_seconds = int(admin_com_timeout) + else: + command_timeout_in_seconds = self.command_timeout + if self.command_timeout and self.command_timeout > int(admin_com_timeout): + _LOGGER.warning( + "Provided value of Command Timeout ({} seconds) by the user is greater than the maximum" + " allowed/permitted value. Using the maximum allowed/permitted value ({} seconds).".format( + self.command_timeout, int(admin_com_timeout) + ) + ) + self.write_warning( + "Setting Command Timeout to maximum allowed/permitted value ({} seconds) as a" + " greater value has been specified ({} seconds) in search.".format( + admin_com_timeout, self.command_timeout + ) + ) + else: + if self.command_timeout: + _LOGGER.info( + "Provided value of Command Timeout ({} seconds) by the user is within the maximum" + " allowed/permitted value ({} seconds).".format(self.command_timeout, int(admin_com_timeout)) + ) + else: + _LOGGER.info( + "No value for Command Timeout is provided. " + "Using the maximum allowed value ({} seconds).".format(admin_com_timeout) + ) + _LOGGER.info("Setting Command Timeout to {} seconds.".format(command_timeout_in_seconds)) + + def fetch_limit_value(): + # Fetching limit value + query_result_limit = databricks_configs.get("query_result_limit") + if not self.limit or self.limit > int(query_result_limit): + row_limit = int(query_result_limit) + else: + row_limit = self.limit + if self.limit and self.limit > int(query_result_limit): + _LOGGER.warning( + "Provided value of Result Limit ({} rows) by the user is greater than the maximum" + " allowed/permitted value. Using the maximum allowed/permitted value ({} rows).".format( + self.limit, int(query_result_limit) ) - - if response["results"].get("truncated", True): - self.write_warning( - "Results are truncated due to Databricks API limitations." + ) + self.write_warning( + "Setting Result Limit to maximum allowed/permitted value ({} rows) as a" + " greater value has been specified ({} rows) in search.".format(query_result_limit, self.limit) + ) + else: + if self.limit: + _LOGGER.info( + "Provided value of Result Limit ({} rows) by the user is within the maximum" + " allowed/permitted value ({} rows).".format(self.limit, int(query_result_limit)) ) + else: + _LOGGER.info( + "No value for Result Limit is provided. " + "Using the maximum allowed value ({} rows).".format(query_result_limit) + ) + _LOGGER.info("Setting Result Limit to {} rows.".format(row_limit)) + return row_limit - _LOGGER.info("Query execution successful. Preparing data.") + client = com.DatabricksClient(self.account_name, session_key) - # Prepare list of Headers - headers = response["results"]["schema"] - schema = [] - for header in headers: - field = header.get("name") - schema.append(field) + def handle_cluster_method(): + # Request to get cluster ID + _LOGGER.info("Requesting cluster ID for cluster: {}.".format(self.cluster)) + cluster_id = client.get_cluster_id(self.cluster) + _LOGGER.info("Cluster ID received: {}.".format(cluster_id)) + + # Request to create context + _LOGGER.info("Creating Context in cluster.") + payload = {"language": "sql", "clusterId": cluster_id} + response = client.databricks_api("post", const.CONTEXT_ENDPOINT, data=payload) + + context_id = response.get("id") + _LOGGER.info("Context created: {}.".format(context_id)) + + # Request to execute command + _LOGGER.info("Submitting SQL query for execution.") + payload["contextId"] = context_id + payload["command"] = self.query + response = client.databricks_api("post", const.COMMAND_ENDPOINT, data=payload) + + command_id = response.get("id") + _LOGGER.info("Query submitted, command id: {}.".format(command_id)) + + # pulling mechanism + _LOGGER.info("Fetching query execution status.") + status = None + args = { + "clusterId": cluster_id, + "contextId": context_id, + "commandId": command_id, + } + cancel_endpoint = const.CANCEL_QUERY_ENDPOINT_CLUSTER + cancel_method_thread = threading.Thread( + target=self.cancel_query, + args=(search_sid, session_key, client, cancel_endpoint, args), + name="cancel_method_thread" + ) + cancel_method_thread.start() - # Fetch Data - data = response["results"]["data"] + total_wait_time = 0 + while total_wait_time <= command_timeout_in_seconds: + response = client.databricks_api("get", const.STATUS_ENDPOINT, args=args) + status = response.get("status") + _LOGGER.info("Query execution status: {}.".format(status)) - for d in data: - yield dict(zip(schema, d)) + if status in ("Canceled", "Cancelled", "Error"): + raise Exception( + "Could not complete the query execution. Status: {}.".format(status) + ) - _LOGGER.info("Data parsed successfully.") - break + elif status == "Finished": + if response["results"]["resultType"] == "error": + if response["results"].get("cause") and \ + "CommandCancelledException" in response["results"]["cause"]: + raise Exception("Search Canceled!") + msg = response["results"].get( + "summary", "Error encountered while executing query." + ) + raise Exception(str(msg)) + + if response["results"]["resultType"] != "table": + raise Exception( + "Encountered unknown result type, terminating the execution." + ) + + if response["results"].get("truncated"): + _LOGGER.info("Results are truncated due to Databricks API limitations.") + self.write_warning( + "Results are truncated due to Databricks API limitations." + ) + + _LOGGER.info("Query execution successful. Preparing data.") + + # Prepare list of Headers + headers = response["results"]["schema"] + schema = [] + for header in headers: + field = header.get("name") + schema.append(field) + + # Fetch Data + data = response["results"]["data"] + count_of_result = len(data) if data else 0 + _LOGGER.info("Total number of rows obtained in query's result: {}".format(count_of_result)) + for d in data: + yield dict(zip(schema, d)) + + _LOGGER.info("Data parsed successfully.") + break + + seconds_to_timeout = command_timeout_in_seconds - total_wait_time + + if seconds_to_timeout < const.COMMAND_SLEEP_INTERVAL_IN_SECONDS: + + if not seconds_to_timeout: + total_wait_time += 1 + continue + + _LOGGER.info( + "Query execution in progress, will retry after {} seconds.".format( + str(seconds_to_timeout) + ) + ) + time.sleep(seconds_to_timeout) + total_wait_time += seconds_to_timeout + continue - seconds_to_timeout = command_timeout_in_seconds - total_wait_time + _LOGGER.info( + "Query execution in progress, will retry after {} seconds.".format( + str(const.COMMAND_SLEEP_INTERVAL_IN_SECONDS) + ) + ) + time.sleep(const.COMMAND_SLEEP_INTERVAL_IN_SECONDS) + total_wait_time += const.COMMAND_SLEEP_INTERVAL_IN_SECONDS + else: + # Timeout scenario + msg = "Command execution timed out. Last status: {}.".format(status) + _LOGGER.info(msg) + _LOGGER.info("Canceling the query execution") + resp_, status_code = client.databricks_api("post", const.CANCEL_QUERY_ENDPOINT_CLUSTER, data=args) + if status_code == 200: + _LOGGER.info("Successfully canceled the query execution.") + self.write_error("Canceled the execution as command execution timed out") + + # Destroy the context to free-up space in Databricks + if context_id: + _LOGGER.info("Deleting context.") + payload = {"contextId": context_id, "clusterId": cluster_id} + _ = client.databricks_api("post", const.CONTEXT_DESTROY_ENDPOINT, data=payload) + _LOGGER.info("Context deleted successfully.") + _LOGGER.info("Successfully executed databricksquery command.") + + def handle_dbsql_method(row_limit, thread_count): + + def fetch_warehouse_status(id_of_warehouse): + while True: + warehouse_resp = client.databricks_api( + "get", + const.SPECIFIC_WAREHOUSE_STATUS_ENDPOINT.format(id_of_warehouse) + ) + if warehouse_resp.get("state").lower() == "starting": + time.sleep(30) + elif warehouse_resp.get("state").lower() == "running": + _LOGGER.info("Warehouse started successfully.") + break + else: + err = "Warehouse is not in RUNNING or STARTING state. Current SQL warehouse state is {}." + raise Exception(err.format(warehouse_resp.get("state"))) + + # Check whether SQL Warehouse exists. If yes, check its status. + warehouse_exist = False + list_of_links = [] + list_of_chunk_number = [] + resp = client.databricks_api("get", const.WAREHOUSE_STATUS_ENDPOINT) + response = resp.get("warehouses") + for res in response: + if res.get("id") == self.warehouse_id: + warehouse_exist = True + if res.get("state").lower() != "running": + try: + if res.get("state").lower() == "starting": + _LOGGER.info("Warehouse is not in RUNNING state. It is in STARTING state.") + time.sleep(30) + fetch_warehouse_status(self.warehouse_id) + else: + _LOGGER.info("Warehouse is not in RUNNING or STARTING state. " + "Starting the warehouse.") + client.databricks_api( + "post", const.WAREHOUSE_START_ENDPOINT.format(self.warehouse_id) + ) + fetch_warehouse_status(self.warehouse_id) + except Exception as err: + raise Exception(err) + break + if not warehouse_exist: + raise Exception("No SQL warehouse found with ID: {}. Provide a valid SQL warehouse ID." + .format(self.warehouse_id)) + + # SQL statement execution payload + payload = { + "warehouse_id": self.warehouse_id, + "statement": self.query, + "schema": "tpch", + "disposition": "EXTERNAL_LINKS", + "format": "JSON_ARRAY", + "row_limit": row_limit, + } + + # Request to execute statement + _LOGGER.info("Submitting SQL query for execution.") + response = client.databricks_api("post", const.EXECUTE_QUERY_ENDPOINT, data=payload) + + statement_id = response.get("statement_id") + _LOGGER.info("Query submitted, statement id: {}.".format(statement_id)) + + cancel_endpoint = const.CANCEL_QUERY_ENDPOINT_DBSQL.format(statement_id=statement_id) + + # Check for Splunk search cancellation + cancel_method_thread = threading.Thread( + target=self.cancel_query, + args=(search_sid, session_key, client, cancel_endpoint, None), + name="cancel_method_thread", + ) + cancel_method_thread.start() - if seconds_to_timeout < const.COMMAND_SLEEP_INTERVAL_IN_SECONDS: + # Pulling mechanism + _LOGGER.info("Fetching query execution status.") + status = None - if not seconds_to_timeout: - total_wait_time += 1 + total_wait_time = 0 + while total_wait_time <= command_timeout_in_seconds: + response = client.databricks_api( + "get", + const.QUERY_STATUS_ENDPOINT.format(statement_id=statement_id) + ) + status = response.get("status", {}).get("state") + _LOGGER.info("Query execution status: {}.".format(status)) + + if status in ("CANCELED", "CLOSED", "FAILED"): + err_message = "Could not complete the query execution. Status: {}.".format(status) + if status == "FAILED": + err_message += " Error: {}".format(response["status"].get("error", {}).get("message")) + raise Exception(err_message) + + elif status == "SUCCEEDED": + _LOGGER.info("Query execution successful. Preparing data.") + + if response["manifest"].get("truncated"): + _LOGGER.info("Result row limit exceeded, hence results are truncated.") + self.write_warning("Result limit exceeded, hence results are truncated.") + + total_row_count = response["manifest"]["total_row_count"] + _LOGGER.info("Total number of rows obtained in query's result: {}".format(total_row_count)) + if int(total_row_count) == 0: + _LOGGER.info("Successfully executed databricksquery command.") + sys.exit(0) + + # Prepare list of Headers + headers = response["manifest"]["schema"]["columns"] + schema = [] + for header in headers: + field = header.get("name") + schema.append(field) + + _LOGGER.info("Result table schema: {}".format(schema)) + + # Method to fetch data of every chunk + def fetch_data_executor(args): + external_link, chunk_index = args + # Fetch Data + response = client.external_api("get", external_link) + + _LOGGER.info( + "Total number of rows obtained in chunk-{} of query result: {}".format( + chunk_index, len(response) + ) + ) + return response + + def parse_data(response, schema): + for row in response: + yield dict(zip(schema, row)) + + # Get external link of first chunk + external_links = response["result"].get("external_links") + if not external_links: + raise Exception("No data returned from execution of this query.") + next_chunk_internal_link = external_links[0].get("next_chunk_internal_link") + + list_of_links.append(external_links[0]["external_link"]) + list_of_chunk_number.append(external_links[0]["chunk_index"]) + + while next_chunk_internal_link: + response = client.databricks_api("get", next_chunk_internal_link) + external_links = response["external_links"] + next_chunk_internal_link = external_links[0].get("next_chunk_internal_link") + list_of_links.append(external_links[0]["external_link"]) + list_of_chunk_number.append(external_links[0]["chunk_index"]) + + combined_args = zip(list_of_links, list_of_chunk_number) + with ThreadPoolExecutor(max_workers=int(thread_count)) as executor: + results = executor.map(fetch_data_executor, combined_args) + + for res in results: + yield from parse_data(res, schema) + + _LOGGER.info("Data parsed successfully.") + break + + # If statement execution is in ["RUNNING", "PENDING"] state + seconds_to_timeout = command_timeout_in_seconds - total_wait_time + + if seconds_to_timeout < const.COMMAND_SLEEP_INTERVAL_IN_SECONDS: + + if not seconds_to_timeout: + total_wait_time += 1 + continue + + _LOGGER.info( + "Query execution in progress, will retry after {} seconds.".format(str(seconds_to_timeout)) + ) + time.sleep(seconds_to_timeout) + total_wait_time += seconds_to_timeout continue _LOGGER.info( "Query execution in progress, will retry after {} seconds.".format( - str(seconds_to_timeout) + str(const.COMMAND_SLEEP_INTERVAL_IN_SECONDS) ) ) - time.sleep(seconds_to_timeout) - total_wait_time += seconds_to_timeout - continue - - _LOGGER.info( - "Query execution in progress, will retry after {} seconds.".format( - str(const.COMMAND_SLEEP_INTERVAL_IN_SECONDS) + time.sleep(const.COMMAND_SLEEP_INTERVAL_IN_SECONDS) + total_wait_time += const.COMMAND_SLEEP_INTERVAL_IN_SECONDS + else: + # Timeout scenario + msg = "Command execution timed out. Last status: {}.".format(status) + _LOGGER.info(msg) + _LOGGER.info("Canceling the query execution") + resp_, status_code = client.databricks_api( + "post", const.CANCEL_QUERY_ENDPOINT_DBSQL.format(statement_id=statement_id) ) - ) - time.sleep(const.COMMAND_SLEEP_INTERVAL_IN_SECONDS) - total_wait_time += const.COMMAND_SLEEP_INTERVAL_IN_SECONDS - else: - # Timeout scenario - msg = "Command execution timed out. Last status: {}.".format(status) - _LOGGER.info(msg) - self.write_error(msg) - - # Destroy the context to free-up space in Databricks - if context_id: - _LOGGER.info("Deleting context.") - payload = {"contextId": context_id, "clusterId": cluster_id} - _ = client.databricks_api("post", const.CONTEXT_DESTROY_ENDPOINT, data=payload) - _LOGGER.info("Context deleted successfully.") + if status_code == 200: + _LOGGER.info("Successfully canceled the query execution.") + self.write_error("Canceled the execution as command execution timed out") + + _LOGGER.info("Successfully executed databricksquery command.") + + if not self.cluster and not self.warehouse_id: + dbquery_type = databricks_configs.get("config_for_dbquery") + if dbquery_type == "dbsql": + handle_invalid_limit_value() + self.warehouse_id = databricks_configs.get("warehouse_id") + if not self.warehouse_id: + raise Exception( + "Databricks warehouse_id is required to execute this custom command. " + "Provide a warehouse_id parameter or configure the Warehouse ID " + "in the TA's configuration page." + ) + row_limit = fetch_limit_value() + for event in handle_dbsql_method(row_limit, databricks_configs.get("thread_count")): + yield event + elif ( + dbquery_type == "interactive_cluster" + or (dbquery_type is None and databricks_configs.get("cluster_name")) + ): + self.cluster = databricks_configs.get("cluster_name") + if not self.cluster: + raise Exception( + "Databricks cluster is required to execute this custom command. " + "Provide a cluster parameter or configure the cluster in the TA's configuration page." + ) + for event in handle_cluster_method(): + yield event + else: + msg = ( + "No configuration found for Cluster Name or Warehouse ID on the TA's configuration page. " + "Provide Cluster Name or Warehouse ID on TA's Configuration page or in Search." + ) + raise Exception(msg) + + elif self.cluster and self.warehouse_id: + _LOGGER.error("Provide only one of Cluster or Warehouse ID. Exiting the script.") + raise Exception("Provide only one of Cluster or Warehouse ID") + elif self.cluster and not self.warehouse_id: + for event in handle_cluster_method(): + yield event + elif self.warehouse_id and not self.cluster: + handle_invalid_limit_value() + row_limit = fetch_limit_value() + for event in handle_dbsql_method(row_limit, databricks_configs.get("thread_count")): + yield event except Exception as e: - _LOGGER.error(e) - _LOGGER.error(traceback.format_exc()) + if str(e) == "Search Canceled!": + _LOGGER.info("Query execution has been canceled!") + else: + _LOGGER.exception(e) self.write_error(str(e)) diff --git a/app/bin/databricksretiredrun.py b/app/bin/databricksretiredrun.py deleted file mode 100644 index 359d56f..0000000 --- a/app/bin/databricksretiredrun.py +++ /dev/null @@ -1,106 +0,0 @@ -import ta_databricks_declare # noqa: F401 - -import sys -import traceback -import json - -import databricks_const as const - -from datetime import datetime, timedelta -from log_manager import setup_logging -from splunklib.client import connect -from solnlib.splunkenv import get_splunkd_access_info -from splunklib.searchcommands import ( - dispatch, - GeneratingCommand, - Configuration, - Option, - validators, -) - -_LOGGER = setup_logging("ta_databricksretiredrun_command") - - -@Configuration() -class DatabricksRetiredRunCommand(GeneratingCommand): - """Custom Command of databricksretiredrun.""" - - days = Option( - name='days', require=False, validate=validators.Integer(minimum=1) - ) - - run_id = Option( - name='run_id', require=False - ) - - user = Option( - name='user', require=False - ) - - def generate(self): - """Generate method of Generating Command.""" - try: - _LOGGER.info("Initiating databricksretiredrun command.") - if False: - yield - - session_key = self._metadata.searchinfo.session_key - - current_time = datetime.utcnow() - if not any((self.days, self.run_id, self.user)): - msg = "No parameters provided. Please provide at least one of the parameters" - self.write_error(msg) - _LOGGER.error(msg) - exit(1) - - conditions_list = [] - - if self.days: - lastcreatedtime = ( - current_time - - timedelta( - days=self.days, - hours=current_time.hour, - minutes=current_time.minute, - seconds=current_time.second, - microseconds=current_time.microsecond, - ) - ).timestamp() - lastcreated_dict = {"created_time": {"$lt": lastcreatedtime}} - conditions_list.append(lastcreated_dict) - - if self.run_id and self.run_id.strip(): - run_id_dict = {"run_id": self.run_id.strip()} - conditions_list.append(run_id_dict) - - if self.user and self.user.strip(): - user_dict = {"user": self.user.strip()} - conditions_list.append(user_dict) - - endpoint_query = {"$and": conditions_list} - _, host, mgmt_port = get_splunkd_access_info() - session_key = self.search_results_info.auth_token - service = connect(app=const.APP_NAME, owner="nobody", - port=mgmt_port, token=session_key) - if const.KV_COLLECTION_NAME_SUBMIT_RUN not in self.service.kvstore: - msg = "Could not find collection {}.".format(const.KV_COLLECTION_NAME_SUBMIT_RUN) - self.write_error(msg) - raise Exception(msg) - collection = service.kvstore[const.KV_COLLECTION_NAME_SUBMIT_RUN] - _LOGGER.info("Deleting retired run details from databricks_submit_run_log...") - # Responsible to delete data from the databricks_submit_run_log lookup - query = json.dumps(endpoint_query) - collection.data.delete(query) - - except Exception as e: - _LOGGER.error(e) - _LOGGER.error(traceback.format_exc()) - _LOGGER.info("Time taken - {} seconds.".format((datetime.utcnow() - current_time).total_seconds())) - _LOGGER.info("Completed the execution of databricksretiredrun command") - - def __init__(self): - """Initialize custom command class.""" - super(DatabricksRetiredRunCommand, self).__init__() - - -dispatch(DatabricksRetiredRunCommand, sys.argv, sys.stdin, sys.stdout, __name__) diff --git a/app/bin/databricksrun.py b/app/bin/databricksrun.py index f3af378..aaa9b0b 100755 --- a/app/bin/databricksrun.py +++ b/app/bin/databricksrun.py @@ -2,6 +2,8 @@ import sys import time import traceback +import json +import uuid import databricks_com as com import databricks_const as const @@ -15,10 +17,12 @@ Option, ) -_LOGGER = setup_logging("ta_databricksrun_command") +APP_NAME = const.APP_NAME +UID = str(uuid.uuid4()) +_LOGGER = setup_logging("ta_databricksrun_command", UID) -@Configuration(type="events") +@Configuration(type="reporting") class DatabricksRunCommand(GeneratingCommand): """Custom Command of databricksrun.""" @@ -33,34 +37,47 @@ class DatabricksRunCommand(GeneratingCommand): def generate(self): """Generating custom command.""" - _LOGGER.info("Initiating databricksrun command") - kv_log_info = { + _LOGGER.info("Initiating databricksrun command.") + _LOGGER.info("Notebook Path: {}".format(self.notebook_path if self.notebook_path else None)) + _LOGGER.info("Notebook Revision Timestamp: {}" + .format(self.revision_timestamp if self.revision_timestamp else None)) + _LOGGER.info("Run Name: {}".format(self.run_name if self.run_name else None)) + _LOGGER.info("Cluster: {}".format(self.cluster if self.cluster else None)) + _LOGGER.info("Notebook Params: {}".format(self.notebook_params if self.notebook_params else None)) + _LOGGER.info("Identifier: {}".format(self.identifier if self.identifier else None)) + + info_to_process = { "user": self._metadata.searchinfo.username, "account_name": self.account_name, "created_time": time.time(), "param": self._metadata.searchinfo.args, "run_id": "-", + "run_execution_status": "-", "output_url": "-", "result_url": "-", - "command_status": "Failed", + "command_submission_status": "Failed", "error": "-", "identifier": "-", + "uid": UID } if not (self.notebook_path and self.notebook_path.strip()): self.write_error('Please provide value for the parameter "notebook_path"') exit(1) if self.identifier and self.identifier.strip(): - kv_log_info["identifier"] = self.identifier.strip() + info_to_process["identifier"] = self.identifier.strip() session_key = self._metadata.searchinfo.session_key self.run_name = self.run_name or const.APP_NAME try: - + databricks_configs = utils.get_databricks_configs(session_key, self.account_name) + if not databricks_configs: + ERR_MSG = \ + "Account '{}' not found. Please provide valid Databricks account.".format(self.account_name) + raise Exception(ERR_MSG) + provided_index = databricks_configs.get("index") # Fetching cluster name - self.cluster = (self.cluster and self.cluster.strip()) or utils.get_databricks_configs( - session_key, self.account_name - ).get("cluster_name") + self.cluster = (self.cluster and self.cluster.strip()) or databricks_configs.get("cluster_name") if not self.cluster: raise Exception( "Databricks cluster is required to execute this custom command. " @@ -93,8 +110,10 @@ def generate(self): _LOGGER.info("Submitting the run") response = client.databricks_api("post", const.RUN_SUBMIT_ENDPOINT, data=payload) - kv_log_info.update(response) + info_to_process.update(response) run_id = response["run_id"] + if run_id: + _LOGGER.info("run ID returned: {}".format(run_id)) _LOGGER.info("Successfully submitted the run with ID: {}".format(run_id)) # Request to get the run_id details @@ -105,27 +124,34 @@ def generate(self): output_url = response.get("run_page_url") if output_url: result_url = output_url.rstrip("/") + "/resultsOnly" - kv_log_info["output_url"] = output_url - kv_log_info["result_url"] = result_url - kv_log_info["command_status"] = "Success" + info_to_process["output_url"] = output_url + info_to_process["result_url"] = result_url + info_to_process["command_submission_status"] = "Success" + info_to_process["run_execution_status"] = "Initiated" _LOGGER.info("Output url returned: {}".format(output_url)) + _LOGGER.info("Successfully executed databricksrun command.") + except Exception as e: _LOGGER.error(e) _LOGGER.error(traceback.format_exc()) - kv_log_info["error"] = str(e) + info_to_process["error"] = str(e) self.write_error(str(e)) exit(1) finally: - updated_kv_info = utils.update_kv_store_collection( - self._metadata.searchinfo.splunkd_uri, - const.KV_COLLECTION_NAME_SUBMIT_RUN, - session_key, - kv_log_info, - ) - - yield updated_kv_info + try: + _LOGGER.info("Ingesting the data into Splunk index: {}".format(provided_index)) + indented_json = json.dumps(info_to_process, indent=4) + _LOGGER.info("Data to be ingested in Splunk:\n{}".format(indented_json)) + utils.ingest_data_to_splunk( + info_to_process, session_key, provided_index, "databricks:databricksrun" + ) + _LOGGER.info("Successfully ingested the data into Splunk index: {}.".format(provided_index)) + except Exception: + _LOGGER.error("Error occured while ingesting data into Splunk. Error: {}" + .format(traceback.format_exc())) + yield info_to_process dispatch(DatabricksRunCommand, sys.argv, sys.stdin, sys.stdout, __name__) diff --git a/app/bin/databricksrunstatus.py b/app/bin/databricksrunstatus.py new file mode 100644 index 0000000..7a6399c --- /dev/null +++ b/app/bin/databricksrunstatus.py @@ -0,0 +1,94 @@ +import ta_databricks_declare # noqa: F401 + +import traceback +import sys +import time + +import splunklib.results as Results # noqa +import splunk.Intersplunk + +import databricks_com as com +import databricks_const as const +import databricks_common_utils as utils +from log_manager import setup_logging + +APP_NAME = const.APP_NAME +_LOGGER = setup_logging("ta_databricksrunstatus_command") + + +if __name__ == "__main__": + try: + _LOGGER.info("Initiating databricksrunstatus command.") + results, d_results, settings = splunk.Intersplunk.getOrganizedResults() + session_key = str(settings.get('sessionKey')) + if len(results) == 0: + _LOGGER.info("No data found to update.") + sys.exit(0) + + for each in results: + try: + to_ingest = False + if each.get("param"): + each["param"] = each["param"].split('\n') + run_id = each['run_id'] + account_name = each['account_name'] + index = each["index"] + sourcetype = each["sourcetype"] + if sourcetype == "databricks:databricksjob": + each.pop("identifier", None) + each.pop("index", None) + each.pop("sourcetype", None) + APPEND_RUN_ID_IN_LOG = "[UID: {}] Run ID: {}.".format(each.get("uid", "-"), str(run_id)) + client_ = com.DatabricksClient(account_name, session_key) + try: + run_id = int(run_id) + except ValueError: + continue + args = {"run_id": run_id} + response = client_.databricks_api("get", const.GET_RUN_ENDPOINT, args=args) + if response and response["state"]["life_cycle_state"] == "RUNNING": + _LOGGER.info("{} Execution is in Running state.".format(APPEND_RUN_ID_IN_LOG)) + if each["run_execution_status"] != "Running": + to_ingest = True + each["run_execution_status"] = "Running" + + elif response and response["state"]["life_cycle_state"] == "PENDING": + _LOGGER.info("{} Execution is in Pending state.".format(APPEND_RUN_ID_IN_LOG)) + if each["run_execution_status"] != "Pending": + to_ingest = True + each["run_execution_status"] = "Pending" + + elif response and response["state"]["life_cycle_state"] == "TERMINATED": + if response["state"]["result_state"] == "SUCCESS": + _LOGGER.info("{} Execution is successfully completed.".format(APPEND_RUN_ID_IN_LOG)) + each["run_execution_status"] = "Success" + to_ingest = True + elif response["state"]["result_state"] == "FAILED": + _LOGGER.info("{} Execution is failed.".format(APPEND_RUN_ID_IN_LOG)) + each["run_execution_status"] = "Failed" + to_ingest = True + elif response["state"]["result_state"] == "CANCELED": + _LOGGER.info("{} Execution is canceled.".format(APPEND_RUN_ID_IN_LOG)) + each["run_execution_status"] = "Canceled" + to_ingest = True + else: + if response: + res_state = response["state"]["result_state"] + _LOGGER.info("{} Execution status is {}.".format(APPEND_RUN_ID_IN_LOG, res_state)) + if each["run_execution_status"] != res_state: + to_ingest = True + each["run_execution_status"] = res_state + + if to_ingest: + each["created_time"] = time.time() + utils.ingest_data_to_splunk(dict(each), session_key, index, sourcetype) + _LOGGER.info("{} Updated Execution details successfully ingested into Splunk." + .format(APPEND_RUN_ID_IN_LOG)) + except Exception: + _LOGGER.error("{} Error: {}".format(APPEND_RUN_ID_IN_LOG, traceback.format_exc())) + continue + _LOGGER.info("Completed execution of databricksrunstatus command.") + except Exception: + _LOGGER.error("Error occured in executing databricksrunstatus command. Error: {}" + .format(traceback.format_exc())) + sys.exit(0) diff --git a/app/bin/log_manager.py b/app/bin/log_manager.py index 75770dd..2babc33 100755 --- a/app/bin/log_manager.py +++ b/app/bin/log_manager.py @@ -1,5 +1,6 @@ # Standard library imports import os +import datetime import logging import logging.handlers import databricks_const as const @@ -12,7 +13,7 @@ DEFAULT_LOG_LEVEL = logging.INFO -def setup_logging(log_name): +def setup_logging(log_name, uid_value=None): """ Get a logger object with specified log level. @@ -23,6 +24,7 @@ def setup_logging(log_name): log_file = make_splunkhome_path(["var", "log", "splunk", "%s.log" % log_name]) # Get directory in which log file is present log_dir = os.path.dirname(log_file) + uid = uid_value if uid_value else None # Create directory at the required path to store log file, if not found if not os.path.exists(log_dir): os.makedirs(log_dir) @@ -46,11 +48,21 @@ def setup_logging(log_name): file_handler = logging.handlers.RotatingFileHandler( log_file, mode="a", maxBytes=10485760, backupCount=10 ) + current_time = datetime.datetime.now(datetime.timezone.utc).astimezone() + time_str = current_time.strftime("%m-%d-%Y %H:%M:%S.%f")[:-3] + tz_offset = current_time.strftime("%z") + formatted_time = "{} {}".format(time_str, tz_offset) # Format logs - fmt_str = ( - "%(asctime)s %(levelname)s pid=%(process)d tid=%(threadName)s " - "file=%(filename)s:%(funcName)s:%(lineno)d | %(message)s" - ) + if uid: + fmt_str = ( + "{} %(levelname)s pid=%(process)d tid=%(threadName)s " + "file=%(filename)s:%(funcName)s:%(lineno)d | [UID: {}] %(message)s" + ).format(formatted_time, uid) + else: + fmt_str = ( + "{} %(levelname)s pid=%(process)d tid=%(threadName)s " + "file=%(filename)s:%(funcName)s:%(lineno)d | %(message)s" + ).format(formatted_time) formatter = logging.Formatter(fmt_str) file_handler.setFormatter(formatter) logger.addHandler(file_handler) diff --git a/app/bin/notebook.py b/app/bin/notebook.py index ca45aac..bfec4fa 100644 --- a/app/bin/notebook.py +++ b/app/bin/notebook.py @@ -136,7 +136,7 @@ def dowork(self, result): self.message("Submitting the run", status="working") # , level=logging.INFO) response = com.databricks_api("post", const.RUN_SUBMIT_ENDPOINT, data=payload) - # kv_log_info.update(response) + # info_to_process.update(response) run_id = response["run_id"] self.message( "Successfully submitted the run with ID: {}".format(run_id) diff --git a/app/bin/ta_databricks/aob_py3/splunklib/__init__.py b/app/bin/ta_databricks/aob_py3/splunklib/__init__.py old mode 100755 new mode 100644 index b370003..31787bd --- a/app/bin/ta_databricks/aob_py3/splunklib/__init__.py +++ b/app/bin/ta_databricks/aob_py3/splunklib/__init__.py @@ -31,5 +31,5 @@ def setup_logging(level, log_format=DEFAULT_LOG_FORMAT, date_format=DEFAULT_DATE format=log_format, datefmt=date_format) -__version_info__ = (1, 7, 1) +__version_info__ = (1, 7, 3) __version__ = ".".join(map(str, __version_info__)) diff --git a/app/bin/ta_databricks/aob_py3/splunklib/binding.py b/app/bin/ta_databricks/aob_py3/splunklib/binding.py old mode 100755 new mode 100644 index 7806bee..85cb8d1 --- a/app/bin/ta_databricks/aob_py3/splunklib/binding.py +++ b/app/bin/ta_databricks/aob_py3/splunklib/binding.py @@ -39,6 +39,7 @@ from io import BytesIO from xml.etree.ElementTree import XML +from splunklib import __version__ from splunklib import six from splunklib.six.moves import urllib @@ -346,7 +347,8 @@ def _authority(scheme=DEFAULT_SCHEME, host=DEFAULT_HOST, port=DEFAULT_PORT): "http://splunk.utopia.net:471" """ - if ':' in host: + # check if host is an IPv6 address and not enclosed in [ ] + if ':' in host and not (host.startswith('[') and host.endswith(']')): # IPv6 addresses must be enclosed in [ ] in order to be well # formed. host = '[' + host + ']' @@ -1434,7 +1436,7 @@ def request(url, message, **kwargs): head = { "Content-Length": str(len(body)), "Host": host, - "User-Agent": "splunk-sdk-python/1.7.1", + "User-Agent": "splunk-sdk-python/%s" % __version__, "Accept": "*/*", "Connection": "Close", } # defaults diff --git a/app/bin/ta_databricks/aob_py3/splunklib/client.py b/app/bin/ta_databricks/aob_py3/splunklib/client.py old mode 100755 new mode 100644 index 85c559d..33156bb --- a/app/bin/ta_databricks/aob_py3/splunklib/client.py +++ b/app/bin/ta_databricks/aob_py3/splunklib/client.py @@ -421,6 +421,7 @@ def __init__(self, **kwargs): super(Service, self).__init__(**kwargs) self._splunk_version = None self._kvstore_owner = None + self._instance_type = None @property def apps(self): @@ -572,7 +573,7 @@ def parse(self, query, **kwargs): :type kwargs: ``dict`` :return: A semantic map of the parsed search query. """ - if self.splunk_version >= (9,): + if not self.disable_v2_api: return self.post("search/v2/parser", q=query, **kwargs) return self.get("search/parser", q=query, **kwargs) @@ -695,6 +696,22 @@ def splunk_version(self): self._splunk_version = tuple([int(p) for p in self.info['version'].split('.')]) return self._splunk_version + @property + def splunk_instance(self): + if self._instance_type is None : + splunk_info = self.info; + if hasattr(splunk_info, 'instance_type') : + self._instance_type = splunk_info['instance_type'] + else: + self._instance_type = '' + return self._instance_type + + @property + def disable_v2_api(self): + if self.splunk_instance.lower() == 'cloud': + return self.splunk_version < (9,0,2209) + return self.splunk_version < (9,0,2) + @property def kvstore_owner(self): """Returns the KVStore owner for this instance of Splunk. @@ -1199,6 +1216,36 @@ def reload(self): self.post("_reload") return self + def acl_update(self, **kwargs): + """To update Access Control List (ACL) properties for an endpoint. + + :param kwargs: Additional entity-specific arguments (required). + + - "owner" (``string``): The Splunk username, such as "admin". A value of "nobody" means no specific user (required). + + - "sharing" (``string``): A mode that indicates how the resource is shared. The sharing mode can be "user", "app", "global", or "system" (required). + + :type kwargs: ``dict`` + + **Example**:: + + import splunklib.client as client + service = client.connect(...) + saved_search = service.saved_searches["name"] + saved_search.acl_update(sharing="app", owner="nobody", app="search", **{"perms.read": "admin, nobody"}) + """ + if "body" not in kwargs: + kwargs = {"body": kwargs} + + if "sharing" not in kwargs["body"]: + raise ValueError("Required argument 'sharing' is missing.") + if "owner" not in kwargs["body"]: + raise ValueError("Required argument 'owner' is missing.") + + self.post("acl", **kwargs) + self.refresh() + return self + @property def state(self): """Returns the entity's state record. @@ -2722,7 +2769,7 @@ def __init__(self, service, sid, **kwargs): # Default to v2 in Splunk Version 9+ path = "{path}{sid}" # Formatting path based on the Splunk Version - if service.splunk_version < (9,): + if service.disable_v2_api: path = path.format(path=PATH_JOBS, sid=sid) else: path = path.format(path=PATH_JOBS_V2, sid=sid) @@ -2782,7 +2829,7 @@ def events(self, **kwargs): kwargs['segmentation'] = kwargs.get('segmentation', 'none') # Search API v1(GET) and v2(POST) - if self.service.splunk_version < (9,): + if self.service.disable_v2_api: return self.get("events", **kwargs).body return self.post("events", **kwargs).body @@ -2874,7 +2921,7 @@ def results(self, **query_params): query_params['segmentation'] = query_params.get('segmentation', 'none') # Search API v1(GET) and v2(POST) - if self.service.splunk_version < (9,): + if self.service.disable_v2_api: return self.get("results", **query_params).body return self.post("results", **query_params).body @@ -2919,7 +2966,7 @@ def preview(self, **query_params): query_params['segmentation'] = query_params.get('segmentation', 'none') # Search API v1(GET) and v2(POST) - if self.service.splunk_version < (9,): + if self.service.disable_v2_api: return self.get("results_preview", **query_params).body return self.post("results_preview", **query_params).body @@ -3011,7 +3058,7 @@ class Jobs(Collection): collection using :meth:`Service.jobs`.""" def __init__(self, service): # Splunk 9 introduces the v2 endpoint - if service.splunk_version >= (9,): + if not service.disable_v2_api: path = PATH_JOBS_V2 else: path = PATH_JOBS @@ -3662,13 +3709,20 @@ class KVStoreCollections(Collection): def __init__(self, service): Collection.__init__(self, service, 'storage/collections/config', item=KVStoreCollection) - def create(self, name, indexes = {}, fields = {}, **kwargs): + def __getitem__(self, item): + res = Collection.__getitem__(self, item) + for k, v in res.content.items(): + if "accelerated_fields" in k: + res.content[k] = json.loads(v) + return res + + def create(self, name, accelerated_fields={}, fields={}, **kwargs): """Creates a KV Store Collection. :param name: name of collection to create :type name: ``string`` - :param indexes: dictionary of index definitions - :type indexes: ``dict`` + :param accelerated_fields: dictionary of accelerated_fields definitions + :type accelerated_fields: ``dict`` :param fields: dictionary of field definitions :type fields: ``dict`` :param kwargs: a dictionary of additional parameters specifying indexes and field definitions @@ -3676,10 +3730,10 @@ def create(self, name, indexes = {}, fields = {}, **kwargs): :return: Result of POST request """ - for k, v in six.iteritems(indexes): + for k, v in six.iteritems(accelerated_fields): if isinstance(v, dict): v = json.dumps(v) - kwargs['index.' + k] = v + kwargs['accelerated_fields.' + k] = v for k, v in six.iteritems(fields): kwargs['field.' + k] = v return self.post(name=name, **kwargs) @@ -3693,18 +3747,20 @@ def data(self): """ return KVStoreCollectionData(self) - def update_index(self, name, value): - """Changes the definition of a KV Store index. + def update_accelerated_field(self, name, value): + """Changes the definition of a KV Store accelerated_field. - :param name: name of index to change + :param name: name of accelerated_fields to change :type name: ``string`` - :param value: new index definition - :type value: ``dict`` or ``string`` + :param value: new accelerated_fields definition + :type value: ``dict`` :return: Result of POST request """ kwargs = {} - kwargs['index.' + name] = value if isinstance(value, six.string_types) else json.dumps(value) + if isinstance(value, dict): + value = json.dumps(value) + kwargs['accelerated_fields.' + name] = value return self.post(**kwargs) def update_field(self, name, value): diff --git a/app/bin/ta_databricks/aob_py3/splunklib/data.py b/app/bin/ta_databricks/aob_py3/splunklib/data.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/modularinput/__init__.py b/app/bin/ta_databricks/aob_py3/splunklib/modularinput/__init__.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/modularinput/argument.py b/app/bin/ta_databricks/aob_py3/splunklib/modularinput/argument.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/modularinput/event.py b/app/bin/ta_databricks/aob_py3/splunklib/modularinput/event.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/modularinput/event_writer.py b/app/bin/ta_databricks/aob_py3/splunklib/modularinput/event_writer.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/modularinput/input_definition.py b/app/bin/ta_databricks/aob_py3/splunklib/modularinput/input_definition.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/modularinput/scheme.py b/app/bin/ta_databricks/aob_py3/splunklib/modularinput/scheme.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/modularinput/script.py b/app/bin/ta_databricks/aob_py3/splunklib/modularinput/script.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/modularinput/utils.py b/app/bin/ta_databricks/aob_py3/splunklib/modularinput/utils.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/modularinput/validation_definition.py b/app/bin/ta_databricks/aob_py3/splunklib/modularinput/validation_definition.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/results.py b/app/bin/ta_databricks/aob_py3/splunklib/results.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/__init__.py b/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/__init__.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/decorators.py b/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/decorators.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/environment.py b/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/environment.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/eventing_command.py b/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/eventing_command.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/external_search_command.py b/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/external_search_command.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/generating_command.py b/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/generating_command.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/internals.py b/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/internals.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/reporting_command.py b/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/reporting_command.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/search_command.py b/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/search_command.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/streaming_command.py b/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/streaming_command.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/validators.py b/app/bin/ta_databricks/aob_py3/splunklib/searchcommands/validators.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/aob_py3/splunklib/six.py b/app/bin/ta_databricks/aob_py3/splunklib/six.py old mode 100755 new mode 100644 diff --git a/app/bin/ta_databricks/modalert_launch_notebook_helper.py b/app/bin/ta_databricks/modalert_launch_notebook_helper.py index c59b6ba..a368d39 100644 --- a/app/bin/ta_databricks/modalert_launch_notebook_helper.py +++ b/app/bin/ta_databricks/modalert_launch_notebook_helper.py @@ -20,6 +20,7 @@ def process_event(helper, *args, **kwargs): notebook_parameters = helper.get_param("notebook_parameters") cluster_name = helper.get_param("cluster_name") account_name = helper.get_param("account_name") + run_name = helper.get_param("run_name") if not (notebook_path and notebook_path.strip()): helper.log_error("Notebook path is a required parameter which is not provided.") @@ -35,6 +36,8 @@ def process_event(helper, *args, **kwargs): search_string = search_string + " notebook_params=\"" + notebook_parameters.strip() + "\"" if cluster_name: search_string = search_string + " cluster=\"" + cluster_name.strip() + "\"" + if run_name: + search_string = search_string + " run_name=\"" + run_name.strip() + "\"" try: if helper.action_mode == "adhoc": sid = helper.orig_sid diff --git a/app/default/alert_actions.conf b/app/default/alert_actions.conf index a3b5532..7ca0629 100644 --- a/app/default/alert_actions.conf +++ b/app/default/alert_actions.conf @@ -24,4 +24,5 @@ param.revision_timestamp = param.notebook_parameters = param.cluster_name = param.account_name = +param.run_name = TA-Databricks param._cam = {"technology": [{"vendor": "Databricks", "product": "Databricks", "version": ["1.1.0"]}], "supports_adhoc": true, "task": ["create"], "drilldown_uri": "databricks-launch-notebook?form.identifier=$rid$:$sid$&redirected=True", "subject": ["process"], "category": ["Information Conveyance"]} diff --git a/app/default/app.conf b/app/default/app.conf index 05f13d7..8888704 100644 --- a/app/default/app.conf +++ b/app/default/app.conf @@ -3,11 +3,11 @@ state_change_requires_restart = false is_configured = false state = enabled -build = 34 +build = 35 [launcher] author = Databricks, Inc. -version = 1.2.0 +version = 1.4.1 description = The Databricks Add-on for Splunk is used to query Databricks data, and execute Databricks notebooks from Splunk. [ui] diff --git a/app/default/collections.conf b/app/default/collections.conf deleted file mode 100644 index 647082d..0000000 --- a/app/default/collections.conf +++ /dev/null @@ -1,24 +0,0 @@ -[databricks_submit_run_log] -enforceTypes = true -field.created_time = time -field.error = string -field.output_url = string -field.result_url = string -field.param = string -field.run_id = string -field.command_status = string -field.user = string -field.account_name = string -field.identifier = string - -[databricks_execute_job_log] -enforceTypes = true -field.created_time = time -field.error = string -field.output_url = string -field.result_url = string -field.param = string -field.run_id = string -field.command_status = string -field.user = string -field.account_name = string diff --git a/app/default/commands.conf b/app/default/commands.conf index de285ad..6a6de09 100644 --- a/app/default/commands.conf +++ b/app/default/commands.conf @@ -13,7 +13,7 @@ filename = databricksjob.py python.version = python3 chunked = true -[databricksretiredrun] -filename = databricksretiredrun.py +[databricksrunstatus] +filename = databricksrunstatus.py python.version = python3 -chunked = true \ No newline at end of file +passauth = true \ No newline at end of file diff --git a/app/default/data/ui/alerts/launch_notebook.html b/app/default/data/ui/alerts/launch_notebook.html index 9cb4b3b..c95c5ca 100644 --- a/app/default/data/ui/alerts/launch_notebook.html +++ b/app/default/data/ui/alerts/launch_notebook.html @@ -7,28 +7,35 @@
- + +
+ + Run Name to identify the run execution. +
+
+
+
Absolute path of notebook to be run in Databricks. eg: /Users/user_1/notebook_1
- +
The timestamp of the revision of the notebook.
- +
Parameters to pass while executing the notebook. eg: key1=value1||key2=value2
- +
Name of Databricks cluster to use for execution. diff --git a/app/default/data/ui/views/databricks-intro.xml b/app/default/data/ui/views/databricks-intro.xml index f2144fa..03cc9c5 100644 --- a/app/default/data/ui/views/databricks-intro.xml +++ b/app/default/data/ui/views/databricks-intro.xml @@ -36,9 +36,13 @@
Add Screenshot here
+
+ Add Screenshot here +

To configure the Databricks Add-on, Navigate to the configuration page from the navigation bar above and Click on "Add" button and then follow these steps:

  1. Enter the databricks instance url in the "Databricks Instance" field without a schema (http or https)
  2. +
  3. Select the mode through which databricksquery command needs to be executed from "'databricksquery' to run on" dropdown.
  4. Select the authentication type from the dropdown.
  5. Based on the authentication type, enter the required credentials. (Refer the image above)
  6. If you select the "Personal Access Token" Authentication Type then, enter the personal access token for the same instance in the "Databricks Access Token" field. Else provide the "Client Id", "Tenant Id" and "Client Secret".
  7. @@ -62,7 +66,7 @@
    @@ -83,17 +87,22 @@ Description - Notebook path + Run Name + No + Run Name to identify the run execution. + + + Notebook Path Yes - The timestamp of the revision of the notebook. + The absolute path of the notebook to be run in the Databricks workspace. This path must begin with a slash. - Notebook revision timestamp + Notebook Revision Timestamp No - The absolute path of the notebook to be run in the Databricks workspace. This path must begin with a slash. + The timestamp of the revision of the notebook. - Notebook parameters + Notebook Parameters No Parameters to pass while executing the notebook. In the form of “key1=value1||key2=value2||...” @@ -122,7 +131,7 @@
    @@ -147,7 +156,7 @@ Launching a Notebook through Splunk search
    - Add Screenshot here + Add Screenshot here

    To launch a notebook via Splunk search, follow these steps:

      @@ -164,17 +173,19 @@

      The app provides three custom commands. Users can open the Splunk search bar and can execute the commands. Below are the command details.

      1. databricksquery: used to query data present in the Databricks table from Splunk. Refer docs for the command parameter details. -
        Syntax: | databricksquery cluster="<cluster_name>" query="<SQL_query>" command_timeout=<timeout_in_seconds> account_name="<account_name>" | table * -
        +
        Syntax 1: | databricksquery warehouse_id="<warehouse_id>" limit=<query_result_limit> query="<SQL_query>" command_timeout=<timeout_in_seconds> account_name="<account_name>" +
        Syntax 2: | databricksquery cluster="<cluster_name>" query="<SQL_query>" command_timeout=<timeout_in_seconds> account_name="<account_name>" +
        +

      2. databricksrun: used to submit a one-time run for a notebook without creating a job. Refer docs for the command parameter details. -
        Syntax: | databricksrun notebook_path="<path_to_notebook>" run_name="<run_name>" cluster="<cluster_name>" revision_timestamp=<revision_timestamp> notebook_params="<params_for_job_execution>" account_name="<account_name>" | table * -
        +
        Syntax: | databricksrun notebook_path="<path_to_notebook>" run_name="<run_name>" cluster="<cluster_name>" revision_timestamp=<revision_timestamp> notebook_params="<params_for_job_execution>" account_name="<account_name>" +
      3. databricksjob: used to run an already created job now from Splunk. Refer docs for the command parameter details. -
        Syntax: | databricksjob job_id=<job_id> notebook_params="<params_for_job_execution>" account_name="<account_name>" | table * -
        +
        Syntax: | databricksjob job_id=<job_id> notebook_params="<params_for_job_execution>" account_name="<account_name>" +

      diff --git a/app/default/data/ui/views/databricks-launch-notebook.xml b/app/default/data/ui/views/databricks-launch-notebook.xml index 64c1318..902e178 100644 --- a/app/default/data/ui/views/databricks-launch-notebook.xml +++ b/app/default/data/ui/views/databricks-launch-notebook.xml @@ -1,7 +1,7 @@ - + - | databricksrun notebook_path=$notebook|s$ revision_timestamp=$revision_timestamp|s$ notebook_params=$params|s$ cluster=$cluster|s$ account_name=$account_name|s$ + | databricksrun notebook_path=$notebook|s$ revision_timestamp=$revision_timestamp|s$ notebook_params=$params|s$ cluster=$cluster|s$ account_name=$account_name|s$ run_name=$run_name|s$ @@ -25,7 +25,9 @@ - |inputlookup submit_run_logs where identifier=$form.identifier|s$|eventstats max(created_time) as max_created_time | where created_time=max_created_time + `databricks_index_macro` sourcetype="databricks:databricksrun" identifier=$form.identifier|s$|eventstats max(created_time) as max_created_time | where created_time=max_created_time | table param{} account_name command_submission_status created_time error identifier number_in_job output_url result_url run_execution_status run_id uid user | rename param{} as param + 0 + now @@ -37,14 +39,17 @@ mvfind(split($split_params$,","),"revision_timestamp=") mvfind(split($split_params$,","),"notebook_params=") mvfind(split($split_params$,","),"cluster=") + mvfind(split($split_params$,","),"run_name=") mvindex(split($split_params$,","),$notebook_index$) mvindex(split($split_params$,","),$param_index$) mvindex(split($split_params$,","),$revision_index$) mvindex(split($split_params$,","),$cluster_index$) + mvindex(split($split_params$,","),$run_index$) substr($notebook_value$,15) substr($revision_value$,20) substr($param_value$,17) substr($cluster_value$,9) + substr($run_value$,10) $result.result_url$ @@ -59,14 +64,17 @@ mvfind(split($split_params$,","),"revision_timestamp=") mvfind(split($split_params$,","),"notebook_params=") mvfind(split($split_params$,","),"cluster=") + mvfind(split($split_params$,","),"run_name=") mvindex(split($split_params$,","),$notebook_index$) mvindex(split($split_params$,","),$param_index$) mvindex(split($split_params$,","),$revision_index$) mvindex(split($split_params$,","),$cluster_index$) + mvindex(split($split_params$,","),$run_index$) substr($notebook_value$,15) substr($revision_value$,20) substr($param_value$,17) substr($cluster_value$,9) + substr($run_value$,10) $result.error$ @@ -78,22 +86,26 @@
      - - + + + TA-Databricks - - + + + + + - + - + - + title title @@ -103,7 +115,7 @@ - + No Yes @@ -153,7 +165,7 @@ Result URL -
      Job running, click here to redirect.
      +
      Job submitted, click here to redirect.
      diff --git a/app/default/data/ui/views/databricks_job_execution_details.xml b/app/default/data/ui/views/databricks_job_execution_details.xml index ffc1560..f01da64 100644 --- a/app/default/data/ui/views/databricks_job_execution_details.xml +++ b/app/default/data/ui/views/databricks_job_execution_details.xml @@ -1,4 +1,4 @@ - +
      @@ -9,7 +9,7 @@ - + All Failed Success @@ -20,7 +20,7 @@ user user - | inputlookup submit_run_logs | append [| inputlookup execute_job_logs] | dedup user | table user + `databricks_index_macro` sourcetype=databricks* | dedup user | table user $creation_time_filter.earliest$ $creation_time_filter.latest$ @@ -31,21 +31,27 @@ Databricks Run Details - +
      - | inputlookup submit_run_logs | sort -created_time + `databricks_index_macro` sourcetype="databricks:databricksrun" | sort -created_time +| dedup run_id | addinfo | where info_min_time<=created_time AND (info_max_time>=created_time OR info_max_time="+Infinity") -| eval created_time=strftime(created_time,"%Y-%m-%dT%H:%M:%S.%Q") | search user="$user_filter$" command_status="$command_execution_status_filter$" -| table created_time user account_name param run_id command_status output_url result_url error +| eval created_time=strftime(created_time,"%Y-%m-%dT%H:%M:%S.%Q") +| eval cancel = run_id + "||" + account_name + "||" + run_execution_status + "||" + uid +| search user="$user_filter$" command_submission_status="$command_execution_status_filter$" +| table uid created_time user account_name param{} run_id command_submission_status run_execution_status output_url result_url error cancel | rename created_time as "Created Time" | rename error as "Error Message" | rename output_url as "Output URL" | rename result_url as "Result URL" -| rename param as "Notebook Params" +| rename param{} as "Notebook Params" | rename run_id as "Run ID" -| rename command_status as "Submission Status" +| rename command_submission_status as "Command Submission Status" +| rename run_execution_status as "Run Execution Status" | rename user as "User" +| rename cancel as "Cancel Run" +| rename uid as UID | rename account_name as "Databricks Account" $creation_time_filter.earliest$ $creation_time_filter.latest$ @@ -70,6 +76,9 @@ + + search?q=`databricks_index_macro`%20sourcetype=databricks:databricksrun%20uid%3D%22$row.UID$%22&earliest=0&latest=now + @@ -80,22 +89,28 @@ Databricks Job Details -
      +
      - | inputlookup execute_job_logs | sort -created_time + `databricks_index_macro` sourcetype="databricks:databricksjob" | sort -created_time +| dedup run_id | addinfo | eval info_max_time=if(isnum(info_max_time), info_max_time, now()) | where info_min_time<=created_time AND (info_max_time>=created_time OR info_max_time="+Infinity") -| search user="$user_filter$" command_status="$command_execution_status_filter$" | eval created_time=strftime(created_time,"%Y-%m-%dT%H:%M:%S.%Q") -| table created_time user account_name param run_id command_status output_url result_url error +| search user="$user_filter$" command_submission_status="$command_execution_status_filter$" +| eval created_time=strftime(created_time,"%Y-%m-%dT%H:%M:%S.%Q") +| eval cancel = run_id + "||" + account_name + "||" + run_execution_status + "||" + uid +| table uid created_time user account_name param{} run_id command_submission_status run_execution_status output_url result_url error cancel | rename created_time as "Created Time" | rename error as "Error Message" | rename output_url as "Output URL" | rename result_url as "Result URL" -| rename param as "Notebook Params" +| rename param{} as "Notebook Params" | rename run_id as "Run ID" -| rename command_status as "Submission Status" +| rename command_submission_status as "Command Submission Status" +| rename run_execution_status as "Job Execution Status" | rename user as "User" +| rename cancel as "Cancel Run" +| rename uid as UID | rename account_name as "Databricks Account" $creation_time_filter.earliest$ $creation_time_filter.latest$ @@ -120,6 +135,9 @@ + + search?q=`databricks_index_macro`%20sourcetype=databricks:databricksjob%20uid%3D%22$row.UID$%22&earliest=0&latest=now + diff --git a/app/default/macros.conf b/app/default/macros.conf index 5ff3e7e..59eddab 100644 --- a/app/default/macros.conf +++ b/app/default/macros.conf @@ -1,4 +1,2 @@ -[databricks_run_retiring_days] -description = Retire Run after given days -definition = 90 -iseval = 0 \ No newline at end of file +[databricks_index_macro] +definition = index IN ("main") \ No newline at end of file diff --git a/app/default/props.conf b/app/default/props.conf index 8ee2ea7..794f93b 100644 --- a/app/default/props.conf +++ b/app/default/props.conf @@ -7,4 +7,6 @@ EXTRACT-output_url = output_url="(?.*?)" [source::...ta_databricks*.log*] SHOULD_LINEMERGE = true -sourcetype = tadatabricks:log \ No newline at end of file +sourcetype = tadatabricks:log +EXTRACT-log_fields = ^(?:\d{4}-\d{2}-\d{2}|\d{2}-\d{2}-\d{4})\s+\d{1,2}:\d{1,2}:\d{1,2}(?:[,\.]\d{1,3})?(?:\s+[\+\-]\d{4})?\s+(?INFO|WARNING|DEBUG|ERROR|CRITICAL)\s?pid=\w+\s?tid=[\w]+\s?file=[^|]+\s?\|\s*(\[UID: (?[^\]]+)\])?\s*(?.+) + diff --git a/app/default/restmap.conf b/app/default/restmap.conf index ec8d3b8..0ca0143 100644 --- a/app/default/restmap.conf +++ b/app/default/restmap.conf @@ -25,3 +25,10 @@ python.version = python3 handler = databricks_get_credentials.DatabricksGetCredentials passSession = true passSystemAuth = true + +[script:cancel_run] +match = /cancel_run +script = cancel_run.py +scripttype = persist +python.version = python3 +handler = cancel_run.CancelRunningExecution diff --git a/app/default/savedsearches.conf b/app/default/savedsearches.conf index d94a764..d162015 100644 --- a/app/default/savedsearches.conf +++ b/app/default/savedsearches.conf @@ -1,11 +1,12 @@ -[databricks_retire_run] -cron_schedule = 0 1 * * * -description = Retire Run after specified days -dispatch.earliest_time = -1m +[databricks_update_run_execution_status] +cron_schedule = */5 * * * * +description = Gets run execution status every 5 minutes +dispatch.earliest_time = 0 dispatch.latest_time = now +display.page.search.mode = fast enableSched = 1 realtime_schedule = 0 request.ui_dispatch_app = TA-Databricks request.ui_dispatch_view = TA-Databricks -search = | databricksretiredrun days=`databricks_run_retiring_days` +search = `databricks_index_macro` sourcetype=*databricks:databricks* | dedup run_id | where run_execution_status IN ("Initiated", "Pending", "Running") | table index sourcetype account_name command_submission_status created_time error number_in_job output_url param{} result_url user run_id run_execution_status identifier uid | rename param{} as param | databricksrunstatus disabled = 0 \ No newline at end of file diff --git a/app/default/searchbnf.conf b/app/default/searchbnf.conf index ad9cd05..956a45d 100644 --- a/app/default/searchbnf.conf +++ b/app/default/searchbnf.conf @@ -1,27 +1,36 @@ [databricksquery-command] -syntax = databricksquery cluster="" query="" command_timeout= account_name= | table * +syntax = databricksquery warehouse_id="" cluster="" query="" command_timeout= limit= account_name= description = This command helps users to query their data present in the Databricks table from Splunk. shortdesc = Query Databricks table from Splunk. -example1 = | databricksquery query="SELECT * FROM default.people WHERE age>30" cluster="test_cluster" command_timeout=60 account_name="AAD_account" | table * +example1 = | databricksquery query="SELECT * FROM default.people WHERE age>30" warehouse_id="a0b1c2" command_timeout=60 account_name="AAD_account" +example2 = | databricksquery query="SELECT * FROM default.people WHERE age>30" cluster="test_cluster" command_timeout=60 account_name="AAD_account" comment1 = Retrieve the data from people table. usage = public appears-in = 1.0.0 catagory = generating maintainer = Databricks, Inc. +[warehouse_id] +syntax = +description = Warehouse to use for execution. + [SQL_query] syntax = description = SQL query to be executed. [timeout_in_seconds] syntax = -description = SQL qurty execution timeout in seconds. +description = SQL query execution timeout in seconds. + +[query_result_limit] +syntax = +description = Limit of rows in SQL query execution result. [databricksrun-command] -syntax = databricksrun cluster="" notebook_path="" revision_timestamp= notebook_params="" run_name="" account_name="" | table * +syntax = databricksrun cluster="" notebook_path="" revision_timestamp= notebook_params="" run_name="" account_name="" description = This custom command helps users to submit a one-time run without creating a job. shortdesc = Submit run without creating job. -example1 = | databricksrun notebook_path="/path/to/test_notebook" run_name="run_comm" cluster="test_cluster" revision_timestamp=1609146477 notebook_params="key1=value1||key2=value2" account_name="PAT_account" | table * +example1 = | databricksrun notebook_path="/path/to/test_notebook" run_name="run_comm" cluster="test_cluster" revision_timestamp=1609146477 notebook_params="key1=value1||key2=value2" account_name="PAT_account" comment1 = Display information such as command status and output URL. usage = public appears-in = 1.0.0 @@ -45,10 +54,10 @@ syntax = description = Name of the run. [databricksjob-command] -syntax = databricksjob job_id= notebook_params="" account_name="" | table * +syntax = databricksjob job_id= notebook_params="" account_name="" description = This custom command helps users to run an already created job from Splunk. shortdesc = Trigger the existing job from Splunk. -example1 = | databricksjob job_id=2 notebook_params="key1=value1||key2=value2" account_name="A1" | table * +example1 = | databricksjob job_id=2 notebook_params="key1=value1||key2=value2" account_name="A1" comment1 = Display information such as command status and output URL. usage = public appears-in = 1.0.0 diff --git a/app/default/server.conf b/app/default/server.conf new file mode 100644 index 0000000..1c66bb3 --- /dev/null +++ b/app/default/server.conf @@ -0,0 +1,3 @@ +[shclustering] +conf_replication_include.ta_databricks_account = true +conf_replication_include.ta_databricks_settings = true diff --git a/app/default/ta_databricks_settings.conf b/app/default/ta_databricks_settings.conf index ead33bb..3d0349b 100644 --- a/app/default/ta_databricks_settings.conf +++ b/app/default/ta_databricks_settings.conf @@ -2,3 +2,8 @@ [logging] +[additional_parameters] +admin_command_timeout = 300 +query_result_limit = 10000 +index = main +thread_count = 5 \ No newline at end of file diff --git a/app/default/transforms.conf b/app/default/transforms.conf deleted file mode 100644 index 445c60e..0000000 --- a/app/default/transforms.conf +++ /dev/null @@ -1,9 +0,0 @@ -[submit_run_logs] -external_type = kvstore -collection = databricks_submit_run_log -fields_list = created_time, output_url, result_url, param, run_id, command_status, user, account_name, error, identifier - -[execute_job_logs] -external_type = kvstore -collection = databricks_execute_job_log -fields_list = created_time output_url result_url param run_id command_status user account_name error diff --git a/app/default/web.conf b/app/default/web.conf index 34be4e4..2e7e23f 100644 --- a/app/default/web.conf +++ b/app/default/web.conf @@ -22,3 +22,7 @@ methods = GET [expose:get_credentials] pattern = databricks_get_credentials methods = GET + +[expose:run_cancelation] +pattern = cancel_run +methods = POST diff --git a/app/metadata/default.meta b/app/metadata/default.meta index 3e8dfcb..b57b64e 100644 --- a/app/metadata/default.meta +++ b/app/metadata/default.meta @@ -7,14 +7,6 @@ export = system access = read : [ * ], write : [ admin, sc_admin ] export = none -[collections/databricks_submit_run_log] -access = read : [ * ], write : [ * ] -export = none - -[collections/databricks_execute_job_log] -access = read : [ * ], write : [ * ] -export = none - [views] access = read : [ * ], write : [ admin, sc_admin ] export = none