Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
import com.amazonaws.athena.connector.lambda.data.writers.extractors.Extractor;
import com.amazonaws.athena.connector.lambda.domain.Split;
import com.amazonaws.athena.connector.lambda.domain.predicate.Constraints;
import com.amazonaws.athena.connector.lambda.domain.predicate.QueryPlan;
import com.amazonaws.athena.connector.lambda.exceptions.AthenaConnectorException;
import com.amazonaws.athena.connector.lambda.handlers.RecordHandler;
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
import com.amazonaws.athena.connector.lambda.security.FederatedIdentity;
import com.amazonaws.athena.connector.substrait.SubstraitRelUtils;
import com.amazonaws.athena.connector.substrait.model.SubstraitRelModel;
import com.amazonaws.athena.connectors.dynamodb.credentials.CrossAccountCredentialsProviderV2;
import com.amazonaws.athena.connectors.dynamodb.qpt.DDBQueryPassthrough;
import com.amazonaws.athena.connectors.dynamodb.resolver.DynamoDBFieldResolver;
Expand All @@ -42,10 +46,14 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.substrait.proto.FetchRel;
import io.substrait.proto.Plan;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand Down Expand Up @@ -201,9 +209,20 @@ protected void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recor
logger.info("ColumnNameMapping isEmpty: " + recordMetadata.getColumnNameMapping().isEmpty());
logger.info("Resolving disableProjectionAndCasing to: " + disableProjectionAndCasing);
}
QueryPlan queryPlan = recordsRequest.getConstraints().getQueryPlan();
FederatedIdentity federatedIdentity = recordsRequest.getIdentity();
AwsRequestOverrideConfiguration overrideConfig = getRequestOverrideConfig(federatedIdentity.getConfigOptions());
Plan plan = null;
if (queryPlan != null) {
plan = SubstraitRelUtils.deserializeSubstraitPlan(queryPlan.getSubstraitPlan());
}

Iterator<Map<String, AttributeValue>> itemIterator = getIterator(split, tableName, recordsRequest.getSchema(), recordsRequest.getConstraints(), disableProjectionAndCasing);
writeItemsToBlock(spiller, recordsRequest, queryStatusChecker, recordMetadata, itemIterator, disableProjectionAndCasing);
Iterator<Map<String, AttributeValue>> itemIterator =
getIterator(split, tableName, recordsRequest.getSchema(), recordsRequest.getConstraints(),
disableProjectionAndCasing, plan, overrideConfig);
// Variable to determine limit can be applied or not, If applicable what is the limit value.
Pair<Boolean, Integer> limitPair = getLimit(plan, recordsRequest.getConstraints());
writeItemsToBlock(spiller, recordsRequest, queryStatusChecker, recordMetadata, itemIterator, disableProjectionAndCasing, limitPair);
}

private void handleQueryPassthroughPartiQLQuery(BlockSpiller spiller, ReadRecordsRequest recordsRequest, QueryStatusChecker queryStatusChecker)
Expand All @@ -224,7 +243,8 @@ private void handleQueryPassthroughPartiQLQuery(BlockSpiller spiller, ReadRecord
ExecuteStatementResponse response = ddbClient.executeStatement(executeStatementRequest);

Iterator<Map<String, AttributeValue>> itemIterator = response.items().iterator();
writeItemsToBlock(spiller, recordsRequest, queryStatusChecker, recordMetadata, itemIterator, false);

writeItemsToBlock(spiller, recordsRequest, queryStatusChecker, recordMetadata, itemIterator, false, Pair.of(false, -1));
}

