@@ -1009,6 +1009,80 @@ run against the collection.
10091009 collection="my_collection",
10101010 )
10111011
1012+ Reading message queues
1013+ ======================
1014+
1015+ Ray Data reads from message queues like Kafka.
1016+
1017+ .. _reading_kafka :
1018+
1019+ Reading Kafka
1020+ ~~~~~~~~~~~~~
1021+
1022+ To read data from Kafka topics, call :func: `~ray.data.read_kafka ` and specify
1023+ the topic names and broker addresses. Ray Data performs bounded reads between
1024+ a start and end offset.
1025+
1026+ First, install the required dependencies:
1027+
1028+ .. code-block :: console
1029+
1030+ pip install kafka-python
1031+
1032+ Then, specify your Kafka configuration and read from topics.
1033+
1034+ .. testcode ::
1035+ :skipif: True
1036+
1037+ import ray
1038+
1039+ # Read from a single topic with offset range
1040+ ds = ray.data.read_kafka(
1041+ topics="my-topic",
1042+ bootstrap_servers="localhost:9092",
1043+ start_offset=0,
1044+ end_offset=1000,
1045+ )
1046+
1047+ # Read from multiple topics
1048+ ds = ray.data.read_kafka(
1049+ topics=["topic1", "topic2"],
1050+ bootstrap_servers="localhost:9092",
1051+ start_offset="earliest",
1052+ end_offset="latest",
1053+ )
1054+
1055+ # Read with authentication
1056+ from ray.data.read_api import KafkaAuthConfig
1057+
1058+ auth_config = KafkaAuthConfig(
1059+ security_protocol="SASL_SSL",
1060+ sasl_mechanism="PLAIN",
1061+ sasl_plain_username="your-username",
1062+ sasl_plain_password="your-password",
1063+ )
1064+
1065+ ds = ray.data.read_kafka(
1066+ topics="secure-topic",
1067+ bootstrap_servers="localhost:9092",
1068+ kafka_auth_config=auth_config,
1069+ )
1070+
1071+ print(ds.schema())
1072+
1073+ .. testoutput ::
1074+
1075+ Column Type
1076+ ------ ----
1077+ offset int64
1078+ key binary
1079+ value binary
1080+ topic string
1081+ partition int32
1082+ timestamp int64
1083+ timestamp_type int32
1084+ headers map<string, binary>
1085+
10121086Creating synthetic data
10131087=======================
10141088
0 commit comments