Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
619 changes: 75 additions & 544 deletions dev/src/pipelines/expression.ts

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions dev/src/pipelines/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,11 @@ export {
ifError,
isError,
isNan,
arrayConcat,
substring,
documentId,
isNull,
arrayContainsAll,
constant,
constantVector,
Field,
Constant,
sum,
Expand All @@ -95,9 +93,9 @@ export {
multiply,
cond,
Ordering,
AggregateWithAlias,
AliasedAggregate,
endsWith,
ExprWithAlias,
AliasedExpr,
mapMerge,
mapRemove,
byteLength,
Expand Down
86 changes: 0 additions & 86 deletions dev/src/pipelines/pipeline-options.ts

This file was deleted.

48 changes: 35 additions & 13 deletions dev/src/pipelines/pipeline-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import './expression';
import Firestore, {
CollectionReference,
DocumentReference,
FieldValue,
Timestamp,
VectorValue,
} from '../index';
Expand Down Expand Up @@ -58,7 +59,6 @@ import {
map,
array,
Constant,
constantVector,
field,
Ordering,
gt,
Expand Down Expand Up @@ -140,14 +140,23 @@ export class ExecutionUtil {
return Date.now() - startTime >= totalTimeout;
}

stream(pipeline: Pipeline): NodeJS.ReadableStream {
// TODO(pipeline) implement options for stream
const structuredPipeline = new StructuredPipeline(pipeline, {}, {});
const responseStream = this._stream(structuredPipeline);
stream(
structuredPipeline: StructuredPipeline,
transactionOrReadTime?: Uint8Array | Timestamp | api.ITransactionOptions
): NodeJS.ReadableStream {
const responseStream = this._stream(
structuredPipeline,
transactionOrReadTime
);
const transform = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
callback(undefined, chunk.result);
transform(chunk: Array<PipelineStreamElement>, encoding, callback) {
chunk.forEach(item => {
if (item.result) {
this.push(item.result);
}
});
callback();
},
});

Expand Down Expand Up @@ -200,10 +209,19 @@ export class ExecutionUtil {
QualifiedResourcePath.fromSlashSeparatedString(result.name)
)
: undefined;

if (!result.fields) {
logger(
'_stream',
null,
'Unexpected state: `result.fields` was falsey. Using an empty map.'
);
}

output.result = new PipelineResult(
this._serializer,
result.fields || {},
ref,
result.fields || undefined,
Timestamp.fromProto(proto.executionTime!),
result.createTime
? Timestamp.fromProto(result.createTime!)
Expand Down Expand Up @@ -570,10 +588,10 @@ export function isOrdering(val: unknown): val is firestore.Pipelines.Ordering {
);
}

export function isAggregateWithAlias(
export function isAliasedAggregate(
val: unknown
): val is firestore.Pipelines.AggregateWithAlias {
const candidate = val as firestore.Pipelines.AggregateWithAlias;
): val is firestore.Pipelines.AliasedAggregate {
const candidate = val as firestore.Pipelines.AliasedAggregate;
return (
isString(candidate.alias) &&
candidate.aggregate instanceof AggregateFunction
Expand Down Expand Up @@ -645,8 +663,12 @@ export function vectorToExpr(
): Expr {
if (value instanceof Expr) {
return value;
} else if (value instanceof VectorValue || Array.isArray(value)) {
const result = constantVector(value);
} else if (value instanceof VectorValue) {
const result = constant(value);
result._createdFromLiteral = true;
return result;
} else if (Array.isArray(value)) {
const result = constant(FieldValue.vector(value));
result._createdFromLiteral = true;
return result;
} else {
Expand Down
Loading
Loading