@@ -60,6 +60,51 @@ def create_topic(project_id: str, topic_id: str) -> None:
6060 # [END pubsub_create_topic]
6161
6262
63+ def create_topic_kinesis_ingestion (
64+ project_id : str ,
65+ topic_id : str ,
66+ stream_arn : str ,
67+ consumer_arn : str ,
68+ aws_role_arn : str ,
69+ gcp_service_account : str ,
70+ ) -> None :
71+ """Create a new Pub/Sub topic with AWS Kinesis Ingestion Settings."""
72+ # [START pubsub_quickstart_create_topic_kinesis_ingestion]
73+ # [START pubsub_create_topic_kinesis_ingestion]
74+ from google .cloud import pubsub_v1
75+ from google .pubsub_v1 .types import Topic
76+ from google .pubsub_v1 .types import IngestionDataSourceSettings
77+
78+ # TODO(developer)
79+ # project_id = "your-project-id"
80+ # topic_id = "your-topic-id"
81+ # stream_arn = "your-stream-arn"
82+ # consumer_arn = "your-consumer-arn"
83+ # aws_role_arn = "your-aws-role-arn"
84+ # gcp_service_account = "your-gcp-service-account"
85+
86+ publisher = pubsub_v1 .PublisherClient ()
87+ topic_path = publisher .topic_path (project_id , topic_id )
88+
89+ request = Topic (
90+ name = topic_path ,
91+ ingestion_data_source_settings = IngestionDataSourceSettings (
92+ aws_kinesis = IngestionDataSourceSettings .AwsKinesis (
93+ stream_arn = stream_arn ,
94+ consumer_arn = consumer_arn ,
95+ aws_role_arn = aws_role_arn ,
96+ gcp_service_account = gcp_service_account ,
97+ )
98+ ),
99+ )
100+
101+ topic = publisher .create_topic (request = request )
102+
103+ print (f"Created topic: { topic .name } with AWS Kinesis Ingestion Settings" )
104+ # [END pubsub_quickstart_create_topic_kinesis_ingestion]
105+ # [END pubsub_create_topic_kinesis_ingestion]
106+
107+
63108def delete_topic (project_id : str , topic_id : str ) -> None :
64109 """Deletes an existing Pub/Sub topic."""
65110 # [START pubsub_delete_topic]
@@ -430,6 +475,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
430475 create_parser = subparsers .add_parser ("create" , help = create_topic .__doc__ )
431476 create_parser .add_argument ("topic_id" )
432477
478+ create_topic_kinesis_ingestion_parser = subparsers .add_parser (
479+ "create_kinesis_ingestion" , help = create_topic_kinesis_ingestion .__doc__
480+ )
481+ create_topic_kinesis_ingestion_parser .add_argument ("topic_id" )
482+ create_topic_kinesis_ingestion_parser .add_argument ("stream_arn" )
483+ create_topic_kinesis_ingestion_parser .add_argument ("consumer_arn" )
484+ create_topic_kinesis_ingestion_parser .add_argument ("aws_role_arn" )
485+ create_topic_kinesis_ingestion_parser .add_argument ("gcp_service_account" )
486+
433487 delete_parser = subparsers .add_parser ("delete" , help = delete_topic .__doc__ )
434488 delete_parser .add_argument ("topic_id" )
435489
@@ -490,6 +544,15 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
490544 list_topics (args .project_id )
491545 elif args .command == "create" :
492546 create_topic (args .project_id , args .topic_id )
547+ elif args .command == "create_kinesis_ingestion" :
548+ create_topic_kinesis_ingestion (
549+ args .project_id ,
550+ args .topic_id ,
551+ args .stream_arn ,
552+ args .consumer_arn ,
553+ args .aws_role_arn ,
554+ args .gcp_service_account ,
555+ )
493556 elif args .command == "delete" :
494557 delete_topic (args .project_id , args .topic_id )
495558 elif args .command == "publish" :
0 commit comments