Skip to content

Streaming elements in a list #49

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jxnl opened this issue Jun 29, 2023 · 3 comments
Closed

Streaming elements in a list #49

jxnl opened this issue Jun 29, 2023 · 3 comments

Comments

@jxnl
Copy link

jxnl commented Jun 29, 2023

Is there any day to stream the data within a list one by one?

# This is your existing generator that yields chunks of JSON string
def read_json_in_chunks(json_string):
    for i in range(0, len(json_string), 5):  # replace 5 with the chunk size you want
        ss = json_string[i : i + 5]
        yield ss
        print(f"generated {ss}")


import json_stream

json_string = b'{"tasks":[{"id":1,"title":"task1"},{"id":2,"title":"task2"},{"id":3,"title":"task3"}]}'
data = json_stream.load(read_json_in_chunks(json_string), persistent=True)

print(data["tasks"][0])

Currently it streams from [ to ] before returning data["tasks"][0]

or for

for task in data["tasks"]
   ...

It streams everything first.

@daggaz
Copy link
Owner

daggaz commented Jun 29, 2023

Hi @jxnl ,

The reason you're seeing this behaviour, is because the data from the iterator is first being buffered (up to the system default buffer size, which for me is 8096).

If your data is longer than this buffer size, then you would see it being processed in chunks.

This is a side-effect of using the iterable wrapping, I'm not really sure if there's a way around that, but I'm thinking about it.

If you use an unbuffered stream, for example a network socket stream with buffering=0 as below, then you will not see this behaviour:

import asyncio
import socket

import json_stream

json_string = b'{"tasks":[{"id":1,"title":"task1"},{"id":2,"title":"task2"},{"id":3,"title":"task3"}]}'


async def handle_client(_, writer):
    for i in range(0, len(json_string), 15):
        message = json_string[i:i + 15]
        print(f"Sending: {message!r}")
        writer.write(message)
        await writer.drain()
        await asyncio.sleep(1)

    print("Closing connection")
    writer.close()
    await writer.wait_closed()


def test():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('127.0.0.1', 8888))
    f = sock.makefile(mode='b', buffering=0)
    data = json_stream.load(f)

    for task in data["tasks"]:
        print(task)


async def main():
    server = await asyncio.start_server(handle_client, '127.0.0.1', 8888)
    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')
    async with server:
        await asyncio.to_thread(test)


asyncio.run(main())

@daggaz
Copy link
Owner

daggaz commented Jun 29, 2023

Here is an example where all the data doesn't fit inside an 8096 length buffer:

import json

import json_stream


def read_json_in_chunks(json_string, chunk_size=100):
    for i in range(0, len(json_string), chunk_size):
        ss = json_string[i:i+chunk_size]
        yield ss
        print('.', end='', flush=True)


data = {
    'tasks': [{
        'id': i,
        'title': f'task{i}'
    } for i in range(1000)],
}
data = json_stream.load(read_json_in_chunks(json.dumps(data).encode()))

for task in data["tasks"]:
    print(task)

@daggaz
Copy link
Owner

daggaz commented Jun 30, 2023

Ah ha!

I have found the issue. There was an unnecessary io.BuffereReader wrapping the IterableStream that wraps the iterable.

Removing this means that each iterator chunk is passed directly out to the tokenizer un-buffered.

New version 2.3.2 on it's way soon.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants