Skip to content

RxSwift support? #686

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
cody1024d opened this issue Jul 13, 2017 · 21 comments
Closed

RxSwift support? #686

cody1024d opened this issue Jul 13, 2017 · 21 comments

Comments

@cody1024d
Copy link

Is there any push to create an RxSwift compatible library for this? It would be very helpful to be able to perform queries in a reactive manor

@jberkel
Copy link
Collaborator

jberkel commented Jul 13, 2017

I'd love to adopt reactive patterns, especially for error handling. Some of them were discussed in #569, and subsequently implemented in GRDB.swift, so maybe check there. @groue, how did it turn out in the end?

@cody1024d
Copy link
Author

cody1024d commented Jul 13, 2017

I know there is a RxGRDB repo: (https://github.com/RxSwiftCommunity/RxGRDB). I even tried to dig around in there to see how it was being done, to see if I could glean anything to layer over this repo. Obviously to get it done a PublishSubject would need to be added somewhere, and that's not too hard, but what I'm having trouble with, is how we'd know when the table is updated properly.

Worse comes to worse (and this is what my plan is for now), I can simply add the reactive code inside of a repository pattern that sits on top of SQLite.Swift, and would just have to hope future developers go through those repositories lol

@groue
Copy link

groue commented Jul 14, 2017

Hello @jberkel, @cody1024d

what I'm having trouble with, is how we'd know when the table is updated properly.

This does not happen in RxGRDB, which is just a thin layer. The meat belongs to GRDB which has a long history of database observation features.

When you mix SQLite Compile-Time Authorization Callbacks, Commit And Rollback Notification Callbacks and Data Change Notification Callbacks, you can:

  • track transactions and the savepoint stack
  • know which columns of which tables are modified by a statement
  • know about all row insertions, updates, and deletions

All those information together give you a precise enough idea of the changes eventually committed by a transaction. You can package this in the public TransactionObserver protocol, the mother of all database observations features.

Compile-Time Authorization Callbacks also tell which columns of which tables are read by a statement. You can thus write a TransactionObserver that fuels a reactive sequence when a transaction has changed the tracked tables and columns: that's the idea of RxGRDB.

SQLite.swift already has updateHook, commitHook and rollbackHook: that's the place to start.

@groue
Copy link

groue commented Jul 14, 2017

@jberkel, I think @cody1024d wasn't talking about using reactive patterns to consume the results of a single request, but to be notified of changes in the results in a request.

#569 was about the fact that Swift Sequence does not quite fit the consumption of the results of a request, and that a cursor whose next method can throw is more suitable. That's why GRDB generally provides three fetching methods, depending on the way you want to consume the results of a request:

let request = Player.all()
// Lazy, does not consume much memory
try request.fetchCursor(db)  // DatabaseCursor<Player>
// Contains copies of database values, may consume a lot of memory
try request.fetchAll(db)     // [Player]
// Consumes a single row
try request.fetchOne(db)     // Player?

RxGRDB does not bring any reactive APIs on top of cursors. It's just a couple lines of code away, though:

// Turns DatabaseCursor into RxSwift Observable
extension DatabaseCursor : ObservableType {
    public typealias E = Element
    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        do {
            while let element = try next() {
                observer.onNext(element)
            }
            observer.onCompleted()
        } catch {
            observer.onError(error)
        }
        return Disposables.create()
    }
}

// Consume a cursor in a reactive way:
try dbQueue.inDatabase { db in
    let cursor = try Player.fetchCursor(db)
    cursor.subscribe(
        onNext: { player in print(player) },
        onError: { error in print(error) }
    )
}

What RxGRDB brings is request observation, that is to say observable sequences of changes in the results of a request:

// Observe a single request:
Player.all().rx
    .fetchAll(in: dbQueue)
    .subscribe(onNext: { players: [Player] in
        print("Players have changed: \(players)")
    })

// Observe some request(s) and fetch consistent results from other request(s):
dbQueue.rx
    .changeTokens(in: [Player.all()])
    .mapFetch { (db: Database) -> ([Player], Int) in
        let players = try Player
            .order(Player.Columns.score.desc)
            .limit(10)
            .fetchAll(db)
        let count = try Player.fetchCount(db)
        return (players, count)
    }
    .subscribe(onNext: { (players, count) in
        print("Best players out of \(count): \(players)")
    })

The ability to fetch "consistent" results in the second example above means that GRDB takes care of preventing concurrent database updates from messing with the fetched results. The mapFetch operator guarantees an immutable view of the database, even when one operates a database under the highly concurrent WAL mode. That's part of the general GRDB concurrency guarantees, that help you write code that is not polluted with concurrency management (a task that is very far from trivial, and better handled by the library than by the user).

I think this is what @cody1024d is after.

Best thing is that RxGRDB eats all GRDB requests, which means that you can observe records, raw database rows, or values fetched from the query builder or even raw SQL:

// Observe maximum player score:
Player
    .select(max(Player.Columns.score))
    .asRequest(of: Int.self)
    .rx
    .fetchOne(in: dbQueue)
    .subscribe(onNext: { maxScore: Int? in
        print("Maximum score has changed: \(maxScore ?? 0)")
    })

// Observe minimum and maximum player scores:
Player
    .select(
        min(Player.Columns.score).aliased("minScore"),
        max(Player.Columns.score).aliased("maxScore"))
    .asRequest(of: Row.self)
    .rx
    .fetchOne(in: dbQueue)
    .map { (row: Row?) -> (minScore: Int?, maxScore: Int?) in
        (minScore: row!.value(named: "minScore"),
         maxScore: row!.value(named: "maxScore"))
    }
    .subscribe(onNext: { (minScore, maxScore) in
        print("Score range has changed: \(minScore ?? 0)-\(maxScore ?? 0)")
    })

// Observe players fetched from the occasionally complex SQL query:
SQLRequest("SELECT * FROM players WHERE score = (SELECT MAX(score) FROM players)")
    .asRequest(of: Player.self)
    .rx
    .fetchAll(in: dbQueue)
    .subscribe(onNext: { (players: [Player]) in
        print("Best players have changed: \(players)")
    })

@cody1024d
Copy link
Author

Thank you @groue for the hardcore breakdown! You are right, that that is what I'm after; the ability to subscribe to, essentially a table, and be notified of updates to said table. I apologize that my initial comment was not well worded. Looking back on it now, even I was confused by what I was asking.

I think I probably could string something together leveraging the hooks built into SQLite.Swift, at least enough so that I could get the end result I wanted. As long as I have the table that was updated in the commitHook, I could hook up a PublishSubject to it correctly, and store the initial query when it's performed, and then in the commitHook just perform that same query again, and push it out over the PublishSubject.

Or, I guess, I could switch to GRDB; however my only concern with that is it leans more towards the Active Record approach, and I much prefer a DAO/Repository approach :/. I guess I could query with raw SQLite within the repositories, but would love to avoid that too. Lol, I want the best of both worlds I guess

@groue
Copy link

groue commented Jul 14, 2017

@cody1024d It's good for both you and SQLite.swift that you start experimenting on the subject.

GRDB is just mature enough now so that you can use it and focus on your application domain. I'm not sure one can say it leans towards Active Record, even though it does indeed makes the assumption that applications prefer dealing with models more than raw rows, and that forcing the user to explicitly turn raw rows into models is a burden that can be avoided. Player.fetchAll(db) is simply easier to the eye than db.prepare(...).map { ... } (and I don't talk about possible optimizations such as direct access to SQLite statements for better performance, or about error handling troubles behind #569). You'll maybe give GRDB a try in a future project ;-)

@cody1024d
Copy link
Author

@groue I'm going to start an issue over at GRDB about the syntax stuff; as I think it's an interesting subject, but don't want to clog this issue up any longer.

@jberkel I'm going to do some looking into getting RX up and running for SQLite.Swift; albeit I'm not sure how clean it will be; but it may be a jumping off point.

@jberkel
Copy link
Collaborator

jberkel commented Jul 15, 2017

@cody1024d ok, great! don't worry too much, we can clean it up, it will be good to have a starting point

@cody1024d
Copy link
Author

@jberkel

Here are my thoughts so far:

    -The RxConnection will wrap around the standard connection
    -It will register itself as the update hook to it's inner connection
    -It has a prepare method (and all the methods that the connection normally has)
        -This will allow me to store the query type, etc. to perform when a change is found

    The prepare method psuedo code:
        Save the passed in query, associated with a specific table (dictionary)
            -TableName to some object that holds a PublishSubject, and the passed in query.
-[String: TableListener] ? (TableListener contains a PublishSubject and the query)
        Execute the query and pass it back through the publish subject that was created before
        Return the publish subject that was just created

    On the Update Hook:

        Look up in the dictionary to see if there is an ObservableListener
        If there is, execute the stored query (that's bound to that ObservableListener), and publish it out through the Publish Subject

I haven't implemented any of this yet, but that's kind of my thinking on how I will go about it. Thoughts?

@cody1024d
Copy link
Author

cody1024d commented Jul 27, 2017

Ok so I am finally getting around to implementing this.

So the above is trivial for insert/delete (atleast in the simplest cases). The interesting part is knowing what columns have changed, given only the ability to use the Connection API (as it stands now). Right now I'm thinking I may have to essentially "rip" the column names out of the Update struct that's passed into the Connection, before it's run.

@jberkel Any insight into a better way of potentially capturing what columns are going to be updated in the context of this library? --This would allow me to give Rx-based observations at the column granularity, as opposed to just at the table; which would be inefficient for updates. For now, my code will return the query results; but for every action performed on the entirety of the table, even if the query and the update are on unrelated columns.

Also, to do this more cleanly, I believe we will need to expose several methods within Sqlite.Swift. Specifically surrounding the QueryType tableName() method, and the queryType.clauses.select variable, which would allow me to get the tablename, and columns from a query, respectively.

@jberkel
Copy link
Collaborator

jberkel commented Jul 28, 2017

@cody1024d great, i'll have a closer look at this soon, travelling right now with limited internet

@cody1024d
Copy link
Author

@jberkel No problem; here is the code I'm using right now. It is working for Inserts/Deletes based on table; although not at the column granularity I wanted. Also you'll notice the hack to get the table name out of the query that I very much don't like :P

It could a bit more encapsulation work, to make the implementation a bit cleaner, that's my goal later today, just wanted to get a working concept together first.

class RxConnection {
    let rawConnection: Connection
    //For every table (and column set eventually) we will have a map of query to subject.  This way, we only perform the query once, even if there are multiple observers for the same query
    //One thing to think about is how do we handle someone make changes when they see a change?  This would end in an endless loop I think
    private var watchedQueries = [ChangesDefinition: [WatchableQuery: [PublishSubject<AnySequence<Row>>]]]()

    init(db: Connection) {
        self.rawConnection = db
        db.updateHook { (operation, db, tableName, rowId) in
            let changesDefinition = ChangesDefinition(tableName: tableName)
            if let queryWatchers = self.watchedQueries[changesDefinition] {
                queryWatchers.forEach { (watchableQuery, subjects) in
                    do {
                        let result = try self.rawConnection.prepare(watchableQuery.query)
                        subjects.forEach { $0.onNext(result) }
                    } catch {
                        subjects.forEach { $0.onError(error) }
                    }

                }
            }
            /**
                TODO--When an update has been done successfully
                If Update:
                    -Need to know which column was to be updated?  --> Need to store which column via the update query?
                    -How do we handle concurrency, two threads call update on this instance,
            **/
        }
    }

    public func scalar<V : Value>(_ query: ScalarQuery<V>) throws -> V {
        return try rawConnection.scalar(query)
    }

    @discardableResult public func run(_ query: Insert) throws -> Int64 {
        return try rawConnection.run(query)
    }

    @discardableResult public func run(_ query: Delete) throws -> Int {
        return try rawConnection.run(query)
    }

    @discardableResult public func run(_ query: Update) throws -> Int {
        return try rawConnection.run(query)
    }
    
    //TODO--Implement all methods on the Connection, and pass through to it

    public func prepare(_ query: QueryType) -> Observable<AnySequence<Row>> {
        //Does not currently handle the granularity of columns, as the update hook does not know which columns were updated
        let tableName: String = query.expression.template.slice(from: "FROM \"", to: "\"")!
        let changesDefinition = ChangesDefinition(tableName: tableName)
        if !watchedQueries.keys.contains(changesDefinition) {
            watchedQueries[changesDefinition] = [:]
        }
        let watchableQuery = WatchableQuery(query: query)
        if !(watchedQueries[changesDefinition]?.keys.contains(watchableQuery) ?? false) {
            watchedQueries[changesDefinition]?[watchableQuery] = []
        }
        let subject = PublishSubject<AnySequence<Row>>()
        watchedQueries[changesDefinition]?[watchableQuery]?.append(subject)

        let originalObs = Observable<AnySequence<Row>>.create { observer in
            do {
                observer.onNext(try self.rawConnection.prepare(query))
            } catch {
                observer.onError(error)
            }
            return Disposables.create {
                self.rawConnection.interrupt()
            }
        }

        return Observable.merge(subject, originalObs)
    }
}



struct WatchableQuery: Hashable {
    let query: QueryType

    public var hashValue: Int {
        return query.asSQL().hashValue
    }

    public static func ==(lhs: WatchableQuery, rhs: WatchableQuery) -> Bool {
        return lhs.query.asSQL() == rhs.query.asSQL()
    }
}

struct ChangesDefinition: Hashable {
    let tableName: String

    public var hashValue: Int {
        return tableName.hashValue
    }

    public static func ==(lhs: ChangesDefinition, rhs: ChangesDefinition) -> Bool {
        return lhs.tableName == rhs.tableName
    }

}

@cody1024d
Copy link
Author

@jberkel Any input on this? I'm using it and it has worked well for me. Definitely can be improved, but I'd like to contribute if I can

@jberkel
Copy link
Collaborator

jberkel commented Sep 16, 2017

@cody1024d how would this be integrated? as a subspec with RxSwift dependency?

@cody1024d
Copy link
Author

Yeah, I would say so. This way users can choose if they want the RX support or not. We could either add it as a subspec to this library, or create another github project for it.

I think the only decision there would be or you (we) would want to support it; if we'd want issues to be lumped together and all, or not.

@jberkel
Copy link
Collaborator

jberkel commented Sep 16, 2017

I'd say lets keep it in one repo, since everything is already set up there

@cody1024d
Copy link
Author

Ok cool. I'm coming down to the wire on the project that it's used in. But I should have time to clean it up, and do a formal PR on it soon(ish). I may have time after my 9-5 to do it as well, but we'll see. We can use a formal PR to code review, etc. it, I think would be best? I'll close this for the time being, and open the PR when I can :)

@robert-cronin
Copy link

Has there been any progress on adding reactive functionality? This would just be the icing on the cake for SQLite.swift

@kwstasna
Copy link

bump

1 similar comment
@norbdev
Copy link

norbdev commented Jul 30, 2020

bump

@kwstasna
Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants