Skip to content

Commit bb3e135

Browse files
anguillanneufplamut
authored andcommitted
Pub/Sub end-to-end sample [(#1800)](GoogleCloudPlatform/python-docs-samples#1800)
* Created new end-to-end sample, moved old sample * Add space around operator
1 parent d28750b commit bb3e135

File tree

3 files changed

+123
-28
lines changed

3 files changed

+123
-28
lines changed

samples/snippets/publisher.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def list_topics(project_id):
4141

4242
def create_topic(project_id, topic_name):
4343
"""Create a new Pub/Sub topic."""
44+
# [START pubsub_quickstart_create_topic]
4445
# [START pubsub_create_topic]
4546
from google.cloud import pubsub_v1
4647

@@ -53,6 +54,7 @@ def create_topic(project_id, topic_name):
5354
topic = publisher.create_topic(topic_path)
5455

5556
print('Topic created: {}'.format(topic))
57+
# [END pubsub_quickstart_create_topic]
5658
# [END pubsub_create_topic]
5759

5860

samples/snippets/quickstart.py

Lines changed: 80 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,94 @@
1515
# limitations under the License.
1616

1717

18-
def run_quickstart():
19-
# [START pubsub_quickstart_create_topic]
20-
# Imports the Google Cloud client library
18+
import argparse
19+
20+
21+
def end_to_end(project_id, topic_name, subscription_name, num_messages):
22+
# [START pubsub_end_to_end]
23+
import time
24+
2125
from google.cloud import pubsub_v1
2226

23-
# Instantiates a client
27+
# TODO project_id = "Your Google Cloud Project ID"
28+
# TODO topic_name = "Your Pub/Sub topic name"
29+
# TODO num_messages = number of messages to test end-to-end
30+
31+
# Instantiates a publisher and subscriber client
2432
publisher = pubsub_v1.PublisherClient()
33+
subscriber = pubsub_v1.SubscriberClient()
34+
35+
# The `topic_path` method creates a fully qualified identifier
36+
# in the form `projects/{project_id}/topics/{topic_name}`
37+
topic_path = subscriber.topic_path(project_id, topic_name)
2538

26-
# The resource path for the new topic contains the project ID
27-
# and the topic name.
28-
topic_path = publisher.topic_path(
29-
'my-project', 'my-new-topic')
39+
# The `subscription_path` method creates a fully qualified identifier
40+
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
41+
subscription_path = subscriber.subscription_path(
42+
project_id, subscription_name)
3043

3144
# Create the topic.
3245
topic = publisher.create_topic(topic_path)
46+
print('\nTopic created: {}'.format(topic.name))
47+
48+
# Create a subscription.
49+
subscription = subscriber.create_subscription(
50+
subscription_path, topic_path)
51+
print('\nSubscription created: {}\n'.format(subscription.name))
52+
53+
publish_begin = time.time()
54+
55+
# Publish messages.
56+
for n in range(num_messages):
57+
data = u'Message number {}'.format(n)
58+
# Data must be a bytestring
59+
data = data.encode('utf-8')
60+
# When you publish a message, the client returns a future.
61+
future = publisher.publish(topic_path, data=data)
62+
print('Published {} of message ID {}.'.format(data, future.result()))
63+
64+
publish_time = time.time() - publish_begin
3365

34-
print('Topic created: {}'.format(topic))
35-
# [END pubsub_quickstart_create_topic]
66+
messages = set()
67+
68+
def callback(message):
69+
print('Received message: {}'.format(message))
70+
# Unacknowledged messages will be sent again.
71+
message.ack()
72+
messages.add(message)
73+
74+
subscribe_begin = time.time()
75+
76+
# Receive messages. The subscriber is nonblocking.
77+
subscriber.subscribe(subscription_path, callback=callback)
78+
79+
print('\nListening for messages on {}...\n'.format(subscription_path))
80+
81+
while True:
82+
if len(messages) == num_messages:
83+
subscribe_time = time.time() - subscribe_begin
84+
print("\nReceived all messages.")
85+
print("Publish time lapsed: {:.2f}s.".format(publish_time))
86+
print("Subscribe time lapsed: {:.2f}s.".format(subscribe_time))
87+
break
88+
else:
89+
# Sleeps the thread at 50Hz to save on resources.
90+
time.sleep(1. / 50)
91+
# [END pubsub_end_to_end]
3692

3793

3894
if __name__ == '__main__':
39-
run_quickstart()
95+
96+
parser = argparse.ArgumentParser(
97+
description=__doc__,
98+
formatter_class=argparse.RawDescriptionHelpFormatter
99+
)
100+
parser.add_argument('project_id', help='Your Google Cloud project ID')
101+
parser.add_argument('topic_name', help='Your topic name')
102+
parser.add_argument('subscription_name', help='Your subscription name')
103+
parser.add_argument('num_msgs', type=int, help='Number of test messages')
104+
105+
args = parser.parse_args()
106+
107+
end_to_end(args.project_id, args.topic_name, args.subscription_name,
108+
args.num_msgs)

samples/snippets/quickstart_test.py

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
# Copyright 2016 Google Inc. All Rights Reserved.
1+
#!/usr/bin/env python
2+
3+
# Copyright 2018 Google Inc. All Rights Reserved.
24
#
35
# Licensed under the Apache License, Version 2.0 (the "License");
46
# you may not use this file except in compliance with the License.
@@ -15,33 +17,55 @@
1517
import os
1618

1719
from google.cloud import pubsub_v1
18-
import mock
1920
import pytest
20-
2121
import quickstart
2222

2323
PROJECT = os.environ['GCLOUD_PROJECT']
24-
# Must match the dataset listed in quickstart.py
25-
TOPIC_NAME = 'my-new-topic'
26-
TOPIC_PATH = 'projects/{}/topics/{}'.format(PROJECT, TOPIC_NAME)
24+
TOPIC = 'end-to-end-test-topic'
25+
SUBSCRIPTION = 'end-to-end-test-topic-sub'
26+
N = 10
27+
28+
29+
@pytest.fixture(scope='module')
30+
def publisher_client():
31+
yield pubsub_v1.PublisherClient()
2732

2833

29-
@pytest.fixture
30-
def temporary_topic():
31-
"""Fixture that ensures the test topic does not exist before the test."""
32-
publisher = pubsub_v1.PublisherClient()
34+
@pytest.fixture(scope='module')
35+
def topic(publisher_client):
36+
topic_path = publisher_client.topic_path(PROJECT, TOPIC)
3337

3438
try:
35-
publisher.delete_topic(TOPIC_PATH)
39+
publisher_client.delete_topic(topic_path)
3640
except Exception:
3741
pass
3842

39-
yield
43+
yield TOPIC
4044

4145

42-
@mock.patch.object(
43-
pubsub_v1.PublisherClient, 'topic_path', return_value=TOPIC_PATH)
44-
def test_quickstart(unused_topic_path, temporary_topic, capsys):
45-
quickstart.run_quickstart()
46+
@pytest.fixture(scope='module')
47+
def subscriber_client():
48+
yield pubsub_v1.SubscriberClient()
49+
50+
51+
@pytest.fixture(scope='module')
52+
def subscription(subscriber_client, topic):
53+
subscription_path = subscriber_client.subscription_path(
54+
PROJECT, SUBSCRIPTION)
55+
56+
try:
57+
subscriber_client.delete_subscription(subscription_path)
58+
except Exception:
59+
pass
60+
61+
yield SUBSCRIPTION
62+
63+
64+
def test_end_to_end(topic, subscription, capsys):
65+
66+
quickstart.end_to_end(PROJECT, topic, subscription, N)
4667
out, _ = capsys.readouterr()
47-
assert TOPIC_NAME in out
68+
69+
assert "Received all messages" in out
70+
assert "Publish time lapsed" in out
71+
assert "Subscribe time lapsed" in out

0 commit comments

Comments
 (0)