Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions docs/md/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,42 @@ websocket connection.
[operate remotely](/docs/md/js.html#remote-perspective-via-perspective-python-and-tornado)
using a websocket API.

### Async Mode

By default, `perspective` will run with a synchronous interface. Using the
`PerspectiveManager.set_loop_callback()` method, `perspective` can be configured
to defer the application of side-effectful calls like `update()` to an event
loop, such as `asyncio`. When running in Async mode, Perspective will release
the GIL for some operations, enabling better parallelism and overall better
server performance. There are a few important differences when running
`PerspectiveManager` in this mode:

* Calls to methods like `update()` will return immediately, and the reciprocal
`on_update()` callbacks will be invoked on an event later scheduled. Calls
to other methods which require an up-to-date object however will still
synchronously apply the pending update.
* Updates will be _conflated_ when multiple calls to `update()` occur before
the scheduled application. In such cases, you may receive a single
`on_update()` notification for multiple `update()` calls.
* Once `set_loop_callback()` has been called, you may not call Perspective
functions from any other thread.

For example, using Tornado `IOLoop` you can create a dedicated thread for a
`PerspectiveManager`:

```python
manager = perspective.PerspectiveManager()

def perspective_thread():
loop = tornado.ioloop.IOLoop()
manager.set_loop_callback(loop.add_callback)
loop.start()

thread = threading.Thread(target=perspective_thread)
thread.daemon = True
thread.start()
```

### Hosting `Table` and `View` instances

`PerspectiveManager` has the ability to "host" `perspective.Table` and
Expand Down
67 changes: 42 additions & 25 deletions examples/tornado-python/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,57 @@
import tornado.websocket
import tornado.web
import tornado.ioloop
import threading

from perspective import Table, PerspectiveManager, PerspectiveTornadoHandler


here = os.path.abspath(os.path.dirname(__file__))
file_path = os.path.join(
here, "..", "..", "node_modules", "superstore-arrow", "superstore.arrow")
here, "..", "..", "node_modules", "superstore-arrow", "superstore.arrow"
)


def perspective_thread(manager):
"""Perspective application thread starts its own tornado IOLoop, and
adds the table with the name "data_source_one", which will be used
in the front-end."""
psp_loop = tornado.ioloop.IOLoop()
manager.set_loop_callback(psp_loop.add_callback)
with open(file_path, mode="rb") as file:
table = Table(file.read(), index="Row ID")
manager.host_table("data_source_one", table)
manager.host_view("view_one", table.view())
psp_loop.start()


def make_app():
with open(file_path, mode='rb') as file:
# Create an instance of `PerspectiveManager` and a table.
MANAGER = PerspectiveManager()
TABLE = Table(file.read(), index="Row ID")

# Track the table with the name "data_source_one", which will be used
# in the front-end to access the Table.
MANAGER.host_table("data_source_one", TABLE)
MANAGER.host_view("view_one", TABLE.view())

return tornado.web.Application([
# create a websocket endpoint that the client Javascript can access
(r"/websocket", PerspectiveTornadoHandler, {
"manager": MANAGER,
"check_origin": True
}),
(r"/node_modules/(.*)", tornado.web.StaticFileHandler, {
"path": "../../node_modules/@finos/"
}),
(r"/(.*)", tornado.web.StaticFileHandler, {
"path": "./",
"default_filename": "index.html"
})
], websocket_ping_interval=15)
manager = PerspectiveManager()

thread = threading.Thread(target=perspective_thread, args=(manager,))
thread.daemon = True
thread.start()

return tornado.web.Application(
[
(
r"/websocket",
PerspectiveTornadoHandler,
{"manager": manager, "check_origin": True},
),
(
r"/node_modules/(.*)",
tornado.web.StaticFileHandler,
{"path": "../../node_modules/@finos/"},
),
(
r"/(.*)",
tornado.web.StaticFileHandler,
{"path": "./", "default_filename": "index.html"},
),
],
websocket_ping_interval=15,
)


