Skip to content

Adding subscription factory on Pub / Sub client. #3370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/pubsub_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,28 @@ def do_something_with(sub): # pylint: disable=unused-argument
# [END client_list_subscriptions]


@snippet
def client_topic(client, to_delete): # pylint: disable=unused-argument
"""Topic factory."""
TOPIC_NAME = 'topic_factory-%d' % (_millis(),)

# [START client_topic]
topic = client.topic(TOPIC_NAME)
# [END client_topic]


@snippet
def client_subscription(client, to_delete): # pylint: disable=unused-argument
"""Subscription factory."""
SUBSCRIPTION_NAME = 'subscription_factory-%d' % (_millis(),)

# [START client_subscription]
subscription = client.subscription(
SUBSCRIPTION_NAME, ack_deadline=60,
retain_acked_messages=True)
# [END client_subscription]


@snippet
def topic_create(client, to_delete):
"""Create a topic."""
Expand Down
47 changes: 47 additions & 0 deletions pubsub/google/cloud/pubsub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from google.cloud.pubsub._http import _PublisherAPI as JSONPublisherAPI
from google.cloud.pubsub._http import _SubscriberAPI as JSONSubscriberAPI
from google.cloud.pubsub._http import _IAMPolicyAPI
from google.cloud.pubsub.subscription import Subscription
from google.cloud.pubsub.topic import Topic

try:
Expand Down Expand Up @@ -225,6 +226,7 @@ def topic(self, name, timestamp_messages=False):
.. literalinclude:: pubsub_snippets.py
:start-after: [START client_topic]
:end-before: [END client_topic]
:dedent: 4

:type name: str
:param name: the name of the topic to be constructed.
Expand All @@ -236,3 +238,48 @@ def topic(self, name, timestamp_messages=False):
:returns: Topic created with the current client.
"""
return Topic(name, client=self, timestamp_messages=timestamp_messages)

def subscription(self, name, ack_deadline=None, push_endpoint=None,
retain_acked_messages=None,
message_retention_duration=None):
"""Creates a subscription bound to the current client.

Example:

.. literalinclude:: pubsub_snippets.py
:start-after: [START client_subscription]
:end-before: [END client_subscription]
:dedent: 4

:type name: str
:param name: the name of the subscription to be constructed.

:type ack_deadline: int
:param ack_deadline: (Optional) The deadline (in seconds) by which
messages pulledfrom the back-end must be
acknowledged.

:type push_endpoint: str
:param push_endpoint:
(Optional) URL to which messages will be pushed by the back-end.
If not set, the application must pull messages.

:type retain_acked_messages: bool
:param retain_acked_messages:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by ``message_retention_duration``.

:type message_retention_duration: :class:`datetime.timedelta`
:param message_retention_duration:
(Optional) Whether to retain acked messages. If set, acked messages
are retained in the subscription's backlog for a duration indicated
by ``message_retention_duration``. If unset, defaults to 7 days.

:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
:returns: Subscription created with the current client.
"""
return Subscription(
name, ack_deadline=ack_deadline, push_endpoint=push_endpoint,
retain_acked_messages=retain_acked_messages,
message_retention_duration=message_retention_duration, client=self)

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

50 changes: 39 additions & 11 deletions pubsub/tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import unittest

import mock
Expand Down Expand Up @@ -365,7 +366,17 @@ def test_list_subscriptions_w_missing_key(self):
self.assertEqual(api._listed_subscriptions,
(self.PROJECT, None, None))

def test_topic(self):
def test_list_snapshots(self):
creds = _make_credentials()
client = self._make_one(project=self.PROJECT, credentials=creds)
client._connection = object()
api = _FauxSubscriberAPI()
response = api._list_snapshots_response = object()
client._subscriber_api = api
self.assertEqual(client.list_snapshots(), response)
self.assertEqual(api._listed_snapshots, (self.PROJECT, None, None))

def test_topic_factory(self):
PROJECT = 'PROJECT'
TOPIC_NAME = 'TOPIC_NAME'
creds = _make_credentials()
Expand All @@ -379,17 +390,33 @@ def test_topic(self):
'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME))
self.assertFalse(new_topic.timestamp_messages)

def test_list_snapshots(self):
def test_subscription_factory(self):
project = 'PROJECT'
creds = _make_credentials()
client = self._make_one(project=self.PROJECT, credentials=creds)
client._connection = object()
api = _FauxSubscriberAPI()
response = api._list_snapshots_response = object()
client._subscriber_api = api
self.assertEqual(client.list_snapshots(), response)
self.assertEqual(api._listed_snapshots, (self.PROJECT, None, None))
client_obj = self._make_one(project=project, credentials=creds)

sub_name = 'hoot-n-holler'
ack_deadline = 60,
push_endpoint = 'https://api.example.com/push'
message_retention_duration = datetime.timedelta(3600)
new_subscription = client_obj.subscription(
sub_name, ack_deadline=ack_deadline,
push_endpoint=push_endpoint,
retain_acked_messages=True,
message_retention_duration=message_retention_duration)

self.assertEqual(new_subscription.name, sub_name)
self.assertIsNone(new_subscription.topic)
self.assertIs(new_subscription._client, client_obj)
self.assertEqual(new_subscription._project, project)
self.assertEqual(new_subscription.ack_deadline, ack_deadline)
self.assertEqual(new_subscription.push_endpoint, push_endpoint)
self.assertTrue(new_subscription.retain_acked_messages)
self.assertEqual(
new_subscription.message_retention_duration,
message_retention_duration)



class _Iterator(object):

def __init__(self, items, token):
Expand Down Expand Up @@ -419,7 +446,8 @@ def list_subscriptions(self, project, page_size, page_token):

def list_snapshots(self, project, page_size, page_token):
self._listed_snapshots = (project, page_size, page_token)
return self._list_snapshots_response
return self._list_snapshots_response


class _Connection(object):

Expand Down