private void writeItemsToBlock(
Expand All @@ -233,7 +253,8 @@ private void writeItemsToBlock(
QueryStatusChecker queryStatusChecker,
DDBRecordMetadata recordMetadata,
Iterator<Map<String, AttributeValue>> itemIterator,
boolean disableProjectionAndCasing)
boolean disableProjectionAndCasing,
Pair<Boolean, Integer> limitPair)
{
DynamoDBFieldResolver resolver = new DynamoDBFieldResolver(recordMetadata);

Expand All @@ -256,7 +277,6 @@ private void writeItemsToBlock(

GeneratedRowWriter rowWriter = rowWriterBuilder.build();
long numRows = 0;
boolean canApplyLimit = canApplyLimit(recordsRequest.getConstraints());
while (itemIterator.hasNext()) {
if (!queryStatusChecker.isQueryRunning()) {
// we can stop processing because the query waiting for this data has already terminated
Expand All @@ -271,16 +291,30 @@ private void writeItemsToBlock(
}
spiller.writeRows((Block block, int rowNum) -> rowWriter.writeRow(block, rowNum, item) ? 1 : 0);
numRows++;
if (canApplyLimit && numRows >= recordsRequest.getConstraints().getLimit()) {
// If limit is enabled and records fetched is greater than limit, We can stop execution.
if (limitPair.getLeft() && numRows >= limitPair.getRight()) {
return;
}
}
logger.info("readWithConstraint: numRows[{}]", numRows);
}

private boolean canApplyLimit(Constraints constraints)
private Optional<Integer> canApplyLimit(Constraints constraints,
SubstraitRelModel substraitRelModel,
boolean useQueryPlan)
{
return constraints.hasLimit() && !constraints.hasNonEmptyOrderByClause();
if (useQueryPlan) {
if (substraitRelModel.getSortRel() == null && substraitRelModel.getFetchRel() != null) {
FetchRel fetchRel = substraitRelModel.getFetchRel();
int limit = (int) fetchRel.getCount();
return Optional.of(limit);
}
return Optional.empty();
}
if (constraints.hasLimit() && !constraints.hasNonEmptyOrderByClause()) {
return Optional.of((int) constraints.getLimit());
}
return Optional.empty();
}

private boolean rangeFilterHasIn(String rangeKeyFilter)
Expand Down Expand Up @@ -314,7 +348,11 @@ private boolean isQueryRequest(Split split)
/*
Converts a split into a Query
*/
private QueryRequest buildQueryRequest(Split split, String tableName, Schema schema, Constraints constraints, boolean disableProjectionAndCasing, Map<String, AttributeValue> exclusiveStartKey)
private QueryRequest buildQueryRequest(Split split, String tableName, Schema schema,
boolean disableProjectionAndCasing,
Map<String, AttributeValue> exclusiveStartKey,
AwsRequestOverrideConfiguration overrideConfiguration,
Pair<Boolean, Integer> limitPair)
{
validateExpectedMetadata(split.getProperties());
// prepare filters
Expand Down Expand Up @@ -372,17 +410,24 @@ private QueryRequest buildQueryRequest(Split split, String tableName, Schema sch
.expressionAttributeNames(expressionAttributeNames)
.expressionAttributeValues(expressionAttributeValues)
.projectionExpression(projectionExpression)
.overrideConfiguration(overrideConfiguration)
.exclusiveStartKey(exclusiveStartKey);
if (canApplyLimit(constraints)) {
queryRequestBuilder.limit((int) constraints.getLimit());

boolean limitPresent = limitPair.getLeft();
if (limitPresent) {
queryRequestBuilder.limit(limitPair.getRight());
}
return queryRequestBuilder.build();
}

/*
Converts a split into a Scan Request
*/
private ScanRequest buildScanRequest(Split split, String tableName, Schema schema, Constraints constraints, boolean disableProjectionAndCasing, Map<String, AttributeValue> exclusiveStartKey)
private ScanRequest buildScanRequest(Split split, String tableName, Schema schema,
boolean disableProjectionAndCasing,
Map<String, AttributeValue> exclusiveStartKey,
AwsRequestOverrideConfiguration overrideConfiguration,
Pair<Boolean, Integer> limitPair)
{
validateExpectedMetadata(split.getProperties());
// prepare filters
Expand Down Expand Up @@ -422,17 +467,21 @@ private ScanRequest buildScanRequest(Split split, String tableName, Schema schem
.expressionAttributeNames(expressionAttributeNames.isEmpty() ? null : expressionAttributeNames)
.expressionAttributeValues(expressionAttributeValues.isEmpty() ? null : expressionAttributeValues)
.projectionExpression(projectionExpression)
.overrideConfiguration(overrideConfiguration)
.exclusiveStartKey(exclusiveStartKey);
if (canApplyLimit(constraints)) {
scanRequestBuilder.limit((int) constraints.getLimit());
boolean limitPresent = limitPair.getLeft();
if (limitPresent) {
scanRequestBuilder.limit(limitPair.getRight());
}
return scanRequestBuilder.build();
}

/*
Creates an iterator that can iterate through a Query or Scan, sending paginated requests as necessary
*/
private Iterator<Map<String, AttributeValue>> getIterator(Split split, String tableName, Schema schema, Constraints constraints, boolean disableProjectionAndCasing)
private Iterator<Map<String, AttributeValue>> getIterator(Split split, String tableName, Schema schema,
Constraints constraints, boolean disableProjectionAndCasing,
Plan plan, AwsRequestOverrideConfiguration requestOverrideConfiguration)
{
return new Iterator<Map<String, AttributeValue>>() {
AtomicReference<Map<String, AttributeValue>> lastKeyEvaluated = new AtomicReference<>();
Expand All @@ -452,17 +501,21 @@ public Map<String, AttributeValue> next()
if (currentPageIterator.get() != null && currentPageIterator.get().hasNext()) {
return currentPageIterator.get().next();
}
// Variable to determine limit can be applied or not, If applicable what is the limit value.
Pair<Boolean, Integer> limitPair = getLimit(plan, constraints);
Iterator<Map<String, AttributeValue>> iterator;
try {
if (isQueryRequest(split)) {
QueryRequest request = buildQueryRequest(split, tableName, schema, constraints, disableProjectionAndCasing, lastKeyEvaluated.get());
QueryRequest request = buildQueryRequest(split, tableName, schema,
disableProjectionAndCasing, lastKeyEvaluated.get(), requestOverrideConfiguration, limitPair);
logger.info("Invoking DDB with Query request: {}", request);
QueryResponse response = invokerCache.get(tableName).invoke(() -> ddbClient.query(request));
lastKeyEvaluated.set(response.lastEvaluatedKey());
iterator = response.items().iterator();
}
else {
ScanRequest request = buildScanRequest(split, tableName, schema, constraints, disableProjectionAndCasing, lastKeyEvaluated.get());
ScanRequest request = buildScanRequest(split, tableName, schema,
disableProjectionAndCasing, lastKeyEvaluated.get(), requestOverrideConfiguration, limitPair);
logger.info("Invoking DDB with Scan request: {}", request);
ScanResponse response = invokerCache.get(tableName).invoke(() -> ddbClient.scan(request));
lastKeyEvaluated.set(response.lastEvaluatedKey());
Expand Down Expand Up @@ -500,4 +553,19 @@ private void validateExpectedMetadata(Map<String, String> metadata)
checkArgument(metadata.containsKey(EXPRESSION_VALUES_METADATA), "Split missing expected metadata [%s] when filters are present", EXPRESSION_VALUES_METADATA);
}
}

private Pair<Boolean, Integer> getLimit(Plan plan, Constraints constraints)
{
SubstraitRelModel substraitRelModel = null;
boolean useQueryPlan = false;
if (plan != null) {
substraitRelModel = SubstraitRelModel.buildSubstraitRelModel(plan.getRelations(0).getRoot().getInput());
useQueryPlan = true;
}
Optional<Integer> optionalLimit = canApplyLimit(constraints, substraitRelModel, useQueryPlan);
if (optionalLimit.isPresent()) {
return Pair.of(true, optionalLimit.get());
}
return Pair.of(false, -1);
}
}
Loading