Skip to content

Does CStream need to have item type Option? #101

@nastynaz

Description

@nastynaz

First off, thank you guys for this repo. I've learned a lot from it.

I noticed when implementing a variant of CStream myself I didn't need to have the item type as Option:

impl KConsumer {
    pub fn new<T: AsRef<str>, V: AsRef<str>>(
        config: &Config,
        topic_name: V,
        consumer_group_id: T,
    ) -> Self {
        let consumer_config = config.build_consumer_config(consumer_group_id);
        let consumer: BaseConsumer<_> = consumer_config.create().expect("Consumer creation error");
        consumer
            .subscribe(&[topic_name.as_ref()])
            .expect("Can't subscribe to specified topic");

        Self {
            consumer: Arc::new(consumer),
        }
    }

    pub fn stream(&self) -> KStream {
        let (sender, receiver) = crossbeam::channel::unbounded();
        let consumer = self.consumer.clone();
        Self::gen_stream(sender, receiver, consumer)
    }

    fn gen_stream(
        sender: Sender<Option<OwnedMessage>>,
        receiver: Receiver<Option<OwnedMessage>>,
        consumer: Arc<BaseConsumer>,
    ) -> KStream {
        let _handle = thread::Builder::new()
            .name("kstream-gen".into())
            .spawn(move || {
                for m in consumer.iter() {
                    let msg = match m {
                        Ok(bm) => Some(bm.detach()),
                        Err(e) => {
                            tracing::error!("{}", e);
                            None
                        }
                    };

                    let _ = sender.send(msg);
                }
            });

        KStream { receiver }
    }
}

pin_project! {
    #[derive(Clone)]
    #[must_use = "streams do nothing unless polled"]
    pub struct KStream {
        #[pin]
        receiver: Receiver<Option<OwnedMessage>>,
    }
}

impl Stream for KStream {
    type Item = OwnedMessage; // this is no longer `Option<OwnedMessage>`

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        if let Ok(inner) = this.receiver.recv() {
            Poll::Ready(inner)
        } else {
            Poll::Pending
        }
    }
}

The stream no longer returns a doubly-nested Option. This allows for slightly more ergonomic iteration:

while let Some(msg) = stream.next().await {
// msg is no longer an Option here
}

I'm wondering if there was a reason you intended to return Options instead of just the message (note that the stream still returns an Option anyway)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions