Skip to content

Fix Environment Migration script for encryption #1732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions composer/tools/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
.. This file is automatically generated. Do not edit this file directly.

Google Cloud Composer Python Samples
===============================================================================

.. image:: https://gstatic.com/cloudssh/images/open-btn.png
:target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=composer/tools/README.rst


This directory contains samples for Google Cloud Composer. `Google Cloud Composer`_ is a managed Apache Airflow service that helps you create, schedule, monitor and manage workflows. Cloud Composer automation helps you create Airflow environments quickly and use Airflow-native tools, such as the powerful Airflow web interface and command line tools, so you can focus on your workflows and not your infrastructure.




.. _Google Cloud Composer: https://cloud.google.com/composer/docs

Setup
-------------------------------------------------------------------------------


Authentication
++++++++++++++

This sample requires you to have authentication setup. Refer to the
`Authentication Getting Started Guide`_ for instructions on setting up
credentials for applications.

.. _Authentication Getting Started Guide:
https://cloud.google.com/docs/authentication/getting-started

Install Dependencies
++++++++++++++++++++

#. Clone python-docs-samples and change directory to the sample directory you want to use.

.. code-block:: bash

$ git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

#. Install `pip`_ and `virtualenv`_ if you do not already have them. You may want to refer to the `Python Development Environment Setup Guide`_ for Google Cloud Platform for instructions.

.. _Python Development Environment Setup Guide:
https://cloud.google.com/python/setup

#. Create a virtualenv. Samples are compatible with Python 2.7 and 3.4+.

.. code-block:: bash

$ virtualenv env
$ source env/bin/activate

#. Install the dependencies needed to run the samples.

.. code-block:: bash

$ pip install -r requirements.txt

.. _pip: https://pip.pypa.io/
.. _virtualenv: https://virtualenv.pypa.io/

Samples
-------------------------------------------------------------------------------

Create a new Composer environment based on an existing environment
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

.. image:: https://gstatic.com/cloudssh/images/open-btn.png
:target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=composer/tools/copy_environment.py,composer/tools/README.rst




To run this sample:

.. code-block:: bash

$ python copy_environment.py

usage: copy_environment.py [-h] [--running_as_service_account]
[--override_machine_type OVERRIDE_MACHINE_TYPE]
[--override_disk_size_gb OVERRIDE_DISK_SIZE_GB]
[--override_network OVERRIDE_NETWORK]
[--override_subnetwork OVERRIDE_SUBNETWORK]
project location existing_env_name new_env_name

Clone a composer environment.

positional arguments:
project Google Cloud Project containing existing Composer
Environment.
location Google Cloud region containing existing Composer
Environment. For example `us-central1`.
existing_env_name The name of the existing Composer Environment.
new_env_name The name to use for the new Composer Environment.

optional arguments:
-h, --help show this help message and exit
--running_as_service_account
Set this flag if the script is running on a VM with
same service account as used in the Composer
Environment. This avoids creating extra credentials.
--override_machine_type OVERRIDE_MACHINE_TYPE
Optional. Overrides machine type used for Cloud
Composer Environment. Must be a fully specified
machine type URI.
--override_disk_size_gb OVERRIDE_DISK_SIZE_GB
Optional. Overrides the disk size in GB used for Cloud
Composer Environment.
--override_network OVERRIDE_NETWORK
Optional. Overrides the network used for Cloud
Composer Environment.
--override_subnetwork OVERRIDE_SUBNETWORK
Optional. Overrides the subnetwork used for Cloud
Composer Environment.





.. _Google Cloud SDK: https://cloud.google.com/sdk/
26 changes: 26 additions & 0 deletions composer/tools/README.rst.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# This file is used to generate README.rst

product:
name: Google Cloud Composer
short_name: Cloud Composer
url: https://cloud.google.com/composer/docs
description: >
`Google Cloud Composer`_ is a managed Apache Airflow service that helps
you create, schedule, monitor and manage workflows. Cloud Composer
automation helps you create Airflow environments quickly and use
Airflow-native tools, such as the powerful Airflow web interface and
command line tools, so you can focus on your workflows and not your
infrastructure.

setup:
- auth
- install_deps

samples:
- name: Create a new Composer environment based on an existing environment
file: copy_environment.py
show_help: True

cloud_client_library: false

folder: composer/tools
131 changes: 123 additions & 8 deletions composer/tools/copy_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import argparse
import ast
import base64
import contextlib
import json
import os
import re
import shutil
import subprocess
Expand All @@ -39,12 +41,15 @@
import time
import uuid

from cryptography import fernet
import google.auth
from google.cloud import storage
from google.oauth2 import service_account
from googleapiclient import discovery
from googleapiclient import errors

from googleapiclient import discovery, errors
from kubernetes import client, config
from mysql import connector
import six
from six.moves import configparser

DEFAULT_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]

Expand Down Expand Up @@ -370,8 +375,112 @@ def export_data(sql_client, project, instance, gcs_bucket_name, filename):
wait_sql_operation(sql_client, project, operation.get("name"))