if __name__ == "__main__":
Expand Down
126 changes: 80 additions & 46 deletions examples/tornado-streaming-python/server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import random
import logging
import threading
import tornado.websocket
import tornado.web
import tornado.ioloop
Expand All @@ -11,67 +12,100 @@ def data_source():
rows = []
modifier = random.random() * random.randint(1, 50)
for i in range(5):
rows.append({
"name": SECURITIES[random.randint(0, len(SECURITIES) - 1)],
"client": CLIENTS[random.randint(0, len(CLIENTS) - 1)],
"open": (random.random() * 75 + random.randint(0, 9)) * modifier,
"high": (random.random() * 105 + random.randint(1, 3)) * modifier,
"low": (random.random() * 85 + random.randint(1, 3)) * modifier,
"close": (random.random() * 90 + random.randint(1, 3)) * modifier,
"lastUpdate": datetime.now(),
"date": date.today()
})
rows.append(
{
"name": SECURITIES[random.randint(0, len(SECURITIES) - 1)],
"client": CLIENTS[random.randint(0, len(CLIENTS) - 1)],
"open": (random.random() * 75 + random.randint(0, 9)) * modifier,
"high": (random.random() * 105 + random.randint(1, 3)) * modifier,
"low": (random.random() * 85 + random.randint(1, 3)) * modifier,
"close": (random.random() * 90 + random.randint(1, 3)) * modifier,
"lastUpdate": datetime.now(),
"date": date.today(),
}
)
return rows


SECURITIES = ["AAPL.N", "AMZN.N", "QQQ.N", "NVDA.N", "TSLA.N",
"FB.N", "MSFT.N", "TLT.N", "XIV.N", "YY.N",
"CSCO.N", "GOOGL.N", "PCLN.N"]
SECURITIES = [
"AAPL.N",
"AMZN.N",
"QQQ.N",
"NVDA.N",
"TSLA.N",
"FB.N",
"MSFT.N",
"TLT.N",
"XIV.N",
"YY.N",
"CSCO.N",
"GOOGL.N",
"PCLN.N",
]

CLIENTS = ["Homer", "Marge", "Bart", "Lisa", "Maggie",
"Moe", "Lenny", "Carl", "Krusty"]
CLIENTS = ["Homer", "Marge", "Bart", "Lisa", "Maggie", "Moe", "Lenny", "Carl", "Krusty"]


def make_app():
# Create an instance of `PerspectiveManager` and a table.
MANAGER = PerspectiveManager()
TABLE = Table({
"name": str,
"client": str,
"open": float,
"high": float,
"low": float,
"close": float,
"lastUpdate": datetime,
"date": date
}, limit=2500)
def perspective_thread(manager):
"""Perspective application thread starts its own tornado IOLoop, and
adds the table with the name "data_source_one", which will be used
in the front-end."""
psp_loop = tornado.ioloop.IOLoop()
manager.set_loop_callback(psp_loop.add_callback)
table = Table(
{
"name": str,
"client": str,
"open": float,
"high": float,
"low": float,
"close": float,
"lastUpdate": datetime,
"date": date,
},
limit=2500,
)

# Track the table with the name "data_source_one", which will be used in
# the front-end to access the Table.
MANAGER.host_table("data_source_one", TABLE)
manager.host_table("data_source_one", table)

# update with new data every 50ms
def updater():
TABLE.update(data_source())
table.update(data_source())

callback = tornado.ioloop.PeriodicCallback(
callback=updater, callback_time=50)
callback = tornado.ioloop.PeriodicCallback(callback=updater, callback_time=50)
callback.start()
psp_loop.start()


def make_app():
manager = PerspectiveManager()

thread = threading.Thread(target=perspective_thread, args=(manager,))
thread.daemon = True
thread.start()

return tornado.web.Application([
# create a websocket endpoint that the client Javascript can access
(r"/websocket", PerspectiveTornadoHandler, {
"manager": MANAGER,
"check_origin": True
}),
(r"/node_modules/(.*)", tornado.web.StaticFileHandler, {
"path": "../../node_modules/@finos/"
}),
(r"/(.*)", tornado.web.StaticFileHandler, {
"path": "./",
"default_filename": "index.html"
})
])
return tornado.web.Application(
[
# create a websocket endpoint that the client Javascript can access
(
r"/websocket",
PerspectiveTornadoHandler,
{"manager": manager, "check_origin": True},
),
(
r"/node_modules/(.*)",
tornado.web.StaticFileHandler,
{"path": "../../node_modules/@finos/"},
),
(
r"/(.*)",
tornado.web.StaticFileHandler,
{"path": "./", "default_filename": "index.html"},
),
]
)


if __name__ == "__main__":
Expand Down
3 changes: 2 additions & 1 deletion packages/perspective/src/js/api/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ export class Client {
* Process an asynchronous message.
*/
post(msg, resolve, reject, keep_alive = false) {
++this._worker.msg_id;
if (resolve || reject) {
this._worker.handlers[++this._worker.msg_id] = {resolve, reject, keep_alive};
this._worker.handlers[this._worker.msg_id] = {resolve, reject, keep_alive};
}
msg.id = this._worker.msg_id;
if (this._worker.initialized.value) {
Expand Down
Loading