-
Notifications
You must be signed in to change notification settings - Fork 3
Feat/add data publishing to stream processor #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
pytrickle/client.py
Outdated
while not self.stop_event.is_set(): | ||
# Wait for 333ms or until stop event is set | ||
try: | ||
await asyncio.wait_for(self.stop_event.wait(), timeout=0.333) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout value ( naming?) should be a constant. but ... I think it might be beneficial as a configurable option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think configurable would be good as well. Added an option on StreamProcessor and TrickleClient that defaults to 333ms (3x as fast as a segment). a26d25c
Note that go-livepeer does not deserialize it, just shuffles it along with a scanner.Scan()
that reads until a new line char is found.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change lgtm. Only one small suggestion. Can we log when data_queue
is full?
except queue.Full: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add one warning log line here with when the queue is full?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is the output queue right? Not the data queue. Happy to add that in a separate PR while also changing it to wither an asyncio.Queue
or a Deque
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes. It's a deque. Apparently this will not throw a QueueFull
error, but can be logged once the length hits max https://docs.python.org/3.10/library/collections.html?highlight=deque#collections.deque.
Sure, let's handle this in another PR.
I'm thinking we can add something like this or go back to a regular queue if catching full is more performant:
if len(self.data_queue) == self.data_queue.maxlen:
print("Discarding message:", self.data_queue[0])
self.data_queue.append(new_item)
Add basic plumbing to send data back through trickle protocol.
Dependent on go-livepeer PR livepeer/go-livepeer#3689