Skip to content

Commit 5426f61

Browse files
salmanygemini-code-assist[bot]glasnt
authored
Add Managed Kafka Connect Connectors Examples (#13522)
* Add Managed Kafka Connect code samples for clusters * Adds code examples for creating, deleting, getting, listing and updating Managed Kafka Connect clusters * Update google-cloud-managedkafka version to 0.1.12 * Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/delete_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/get_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/list_connect_clusters.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/update_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Add timeouts and improve error handling. * Addressed PR comments. * Add Managed Kafka Connect Connectors examples This commit adds examples for creating Managed Kafka Connect connectors: * MirrorMaker connector * BigQuery sink connector * Cloud Storage sink connector * Pub/Sub sink connector * Pub/Sub source connector * Fix import statements and add headers. * Fix sample configs and update MM2 connector. * Remove unwanted local test artifact. * Adds requirements.txt file for samples. As per the [Authoring Guide] (https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#dependencies), each sample is required to have a requirements.txt file that lists the dependencies needed to run the sample. * Remove timeout in update_connect_cluster.py Remove timeout to align with Managed Kafka Cluster update example: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/cdae4cacfe8f9612e554af11ef72bc8d34765ada/managedkafka/snippets/clusters/update_cluster.py#L60 * Fix CPU and memory values for Cluster creation. * Fixed comment for minimum vCpu. * Fix lint by adding space. * lint * lint * lint * lint * resolve conflicting lint reports #13522 (comment) * Minor changes to MM2 sample and test. * lint --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Katie McLaughlin <[email protected]>
1 parent 119c6f1 commit 5426f61

File tree

6 files changed

+691
-0
lines changed

6 files changed

+691
-0
lines changed
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from unittest import mock
16+
from unittest.mock import MagicMock
17+
18+
import create_bigquery_sink_connector
19+
import create_cloud_storage_sink_connector
20+
import create_mirrormaker2_source_connector
21+
import create_pubsub_sink_connector
22+
import create_pubsub_source_connector
23+
from google.api_core.operation import Operation
24+
from google.cloud import managedkafka_v1
25+
import pytest
26+
27+
28+
PROJECT_ID = "test-project-id"
29+
REGION = "us-central1"
30+
CONNECT_CLUSTER_ID = "test-connect-cluster-id"
31+
32+
33+
@mock.patch(
34+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector"
35+
)
36+
def test_create_mirrormaker2_source_connector(
37+
mock_method: MagicMock,
38+
capsys: pytest.CaptureFixture[str],
39+
) -> None:
40+
connector_id = "mm2-source-to-target-connector-id"
41+
operation = mock.MagicMock(spec=Operation)
42+
connector = managedkafka_v1.types.Connector()
43+
connector.name = connector_id
44+
operation.result = mock.MagicMock(return_value=connector)
45+
mock_method.return_value = operation
46+
47+
create_mirrormaker2_source_connector.create_mirrormaker2_source_connector(
48+
PROJECT_ID,
49+
REGION,
50+
CONNECT_CLUSTER_ID,
51+
connector_id,
52+
"source_cluster_dns",
53+
"target_cluster_dns",
54+
"3",
55+
"source",
56+
"target",
57+
".*",
58+
"mm2.*\\.internal,.*\\.replica,__.*",
59+
)
60+
61+
out, _ = capsys.readouterr()
62+
assert "Created Connector" in out
63+
assert connector_id in out
64+
mock_method.assert_called_once()
65+
66+
67+
@mock.patch(
68+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector"
69+
)
70+
def test_create_pubsub_source_connector(
71+
mock_method: MagicMock,
72+
capsys: pytest.CaptureFixture[str],
73+
) -> None:
74+
connector_id = "CPS_SOURCE_CONNECTOR_ID"
75+
operation = mock.MagicMock(spec=Operation)
76+
connector = managedkafka_v1.types.Connector()
77+
connector.name = connector_id
78+
operation.result = mock.MagicMock(return_value=connector)
79+
mock_method.return_value = operation
80+
81+
create_pubsub_source_connector.create_pubsub_source_connector(
82+
PROJECT_ID,
83+
REGION,
84+
CONNECT_CLUSTER_ID,
85+
connector_id,
86+
"GMK_TOPIC_ID",
87+
"CPS_SUBSCRIPTION_ID",
88+
"GCP_PROJECT_ID",
89+
"3",
90+
"org.apache.kafka.connect.converters.ByteArrayConverter",
91+
"org.apache.kafka.connect.storage.StringConverter",
92+
)
93+
94+
out, _ = capsys.readouterr()
95+
assert "Created Connector" in out
96+
assert connector_id in out
97+
mock_method.assert_called_once()
98+
99+
100+
@mock.patch(
101+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector"
102+
)
103+
def test_create_pubsub_sink_connector(
104+
mock_method: MagicMock,
105+
capsys: pytest.CaptureFixture[str],
106+
) -> None:
107+
connector_id = "CPS_SINK_CONNECTOR_ID"
108+
operation = mock.MagicMock(spec=Operation)
109+
connector = managedkafka_v1.types.Connector()
110+
connector.name = connector_id
111+
operation.result = mock.MagicMock(return_value=connector)
112+
mock_method.return_value = operation
113+
114+
create_pubsub_sink_connector.create_pubsub_sink_connector(
115+
PROJECT_ID,
116+
REGION,
117+
CONNECT_CLUSTER_ID,
118+
connector_id,
119+
"GMK_TOPIC_ID",
120+
"org.apache.kafka.connect.storage.StringConverter",
121+
"org.apache.kafka.connect.storage.StringConverter",
122+
"CPS_TOPIC_ID",
123+
"GCP_PROJECT_ID",
124+
"3",
125+
)
126+
127+
out, _ = capsys.readouterr()
128+
assert "Created Connector" in out
129+
assert connector_id in out
130+
mock_method.assert_called_once()
131+
132+
133+
@mock.patch(
134+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector"
135+
)
136+
def test_create_cloud_storage_sink_connector(
137+
mock_method: MagicMock,
138+
capsys: pytest.CaptureFixture[str],
139+
) -> None:
140+
connector_id = "GCS_SINK_CONNECTOR_ID"
141+
operation = mock.MagicMock(spec=Operation)
142+
connector = managedkafka_v1.types.Connector()
143+
connector.name = connector_id
144+
operation.result = mock.MagicMock(return_value=connector)
145+
mock_method.return_value = operation
146+
147+
create_cloud_storage_sink_connector.create_cloud_storage_sink_connector(
148+
PROJECT_ID,
149+
REGION,
150+
CONNECT_CLUSTER_ID,
151+
connector_id,
152+
"GMK_TOPIC_ID",
153+
"GCS_BUCKET_NAME",
154+
"3",
155+
"json",
156+
"org.apache.kafka.connect.json.JsonConverter",
157+
"false",
158+
"org.apache.kafka.connect.storage.StringConverter",
159+
)
160+
161+
out, _ = capsys.readouterr()
162+
assert "Created Connector" in out
163+
assert connector_id
164+
165+
166+
@mock.patch(
167+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector"
168+
)
169+
def test_create_bigquery_sink_connector(
170+
mock_method: MagicMock,
171+
capsys: pytest.CaptureFixture[str],
172+
) -> None:
173+
connector_id = "BQ_SINK_CONNECTOR_ID"
174+
operation = mock.MagicMock(spec=Operation)
175+
connector = managedkafka_v1.types.Connector()
176+
connector.name = connector_id
177+
operation.result = mock.MagicMock(return_value=connector)
178+
mock_method.return_value = operation
179+
180+
create_bigquery_sink_connector.create_bigquery_sink_connector(
181+
PROJECT_ID,
182+
REGION,
183+
CONNECT_CLUSTER_ID,
184+
connector_id,
185+
"GMK_TOPIC_ID",
186+
"3",
187+
"org.apache.kafka.connect.storage.StringConverter",
188+
"org.apache.kafka.connect.json.JsonConverter",
189+
"false",
190+
"BQ_DATASET_ID",
191+
)
192+
193+
out, _ = capsys.readouterr()
194+
assert "Created Connector" in out
195+
assert connector_id in out
196+
mock_method.assert_called_once()
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
def create_bigquery_sink_connector(
16+
project_id: str,
17+
region: str,
18+
connect_cluster_id: str,
19+
connector_id: str,
20+
topics: str,
21+
tasks_max: str,
22+
key_converter: str,
23+
value_converter: str,
24+
value_converter_schemas_enable: str,
25+
default_dataset: str,
26+
) -> None:
27+
"""
28+
Create a BigQuery Sink connector.
29+
30+
Args:
31+
project_id: Google Cloud project ID.
32+
region: Cloud region.
33+
connect_cluster_id: ID of the Kafka Connect cluster.
34+
connector_id: Name of the connector.
35+
topics: Kafka topics to read from.
36+
tasks_max: Maximum number of tasks.
37+
key_converter: Key converter class.
38+
value_converter: Value converter class.
39+
value_converter_schemas_enable: Enable schemas for value converter.
40+
default_dataset: BigQuery dataset ID.
41+
42+
Raises:
43+
This method will raise the GoogleAPICallError exception if the operation errors or
44+
the timeout before the operation completes is reached.
45+
"""
46+
# TODO(developer): Update with your config values. Here is a sample configuration:
47+
# project_id = "my-project-id"
48+
# region = "us-central1"
49+
# connect_cluster_id = "my-connect-cluster"
50+
# connector_id = "BQ_SINK_CONNECTOR_ID"
51+
# topics = "GMK_TOPIC_ID"
52+
# tasks_max = "3"
53+
# key_converter = "org.apache.kafka.connect.storage.StringConverter"
54+
# value_converter = "org.apache.kafka.connect.json.JsonConverter"
55+
# value_converter_schemas_enable = "false"
56+
# default_dataset = "BQ_DATASET_ID"
57+
58+
# [START managedkafka_create_bigquery_sink_connector]
59+
from google.api_core.exceptions import GoogleAPICallError
60+
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
61+
ManagedKafkaConnectClient,
62+
)
63+
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
64+
65+
connect_client = ManagedKafkaConnectClient()
66+
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
67+
68+
configs = {
69+
"name": connector_id,
70+
"project": project_id,
71+
"topics": topics,
72+
"tasks.max": tasks_max,
73+
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
74+
"key.converter": key_converter,
75+
"value.converter": value_converter,
76+
"value.converter.schemas.enable": value_converter_schemas_enable,
77+
"defaultDataset": default_dataset,
78+
}
79+
80+
connector = Connector()
81+
connector.name = connector_id
82+
connector.configs = configs
83+
84+
request = CreateConnectorRequest(
85+
parent=parent,
86+
connector_id=connector_id,
87+
connector=connector,
88+
)
89+
90+
try:
91+
operation = connect_client.create_connector(request=request)
92+
print(f"Waiting for operation {operation.operation.name} to complete...")
93+
response = operation.result()
94+
print("Created Connector:", response)
95+
except GoogleAPICallError as e:
96+
print(f"The operation failed with error: {e}")
97+
# [END managedkafka_create_bigquery_sink_connector]
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
def create_cloud_storage_sink_connector(
16+
project_id: str,
17+
region: str,
18+
connect_cluster_id: str,
19+
connector_id: str,
20+
topics: str,
21+
gcs_bucket_name: str,
22+
tasks_max: str,
23+
format_output_type: str,
24+
value_converter: str,
25+
value_converter_schemas_enable: str,
26+
key_converter: str,
27+
) -> None:
28+
"""
29+
Create a Cloud Storage Sink connector.
30+
31+
Args:
32+
project_id: Google Cloud project ID.
33+
region: Cloud region.
34+
connect_cluster_id: ID of the Kafka Connect cluster.
35+
connector_id: Name of the connector.
36+
topics: Kafka topics to read from.
37+
gcs_bucket_name: Google Cloud Storage bucket name.
38+
tasks_max: Maximum number of tasks.
39+
format_output_type: Output format type.
40+
value_converter: Value converter class.
41+
value_converter_schemas_enable: Enable schemas for value converter.
42+
key_converter: Key converter class.
43+
44+
Raises:
45+
This method will raise the GoogleAPICallError exception if the operation errors or
46+
the timeout before the operation completes is reached.
47+
"""
48+
# TODO(developer): Update with your config values. Here is a sample configuration:
49+
# project_id = "my-project-id"
50+
# region = "us-central1"
51+
# connect_cluster_id = "my-connect-cluster"
52+
# connector_id = "GCS_SINK_CONNECTOR_ID"
53+
# topics = "GMK_TOPIC_ID"
54+
# gcs_bucket_name = "GCS_BUCKET_NAME"
55+
# tasks_max = "3"
56+
# format_output_type = "json"
57+
# value_converter = "org.apache.kafka.connect.json.JsonConverter"
58+
# value_converter_schemas_enable = "false"
59+
# key_converter = "org.apache.kafka.connect.storage.StringConverter"
60+
61+
# [START managedkafka_create_cloud_storage_sink_connector]
62+
from google.api_core.exceptions import GoogleAPICallError
63+
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
64+
ManagedKafkaConnectClient,
65+
)
66+
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
67+
68+
connect_client = ManagedKafkaConnectClient()
69+
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
70+
71+
configs = {
72+
"connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector",
73+
"tasks.max": tasks_max,
74+
"topics": topics,
75+
"gcs.bucket.name": gcs_bucket_name,
76+
"gcs.credentials.default": "true",
77+
"format.output.type": format_output_type,
78+
"name": connector_id,
79+
"value.converter": value_converter,
80+
"value.converter.schemas.enable": value_converter_schemas_enable,
81+
"key.converter": key_converter,
82+
}
83+
84+
connector = Connector()
85+
connector.name = connector_id
86+
connector.configs = configs
87+
88+
request = CreateConnectorRequest(
89+
parent=parent,
90+
connector_id=connector_id,
91+
connector=connector,
92+
)
93+
94+
try:
95+
operation = connect_client.create_connector(request=request)
96+
print(f"Waiting for operation {operation.operation.name} to complete...")
97+
response = operation.result()
98+
print("Created Connector:", response)
99+
except GoogleAPICallError as e:
100+
print(f"The operation failed with error: {e}")
101+
# [END managedkafka_create_cloud_storage_sink_connector]

0 commit comments

Comments
 (0)