Skip to content

Commit 5c3681d

Browse files
jymdmanjamesdaniels
authored andcommitted
fix(database): Update to rxjs pipeable operators (#1622)
1 parent 97b26e3 commit 5c3681d

15 files changed

+115
-108
lines changed

src/database/interfaces.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Reference, DataSnapshot, ThenableReference, Query } from '@firebase/database-types';
2-
import { Observable } from 'rxjs/Observable';
2+
import { Observable } from 'rxjs';
33

44
export type FirebaseOperation = string | Reference | DataSnapshot;
55

src/database/list/audit-trail.spec.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { FirebaseApp, FirebaseAppConfig, AngularFireModule } from 'angularfire2'
33
import { AngularFireDatabase, AngularFireDatabaseModule, auditTrail, ChildEvent } from 'angularfire2/database';
44
import { TestBed, inject } from '@angular/core/testing';
55
import { COMMON_CONFIG } from '../test-config';
6-
import 'rxjs/add/operator/skip';
6+
import { skip } from 'rxjs/operators';
77

88
// generate random string to test fidelity of naming
99
const rando = () => (Math.random() + 1).toString(36).substring(7);
@@ -41,13 +41,13 @@ describe('auditTrail', () => {
4141
app.delete().then(done, done.fail);
4242
});
4343

