-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: Add support for DynamoDB online_read in batches #2371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
feast-ci-bot
merged 25 commits into
feast-dev:master
from
TremaMiguel:feat/dynamo_db_online_write_read
Mar 23, 2022
Merged
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
c34bf3e
feat: dynamodb onlin read in batches
TremaMiguel 56d2d32
run linters and format
TremaMiguel 7b98faf
feat: batch_size parameter
TremaMiguel 94abfb6
Merge branch 'feast-dev:master' into feat/dynamo_db_online_write_read
TremaMiguel b5c1a3d
docs: typo in batch_size description
TremaMiguel 5a12856
trailing white space
TremaMiguel 8bd2a84
fix: batch_size is last argument
TremaMiguel fb6eacb
test: dynamodb online store online_read in batches
TremaMiguel 1bbd5dc
Merge branch 'master' into feat/dynamo_db_online_write_read
adchia 307bab9
test: mock dynamodb behavior
TremaMiguel e52a895
feat: batch_size value must be less than 40
TremaMiguel 29b5cf6
Merge branch 'master' into feat/dynamo_db_online_write_read
adchia 97fd71f
feat: batch_size defaults to 40
TremaMiguel ddb3f0a
Merge branch 'feat/dynamo_db_online_write_read' of github.com:TremaMi…
TremaMiguel 12064e4
feat: sort dynamodb responses
TremaMiguel 449f60d
merge branch master into feat/dynamo_db_online_write_read
TremaMiguel 3f72228
resolve merge conflicts
TremaMiguel 7a4edbd
test online response proto with redshift:dynamodb
TremaMiguel 3843a02
feat: consistency in batch_size process
TremaMiguel 88e183e
fix: return batch_size times None
TremaMiguel 23cb49a
remove debug code
TremaMiguel 44f97e7
Merge branch 'feast-dev:master' into feat/dynamo_db_online_write_read
TremaMiguel eaf4940
typo in docstring
TremaMiguel 5bc54d3
batch_size in onlineconfigstore
TremaMiguel c7ab086
Merge branch 'master' into feat/dynamo_db_online_write_read
TremaMiguel File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
57 changes: 57 additions & 0 deletions
57
sdk/python/tests/unit/online_store/test_dynamodb_online_store.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
from dataclasses import dataclass | ||
|
||
import pytest | ||
from moto import mock_dynamodb2 | ||
|
||
from feast.infra.offline_stores.file import FileOfflineStoreConfig | ||
from feast.infra.online_stores.dynamodb import ( | ||
DynamoDBOnlineStore, | ||
DynamoDBOnlineStoreConfig, | ||
) | ||
from feast.repo_config import RepoConfig | ||
from tests.utils.online_store_utils import ( | ||
_create_n_customer_test_samples, | ||
_create_test_table, | ||
_insert_data_test_table, | ||
) | ||
|
||
REGISTRY = "s3://test_registry/registry.db" | ||
PROJECT = "test_aws" | ||
PROVIDER = "aws" | ||
TABLE_NAME = "dynamodb_online_store" | ||
REGION = "us-west-2" | ||
|
||
|
||
@dataclass | ||
class MockFeatureView: | ||
name: str | ||
|
||
|
||
@pytest.fixture | ||
def repo_config(): | ||
return RepoConfig( | ||
registry=REGISTRY, | ||
project=PROJECT, | ||
provider=PROVIDER, | ||
online_store=DynamoDBOnlineStoreConfig(region=REGION), | ||
offline_store=FileOfflineStoreConfig(), | ||
) | ||
|
||
|
||
@mock_dynamodb2 | ||
@pytest.mark.parametrize("n_samples", [5, 50, 100]) | ||
def test_online_read(repo_config, n_samples): | ||
"""Test DynamoDBOnlineStore online_read method.""" | ||
_create_test_table(PROJECT, f"{TABLE_NAME}_{n_samples}", REGION) | ||
data = _create_n_customer_test_samples(n=n_samples) | ||
_insert_data_test_table(data, PROJECT, f"{TABLE_NAME}_{n_samples}", REGION) | ||
|
||
entity_keys, features = zip(*data) | ||
dynamodb_store = DynamoDBOnlineStore() | ||
returned_items = dynamodb_store.online_read( | ||
config=repo_config, | ||
table=MockFeatureView(name=f"{TABLE_NAME}_{n_samples}"), | ||
entity_keys=entity_keys, | ||
) | ||
assert len(returned_items) == len(data) | ||
assert [item[1] for item in returned_items] == list(features) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
from datetime import datetime | ||
|
||
import boto3 | ||
|
||
from feast import utils | ||
from feast.infra.online_stores.helpers import compute_entity_id | ||
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto | ||
from feast.protos.feast.types.Value_pb2 import Value as ValueProto | ||
|
||
|
||
def _create_n_customer_test_samples(n=10): | ||
return [ | ||
( | ||
EntityKeyProto( | ||
join_keys=["customer"], entity_values=[ValueProto(string_val=str(i))] | ||
), | ||
{ | ||
"avg_orders_day": ValueProto(float_val=1.0), | ||
"name": ValueProto(string_val="John"), | ||
"age": ValueProto(int64_val=3), | ||
}, | ||
) | ||
for i in range(n) | ||
] | ||
|
||
|
||
def _create_test_table(project, tbl_name, region): | ||
client = boto3.client("dynamodb", region_name=region) | ||
client.create_table( | ||
TableName=f"{project}.{tbl_name}", | ||
KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}], | ||
AttributeDefinitions=[{"AttributeName": "entity_id", "AttributeType": "S"}], | ||
BillingMode="PAY_PER_REQUEST", | ||
) | ||
|
||
|
||
def _delete_test_table(project, tbl_name, region): | ||
client = boto3.client("dynamodb", region_name=region) | ||
client.delete_table(TableName=f"{project}.{tbl_name}") | ||
|
||
|
||
def _insert_data_test_table(data, project, tbl_name, region): | ||
dynamodb_resource = boto3.resource("dynamodb", region_name=region) | ||
table_instance = dynamodb_resource.Table(f"{project}.{tbl_name}") | ||
for entity_key, features in data: | ||
entity_id = compute_entity_id(entity_key) | ||
with table_instance.batch_writer() as batch: | ||
batch.put_item( | ||
Item={ | ||
"entity_id": entity_id, | ||
"event_ts": str(utils.make_tzaware(datetime.utcnow())), | ||
"values": {k: v.SerializeToString() for k, v in features.items()}, | ||
} | ||
) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.