diff --git a/.flowconfig b/.flowconfig index a3067bf..1fed445 100644 --- a/.flowconfig +++ b/.flowconfig @@ -1,7 +1,6 @@ [ignore] [include] -src/ [libs] diff --git a/.travis.yml b/.travis.yml index f4008c7..4503a2e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,12 @@ language: node_js sudo: false script: +# Enable flow to have flow errors fail the build +# - npm -s run flow # lint errors fail the build -- npm run lint +- npm -s run lint # unit tests with coverage report -- npm run test:coverage +- npm -s run test:coverage node_js: - '8' - '7' diff --git a/package-lock.json b/package-lock.json index e473d6f..9f7ed29 100644 --- a/package-lock.json +++ b/package-lock.json @@ -520,6 +520,18 @@ "integrity": "sha1-nufoM3KQ2pUoggGmpX9BcDF4MN4=", "dev": true }, + "babel-plugin-syntax-flow": { + "version": "6.18.0", + "resolved": "https://registry.npmjs.org/babel-plugin-syntax-flow/-/babel-plugin-syntax-flow-6.18.0.tgz", + "integrity": "sha1-TDqyCiryaqIM0lmVw5jE63AxDI0=", + "dev": true + }, + "babel-plugin-syntax-object-rest-spread": { + "version": "6.13.0", + "resolved": "https://registry.npmjs.org/babel-plugin-syntax-object-rest-spread/-/babel-plugin-syntax-object-rest-spread-6.13.0.tgz", + "integrity": "sha1-/WU28rzhODb/o6VFjEkDpZe7O/U=", + "dev": true + }, "babel-plugin-syntax-trailing-function-commas": { "version": "6.22.0", "resolved": "https://registry.npmjs.org/babel-plugin-syntax-trailing-function-commas/-/babel-plugin-syntax-trailing-function-commas-6.22.0.tgz", @@ -816,6 +828,26 @@ "babel-runtime": "6.26.0" } }, + "babel-plugin-transform-flow-strip-types": { + "version": "6.22.0", + "resolved": "https://registry.npmjs.org/babel-plugin-transform-flow-strip-types/-/babel-plugin-transform-flow-strip-types-6.22.0.tgz", + "integrity": "sha1-hMtnKTXUNxT9wyvOhFaNh0Qc988=", + "dev": true, + "requires": { + "babel-plugin-syntax-flow": "6.18.0", + "babel-runtime": "6.26.0" + } + }, + "babel-plugin-transform-object-rest-spread": { + "version": "6.26.0", + "resolved": "https://registry.npmjs.org/babel-plugin-transform-object-rest-spread/-/babel-plugin-transform-object-rest-spread-6.26.0.tgz", + "integrity": "sha1-DzZpLVD+9rfi1LOsFHgTepY7ewY=", + "dev": true, + "requires": { + "babel-plugin-syntax-object-rest-spread": "6.13.0", + "babel-runtime": "6.26.0" + } + }, "babel-plugin-transform-regenerator": { "version": "6.26.0", "resolved": "https://registry.npmjs.org/babel-plugin-transform-regenerator/-/babel-plugin-transform-regenerator-6.26.0.tgz", @@ -905,6 +937,15 @@ "semver": "5.5.0" } }, + "babel-preset-flow": { + "version": "6.23.0", + "resolved": "https://registry.npmjs.org/babel-preset-flow/-/babel-preset-flow-6.23.0.tgz", + "integrity": "sha1-5xIYiHCFrpoktb5Baa/7WZgWxJ0=", + "dev": true, + "requires": { + "babel-plugin-transform-flow-strip-types": "6.22.0" + } + }, "babel-register": { "version": "6.26.0", "resolved": "https://registry.npmjs.org/babel-register/-/babel-register-6.26.0.tgz", @@ -1559,6 +1600,12 @@ "write": "0.2.1" } }, + "flow-bin": { + "version": "0.66.0", + "resolved": "https://registry.npmjs.org/flow-bin/-/flow-bin-0.66.0.tgz", + "integrity": "sha1-qW3ecBXcM0P9VSp7SWPAK+cFyiY=", + "dev": true + }, "for-in": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/for-in/-/for-in-1.0.2.tgz", @@ -2362,14 +2409,6 @@ } } }, - "string_decoder": { - "version": "1.0.1", - "bundled": true, - "dev": true, - "requires": { - "safe-buffer": "5.0.1" - } - }, "string-width": { "version": "1.0.2", "bundled": true, @@ -2380,6 +2419,14 @@ "strip-ansi": "3.0.1" } }, + "string_decoder": { + "version": "1.0.1", + "bundled": true, + "dev": true, + "requires": { + "safe-buffer": "5.0.1" + } + }, "stringstream": { "version": "0.0.5", "bundled": true, @@ -3016,11 +3063,6 @@ "integrity": "sha1-Sr6/7tdUHywnrPspvbvRXI1bpPc=", "dev": true }, - "node-uuid": { - "version": "1.4.8", - "resolved": "https://registry.npmjs.org/node-uuid/-/node-uuid-1.4.8.tgz", - "integrity": "sha1-sEDrCSOWivq/jTL7HxfxFn/auQc=" - }, "normalize-path": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/normalize-path/-/normalize-path-2.1.1.tgz", @@ -5049,15 +5091,6 @@ "integrity": "sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw=", "dev": true }, - "string_decoder": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.0.3.tgz", - "integrity": "sha512-4AH6Z5fzNNBcH+6XDMfA/BTt87skxqJlO0lAh3Dker5zThcAxG6mKz+iGu308UKoPPQ8Dcqx/4JhujzltRa+hQ==", - "dev": true, - "requires": { - "safe-buffer": "5.1.1" - } - }, "string-width": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/string-width/-/string-width-2.1.1.tgz", @@ -5085,6 +5118,15 @@ } } }, + "string_decoder": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.0.3.tgz", + "integrity": "sha512-4AH6Z5fzNNBcH+6XDMfA/BTt87skxqJlO0lAh3Dker5zThcAxG6mKz+iGu308UKoPPQ8Dcqx/4JhujzltRa+hQ==", + "dev": true, + "requires": { + "safe-buffer": "5.1.1" + } + }, "strip-ansi": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz", @@ -5191,6 +5233,11 @@ "integrity": "sha1-RQ1Nyfpw3nMnYvvS1KKJgUGaDM8=", "dev": true }, + "uuid": { + "version": "3.2.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.2.1.tgz", + "integrity": "sha512-jZnMwlb9Iku/O3smGWvZhauCf6cvvpKi4BKRiliS3cxnI+Gz9j5MEpTz2UFuXiKPJocb7gnsLHwiS05ige5BEA==" + }, "v8flags": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/v8flags/-/v8flags-2.1.1.tgz", diff --git a/package.json b/package.json index a73f995..4ba2c46 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "babel-preset-env": "^1.6.1", "babel-preset-flow": "^6.23.0", "eslint": "^4.17.0", + "flow-bin": "^0.66.0", "mocha": "^5.0.0", "nyc": "^11.4.1" } diff --git a/src/simperium/channel.js b/src/simperium/channel.js index 5cb5403..e4aa1e4 100644 --- a/src/simperium/channel.js +++ b/src/simperium/channel.js @@ -1,21 +1,160 @@ +// @flow /*eslint no-shadow: 0*/ import { format, inherits } from 'util' -import { EventEmitter } from 'events' +import events from 'events' import { parseMessage, parseVersionMessage, change as change_util } from './util' +import type { ObjectOperationSet } from './jsondiff'; +import type { BucketChangeType } from './util/change'; import JSONDiff from './jsondiff' -import { v4 as uuid } from 'uuid' +import uuid from 'uuid/v4' + +const { EventEmitter } = events; + +/** + * A ghost represents a version of a bucket object as known by Simperium + * + * Generally a client will keep the last known ghost stored locally for efficient + * diffing and patching of Simperium change operations. + * + * @typedef {Object} Ghost + * @property {Number} version - the ghost's version + * @property {String} key - the simperium bucket object id this ghost is for + * @property {Object} data - the data for the given ghost version + */ +type Ghost = { + key: string, + version: number, + data: {} +} + +/** + * Callback function used by the ghost store to iterate over existing ghosts + * + * @callback ghostIterator + * @param {Ghost} - the current ghost + */ + +/** + * A GhostStore provides the store mechanism for ghost data that the Channel + * uses to maintain syncing state and producing change operations for + * Bucket objects. + * + * @interface GhostStore + */ +interface GhostStore { + /** + * Retrieve a Ghost for the given bucket object id + * + * @function + * @name GhostStore#get + * @param {String} id - bucket object id + * @returns {Promise} - the ghost for this object + */ + get( id: string ): Promise; + + /** + * Save a ghost in the store. + * + * @function + * @name GhostStore#put + * @param {String} id - bucket object id + * @param {Number} version - version of ghost data + * @param {Object} data - object literal to save as this ghost's data for this version + * @returns {Promise} - the ghost for this object + */ + put( id: string, version: number, data: {} ): Promise; + + /** + * Delete a Ghost from the store. + * + * @function + * @name GhostStore#remove + * @param {String} id - bucket object id + * @returns {Promise} - the ghost for this object + */ + remove( id: string ): Promise; + + /** + * Iterate over existing Ghost objects with the given callback. + * + * @function + * @name GhostStore#eachGhost + * @param {ghostIterator} - function to run against each ghost + */ + eachGhost( iterator: ( Ghost ) => void ): void; + + /** + * Get the current change version (cv) that this channel has synced. + * + * @function + * @name GhostStore#getChangeVersion + * @returns {Promise} - the current change version for the bucket + */ + getChangeVersion(): Promise; + + /** + * Set the current change version. + * + * @function + * @name GhostStore#setChangeVersion + * @param {string} changeVersion - new change version + * @returns {Promise} - resolves once the change version is saved + */ + setChangeVersion( changeVersion: string ): Promise; +} const jsondiff = new JSONDiff( {list_diff: false} ); +type LocalChange = { + id: string, + ccid: string +} + +type NetworkRemoveOperation = { + o: '-', + id: string, + ccids: string[], + cv: string +} + +type NetworkModifyOperation = { + o: 'M', + id: string, + ccids: string[], + cv: string, + ev: number, + sv: number, + v: ObjectOperationSet +} + +type NetworkChange = NetworkModifyOperation | NetworkRemoveOperation; + +type NetworkChangeErrorResponse = { + error: number, + id: string, + ccids: string[], + d: ?{}, + hasSentFullObject?: boolean +}; + +class ChangeError extends Error { + code: number; + changeError: NetworkChangeErrorResponse; + change: ?LocalChange; + + constructor( changeError: NetworkChangeErrorResponse, localChange: ?LocalChange ) { + super( `${changeError.error} - Could not apply change to: ${changeError.id}` ); + this.code = changeError.error; + this.changeError = changeError; + this.change = localChange; + } +} + const UNKNOWN_CV = '?'; const CODE_INVALID_VERSION = 405; const CODE_EMPTY_RESPONSE = 412; const CODE_INVALID_DIFF = 440; - -const operation = { - MODIFY: 'M', - REMOVE: '-' -}; +const CODE_DUPLICATE_CHANGE = 409; // internal methods used as instance methods on a Channel instance const internal = {}; @@ -26,8 +165,9 @@ const internal = {}; * @param {String} cv - the change version synced * @returns {Promise} the saved `cv` */ -internal.updateChangeVersion = function( cv ) { - return this.store.setChangeVersion( cv ).then( () => { +internal.updateChangeVersion = function( cv ): Promise { + const store: GhostStore = this.store; + return store.setChangeVersion( cv ).then( () => { // A unit test currently relies on this event, otherwise we can remove it this.emit( 'change-version', cv ); return cv; @@ -41,12 +181,16 @@ internal.updateChangeVersion = function( cv ) { * @param {String} id - id of the object changed * @param {Object} change - the change to apply to the object */ -internal.changeObject = function( id, change ) { - // pull out the object from the store and apply the change delta - var applyChange = internal.performChange.bind( this, change ); - - this.networkQueue.queueFor( id ).add( function( done ) { - return applyChange().then( done, done ); +internal.changeObject = function( id: string, change: NetworkChange ) { + // Add types for now until function is changed to accept a Channel + const channel: Channel = this; + const store: GhostStore = channel.store; + const queue: NetworkQueue = channel.networkQueue; + + queue.queueFor( id ).add( ( done ) => { + store.get( change.id ) + .then( ghost => internal.applyChange.call( channel, change, ghost ) ) + .then( done, done ); } ); }; @@ -60,12 +204,11 @@ internal.changeObject = function( id, change ) { * @param {Object} object - object literal of the data that the change should produce * @param {Object} ghost - the ghost version used to produce the change object */ -internal.buildModifyChange = function( id, object, ghost ) { - var payload = change_util.buildChange( change_util.type.MODIFY, id, object, ghost ), - empty = true, - key; +internal.buildModifyChange = function( id: string, object: {}, ghost: Ghost ) { + const payload = change_util.buildChange( change_util.type.MODIFY, id, object, ghost ); + let empty = true; - for ( key in payload.v ) { + for ( let key in payload.v ) { if ( key ) { empty = false; break; @@ -89,9 +232,10 @@ internal.buildModifyChange = function( id, object, ghost ) { * @param {String} id - object to remove * @param {Object} ghost - current ghost object for the given id */ -internal.buildRemoveChange = function( id, ghost ) { - var payload = change_util.buildChange( change_util.type.REMOVE, id, {}, ghost ); - this.localQueue.queue( payload ); +internal.buildRemoveChange = function( id: string, ghost: Ghost ) { + const payload = change_util.buildChange( '-', id, {}, ghost ); + const localQueue: LocalQueue = this.localQueue; + localQueue.queue( payload ); }; internal.diffAndSend = function( id, object ) { @@ -106,7 +250,7 @@ internal.removeAndSend = function( id ) { // We've receive a full object from the network. Update the local instance and // notify of the new object version -internal.updateObjectVersion = function( id, version, data, original, patch, acknowledged ) { +internal.updateObjectVersion = function( id: string, version: number, data: {}, original, patch, acknowledged ): Promise<*> { var notify, changes, change, @@ -142,7 +286,7 @@ internal.updateObjectVersion = function( id, version, data, original, patch, ack return this.store.put( id, version, data ).then( notify ); }; -internal.removeObject = function( id, acknowledged ) { +internal.removeObject = function( id, acknowledged ): Promise<*> { var notify; if ( !acknowledged ) { notify = this.emit.bind( this, 'remove', id ); @@ -150,10 +294,11 @@ internal.removeObject = function( id, acknowledged ) { notify = internal.updateAcknowledged.bind( this, acknowledged ); } - return this.store.remove( id ).then( notify ); + const store: GhostStore = this.store; + return store.remove( id ).then( notify ); }; -internal.updateAcknowledged = function( change ) { +internal.updateAcknowledged = function( change: LocalChange ) { var id = change.id; if ( this.localQueue.sent[id] === change ) { this.localQueue.acknowledge( change ); @@ -161,13 +306,8 @@ internal.updateAcknowledged = function( change ) { } }; -internal.performChange = function( change ) { - var success = internal.applyChange.bind( this, change ); - return this.store.get( change.id ).then( success ); -}; - -internal.findAcknowledgedChange = function( change ) { - var possibleChange = this.localQueue.sent[change.id]; +internal.findAcknowledgedChange = function( change: { id: string, ccids: string[] } ): ?LocalChange { + const possibleChange: ?LocalChange = this.localQueue.sent[change.id]; if ( possibleChange ) { if ( ( change.ccids || [] ).indexOf( possibleChange.ccid ) > -1 ) { return possibleChange; @@ -175,7 +315,7 @@ internal.findAcknowledgedChange = function( change ) { } }; -internal.requestObjectVersion = function( id, version ) { +internal.requestObjectVersion = function( id: string, version: number ) { return new Promise( resolve => { this.once( `version.${ id }.${ version }`, data => { resolve( data ); @@ -184,36 +324,38 @@ internal.requestObjectVersion = function( id, version ) { } ); }; -internal.applyChange = function( change, ghost ) { - const acknowledged = internal.findAcknowledgedChange.bind( this )( change ), +const applyChangeError = ( channel: Channel, changeError: NetworkChangeErrorResponse ) => { + // run on network queue for the relevant bucket object + const networkQueue: NetworkQueue = channel.networkQueue; + networkQueue.queueFor( changeError.id ).add( ( done ) => { + const localChange = internal.findAcknowledgedChange.call( channel, changeError ); + const error = new ChangeError( changeError, localChange ); + internal.handleChangeError.call( channel, error, changeError, localChange ); + done(); + } ) +}; + +internal.applyChange = function( change: NetworkChange, ghost: Ghost ): Promise { + const acknowledged = internal.findAcknowledgedChange.call( this, change ), updateChangeVersion = internal.updateChangeVersion.bind( this, change.cv ); - let error, - original, + let original, patch, modified; - // attempt to apply the change - // TODO: Handle errors as specified in - // 0:c:[{"ccids": ["0435edf4-3f07-4cc6-bf86-f68e6db8779c"], "id": "9e9a9616-8174-42 - // { ccids: [ '0435edf4-3f07-4cc6-bf86-f68e6db8779c' ], - // id: '9e9a9616-8174-425a-a1b0-9ed5410f1edc', - // clientid: 'node-b9776e96-c068-42ae-893a-03f50833bddb', - // error: 400 } - if ( change.error ) { - error = new Error( `${change.error} - Could not apply change to: ${ghost.key}` ); - error.code = change.error; - error.change = change; - error.ghost = ghost; - internal.handleChangeError.call( this, error, change, acknowledged ); - return; + + if ( change.o === '-' ) { + return internal.removeObject.call( this, change.id, acknowledged ).then( updateChangeVersion ); } - if ( change.o === operation.MODIFY ) { - if ( ghost && ( ghost.version !== change.sv ) ) { + if ( change.o === 'M' ) { + const modifyChange: NetworkModifyOperation = change; + const matchesStartingVersion = change.sv === ghost.version || + ( change.sv === 0 && ( ghost.version === null || ghost.version === undefined ) ); + if ( ! matchesStartingVersion ) { internal.requestObjectVersion.call( this, change.id, change.sv ).then( data => { - internal.applyChange.call( this, change, { version: change.sv, data } ) + internal.applyChange.call( this, change, { key: ghost.key, version: modifyChange.sv, data } ) } ); - return; + return Promise.resolve(); } original = ghost.data; @@ -221,13 +363,18 @@ internal.applyChange = function( change, ghost ) { modified = jsondiff.apply_object_diff( original, patch ); return internal.updateObjectVersion.call( this, change.id, change.ev, modified, original, patch, acknowledged ) .then( updateChangeVersion ); - } else if ( change.o === operation.REMOVE ) { - return internal.removeObject.bind( this )( change.id, acknowledged ).then( updateChangeVersion ); } + // Only changes of REMOVE and MODIFY are possible + // Should changes of ADD throw an error? + return Promise.resolve(); } -internal.handleChangeError = function( err, change, acknowledged ) { +internal.handleChangeError = function( err: ChangeError, change: NetworkChangeErrorResponse, acknowledged: ?LocalChange ) { switch ( err.code ) { + case CODE_DUPLICATE_CHANGE: + if ( ! acknowledged ) { + break; + } case CODE_INVALID_VERSION: case CODE_INVALID_DIFF: // Invalid version or diff, send full object back to server if ( ! change.hasSentFullObject ) { @@ -242,7 +389,9 @@ internal.handleChangeError = function( err, change, acknowledged ) { break; case CODE_EMPTY_RESPONSE: // Change causes no change, just acknowledge it - internal.updateAcknowledged.call( this, acknowledged ); + if ( acknowledged ) { + internal.updateAcknowledged.call( this, acknowledged ); + } break; default: this.emit( 'error', err, change ); @@ -265,87 +414,6 @@ internal.indexingComplete = function() { this.emit( 'ready' ) } -/** - * A ghost represents a version of a bucket object as known by Simperium - * - * Generally a client will keep the last known ghost stored locally for efficient - * diffing and patching of Simperium change operations. - * - * @typedef {Object} Ghost - * @property {Number} version - the ghost's version - * @property {String} key - the simperium bucket object id this ghost is for - * @property {Object} data - the data for the given ghost version - */ - -/** - * Callback function used by the ghost store to iterate over existing ghosts - * - * @callback ghostIterator - * @param {Ghost} - the current ghost - */ - -/** - * A GhostStore provides the store mechanism for ghost data that the Channel - * uses to maintain syncing state and producing change operations for - * Bucket objects. - * - * @interface GhostStore - */ - -/** - * Retrieve a Ghost for the given bucket object id - * - * @function - * @name GhostStore#get - * @param {String} id - bucket object id - * @returns {Promise} - the ghost for this object - */ - -/** - * Save a ghost in the store. - * - * @function - * @name GhostStore#put - * @param {String} id - bucket object id - * @param {Number} version - version of ghost data - * @param {Object} data - object literal to save as this ghost's data for this version - * @returns {Promise} - the ghost for this object - */ - -/** - * Delete a Ghost from the store. - * - * @function - * @name GhostStore#remove - * @param {String} id - bucket object id - * @returns {Promise} - the ghost for this object - */ - -/** - * Iterate over existing Ghost objects with the given callback. - * - * @function - * @name GhostStore#eachGhost - * @param {ghostIterator} - function to run against each ghost - */ - -/** - * Get the current change version (cv) that this channel has synced. - * - * @function - * @name GhostStore#getChangeVersion - * @returns {Promise} - the current change version for the bucket - */ - -/** - * Set the current change version. - * - * @function - * @name GhostStore#setChangeVersion - * @returns {Promise} - resolves once the change version is saved - */ - - /** * Maintains syncing state for a Simperium bucket. * @@ -364,7 +432,7 @@ internal.indexingComplete = function() { * @param {GhostStore} store - data storage for ghost objects * @param {String} name - the name of the bucket on Simperium.com */ -export default function Channel( appid, access_token, store, name ) { +export default function Channel( appid: string, access_token: string, store: GhostStore, name: string ) { // Uses an event emitter to handle different Simperium commands const message = this.message = new EventEmitter(); @@ -634,12 +702,95 @@ Channel.prototype.sendChangeVersionRequest = function( cv ) { this.send( format( 'cv:%s', cv ) ); }; +type ChangeMessage = { + clientid?: string, + ccids?: string[], + id?: string, // Bucket object being changed + o?: BucketChangeType, + v?: {}, + cv?: string, + sv?: number, + ev?: number, + error?: number, + d?: {} +}; + +const requireProp = ( key: string, object: {} ): T => { + const value: T = object[ key ]; + if ( value ) { + return value; + } + throw new Error( `unexpected value for key ${ key } in ${ JSON.stringify( object ) }` ); +} + +const asNetworkErrorResponse = ( changeMessage: ChangeMessage ): NetworkChangeErrorResponse => { + return { + id: requireProp( 'id', changeMessage ), + d: changeMessage.d, + ccids: requireProp( 'ccids', changeMessage ), + error: requireProp( 'error', changeMessage ) + }; +} + +class ProtocolError extends Error { +} + +const asNetworkChange = ( changeMessage: ChangeMessage ): NetworkChange => { + const operation: ?BucketChangeType = changeMessage.o; + + if ( ! changeMessage.ccids ) { + throw new ProtocolError( 'nework change missing ccids' ); + } + + if ( ! changeMessage.cv ) { + throw new ProtocolError( 'netwock change missing change version (cv)' ); + } + + if ( ! changeMessage.id ) { + throw new ProtocolError( 'network change missing id' ); + } + + if ( operation === '-' ) { + return { + id: changeMessage.id, + cv: changeMessage.cv, + ccids: changeMessage.ccids, + o: '-', + sv: changeMessage.sv ? changeMessage.sv : 0, + } + } + if ( operation === 'M' ) { + if ( ! changeMessage.ev ) { + throw new ProtocolError( 'network modify change missing ev' ); + } + + if ( ! changeMessage.v ) { + throw new ProtocolError( 'network modify change missing v' ); + } + + return { + id: changeMessage.id, + cv: changeMessage.cv, + ccids: changeMessage.ccids, + o: 'M', + sv: changeMessage.sv ? changeMessage.sv : 0, + ev: changeMessage.ev, + v: changeMessage.v + } + } + throw new Error( `Invalid change type ${ operation ? operation : '' } in c:${ JSON.stringify( changeMessage )}` ); +} + Channel.prototype.onChanges = function( data ) { var changes = JSON.parse( data ), onChange = internal.changeObject.bind( this ); - changes.forEach( function( change ) { - onChange( change.id, change ); + changes.forEach( ( change ) => { + if ( change.error ) { + applyChangeError( this, asNetworkErrorResponse( change ) ); + } else { + onChange( change.id, asNetworkChange( change ) ); + } } ); // emit ready after all server changes have been applied this.emit( 'ready' ); @@ -692,7 +843,7 @@ function Queue() { inherits( Queue, EventEmitter ); // Add a function at the end of the queue -Queue.prototype.add = function( fn ) { +Queue.prototype.add = function( fn: ( () => void ) => void ): Queue { this.queue.push( fn ); this.start(); return this; @@ -719,7 +870,7 @@ Queue.prototype.run = function() { fn( this.run.bind( this ) ); } -function LocalQueue( store ) { +function LocalQueue( store: GhostStore ) { this.store = store; this.sent = {}; this.queues = {}; @@ -866,7 +1017,7 @@ LocalQueue.prototype.resendSentChanges = function() { * * @type {Map} stores specific revisions as a cache */ -export const revisionCache = new Map(); +export const revisionCache: Map = new Map(); /** * Attempts to fetch an entity's revisions @@ -966,8 +1117,9 @@ function collectionRevisions( channel, id, callback ) { requestedVersions.add( version ); // fetch from server or local cache - if ( revisionCache.has( `${ id }.${ version }` ) ) { - onVersion( id, version, revisionCache.get( `${ id }.${ version }` ) ); + const cached = revisionCache.get( `${ id }.${ version }` ); + if ( cached ) { + onVersion( id, version, cached ); } else { channel.send( `e:${ id }.${ version }` ); } diff --git a/src/simperium/jsondiff/diff_match_patch.js b/src/simperium/jsondiff/diff_match_patch.js index 6f97a7a..9bdccea 100644 --- a/src/simperium/jsondiff/diff_match_patch.js +++ b/src/simperium/jsondiff/diff_match_patch.js @@ -1,4 +1,3 @@ -module.exports = diff_match_patch; /** * Diff Match and Patch @@ -29,7 +28,7 @@ module.exports = diff_match_patch; * Class containing the diff, match and patch methods. * @constructor */ -function diff_match_patch() { +export default function diff_match_patch() { // Defaults. // Redefine these in your program to override the defaults. @@ -65,9 +64,9 @@ function diff_match_patch() { * [[DIFF_DELETE, 'Hello'], [DIFF_INSERT, 'Goodbye'], [DIFF_EQUAL, ' world.']] * which means: delete 'Hello', add 'Goodbye' and keep ' world.' */ -var DIFF_DELETE = -1; -var DIFF_INSERT = 1; -var DIFF_EQUAL = 0; +export const DIFF_DELETE = -1; +export const DIFF_INSERT = 1; +export const DIFF_EQUAL = 0; /** @typedef {{0: number, 1: string}} */ diff_match_patch.Diff; @@ -2183,13 +2182,3 @@ diff_match_patch.patch_obj.prototype.toString = function() { } return text.join('').replace(/%20/g, ' '); }; - - -// Export these global variables so that they survive Google's JS compiler. -// In a browser, 'this' will be 'window'. -// Users of node.js should 'require' the uncompressed version since Google's -// JS compiler may break the following exports for non-browser environments. -module.exports['diff_match_patch'] = diff_match_patch; -module.exports['DIFF_DELETE'] = DIFF_DELETE; -module.exports['DIFF_INSERT'] = DIFF_INSERT; -module.exports['DIFF_EQUAL'] = DIFF_EQUAL; \ No newline at end of file diff --git a/src/simperium/jsondiff/index.js b/src/simperium/jsondiff/index.js index 568cfa5..85344f8 100644 --- a/src/simperium/jsondiff/index.js +++ b/src/simperium/jsondiff/index.js @@ -1,8 +1,12 @@ +// @flow import JSONDiff from './jsondiff' import diff_match_patch from './diff_match_patch' +import type { ObjectOperationSet } from './jsondiff'; + +export type { ObjectOperationSet }; export { JSONDiff as jsondiff, diff_match_patch } -export default function init( options ) { +export default function init( options: ?{ list_diff: boolean } ): JSONDiff { return new JSONDiff( options ); } diff --git a/src/simperium/jsondiff/jsondiff.js b/src/simperium/jsondiff/jsondiff.js index 8ff9f16..579fd06 100644 --- a/src/simperium/jsondiff/jsondiff.js +++ b/src/simperium/jsondiff/jsondiff.js @@ -1,14 +1,46 @@ -var diff_match_patch = require('./diff_match_patch'); +// @flow +import diff_match_patch, { DIFF_EQUAL, DIFF_INSERT, DIFF_DELETE } from './diff_match_patch'; -// stolen from https://raw.github.com/Simperium/jsondiff/master/src/jsondiff.js -(function() { - var jsondiff, - __bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; }, - __hasProp = Object.prototype.hasOwnProperty; +// These ar the types of changes that can be applied to a value +type PropertyChangeType + = '+' // add the value at `v` + | '-' // remove the key from the object + | 'r' // replace the key value of object with `v` + | 'I' // increment the value at key, requires the value to be numerical + | 'L' // apply a list diff operation to the property at v + | 'O' // apply an object diff operation to the property at v + | 'd' // apply a diff match patch operation to the property at v + +// Add a value, will exist within an object operation +type AddOperation = { o: '+', v: any } +type ReplaceOperation = { o: 'r', v: any } + +type RemoveOperation = { o: '-' }; +type IncrementOperation = { o: 'I', v: number } +type ListOperation = { o: 'L', v: { [key: number]: Operation } }; +// an object diff is indexed by the key name that the diff applies to + +export type ObjectOperationSet = { [key: string]: Operation }; +type ObjectOperation = { o: 'O', v: ObjectOperationSet }; - jsondiff = (function() { +type DiffMatchPathOperation = { o: 'd', v: string } - function jsondiff(options) { + +export type Operation + // adds a value to an object, is a child of another change + = AddOperation + | ReplaceOperation + | RemoveOperation + | IncrementOperation + | ListOperation + | ObjectOperation + | DiffMatchPathOperation + +// stolen from https://raw.github.com/Simperium/jsondiff/master/src/jsondiff.js + const __bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; }; + const __hasProp = Object.prototype.hasOwnProperty; + + export default function jsondiff(options: ? { list_diff: boolean } ) { this.options = options || {list_diff: true}; this.patch_apply_with_offsets = __bind(this.patch_apply_with_offsets, this); this.transform_object_diff = __bind(this.transform_object_diff, this); @@ -168,7 +200,7 @@ var diff_match_patch = require('./diff_match_patch'); return diffs; }; - jsondiff.prototype.object_diff = function(a, b) { + jsondiff.prototype.object_diff = function(a: {}, b: {}): DiffSet { var diffs, key; diffs = {}; if (!(a != null) || !(b != null)) return {}; @@ -298,8 +330,8 @@ var diff_match_patch = require('./diff_match_patch'); return patched; }; - jsondiff.prototype.apply_object_diff = function(s, diffs) { - var dmp_diffs, dmp_patches, dmp_result, key, op, patched; + jsondiff.prototype.apply_object_diff = function(s: {}, diffs: ObjectOperationSet): {} { + var dmp_diffs, dmp_patches, dmp_result, key, op, patched: {}; patched = this.deepCopy(s); for (key in diffs) { if (!__hasProp.call(diffs, key)) continue; @@ -429,7 +461,7 @@ var diff_match_patch = require('./diff_match_patch'); return ad_new; }; - jsondiff.prototype.transform_object_diff = function(ad, bd, s) { + jsondiff.prototype.transform_object_diff = function(ad: ObjectOperationSet, bd: ObjectOperationSet, s: {}): ?ObjectOperationSet { var a_patches, ab_text, ad_new, aop, b_patches, b_text, bop, dmp_diffs, dmp_patches, dmp_result, key, sk, _ref; ad_new = this.deepCopy(ad); for (key in ad) { @@ -611,11 +643,3 @@ var diff_match_patch = require('./diff_match_patch'); text = text.substring(nullPadding.length, text.length - nullPadding.length); return text; }; - - return jsondiff; - - })(); - - module.exports = jsondiff; - -}).call(); \ No newline at end of file diff --git a/src/simperium/util/change.js b/src/simperium/util/change.js index 96c8a72..943c080 100644 --- a/src/simperium/util/change.js +++ b/src/simperium/util/change.js @@ -1,10 +1,13 @@ -import { v4 as uuid } from 'uuid' +// @flow +import uuid from 'uuid/v4' import jsondiff from '../jsondiff' +import type { ObjectOperationSet } from '../jsondiff'; -const changeTypes = { +export type BucketChangeType = 'M' | '-'; + +const changeTypes: { [name: string]: BucketChangeType } = { MODIFY: 'M', - REMOVE: '-', - ADD: '+' + REMOVE: '-' }; const { object_diff, transform_object_diff, apply_object_diff } = jsondiff( {list_diff: false} ) @@ -17,32 +20,55 @@ export { apply_diff as apply } -function modify( id, version, patch ) { - return { o: changeTypes.MODIFY, id: id, ccid: uuid.v4(), v: patch }; +function modify( id: string, version: number, patch: {} ) { + return { o: changeTypes.MODIFY, id: id, ccid: uuid(), v: patch }; } -function buildChange( type, id, object, ghost ) { +function buildChange( type: BucketChangeType, id: string, object: {}, ghost: {| version: number, data: {} |} ) { return buildChangeFromOrigin( type, id, ghost.version, object, ghost.data ); } -function buildChangeFromOrigin( type, id, version, target, origin ) { - var changeData = { - o: type, - id: id, - ccid: uuid.v4() - }; +export type ModifyChange = { + o: 'M', + id: string, + ccid: string, + v: ObjectOperationSet, + sv?: number +} - // Remove operations have no source version or diff - if ( type === changeTypes.REMOVE ) return changeData; +export type RemoveChange = { + o: '-', + id: string, + ccid: string +} - if ( version > 0 ) changeData.sv = version; +export type Change = ModifyChange | RemoveChange; - changeData.v = object_diff( origin, target ); +function buildChangeFromOrigin( type: BucketChangeType, id: string, version: number, target: {}, origin: {} ): Change { + + if ( type === changeTypes.REMOVE ) { + return { + o: '-', + id, + ccid: uuid() + }; + } - return changeData; + + const change: ModifyChange = { + o: 'M', + id, + ccid: uuid(), + v: object_diff( origin, target ) + } + + if ( version > 0 ) { + change.sv = version; + } + return change; } -function compressChanges( changes, origin ) { +function compressChanges( changes: Array, origin: {} ): ?ObjectOperationSet { var modified; if ( changes.length === 0 ) { @@ -50,13 +76,17 @@ function compressChanges( changes, origin ) { } if ( changes.length === 1 ) { - return changes[0].v; + const change: Change = changes[0]; + if ( change.o === 'M' ) { + return change.v; + } + return null; } modified = changes.reduce( function( from, change ) { // deletes when, any changes after a delete are ignored if ( from === null ) return null; - if ( from.o === changeTypes.REMOVE ) return null; + if ( change.o === '-' ) return null; return apply_object_diff( from, change.v ); }, origin ); @@ -65,10 +95,10 @@ function compressChanges( changes, origin ) { return object_diff( origin, modified ); } -function rebase( local_diff, remote_diff, origin ) { +function rebase( local_diff: ObjectOperationSet, remote_diff: ObjectOperationSet, origin: {} ) { return transform_object_diff( local_diff, remote_diff, origin ); } -function apply_diff( patch, object ) { +function apply_diff( patch: Change, object: {} ) { return apply_object_diff( object, patch ); } diff --git a/test/simperium/channel_test.js b/test/simperium/channel_test.js index dd57405..fe501a0 100644 --- a/test/simperium/channel_test.js +++ b/test/simperium/channel_test.js @@ -77,9 +77,9 @@ describe( 'Channel', function() { version1 = { content: 'step 1'}, version2 = { content: 'step 2'}, version3 = { content: 'step 3'}, - change1 = { o: 'M', ev: 1, cv: 'cv1', id: id, v: diff( {}, version1 )}, - change2 = { o: 'M', ev: 2, sv: 1, cv: 'cv2', id: id, v: diff( version1, version2 )}, - change3 = { o: 'M', ev: 3, sv: 2, cv: 'cv3', id: id, v: diff( version2, version3 )}, + change1 = { o: 'M', ccids: [], ev: 1, cv: 'cv1', id: id, v: diff( {}, version1 )}, + change2 = { o: 'M', ccids: [], ev: 2, sv: 1, cv: 'cv2', id: id, v: diff( version1, version2 )}, + change3 = { o: 'M', ccids: [], ev: 3, sv: 2, cv: 'cv3', id: id, v: diff( version2, version3 )}, check = fn.counts( 2, function( id, data ) { equal( data.content, 'step 3' ); done(); @@ -238,7 +238,7 @@ describe( 'Channel', function() { it( 'should notify bucket after receiving a network change', () => { const id = 'object', data = { content: 'step 1'}, - change = { o: 'M', ev: 1, cv: 'cv1', id: id, v: diff( {}, data )}; + change = { o: 'M', ccids: [], ev: 1, cv: 'cv1', id: id, v: diff( {}, data )}; return new Promise( ( resolve ) => { bucket.on( 'update', () => { @@ -268,7 +268,7 @@ describe( 'Channel', function() { bucket.update( key, {title: 'hello world'}, function() { channel.handleMessage( 'c:' + JSON.stringify( [{ - o: '-', ev: 1, cv: 'cv1', id: key + o: '-', ev: 1, cv: 'cv1', id: key, ccids: [] }] ) ); } ); } ) ); @@ -348,7 +348,7 @@ describe( 'Channel', function() { // We receive a remote change from "Hello world" to "Hello kansas" channel.handleMessage( 'c:' + JSON.stringify( [{ - o: 'M', ev: 2, sv: 1, cv: 'cv1', id: key, v: remoteDiff + o: 'M', ev: 2, sv: 1, cv: 'cv1', id: key, v: remoteDiff, ccids: [] }] ) ); // We're changing "Hello world" to "Goodbye world" @@ -529,7 +529,7 @@ describe( 'Channel', function() { } ); it( 'should request entire object when source version is out of date', ( done ) => { - var change = {o: 'M', id: 'thing', sv: 1, ev: 2, ccid: 'abc', cv: 'new-cv', v: diff( { hello: 'mundo'}, {hello: 'world'} ) }; + var change = {o: 'M', id: 'thing', sv: 1, ev: 2, ccids: ['abc'], cv: 'new-cv', v: diff( { hello: 'mundo'}, {hello: 'world'} ) }; channel.once( 'send', ( data ) => { equal( data, `e:${change.id}.${change.sv}` ); channel.once( 'change-version', ( cv ) => {