Skip to content

Read multiple JSON objects in a stream #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
ygoe opened this issue Dec 5, 2022 · 9 comments
Open

Read multiple JSON objects in a stream #30

ygoe opened this issue Dec 5, 2022 · 9 comments

Comments

@ygoe
Copy link

ygoe commented Dec 5, 2022

It's not mentioned in the documentation, so I'm wondering if this is already possible. I need a streaming JSON parser that can give me one JSON object after the other. No need for streaming each object but I'm going to receive multiple JSON objects over a byte stream (TCP or similar). The problem is that these are byte streams not message streams. I can parse a JSON object by whatever means but first need to separate them from the stream. And one object (or more) might be read completely while the next object was only read partially. Can this library already do this?

The intended purpose is for JSON-RPC over TCP. I couldn't find any Python library (with a free license) for that. Actually JSON-RPC is very simple. It's the message splitting that's hard. It can be done with separate protocol overhead (like WebSockets does it) or by reading complete JSON objects. (I could do that myself, too, but wanted to see if there's a ready solution.)

@daggaz
Copy link
Owner

daggaz commented Dec 5, 2022

I think this is possible:

from time import sleep

from json_stream.tokenizer import tokenize

import json_stream


def input_stream():
    # simulate JSON-RPC messages
    yield '{"bob": 1, "bobby": 4}'.encode()
    sleep(5)
    yield '{"bobo": 3}'.encode()


def json_documents(f):
    # process each new JSON document in the stream and yield
    # a json_stream parser
    try:
        while True:
            yield json_stream.load(f, tokenizer=tokenize)
    except StopIteration:
        pass


f = input_stream()
for document in json_documents(f):
    # once for each new JSON-RPC message
    print("got new message")
    for k, v in document.items():
        # process message
        print(f"{k} = {v}")
    print("end of message")

Note: due to an issue with the rust tokenizer, this code uses the non-default pure-python tokenizer implementation.

@jorektheglitch
Copy link

jorektheglitch commented Mar 20, 2023

There is one tricky moment - JSONs may be fragmented in different ways.

Example:

"{'alpha': 'bra"
"vo'}{'chadlie': 'delta'}"

Does suggested method works In this case?

@daggaz
Copy link
Owner

daggaz commented Mar 23, 2023

@jorektheglitch I'm not 100% clear what you're saying, that just looks like malformed JSON?

@jorektheglitch
Copy link

jorektheglitch commented Mar 28, 2023

@daggaz, I meant situation in with JSON readed chunk-by-chunk. There is absolutely no guarantee that JSONs will be read from start to end and nothing else. In example I just show two possible chunks readed from stream.

@ygoe
Copy link
Author

ygoe commented Apr 2, 2023

I've meanwhile lost interest in this but did the same in C# as the application moved to that language.

  • What happens if you receive an incomplete chunk? Nothing, keep it for later.
  • What happens if you receive a continuation of that? Consider the previous chunks and this one.
  • What happens if a single chunk contains multiple objects? Return them all. And keep the leftovers for when more arrives.

I implemented this with a preparser. It cannot deserialise JSON objects into anything, it can just track the syntax and tell me if and where an object is complete so I can extract that part of the data and pass it to a real deserialiser. It then continues with what comes afterwards.

@chrishas35
Copy link

I think this is possible:

This specific example is working for me, but I'm having a difficult time making it work for an httpx stream. Feel like I'm missing something obvious, but any pointers appreciated. I swapped to the httpx.load function.

def json_documents(f):
    # process each new JSON document in the stream and yield
    # a json_stream parser
    try:
        while True:
            yield json_stream.httpx.load(f, tokenizer=tokenize)
    except StopIteration:
        pass

with self.client.stream("POST", url, json=data) as resp:
    for document in json_documents(resp):
        # once for each new JSON-RPC message
        print("got new message")
        for k, v in document.items():
            # process message
            print(f"{k} = {v}")
        print("end of message")

This will get me the first object returned, but the next is throwing httpx.StreamConsumed.

@chrishas35
Copy link

Classic situation of needing to ask the question to be able to figure out the answer (I think?)

def json_documents(f):
    # process each new JSON document in the stream and yield
    # a json_stream parser
    try:
        while True:
            yield json_stream.load(f, tokenizer=tokenize)
    except StopIteration:
        pass

with self.client.stream("POST", url, json=data) as resp:
    for document in json_documents(resp.iter_bytes()):
        # once for each new JSON-RPC message
        print("got new message")
        for k, v in document.items():
            # process message
            print(f"{k} = {v}")
        print("end of message")

@daggaz
Copy link
Owner

daggaz commented Sep 28, 2023

@chrishas35 great that works for you, but I think it's only by accident!

We will be merging big changes to the backend tokenizer API to allow this to work all the time.

@chrishas35
Copy link

I may not be the best programmer, but I get lucky some times! 🤣

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants