Skip to content

Commit b668b21

Browse files
VenkatasivareddyTRburhan94fal-bharadwaj
authored
Added pagination for Db2 connector (#2772)
Co-authored-by: burhan94 <[email protected]> Co-authored-by: Fal Bharadwaj <[email protected]>
1 parent 373ee56 commit b668b21

File tree

3 files changed

+87
-1
lines changed

3 files changed

+87
-1
lines changed

athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2Constants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ public class Db2Constants
3535
"from syscat.tables " +
3636
"where type in ('T', 'U', 'V', 'W') and " +
3737
"tabschema = ? order by tabname";
38+
public static final String LIST_PAGINATED_TABLES_QUERY = "select tabschema AS \"TABLE_SCHEM\", tabname AS \"TABLE_NAME\" " +
39+
"from syscat.tables where type in ('T', 'U', 'V', 'W') and tabschema = ? " +
40+
"ORDER BY tabname OFFSET ? ROWS FETCH NEXT ? ROWS ONLY";
3841

3942
static final String PARTITION_QUERY = "SELECT DATAPARTITIONID FROM SYSCAT.DATAPARTITIONS WHERE TABSCHEMA = ? AND TABNAME = ? AND SEQNO > 0";
4043
static final String COLUMN_INFO_QUERY = "select colname, typename from syscat.columns where tabschema = ? AND tabname = ?";

athena-db2/src/main/java/com/amazonaws/athena/connectors/db2/Db2MetadataHandler.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,15 @@
3737
import com.amazonaws.athena.connector.lambda.metadata.GetTableLayoutRequest;
3838
import com.amazonaws.athena.connector.lambda.metadata.ListSchemasRequest;
3939
import com.amazonaws.athena.connector.lambda.metadata.ListSchemasResponse;
40+
import com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest;
41+
import com.amazonaws.athena.connector.lambda.metadata.ListTablesResponse;
4042
import com.amazonaws.athena.connector.lambda.metadata.optimizations.DataSourceOptimizations;
4143
import com.amazonaws.athena.connector.lambda.metadata.optimizations.OptimizationSubType;
4244
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.ComplexExpressionPushdownSubType;
4345
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.FilterPushdownSubType;
4446
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.LimitPushdownSubType;
4547
import com.amazonaws.athena.connector.lambda.metadata.optimizations.pushdown.TopNPushdownSubType;
48+
import com.amazonaws.athena.connector.util.PaginationHelper;
4649
import com.amazonaws.athena.connectors.db2.resolver.Db2JDBCCaseResolver;
4750
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionConfig;
4851
import com.amazonaws.athena.connectors.jdbc.connection.DatabaseConnectionInfo;
@@ -80,6 +83,7 @@
8083
import java.util.stream.Collectors;
8184

8285
import static com.amazonaws.athena.connector.lambda.domain.predicate.functions.StandardFunctions.NULLIF_FUNCTION_NAME;
86+
import static com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE;
8387
import static com.amazonaws.athena.connectors.db2.Db2Constants.PARTITION_NUMBER;
8488

8589
public class Db2MetadataHandler extends JdbcMetadataHandler
@@ -170,6 +174,37 @@ protected List<TableName> listTables(Connection connection, String schemaName) t
170174
.collect(Collectors.toList());
171175
}
172176

