Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/include/duckdb/web/webdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class WebDB {
/// The connection
duckdb::Connection connection_;

/// The statements extracted from the text passed to PendingQuery
std::vector<duckdb::unique_ptr<duckdb::SQLStatement>> current_pending_statements_;
/// The index of the currently-running statement (in the above list)
size_t current_pending_statement_index_ = 0;
/// The value of allow_stream_result passed to PendingQuery
bool current_allow_stream_result_ = false;
/// The current pending query result (if any)
duckdb::unique_ptr<duckdb::PendingQueryResult> current_pending_query_result_ = nullptr;
/// The current pending query was canceled
Expand Down
40 changes: 35 additions & 5 deletions lib/src/webdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,20 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> WebDB::Connection::RunQuery(std::s
arrow::Result<std::shared_ptr<arrow::Buffer>> WebDB::Connection::PendingQuery(std::string_view text,
bool allow_stream_result) {
try {
// Send the query
auto result = connection_.PendingQuery(std::string{text}, allow_stream_result);
if (result->HasError()) return arrow::Status{arrow::StatusCode::ExecutionError, std::move(result->GetError())};
auto statements = connection_.ExtractStatements(std::string{text});
if (statements.size() == 0) {
return arrow::Status{arrow::StatusCode::ExecutionError, "no statements"};
}
current_pending_statements_ = std::move(statements);
current_pending_statement_index_ = 0;
current_allow_stream_result_ = allow_stream_result;
// Send the first query
auto result = connection_.PendingQuery(std::move(current_pending_statements_[current_pending_statement_index_]),
current_allow_stream_result_);
if (result->HasError()) {
current_pending_statements_.clear();
return arrow::Status{arrow::StatusCode::ExecutionError, std::move(result->GetError())};
}
current_pending_query_result_ = std::move(result);
current_pending_query_was_canceled_ = false;
current_query_result_.reset();
Expand Down Expand Up @@ -204,8 +215,25 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> WebDB::Connection::PollPendingQuer
do {
switch (current_pending_query_result_->ExecuteTask()) {
case PendingExecutionResult::EXECUTION_FINISHED:
case PendingExecutionResult::RESULT_READY:
return StreamQueryResult(current_pending_query_result_->Execute());
case PendingExecutionResult::RESULT_READY: {
auto result = current_pending_query_result_->Execute();
current_pending_statement_index_++;
// If this was the last statement, then return the result
if (current_pending_statement_index_ == current_pending_statements_.size()) {
return StreamQueryResult(std::move(result));
}
// Otherwise, start the next statement
auto pending_result =
connection_.PendingQuery(std::move(current_pending_statements_[current_pending_statement_index_]),
current_allow_stream_result_);
if (pending_result->HasError()) {
current_pending_query_result_.reset();
current_pending_statements_.clear();
return arrow::Status{arrow::StatusCode::ExecutionError, std::move(pending_result->GetError())};
}
current_pending_query_result_ = std::move(pending_result);
break;
}
case PendingExecutionResult::BLOCKED:
case PendingExecutionResult::NO_TASKS_AVAILABLE:
return nullptr;
Expand All @@ -214,6 +242,7 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> WebDB::Connection::PollPendingQuer
case PendingExecutionResult::EXECUTION_ERROR: {
auto err = current_pending_query_result_->GetError();
current_pending_query_result_.reset();
current_pending_statements_.clear();
return arrow::Status{arrow::StatusCode::ExecutionError, err};
}
}
Expand All @@ -228,6 +257,7 @@ bool WebDB::Connection::CancelPendingQuery() {
if (current_pending_query_result_ != nullptr && current_query_result_ == nullptr) {
current_pending_query_was_canceled_ = true;
current_pending_query_result_.reset();
current_pending_statements_.clear();
return true;
} else {
return false;
Expand Down
2 changes: 2 additions & 0 deletions packages/duckdb-wasm/test/index_browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ import { testUDF } from './udf.test';
import { longQueries } from './long_queries.test';
//import { testEXCEL } from './excel.test';
//import { testJSON } from './json.test';
import { testPivot } from './pivot.test';

const baseURL = window.location.origin;
const dataURL = `${baseURL}/data`;
Expand Down Expand Up @@ -140,3 +141,4 @@ testTokenization(() => db!);
testTokenizationAsync(() => adb!);
//testEXCEL(() => db!);
//testJSON(() => db!);
testPivot(() => db!);
2 changes: 2 additions & 0 deletions packages/duckdb-wasm/test/index_node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import { testUDF } from './udf.test';
import { longQueries } from './long_queries.test';
import { testRegressionAsync } from './regression';
import { testFTS } from './fts.test';
import { testPivot } from './pivot.test';

testUDF(() => db!);
longQueries(() => adb!);
Expand All @@ -101,3 +102,4 @@ testCSVInsertAsync(() => adb!);
testTokenization(() => db!);
testTokenizationAsync(() => adb!);
testFTS(() => db!);
testPivot(() => db!, { skipValuesCheck: true });
56 changes: 56 additions & 0 deletions packages/duckdb-wasm/test/pivot.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import * as duckdb from '../src/';

export function testPivot(db: () => duckdb.DuckDBBindings, options?: { skipValuesCheck: boolean }): void {
let conn: duckdb.DuckDBConnection;
beforeEach(() => {
conn = db().connect();
});

afterEach(() => {
conn.close();
db().flushFiles();
db().dropFiles();
});

describe('PIVOT', () => {
it('with send', async () => {
conn.query(`
CREATE TABLE cities (
country VARCHAR, name VARCHAR, year INTEGER, population INTEGER
);`);
conn.query(`
INSERT INTO cities VALUES
('NL', 'Amsterdam', 2000, 1005),
('NL', 'Amsterdam', 2010, 1065),
('NL', 'Amsterdam', 2020, 1158),
('US', 'Seattle', 2000, 564),
('US', 'Seattle', 2010, 608),
('US', 'Seattle', 2020, 738),
('US', 'New York City', 2000, 8015),
('US', 'New York City', 2010, 8175),
('US', 'New York City', 2020, 8772);`);

const reader = await conn.send(`PIVOT cities ON year USING sum(population);`);
const batches = reader.readAll();
expect(batches.length).toBe(1);
const batch = batches[0];
expect(batch.numCols).toBe(5);
expect(batch.numRows).toBe(3);
expect(batch.getChildAt(0)?.toArray()).toEqual(['NL', 'US', 'US']);
expect(batch.getChildAt(1)?.toArray()).toEqual(['Amsterdam', 'Seattle', 'New York City']);
// On Node, the types of these columns are inconsistent in different builds, so we skip the check.
if (!options?.skipValuesCheck) {
// Pivoted columns are int128
expect(batch.getChildAt(2)?.toArray()).toEqual(
new Uint32Array([1005, 0, 0, 0, 564, 0, 0, 0, 8015, 0, 0, 0]),
);
expect(batch.getChildAt(3)?.toArray()).toEqual(
new Uint32Array([1065, 0, 0, 0, 608, 0, 0, 0, 8175, 0, 0, 0]),
);
expect(batch.getChildAt(4)?.toArray()).toEqual(
new Uint32Array([1158, 0, 0, 0, 738, 0, 0, 0, 8772, 0, 0, 0]),
);
}
});
});
}