Skip to content

Commit ae209de

Browse files
committed
Add changesets
1 parent e5ba9ff commit ae209de

File tree

8 files changed

+60
-6
lines changed

8 files changed

+60
-6
lines changed

.changeset/early-hounds-marry.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-sync-rules': patch
3+
---
4+
5+
Prepare new sync streams compiler.

packages/sync-rules/src/compiler/bucket_resolver.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import { PointLookup, RowEvaluator, SourceRowProcessor } from './rows.js';
55
import { RequestTableValuedResultSet } from './table.js';
66
import { StreamOptions } from '../sync_plan/plan.js';
77

8+
/**
9+
* Describes how to resolve a subscription to buckets.
10+
*/
811
export class StreamResolver {
912
constructor(
1013
readonly options: StreamOptions,

packages/sync-rules/src/compiler/compiler.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { NodeLocation, parse, PGNode } from 'pgsql-ast-parser';
2-
import { HashSet, StableHasher } from './equality.js';
2+
import { HashSet } from './equality.js';
33
import { PointLookup, RowEvaluator } from './rows.js';
44
import { StreamResolver } from './bucket_resolver.js';
55
import { StreamOptions, SyncPlan } from '../sync_plan/plan.js';
@@ -9,6 +9,16 @@ import { StreamQueryParser } from './parser.js';
99

1010
/**
1111
* State for compiling sync streams.
12+
*
13+
* The output of compiling all sync streams is a {@link SyncPlan}, a declarative description of the sync process that
14+
* can be serialized to bucket storage. The compiler stores a mutable intermediate representation that is essentially a
15+
* copy of the sync plan, except that we're using JavaScript classes with methods to compute hash codes and equality
16+
* relations. This allows the compiler to efficiently de-duplicate parameters and buckets.
17+
*
18+
* Overall, the compilation process is as follows: Each data query for a stream is first parsed by
19+
* {@link StreamQueryParser} into a canonicalized intermediate representation (see that class for details).
20+
* Then, {@link QuerierGraphBuilder} analyzes a chain of `AND` expressions to identify parameters (as partition keys)
21+
* and their instantiation, as well as static filters that need to be added to reach row.
1222
*/
1323
export class SyncStreamsCompiler {
1424
readonly output = new CompiledStreamQueries();

packages/sync-rules/src/compiler/ir_to_sync_plan.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import * as plan from '../sync_plan/plan.js';
22
import * as resolver from './bucket_resolver.js';
3-
import { equalsIgnoringResultSet } from './compatibility.js';
43
import { CompiledStreamQueries } from './compiler.js';
54
import { Equality, HashMap, StableHasher, unorderedEquality } from './equality.js';
65
import { ColumnInRow, SyncExpression } from './expression.js';

packages/sync-rules/src/compiler/querier_graph.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,31 @@ export class QuerierGraphBuilder {
8888
* This works by first processing subterms related to the source set selected from. If the subterm is a row expression,
8989
* we add it as a filter to the row processor or parameter lookup. If it is a match expression, we create a partition
9090
* key and obtain the value by recursively applying this algorithm to resolve the other side.
91+
*
92+
* To visualize this algorithm, consider the following example query:
93+
*
94+
* ```SQL
95+
* SELECT * FROM comments c, issues i, users u
96+
* WHERE u.user_id = auth.user_id() AND c.issue_id = i.id AND u.id = i.owner_id AND u.is_admin
97+
* ```
98+
*
99+
* First, {@link resolvePrimaryInput} is called, which calls {@link resolveResultSet} on `comments c`. While resolving a
100+
* result set, we extract conditions mentioning that result set. In this case, the only such expression is
101+
* `c.issue_id = i.id`. Because this is an {@link EqualsClause}, we know we need to introduce a parameter (in this case,
102+
* `c.issue_id` because that's the half depending on the current result set). We then look at the other half and
103+
* recursively resolve `issues i` (via {@link resolvePointLookup}). When we're done resolving that, we add `i.id` as to
104+
* {@link PendingExpandingLookup.usedOutputs}.
105+
*
106+
* To resolve `issues i`, we extract the only remaining expression mentioning it, `u.id = i.owner_id`. We once again
107+
* recursve to resolve `users u` and will add `u.id` as a used output.
108+
*
109+
* Finally, we find `u.user_id = auth.user_id()` and `u.is_admin`. The first expression creates a parameter, but doesn't
110+
* need to resolve any further result sets since the input depends on connection data. The second expression only
111+
* depends on the row itself, so we add it as a static condition to only create parameter lookups for rows matching that
112+
* condition.
113+
*
114+
* This algorithm gives us the bucket creator as well as parameter lookups with their partition keys and values, which
115+
* is the sync plan.
91116
*/
92117
class PendingQuerierPath {
93118
// Terms in the And that have not yet been handled.

packages/sync-rules/src/sync_plan/serialize.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ import {
1515
SyncPlan
1616
} from './plan.js';
1717

18+
/**
19+
* Serializes a sync plan into a simple JSON object.
20+
*
21+
* While {@link SyncPlan}s are already serializable for the most part, it contains a graph of references from e.g.
22+
* queriers to bucket creators. To represent this efficiently, we assign numbers to referenced elements while
23+
* serializing instead of duplicating definitions.
24+
*/
1825
export function serializeSyncPlan(plan: SyncPlan): SerializedSyncPlanUnstable {
1926
const dataSourceIndex = new Map<StreamDataSource, number>();
2027
const bucketIndex = new Map<StreamBucketDataSource, number>();

packages/sync-rules/test/src/compiler/errors.test.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,15 @@ import { describe, expect, test } from 'vitest';
22
import { compilationErrorsForSingleStream } from './utils.js';
33

44
describe('compilation errors', () => {
5+
test('not a select statement', () => {
6+
expect(compilationErrorsForSingleStream("INSERT INTO users (id) VALUES ('foo')")).toStrictEqual([
7+
{
8+
message: 'Expected a SELECT statement',
9+
source: "INSERT INTO users (id) VALUES ('foo'"
10+
}
11+
]);
12+
});
13+
514
test('IN operator with static left clause', () => {
615
expect(
716
compilationErrorsForSingleStream("SELECT * FROM issues WHERE 'static' IN (SELECT id FROM users WHERE is_admin)")

packages/sync-rules/test/src/compiler/utils.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,6 @@ export function compilationErrorsForSingleStream(...sql: string[]): TranslationE
4747
])[0];
4848
}
4949

50-
export function compilationErrors(inputs: SyncStreamInput[]): TranslationError[] {
51-
return compileToSyncPlan(inputs)[0];
52-
}
53-
5450
export function compileToSyncPlan(inputs: SyncStreamInput[]): [TranslationError[], SyncPlan] {
5551
const compiler = new SyncStreamsCompiler();
5652
const errors: TranslationError[] = [];

0 commit comments

Comments
 (0)