Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions doc/source/data/loading-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,77 @@ run against the collection.
collection="my_collection",
)

Reading from Kafka
======================

Ray Data reads from message queues like Kafka.

.. _reading_kafka:

To read data from Kafka topics, call :func:`~ray.data.read_kafka` and specify
the topic names and broker addresses. Ray Data performs bounded reads between
a start and end offset.

First, install the required dependencies:

.. code-block:: console

pip install kafka-python

Then, specify your Kafka configuration and read from topics.

.. testcode::
:skipif: True

import ray

# Read from a single topic with offset range
ds = ray.data.read_kafka(
topics="my-topic",
bootstrap_servers="localhost:9092",
start_offset=0,
end_offset=1000,
)

# Read from multiple topics
ds = ray.data.read_kafka(
topics=["topic1", "topic2"],
bootstrap_servers="localhost:9092",
start_offset="earliest",
end_offset="latest",
)

# Read with authentication
from ray.data import KafkaAuthConfig

auth_config = KafkaAuthConfig(
security_protocol="SASL_SSL",
sasl_mechanism="PLAIN",
sasl_plain_username="your-username",
sasl_plain_password="your-password",
)

ds = ray.data.read_kafka(
topics="secure-topic",
bootstrap_servers="localhost:9092",
kafka_auth_config=auth_config,
)

print(ds.schema())

.. testoutput::

Column Type
------ ----
offset int64
key binary
value binary
topic string
partition int32
timestamp int64
timestamp_type int32
headers map<string, binary>

Creating synthetic data
=======================

Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from ray.data.iterator import DataIterator, DatasetIterator
from ray.data.preprocessor import Preprocessor
from ray.data.read_api import ( # noqa: F401
KafkaAuthConfig,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation for removing these?

I don't think we require importing from ray.data.read_api anywhere else

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added back, I thought we shouldn't add Kafka-specific stuff in ray.data

KafkaAuthConfig, # noqa: F401
from_arrow,
from_arrow_refs,
from_blocks,
Expand Down