44-
function prepareAuditTrail(opts: { events?: ChildEvent[], skip: number } = { skip: 0 }) {
45-
const { events, skip } = opts;
44+
function prepareAuditTrail(opts: { events?: ChildEvent[], skipnumber: number } = { skipnumber: 0 }) {
45+
const { events, skipnumber } = opts;
4646
const aref = createRef(rando());
4747
aref.set(batch);
4848
const changes = auditTrail(aref, events);
4949
return {
50-
changes: changes.skip(skip),
50+
changes: changes.pipe(skip(skipnumber)),
5151
ref: aref
5252
};
5353
}

src/database/list/audit-trail.ts

+36-32
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import { DatabaseQuery, ChildEvent, DatabaseSnapshot, AngularFireAction, SnapshotAction } from '../interfaces';
22
import { stateChanges } from './state-changes';
3-
import { Observable } from 'rxjs/Observable';
3+
import { Observable } from 'rxjs';
44
import { DataSnapshot } from '@firebase/database-types';
55
import { fromRef } from '../observable/fromRef';
66
import { AngularFireDatabase } from '../database';
77

8-
import 'rxjs/add/operator/skipWhile';
9-
import 'rxjs/add/operator/withLatestFrom';
10-
import 'rxjs/add/operator/map';
8+
import { skipWhile, withLatestFrom, map, scan } from 'rxjs/operators';
119

1210
export function createAuditTrail(query: DatabaseQuery, afDatabase: AngularFireDatabase) {
1311
return (events?: ChildEvent[]) => afDatabase.scheduler.keepUnstableUntilFirst(
@@ -19,7 +17,9 @@ export function createAuditTrail(query: DatabaseQuery, afDatabase: AngularFireDa
1917

2018
export function auditTrail(query: DatabaseQuery, events?: ChildEvent[]): Observable<SnapshotAction[]> {
2119
const auditTrail$ = stateChanges(query, events)
22-
.scan((current, action) => [...current, action], []);
20+
.pipe(
21+
scan<SnapshotAction>((current, action) => [...current, action], [])
22+
);
2323
return waitForLoaded(query, auditTrail$);
2424
}
2525

@@ -33,36 +33,40 @@ function loadedData(query: DatabaseQuery): Observable<LoadedMetadata> {
3333
// known dataset. This will allow us to know what key to
3434
// emit the "whole" array at when listening for child events.
3535
return fromRef(query, 'value')
36-
.map(data => {
37-
// Store the last key in the data set
38-
let lastKeyToLoad;
39-
// Loop through loaded dataset to find the last key
40-
data.payload.forEach(child => {
41-
lastKeyToLoad = child.key; return false;
42-
});
43-
// return data set and the current last key loaded
44-
return { data, lastKeyToLoad };
45-
});
36+
.pipe(
37+
map(data => {
38+
// Store the last key in the data set
39+
let lastKeyToLoad;
40+
// Loop through loaded dataset to find the last key
41+
data.payload.forEach(child => {
42+
lastKeyToLoad = child.key; return false;
43+
});
44+
// return data set and the current last key loaded
45+
return { data, lastKeyToLoad };
46+
})
47+
);
4648
}
4749

4850
function waitForLoaded(query: DatabaseQuery, action$: Observable<SnapshotAction[]>) {
4951
const loaded$ = loadedData(query);
5052
return loaded$
51-
.withLatestFrom(action$)
52-
// Get the latest values from the "loaded" and "child" datasets
53-
// We can use both datasets to form an array of the latest values.
54-
.map(([loaded, actions]) => {
55-
// Store the last key in the data set
56-
let lastKeyToLoad = loaded.lastKeyToLoad;
57-
// Store all child keys loaded at this point
58-
const loadedKeys = actions.map(snap => snap.key);
59-
return { actions, lastKeyToLoad, loadedKeys }
60-
})
61-
// This is the magical part, only emit when the last load key
62-
// in the dataset has been loaded by a child event. At this point
63-
// we can assume the dataset is "whole".
64-
.skipWhile(meta => meta.loadedKeys.indexOf(meta.lastKeyToLoad) === -1)
65-
// Pluck off the meta data because the user only cares
66-
// to iterate through the snapshots
67-
.map(meta => meta.actions);
53+
.pipe(
54+
withLatestFrom(action$),
55+
// Get the latest values from the "loaded" and "child" datasets
56+
// We can use both datasets to form an array of the latest values.
57+
map(([loaded, actions]) => {
58+
// Store the last key in the data set
59+
let lastKeyToLoad = loaded.lastKeyToLoad;
60+
// Store all child keys loaded at this point
61+
const loadedKeys = actions.map(snap => snap.key);
62+
return { actions, lastKeyToLoad, loadedKeys }
63+
}),
64+
// This is the magical part, only emit when the last load key
65+
// in the dataset has been loaded by a child event. At this point
66+
// we can assume the dataset is "whole".
67+
skipWhile(meta => meta.loadedKeys.indexOf(meta.lastKeyToLoad) === -1),
68+
// Pluck off the meta data because the user only cares
69+
// to iterate through the snapshots
70+
map(meta => meta.actions)
71+
);
6872
}

src/database/list/changes.spec.ts

+9-9
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { FirebaseApp, FirebaseAppConfig, AngularFireModule } from 'angularfire2'
33
import { AngularFireDatabase, AngularFireDatabaseModule, listChanges } from 'angularfire2/database';
44
import { TestBed, inject } from '@angular/core/testing';
55
import { COMMON_CONFIG } from '../test-config';
6-
import 'rxjs/add/operator/skip';
6+
import { skip, take } from 'rxjs/operators';
77

88
// generate random string to test fidelity of naming
99
const rando = () => (Math.random() + 1).toString(36).substring(7);
@@ -46,7 +46,7 @@ describe('listChanges', () => {
4646
it('should stream value at first', (done) => {
4747
const someRef = ref(rando());
4848
const obs = listChanges(someRef, ['child_added']);
49-
const sub = obs.take(1).subscribe(changes => {
49+
const sub = obs.pipe(take(1)).subscribe(changes => {
5050
const data = changes.map(change => change.payload.val());
5151
expect(data).toEqual(items);
5252
}).add(done);
@@ -56,7 +56,7 @@ describe('listChanges', () => {
5656
it('should process a new child_added event', done => {
5757
const aref = ref(rando());
5858
const obs = listChanges(aref, ['child_added']);
59-
const sub = obs.skip(1).take(1).subscribe(changes => {
59+
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
6060
const data = changes.map(change => change.payload.val());
6161
expect(data[3]).toEqual({ name: 'anotha one' });
6262
}).add(done);
@@ -67,7 +67,7 @@ describe('listChanges', () => {
6767
it('should stream in order events', (done) => {
6868
const aref = ref(rando());
6969
const obs = listChanges(aref.orderByChild('name'), ['child_added']);
70-
const sub = obs.take(1).subscribe(changes => {
70+
const sub = obs.pipe(take(1)).subscribe(changes => {
7171
const names = changes.map(change => change.payload.val().name);
7272
expect(names[0]).toEqual('one');
7373
expect(names[1]).toEqual('two');
@@ -79,7 +79,7 @@ describe('listChanges', () => {
7979
it('should stream in order events w/child_added', (done) => {
8080
const aref = ref(rando());
8181
const obs = listChanges(aref.orderByChild('name'), ['child_added']);
82-
const sub = obs.skip(1).take(1).subscribe(changes => {
82+
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
8383
const names = changes.map(change => change.payload.val().name);
8484
expect(names[0]).toEqual('anotha one');
8585
expect(names[1]).toEqual('one');
@@ -93,7 +93,7 @@ describe('listChanges', () => {
9393
it('should stream events filtering', (done) => {
9494
const aref = ref(rando());
9595
const obs = listChanges(aref.orderByChild('name').equalTo('zero'), ['child_added']);
96-
obs.skip(1).take(1).subscribe(changes => {
96+
obs.pipe(skip(1),take(1)).subscribe(changes => {
9797
const names = changes.map(change => change.payload.val().name);
9898
expect(names[0]).toEqual('zero');
9999
expect(names[1]).toEqual('zero');
@@ -105,7 +105,7 @@ describe('listChanges', () => {
105105
it('should process a new child_removed event', done => {
106106
const aref = ref(rando());
107107
const obs = listChanges(aref, ['child_added','child_removed']);
108-
const sub = obs.skip(1).take(1).subscribe(changes => {
108+
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
109109
const data = changes.map(change => change.payload.val());
110110
expect(data.length).toEqual(items.length - 1);
111111
}).add(done);
@@ -118,7 +118,7 @@ describe('listChanges', () => {
118118
it('should process a new child_changed event', (done) => {
119119
const aref = ref(rando());
120120
const obs = listChanges(aref, ['child_added','child_changed'])
121-
const sub = obs.skip(1).take(1).subscribe(changes => {
121+
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
122122
const data = changes.map(change => change.payload.val());
123123
expect(data[1].name).toEqual('lol');
124124
}).add(done);
@@ -131,7 +131,7 @@ describe('listChanges', () => {
131131
it('should process a new child_moved event', (done) => {
132132
const aref = ref(rando());
133133
const obs = listChanges(aref, ['child_added','child_moved'])
134-
const sub = obs.skip(1).take(1).subscribe(changes => {
134+
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
135135
const data = changes.map(change => change.payload.val());
136136
// We moved the first item to the last item, so we check that
137137
// the new result is now the last result

src/database/list/changes.ts

+13-14
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,22 @@
11
import { fromRef } from '../observable/fromRef';
2-
import { Observable } from 'rxjs/Observable';
2+
import { Observable } from 'rxjs';
3+
import { of } from 'rxjs/observable/of';
4+
import { merge } from 'rxjs/observable/merge';
5+
36
import { DatabaseQuery, ChildEvent, AngularFireAction, SnapshotAction } from '../interfaces';
47
import { isNil } from '../utils';
58

6-
import 'rxjs/add/operator/scan';
7-
import 'rxjs/add/observable/merge';
8-
import 'rxjs/add/observable/of';
9-
import 'rxjs/add/operator/switchMap';
10-
import 'rxjs/add/operator/filter';
11-
import 'rxjs/add/operator/delay';
12-
import 'rxjs/add/operator/distinctUntilChanged';
9+
import { switchMap, distinctUntilChanged, scan } from 'rxjs/operators';
1310

1411
export function listChanges<T>(ref: DatabaseQuery, events: ChildEvent[]): Observable<SnapshotAction[]> {
15-
return fromRef(ref, 'value', 'once').switchMap(snapshotAction => {
16-
const childEvent$ = [Observable.of(snapshotAction)];
17-
events.forEach(event => childEvent$.push(fromRef(ref, event)));
18-
return Observable.merge(...childEvent$).scan(buildView, [])
19-
})
20-
.distinctUntilChanged();
12+
return fromRef(ref, 'value', 'once').pipe(
13+
switchMap(snapshotAction => {
14+
const childEvent$ = [of(snapshotAction)];
15+
events.forEach(event => childEvent$.push(fromRef(ref, event)));
16+
return merge(...childEvent$).pipe(scan(buildView, []))
17+
}),
18+
distinctUntilChanged()
19+
);
2120
}
2221

2322
function positionFor(changes: SnapshotAction[], key) {

src/database/list/create-reference.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { createAuditTrail } from './audit-trail';
55
import { createDataOperationMethod } from './data-operation';
66
import { createRemoveMethod } from './remove';
77
import { AngularFireDatabase } from '../database';
8+
import { map } from 'rxjs/operators';
89

910
export function createListReference<T>(query: DatabaseQuery, afDatabase: AngularFireDatabase): AngularFireList<T> {
1011
return {
@@ -29,7 +30,9 @@ export function createListReference<T>(query: DatabaseQuery, afDatabase: Angular
2930
afDatabase.scheduler.runOutsideAngular(
3031
snapshotChanges$
3132
)
32-
).map(actions => actions.map(a => a.payload.val()));
33+
).pipe(
34+
map(actions => actions.map(a => a.payload.val()))
35+
);
3336
}
3437
}
3538
}

src/database/list/snapshot-changes.spec.ts

+15-15
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import { FirebaseApp, FirebaseAppConfig, AngularFireModule } from 'angularfire2'
33
import { AngularFireDatabase, AngularFireDatabaseModule, snapshotChanges, ChildEvent } from 'angularfire2/database';
44
import { TestBed, inject } from '@angular/core/testing';
55
import { COMMON_CONFIG } from '../test-config';
6-
import 'rxjs/add/operator/skip';
7-
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
6+
import { BehaviorSubject } from 'rxjs';
7+
import { skip, take, switchMap } from 'rxjs/operators';
88

99
// generate random string to test fidelity of naming
1010
const rando = () => (Math.random() + 1).toString(36).substring(7);
@@ -42,12 +42,12 @@ describe('snapshotChanges', () => {
4242
app.delete().then(done, done.fail);
4343
});
4444

45-
function prepareSnapshotChanges(opts: { events?: ChildEvent[], skip: number } = { skip: 0 }) {
46-
const { events, skip } = opts;
45+
function prepareSnapshotChanges(opts: { events?: ChildEvent[], skipnumber: number } = { skipnumber: 0 }) {
46+
const { events, skipnumber } = opts;
4747
const aref = createRef(rando());
4848
const snapChanges = snapshotChanges(aref, events);
4949
return {
50-
snapChanges: snapChanges.skip(skip),
50+
snapChanges: snapChanges.pipe(skip(skipnumber)),
5151
ref: aref
5252
};
5353
}
@@ -64,7 +64,7 @@ describe('snapshotChanges', () => {
6464
it('should handle multiple subscriptions (hot)', (done) => {
6565
const { snapChanges, ref } = prepareSnapshotChanges();
6666
const sub = snapChanges.subscribe(() => {}).add(done);
67-
snapChanges.take(1).subscribe(actions => {
67+
snapChanges.pipe(take(1)).subscribe(actions => {
6868
const data = actions.map(a => a.payload!.val());
6969
expect(data).toEqual(items);
7070
}).add(sub);
@@ -73,8 +73,8 @@ describe('snapshotChanges', () => {
7373

7474
it('should handle multiple subscriptions (warm)', done => {
7575
const { snapChanges, ref } = prepareSnapshotChanges();
76-
snapChanges.take(1).subscribe(() => {}).add(() => {
77-
snapChanges.take(1).subscribe(actions => {
76+
snapChanges.pipe(take(1)).subscribe(() => {}).add(() => {
77+
snapChanges.pipe(take(1)).subscribe(actions => {
7878
const data = actions.map(a => a.payload!.val());
7979
expect(data).toEqual(items);
8080
}).add(done);
@@ -83,8 +83,8 @@ describe('snapshotChanges', () => {
8383
});
8484

8585
it('should listen to only child_added events', (done) => {
86-
const { snapChanges, ref } = prepareSnapshotChanges({ events: ['child_added'], skip: 0 });
87-
snapChanges.take(1).subscribe(actions => {
86+
const { snapChanges, ref } = prepareSnapshotChanges({ events: ['child_added'], skipnumber: 0 });
87+
snapChanges.pipe(take(1)).subscribe(actions => {
8888
const data = actions.map(a => a.payload!.val());
8989
expect(data).toEqual(items);
9090
}).add(done);
@@ -94,10 +94,10 @@ describe('snapshotChanges', () => {
9494
it('should listen to only child_added, child_changed events', (done) => {
9595
const { snapChanges, ref } = prepareSnapshotChanges({
9696
events: ['child_added', 'child_changed'],
97-
skip: 1
97+
skipnumber: 1
9898
});
9999
const name = 'ligatures';
100-
snapChanges.take(1).subscribe(actions => {
100+
snapChanges.pipe(take(1)).subscribe(actions => {
101101
const data = actions.map(a => a.payload!.val());;
102102
const copy = [...items];
103103
copy[0].name = name;
@@ -112,7 +112,7 @@ describe('snapshotChanges', () => {
112112
it('should handle empty sets', done => {
113113
const aref = createRef(rando());
114114
aref.set({});
115-
snapshotChanges(aref).take(1).subscribe(data => {
115+
snapshotChanges(aref).pipe(take(1)).subscribe(data => {
116116
expect(data.length).toEqual(0);
117117
}).add(done);
118118
});
@@ -124,10 +124,10 @@ describe('snapshotChanges', () => {
124124
let namefilter$ = new BehaviorSubject<number|null>(null);
125125
const aref = createRef(rando());
126126
aref.set(batch);
127-
namefilter$.switchMap(name => {
127+
namefilter$.pipe(switchMap(name => {
128128
const filteredRef = name ? aref.child('name').equalTo(name) : aref
129129
return snapshotChanges(filteredRef);
130-
}).take(2).subscribe(data => {
130+
}),take(2)).subscribe(data => {
131131
count = count + 1;
132132
// the first time should all be 'added'
133133
if(count === 1) {

src/database/list/snapshot-changes.ts

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
import { Observable } from 'rxjs/Observable';
1+
import { Observable } from 'rxjs';
22
import { listChanges } from './changes';
33
import { DatabaseQuery, ChildEvent, SnapshotAction } from '../interfaces';
44
import { validateEventsArray } from './utils';
5-
import 'rxjs/add/operator/map';
65

76
export function snapshotChanges(query: DatabaseQuery, events?: ChildEvent[]): Observable<SnapshotAction[]> {
87
events = validateEventsArray(events);

0 commit comments

Comments
 (0)