177+
@Override
178+
public ListTablesResponse listPaginatedTables(final Connection connection, final ListTablesRequest listTablesRequest) throws SQLException
179+
{
180+
LOGGER.debug("Starting listPaginatedTables for Db2.");
181+
int pageSize = listTablesRequest.getPageSize();
182+
int token = PaginationHelper.validateAndParsePaginationArguments(listTablesRequest.getNextToken(), pageSize);
183+
184+
if (pageSize == UNLIMITED_PAGE_SIZE_VALUE) {
185+
pageSize = Integer.MAX_VALUE;
186+
}
187+
188+
LOGGER.info("Starting pagination at {} with page size {}", token, pageSize);
189+
List<TableName> paginatedTables = getPaginatedTables(connection, listTablesRequest.getSchemaName(), token, pageSize);
190+
String nextToken = PaginationHelper.calculateNextToken(token, pageSize, paginatedTables);
191+
LOGGER.info("{} tables returned. Next token is {}", paginatedTables.size(), nextToken);
192+
193+
return new ListTablesResponse(listTablesRequest.getCatalogName(), paginatedTables, nextToken);
194+
}
195+
196+
@VisibleForTesting
197+
protected List<TableName> getPaginatedTables(Connection connection, String databaseName, int offset, int limit) throws SQLException
198+
{
199+
PreparedStatement preparedStatement = connection.prepareStatement(Db2Constants.LIST_PAGINATED_TABLES_QUERY);
200+
201+
preparedStatement.setString(1, databaseName);
202+
preparedStatement.setInt(2, offset);
203+
preparedStatement.setInt(3, limit);
204+
205+
return JDBCUtil.getTableMetadata(preparedStatement, TABLES_AND_VIEWS);
206+
}
207+
173208
/**
174209
* Creates Schema object with arrow compatible filed to frame the partition.
175210
*

athena-db2/src/test/java/com/amazonaws/athena/connectors/db2/Db2MetadataHandlerTest.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272

7373
import static com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE;
7474
import static com.amazonaws.athena.connectors.db2.Db2Constants.PARTITION_NUMBER;
75-
import static org.mockito.ArgumentMatchers.any;
7675
import static org.mockito.ArgumentMatchers.nullable;
7776

7877
public class Db2MetadataHandlerTest extends TestBase {
@@ -352,5 +351,54 @@ public void doListTables() throws Exception {
352351
new TableName("TESTSCHEMA", "testTABLE")};
353352
Assert.assertEquals(Arrays.toString(expectedTables), listTablesResponse.getTables().toString());
354353
}
354+
355+
@Test
356+
public void doListPaginatedTables()
357+
throws Exception
358+
{
359+
//Test 1: Testing Single table returned in request of page size 1 and nextToken null
360+
Object[][] values = {{"testSchema", "testTable"}};
361+
TableName[] expected = {new TableName("testSchema", "testTable")};
362+
executePaginatedTableTest(values, expected, null, 1, "1");
363+
364+
// Test 2: Testing next table returned of page size 1 and nextToken 1
365+
executePaginatedTableTest(values, expected, "1", 1, "2");
366+
367+
// Test 3: Testing single table returned when requesting pageSize 2 signifying end of pagination where nextToken is null.
368+
executePaginatedTableTest(values, expected, "2", 2, null);
369+
370+
// Test 4: Testing unlimited page size (UNLIMITED_PAGE_SIZE_VALUE) which should set pageSize to Integer.MAX_VALUE
371+
values = new Object[][]{{"testSchema", "testTable1"}, {"testSchema", "testTable2"}};
372+
expected = new TableName[]{new TableName("testSchema", "testTable1"), new TableName("testSchema", "testTable2")};
373+
// Use a non-null token to force pagination path, which will trigger the listPaginatedTables method
374+
// With unlimited page size, there should be no next token (null) since all results are returned
375+
executePaginatedTableTest(values, expected, "0", UNLIMITED_PAGE_SIZE_VALUE, null);
376+
}
377+
378+
/**
379+
* Helper method to execute paginated table test and verify results
380+
* @param values the table data to return in the ResultSet
381+
* @param expected the expected TableName array
382+
* @param nextToken the nextToken to use in the request
383+
* @param pageSize the pageSize to use in the request
384+
* @param expectedNextToken the expected nextToken in the response
385+
* @throws Exception if there's an error during execution
386+
*/
387+
private void executePaginatedTableTest(Object[][] values,
388+
TableName[] expected, String nextToken, int pageSize,
389+
String expectedNextToken) throws Exception {
390+
BlockAllocator blockAllocator = new BlockAllocatorImpl();
391+
PreparedStatement preparedStatement = Mockito.mock(PreparedStatement.class);
392+
Mockito.when(this.connection.prepareStatement(Db2Constants.LIST_PAGINATED_TABLES_QUERY)).thenReturn(preparedStatement);
393+
String[] schema = {"TABLE_SCHEM", "TABLE_NAME"};
394+
ResultSet resultSet = mockResultSet(schema, values, new AtomicInteger(-1));
395+
Mockito.when(preparedStatement.executeQuery()).thenReturn(resultSet);
396+
ListTablesResponse listTablesResponse = this.db2MetadataHandler.doListTables(
397+
blockAllocator, new ListTablesRequest(this.federatedIdentity, "testQueryId",
398+
"testCatalog", "testSchema", nextToken, pageSize));
399+
400+
Assert.assertEquals(expectedNextToken, listTablesResponse.getNextToken());
401+
Assert.assertArrayEquals(expected, listTablesResponse.getTables().toArray());
402+
}
355403
}
356404

0 commit comments

Comments
 (0)