Skip to content

infra: clean up pickle.load logic in integ tests #1611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions tests/integ/datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import gzip
import os
import pickle

from tests.integ import DATA_DIR


def one_p_mnist():
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
with gzip.open(data_path, "rb") as f:
training_set, _, _ = pickle.load(f, encoding="latin1")

return training_set
78 changes: 20 additions & 58 deletions tests/integ/test_airflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import gzip
import os
import pickle
import sys
import pytest
import tests.integ

import airflow
import pytest
import numpy as np
from airflow import DAG
from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator
from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator
from six.moves.urllib.parse import urlparse

import tests.integ
from sagemaker import (
KMeans,
FactorizationMachines,
Expand All @@ -40,21 +42,13 @@
from sagemaker.pytorch.estimator import PyTorch
from sagemaker.sklearn import SKLearn
from sagemaker.tensorflow import TensorFlow
from sagemaker.workflow import airflow as sm_airflow
from sagemaker.utils import sagemaker_timestamp

import airflow
from airflow import DAG
from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator
from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator

from sagemaker.workflow import airflow as sm_airflow
from sagemaker.xgboost import XGBoost
from tests.integ import DATA_DIR, PYTHON_VERSION
from tests.integ import datasets, DATA_DIR, PYTHON_VERSION
from tests.integ.record_set import prepare_record_set_from_local_files
from tests.integ.timeout import timeout

from six.moves.urllib.parse import urlparse

PYTORCH_MNIST_DIR = os.path.join(DATA_DIR, "pytorch_mnist")
PYTORCH_MNIST_SCRIPT = os.path.join(PYTORCH_MNIST_DIR, "mnist.py")
AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS = 10
Expand Down Expand Up @@ -101,13 +95,6 @@ def test_byo_airflow_config_uploads_data_source_to_s3_when_inputs_provided(
@pytest.mark.canary_quick
def test_kmeans_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"}

# Load the data into memory as numpy arrays
with gzip.open(data_path, "rb") as f:
train_set, _, _ = pickle.load(f, **pickle_args)

kmeans = KMeans(
role=ROLE,
train_instance_count=SINGLE_INSTANCE_COUNT,
Expand All @@ -126,7 +113,7 @@ def test_kmeans_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_
kmeans.center_factor = 1
kmeans.eval_metrics = ["ssd", "msd"]

records = kmeans.record_set(train_set[0][:100])
records = kmeans.record_set(datasets.one_p_mnist()[0][:100])

training_config = _build_airflow_workflow(
estimator=kmeans, instance_type=cpu_instance_type, inputs=records
Expand All @@ -140,13 +127,6 @@ def test_kmeans_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_

def test_fm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"}

# Load the data into memory as numpy arrays
with gzip.open(data_path, "rb") as f:
train_set, _, _ = pickle.load(f, **pickle_args)

fm = FactorizationMachines(
role=ROLE,
train_instance_count=SINGLE_INSTANCE_COUNT,
Expand All @@ -160,7 +140,8 @@ def test_fm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_inst
sagemaker_session=sagemaker_session,
)

records = fm.record_set(train_set[0][:200], train_set[1][:200].astype("float32"))
training_set = datasets.one_p_mnist()
records = fm.record_set(training_set[0][:200], training_set[1][:200].astype("float32"))

training_config = _build_airflow_workflow(
estimator=fm, instance_type=cpu_instance_type, inputs=records
Expand Down Expand Up @@ -206,13 +187,6 @@ def test_ipinsights_airflow_config_uploads_data_source_to_s3(sagemaker_session,

def test_knn_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"}

# Load the data into memory as numpy arrays
with gzip.open(data_path, "rb") as f:
train_set, _, _ = pickle.load(f, **pickle_args)

knn = KNN(
role=ROLE,
train_instance_count=SINGLE_INSTANCE_COUNT,
Expand All @@ -223,7 +197,8 @@ def test_knn_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins
sagemaker_session=sagemaker_session,
)

records = knn.record_set(train_set[0][:200], train_set[1][:200].astype("float32"))
training_set = datasets.one_p_mnist()
records = knn.record_set(training_set[0][:200], training_set[1][:200].astype("float32"))

training_config = _build_airflow_workflow(
estimator=knn, instance_type=cpu_instance_type, inputs=records
Expand Down Expand Up @@ -277,16 +252,10 @@ def test_linearlearner_airflow_config_uploads_data_source_to_s3(
sagemaker_session, cpu_instance_type
):
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"}

# Load the data into memory as numpy arrays
with gzip.open(data_path, "rb") as f:
train_set, _, _ = pickle.load(f, **pickle_args)

train_set[1][:100] = 1
train_set[1][100:200] = 0
train_set = train_set[0], train_set[1].astype(np.dtype("float32"))
training_set = datasets.one_p_mnist()
training_set[1][:100] = 1
training_set[1][100:200] = 0
training_set = training_set[0], training_set[1].astype(np.dtype("float32"))

ll = LinearLearner(
ROLE,
Expand Down Expand Up @@ -331,7 +300,7 @@ def test_linearlearner_airflow_config_uploads_data_source_to_s3(
ll.early_stopping_tolerance = 0.0001
ll.early_stopping_patience = 3

records = ll.record_set(train_set[0][:200], train_set[1][:200])
records = ll.record_set(training_set[0][:200], training_set[1][:200])

training_config = _build_airflow_workflow(
estimator=ll, instance_type=cpu_instance_type, inputs=records
Expand Down Expand Up @@ -380,13 +349,6 @@ def test_ntm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins
@pytest.mark.canary_quick
def test_pca_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"}

# Load the data into memory as numpy arrays
with gzip.open(data_path, "rb") as f:
train_set, _, _ = pickle.load(f, **pickle_args)

pca = PCA(
role=ROLE,
train_instance_count=SINGLE_INSTANCE_COUNT,
Expand All @@ -399,7 +361,7 @@ def test_pca_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins
pca.subtract_mean = True
pca.extra_components = 5

records = pca.record_set(train_set[0][:100])
records = pca.record_set(datasets.one_p_mnist()[0][:100])

training_config = _build_airflow_workflow(
estimator=pca, instance_type=cpu_instance_type, inputs=records
Expand Down
30 changes: 10 additions & 20 deletions tests/integ/test_byo_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,16 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import gzip
import json
import os
import pickle
import sys

import pytest

import sagemaker
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.estimator import Estimator
from sagemaker.utils import unique_name_from_base
from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES
from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES, datasets
from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name


Expand All @@ -33,6 +30,11 @@ def region(sagemaker_session):
return sagemaker_session.boto_session.region_name


@pytest.fixture
def training_set():
return datasets.one_p_mnist()


def fm_serializer(data):
js = {"instances": []}
for row in data:
Expand All @@ -41,7 +43,7 @@ def fm_serializer(data):


@pytest.mark.canary_quick
def test_byo_estimator(sagemaker_session, region, cpu_instance_type):
def test_byo_estimator(sagemaker_session, region, cpu_instance_type, training_set):
"""Use Factorization Machines algorithm as an example here.

First we need to prepare data for training. We take standard data set, convert it to the
Expand All @@ -57,12 +59,6 @@ def test_byo_estimator(sagemaker_session, region, cpu_instance_type):
job_name = unique_name_from_base("byo")

with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"}

with gzip.open(data_path, "rb") as f:
train_set, _, _ = pickle.load(f, **pickle_args)

prefix = "test_byo_estimator"
key = "recordio-pb-data"

Expand Down Expand Up @@ -92,26 +88,20 @@ def test_byo_estimator(sagemaker_session, region, cpu_instance_type):
predictor.content_type = "application/json"
predictor.deserializer = sagemaker.predictor.json_deserializer

result = predictor.predict(train_set[0][:10])
result = predictor.predict(training_set[0][:10])

assert len(result["predictions"]) == 10
for prediction in result["predictions"]:
assert prediction["score"] is not None


def test_async_byo_estimator(sagemaker_session, region, cpu_instance_type):
def test_async_byo_estimator(sagemaker_session, region, cpu_instance_type, training_set):
image_name = get_image_uri(region, "factorization-machines")
endpoint_name = unique_name_from_base("byo")
training_data_path = os.path.join(DATA_DIR, "dummy_tensor")
job_name = unique_name_from_base("byo")

with timeout(minutes=5):
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"}

with gzip.open(data_path, "rb") as f:
train_set, _, _ = pickle.load(f, **pickle_args)

prefix = "test_byo_estimator"
key = "recordio-pb-data"

Expand Down Expand Up @@ -144,7 +134,7 @@ def test_async_byo_estimator(sagemaker_session, region, cpu_instance_type):
predictor.content_type = "application/json"
predictor.deserializer = sagemaker.predictor.json_deserializer

result = predictor.predict(train_set[0][:10])
result = predictor.predict(training_set[0][:10])

assert len(result["predictions"]) == 10
for prediction in result["predictions"]:
Expand Down
39 changes: 14 additions & 25 deletions tests/integ/test_factorization_machines.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,25 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import gzip
import os
import pickle
import sys
import time

import pytest

from sagemaker import FactorizationMachines, FactorizationMachinesModel
from sagemaker.utils import unique_name_from_base
from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES
from tests.integ import datasets, TRAINING_DEFAULT_TIMEOUT_MINUTES
from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name


def test_factorization_machines(sagemaker_session, cpu_instance_type):
job_name = unique_name_from_base("fm")
@pytest.fixture
def training_set():
return datasets.one_p_mnist()

with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"}

# Load the data into memory as numpy arrays
with gzip.open(data_path, "rb") as f:
train_set, _, _ = pickle.load(f, **pickle_args)
def test_factorization_machines(sagemaker_session, cpu_instance_type, training_set):
job_name = unique_name_from_base("fm")

with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
fm = FactorizationMachines(
role="SageMakerRole",
train_instance_count=1,
Expand All @@ -50,7 +46,7 @@ def test_factorization_machines(sagemaker_session, cpu_instance_type):

# training labels must be 'float32'
fm.fit(
fm.record_set(train_set[0][:200], train_set[1][:200].astype("float32")),
fm.record_set(training_set[0][:200], training_set[1][:200].astype("float32")),
job_name=job_name,
)

Expand All @@ -59,24 +55,17 @@ def test_factorization_machines(sagemaker_session, cpu_instance_type):
fm.model_data, role="SageMakerRole", sagemaker_session=sagemaker_session
)
predictor = model.deploy(1, cpu_instance_type, endpoint_name=job_name)
result = predictor.predict(train_set[0][:10])
result = predictor.predict(training_set[0][:10])

assert len(result) == 10
for record in result:
assert record.label["score"] is not None


def test_async_factorization_machines(sagemaker_session, cpu_instance_type):
def test_async_factorization_machines(sagemaker_session, cpu_instance_type, training_set):
job_name = unique_name_from_base("fm")

with timeout(minutes=5):
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
pickle_args = {} if sys.version_info.major == 2 else {"encoding": "latin1"}

# Load the data into memory as numpy arrays
with gzip.open(data_path, "rb") as f:
train_set, _, _ = pickle.load(f, **pickle_args)

fm = FactorizationMachines(
role="SageMakerRole",
train_instance_count=1,
Expand All @@ -92,7 +81,7 @@ def test_async_factorization_machines(sagemaker_session, cpu_instance_type):

# training labels must be 'float32'
fm.fit(
fm.record_set(train_set[0][:200], train_set[1][:200].astype("float32")),
fm.record_set(training_set[0][:200], training_set[1][:200].astype("float32")),
job_name=job_name,
wait=False,
)
Expand All @@ -109,7 +98,7 @@ def test_async_factorization_machines(sagemaker_session, cpu_instance_type):
estimator.model_data, role="SageMakerRole", sagemaker_session=sagemaker_session
)
predictor = model.deploy(1, cpu_instance_type, endpoint_name=job_name)
result = predictor.predict(train_set[0][:10])
result = predictor.predict(training_set[0][:10])

assert len(result) == 10
for record in result:
Expand Down
Loading