22
22
"""
23
23
24
24
import argparse
25
- import time
26
25
27
- from google .cloud import pubsub_v1
28
26
29
-
30
- def list_topics (project ):
27
+ def list_topics (project_id ):
31
28
"""Lists all Pub/Sub topics in the given project."""
32
29
# [START pubsub_list_topics]
30
+ from google .cloud import pubsub_v1
31
+
32
+ # TODO project_id = "Your Google Cloud Project ID"
33
+
33
34
publisher = pubsub_v1 .PublisherClient ()
34
- project_path = publisher .project_path (project )
35
+ project_path = publisher .project_path (project_id )
35
36
36
37
for topic in publisher .list_topics (project_path ):
37
38
print (topic )
38
39
# [END pubsub_list_topics]
39
40
40
41
41
- def create_topic (project , topic_name ):
42
+ def create_topic (project_id , topic_name ):
42
43
"""Create a new Pub/Sub topic."""
43
44
# [START pubsub_create_topic]
45
+ from google .cloud import pubsub_v1
46
+
47
+ # TODO project_id = "Your Google Cloud Project ID"
48
+ # TODO topic_name = "Your Pub/Sub topic name"
49
+
44
50
publisher = pubsub_v1 .PublisherClient ()
45
- topic_path = publisher .topic_path (project , topic_name )
51
+ topic_path = publisher .topic_path (project_id , topic_name )
46
52
47
53
topic = publisher .create_topic (topic_path )
48
54
49
55
print ('Topic created: {}' .format (topic ))
50
56
# [END pubsub_create_topic]
51
57
52
58
53
- def delete_topic (project , topic_name ):
59
+ def delete_topic (project_id , topic_name ):
54
60
"""Deletes an existing Pub/Sub topic."""
55
61
# [START pubsub_delete_topic]
62
+ from google .cloud import pubsub_v1
63
+
64
+ # TODO project_id = "Your Google Cloud Project ID"
65
+ # TODO topic_name = "Your Pub/Sub topic name"
66
+
56
67
publisher = pubsub_v1 .PublisherClient ()
57
- topic_path = publisher .topic_path (project , topic_name )
68
+ topic_path = publisher .topic_path (project_id , topic_name )
58
69
59
70
publisher .delete_topic (topic_path )
60
71
61
72
print ('Topic deleted: {}' .format (topic_path ))
62
73
# [END pubsub_delete_topic]
63
74
64
75
65
- def publish_messages (project , topic_name ):
76
+ def publish_messages (project_id , topic_name ):
66
77
"""Publishes multiple messages to a Pub/Sub topic."""
67
78
# [START pubsub_quickstart_publisher]
68
79
# [START pubsub_publish]
80
+ from google .cloud import pubsub_v1
81
+
82
+ # TODO project_id = "Your Google Cloud Project ID"
83
+ # TODO topic_name = "Your Pub/Sub topic name"
84
+
69
85
publisher = pubsub_v1 .PublisherClient ()
70
- topic_path = publisher .topic_path (project , topic_name )
86
+ # The `topic_path` method creates a fully qualified identifier
87
+ # in the form `projects/{project_id}/topics/{topic_name}`
88
+ topic_path = publisher .topic_path (project_id , topic_name )
71
89
72
90
for n in range (1 , 10 ):
73
91
data = u'Message number {}' .format (n )
74
92
# Data must be a bytestring
75
93
data = data .encode ('utf-8' )
76
- publisher .publish (topic_path , data = data )
94
+ # When you publish a message, the client returns a future.
95
+ future = publisher .publish (topic_path , data = data )
96
+ print ('Published {} of message ID {}.' .format (data , future .result ()))
77
97
78
98
print ('Published messages.' )
79
99
# [END pubsub_quickstart_publisher]
80
100
# [END pubsub_publish]
81
101
82
102
83
- def publish_messages_with_custom_attributes (project , topic_name ):
103
+ def publish_messages_with_custom_attributes (project_id , topic_name ):
84
104
"""Publishes multiple messages with custom attributes
85
105
to a Pub/Sub topic."""
86
106
# [START pubsub_publish_custom_attributes]
107
+ from google .cloud import pubsub_v1
108
+
109
+ # TODO project_id = "Your Google Cloud Project ID"
110
+ # TODO topic_name = "Your Pub/Sub topic name"
111
+
87
112
publisher = pubsub_v1 .PublisherClient ()
88
- topic_path = publisher .topic_path (project , topic_name )
113
+ topic_path = publisher .topic_path (project_id , topic_name )
89
114
90
115
for n in range (1 , 10 ):
91
116
data = u'Message number {}' .format (n )
@@ -99,12 +124,17 @@ def publish_messages_with_custom_attributes(project, topic_name):
99
124
# [END pubsub_publish_custom_attributes]
100
125
101
126
102
- def publish_messages_with_futures (project , topic_name ):
127
+ def publish_messages_with_futures (project_id , topic_name ):
103
128
"""Publishes multiple messages to a Pub/Sub topic and prints their
104
129
message IDs."""
105
130
# [START pubsub_publisher_concurrency_control]
131
+ from google .cloud import pubsub_v1
132
+
133
+ # TODO project_id = "Your Google Cloud Project ID"
134
+ # TODO topic_name = "Your Pub/Sub topic name"
135
+
106
136
publisher = pubsub_v1 .PublisherClient ()
107
- topic_path = publisher .topic_path (project , topic_name )
137
+ topic_path = publisher .topic_path (project_id , topic_name )
108
138
109
139
# When you publish a message, the client returns a Future. This Future
110
140
# can be used to track when the message is published.
@@ -124,11 +154,18 @@ def publish_messages_with_futures(project, topic_name):
124
154
# [END pubsub_publisher_concurrency_control]
125
155
126
156
127
- def publish_messages_with_error_handler (project , topic_name ):
157
+ def publish_messages_with_error_handler (project_id , topic_name ):
128
158
"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
129
159
# [START pubsub_publish_messages_error_handler]
160
+ import time
161
+
162
+ from google .cloud import pubsub_v1
163
+
164
+ # TODO project_id = "Your Google Cloud Project ID"
165
+ # TODO topic_name = "Your Pub/Sub topic name"
166
+
130
167
publisher = pubsub_v1 .PublisherClient ()
131
- topic_path = publisher .topic_path (project , topic_name )
168
+ topic_path = publisher .topic_path (project_id , topic_name )
132
169
133
170
def callback (message_future ):
134
171
# When timeout is unspecified, the exception method waits indefinitely.
@@ -155,17 +192,22 @@ def callback(message_future):
155
192
# [END pubsub_publish_messages_error_handler]
156
193
157
194
158
- def publish_messages_with_batch_settings (project , topic_name ):
195
+ def publish_messages_with_batch_settings (project_id , topic_name ):
159
196
"""Publishes multiple messages to a Pub/Sub topic with batch settings."""
160
197
# [START pubsub_publisher_batch_settings]
161
- # Configure the batch to publish once there is one kilobyte of data or
162
- # 1 second has passed.
198
+ from google .cloud import pubsub_v1
199
+
200
+ # TODO project_id = "Your Google Cloud Project ID"
201
+ # TODO topic_name = "Your Pub/Sub topic name"
202
+
203
+ # Configure the batch to publish as soon as there is one kilobyte
204
+ # of data or one second has passed.
163
205
batch_settings = pubsub_v1 .types .BatchSettings (
164
206
max_bytes = 1024 , # One kilobyte
165
207
max_latency = 1 , # One second
166
208
)
167
209
publisher = pubsub_v1 .PublisherClient (batch_settings )
168
- topic_path = publisher .topic_path (project , topic_name )
210
+ topic_path = publisher .topic_path (project_id , topic_name )
169
211
170
212
for n in range (1 , 10 ):
171
213
data = u'Message number {}' .format (n )
@@ -182,7 +224,7 @@ def publish_messages_with_batch_settings(project, topic_name):
182
224
description = __doc__ ,
183
225
formatter_class = argparse .RawDescriptionHelpFormatter
184
226
)
185
- parser .add_argument ('project ' , help = 'Your Google Cloud project ID' )
227
+ parser .add_argument ('project_id ' , help = 'Your Google Cloud project ID' )
186
228
187
229
subparsers = parser .add_subparsers (dest = 'command' )
188
230
subparsers .add_parser ('list' , help = list_topics .__doc__ )
@@ -220,18 +262,19 @@ def publish_messages_with_batch_settings(project, topic_name):
220
262
args = parser .parse_args ()
221
263
222
264
if args .command == 'list' :
223
- list_topics (args .project )
265
+ list_topics (args .project_id )
224
266
elif args .command == 'create' :
225
- create_topic (args .project , args .topic_name )
267
+ create_topic (args .project_id , args .topic_name )
226
268
elif args .command == 'delete' :
227
- delete_topic (args .project , args .topic_name )
269
+ delete_topic (args .project_id , args .topic_name )
228
270
elif args .command == 'publish' :
229
- publish_messages (args .project , args .topic_name )
271
+ publish_messages (args .project_id , args .topic_name )
230
272
elif args .command == 'publish-with-custom-attributes' :
231
- publish_messages_with_custom_attributes (args .project , args .topic_name )
273
+ publish_messages_with_custom_attributes (
274
+ args .project_id , args .topic_name )
232
275
elif args .command == 'publish-with-futures' :
233
- publish_messages_with_futures (args .project , args .topic_name )
276
+ publish_messages_with_futures (args .project_id , args .topic_name )
234
277
elif args .command == 'publish-with-error-handler' :
235
- publish_messages_with_error_handler (args .project , args .topic_name )
278
+ publish_messages_with_error_handler (args .project_id , args .topic_name )
236
279
elif args .command == 'publish-with-batch-settings' :
237
- publish_messages_with_batch_settings (args .project , args .topic_name )
280
+ publish_messages_with_batch_settings (args .project_id , args .topic_name )
0 commit comments