def get_fernet_key(composer_env):
print("Retrieving fernet key for Composer Environment {}.".format(
composer_env.get('name', '')))
gke_cluster_resource = composer_env.get("config", {}).get("gkeCluster")
project_zone_cluster = re.match(
"projects/([^/]*)/zones/([^/]*)/clusters/([^/]*)", gke_cluster_resource
).groups()
tmp_dir_name = None
try:
print("Getting cluster credentials {} to retrieve fernet key.".format(
gke_cluster_resource))
tmp_dir_name = tempfile.mkdtemp()
kubeconfig_file = tmp_dir_name + "/config"
os.environ["KUBECONFIG"] = kubeconfig_file
if subprocess.call(
[
"gcloud",
"container",
"clusters",
"get-credentials",
project_zone_cluster[2],
"--zone",
project_zone_cluster[1],
"--project",
project_zone_cluster[0]
]
):
print("Failed to retrieve cluster credentials: {}.".format(
gke_cluster_resource))
sys.exit(1)

kubernetes_client = client.CoreV1Api(
api_client=config.new_client_from_config(
config_file=kubeconfig_file))
airflow_configmap = kubernetes_client.read_namespaced_config_map(
"airflow-configmap", "default")
config_str = airflow_configmap.data['airflow.cfg']
with contextlib.closing(six.StringIO(config_str)) as config_buffer:
config_parser = configparser.ConfigParser()
config_parser.readfp(config_buffer)
return config_parser.get("core", "fernet_key")
except Exception as exc:
print("Failed to get fernet key for cluster: {}.".format(str(exc)))
sys.exit(1)
finally:
if tmp_dir_name:
shutil.rmtree(tmp_dir_name)


def reencrypt_variables_connections(old_fernet_key_str, new_fernet_key_str):
old_fernet_key = fernet.Fernet(old_fernet_key_str.encode("utf-8"))
new_fernet_key = fernet.Fernet(new_fernet_key_str.encode("utf-8"))
db = connector.connect(
host="127.0.0.1",
user="root",
database="airflow-db",
)
variable_cursor = db.cursor()
variable_cursor.execute("SELECT id, val, is_encrypted FROM variable")
rows = variable_cursor.fetchall()
for row in rows:
id = row[0]
val = row[1]
is_encrypted = row[2]
if is_encrypted:
updated_val = new_fernet_key.encrypt(
old_fernet_key.decrypt(bytes(val))).decode()
variable_cursor.execute(
"UPDATE variable SET val=%s WHERE id=%s", (updated_val, id))
db.commit()

conn_cursor = db.cursor()
conn_cursor.execute(
"SELECT id, password, extra, is_encrypted, is_extra_encrypted FROM "
"connection")
rows = conn_cursor.fetchall()
for row in rows:
id = row[0]
password = row[1]
extra = row[2]
is_encrypted = row[3]
is_extra_encrypted = row[4]
if is_encrypted:
updated_password = new_fernet_key.encrypt(
old_fernet_key.decrypt(bytes(password))).decode()
conn_cursor.execute(
"UPDATE connection SET password=%s WHERE id=%s",
(updated_password, id))
if is_extra_encrypted:
updated_extra = new_fernet_key.encrypt(
old_fernet_key.decrypt(bytes(extra))).decode()
conn_cursor.execute(
"UPDATE connection SET extra=%s WHERE id=%s",
(updated_extra, id))
db.commit()


def import_data(
sql_client, service_account_key, project, instance, gcs_bucket, filename
sql_client,
service_account_key,
project,
instance,
gcs_bucket,
filename,
old_fernet_key,
new_fernet_key
):
tmp_dir_name = None
fuse_dir = None
Expand All @@ -383,7 +492,6 @@ def import_data(
if subprocess.call(["mkdir", fuse_dir]):
print("Failed to make temporary subdir {}.".format(fuse_dir))
sys.exit(1)
print(str(["gcsfuse", gcs_bucket, fuse_dir]))
if subprocess.call(["gcsfuse", gcs_bucket, fuse_dir]):
print(
"Failed to fuse bucket {} with temp local directory {}".format(
Expand Down Expand Up @@ -424,9 +532,11 @@ def import_data(
):
print("Failed to import database.")
sys.exit(1)
print("Reencrypting variables and connections.")
reencrypt_variables_connections(old_fernet_key, new_fernet_key)
print("Database import succeeded.")
except Exception:
print("Failed to copy database.")
except Exception as exc:
print("Failed to copy database: {}".format(str(exc)))
sys.exit(1)
finally:
if proxy_subprocess:
Expand Down Expand Up @@ -522,6 +632,9 @@ def copy_database(project, existing_env, new_env, running_as_service_account):
gcs_db_dump_bucket.name,
"db_dump.sql",
)
print("Obtaining fernet keys for Composer Environments.")
old_fernet_key = get_fernet_key(existing_env)
new_fernet_key = get_fernet_key(new_env)
print("Preparing database import to new Environment.")
import_data(
sql_client,
Expand All @@ -530,6 +643,8 @@ def copy_database(project, existing_env, new_env, running_as_service_account):
new_sql_instance,
gcs_db_dump_bucket.name,
"db_dump.sql",
old_fernet_key,
new_fernet_key,
)
finally:
if gke_service_account_key:
Expand All @@ -542,7 +657,7 @@ def copy_database(project, existing_env, new_env, running_as_service_account):
)
if gcs_db_dump_bucket:
print("Deleting temporary Cloud Storage bucket.")
# delete_bucket(gcs_db_dump_bucket)
delete_bucket(gcs_db_dump_bucket)


def copy_gcs_bucket(existing_env, new_env):
Expand Down
3 changes: 3 additions & 0 deletions composer/tools/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
cryptography==2.3.1
google-api-python-client==1.6.4
google-auth==1.5.1
google-cloud-storage==1.11.0
kubernetes==7.0.0
mysql-connector-python==8.0.12