Skip to content

Commit aafcd2a

Browse files
committed
Merge pull request #859 from tseaver/825-pubsub-explicit_connection
#825: Allow passing explicit connection to pubsub API methods
2 parents 90b4f72 + ad82cf7 commit aafcd2a

File tree

7 files changed

+630
-206
lines changed

7 files changed

+630
-206
lines changed

gcloud/pubsub/_implicit_environ.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,24 @@ def get_default_connection():
3838
return _DEFAULTS.connection
3939

4040

41+
def _require_connection(connection=None):
42+
"""Infer a connection from the environment, if not passed explicitly.
43+
44+
:type connection: :class:`gcloud.pubsub.connection.Connection`
45+
:param connection: Optional.
46+
47+
:rtype: :class:`gcloud.pubsub.connection.Connection`
48+
:returns: A connection based on the current environment.
49+
:raises: :class:`EnvironmentError` if ``connection`` is ``None``, and
50+
cannot be inferred from the environment.
51+
"""
52+
if connection is None:
53+
connection = get_default_connection()
54+
55+
if connection is None:
56+
raise EnvironmentError('Connection could not be inferred.')
57+
58+
return connection
59+
60+
4161
_DEFAULTS = _DefaultsContainer()

gcloud/pubsub/api.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"""Define API functions (not bound to classes)."""
1616

1717
from gcloud._helpers import get_default_project
18-
from gcloud.pubsub._implicit_environ import get_default_connection
18+
from gcloud.pubsub._implicit_environ import _require_connection
1919
from gcloud.pubsub.subscription import Subscription
2020
from gcloud.pubsub.topic import Topic
2121

@@ -53,8 +53,7 @@ def list_topics(page_size=None, page_token=None,
5353
if project is None:
5454
project = get_default_project()
5555

56-
if connection is None:
57-
connection = get_default_connection()
56+
connection = _require_connection(connection)
5857

5958
params = {}
6059

@@ -66,8 +65,7 @@ def list_topics(page_size=None, page_token=None,
6665

6766
path = '/projects/%s/topics' % project
6867
resp = connection.api_request(method='GET', path=path, query_params=params)
69-
topics = [Topic.from_api_repr(resource, connection)
70-
for resource in resp['topics']]
68+
topics = [Topic.from_api_repr(resource) for resource in resp['topics']]
7169
return topics, resp.get('nextPageToken')
7270

7371

@@ -110,8 +108,7 @@ def list_subscriptions(page_size=None, page_token=None, topic_name=None,
110108
if project is None:
111109
project = get_default_project()
112110

113-
if connection is None:
114-
connection = get_default_connection()
111+
connection = _require_connection(connection)
115112

116113
params = {}
117114

@@ -128,8 +125,6 @@ def list_subscriptions(page_size=None, page_token=None, topic_name=None,
128125

129126
resp = connection.api_request(method='GET', path=path, query_params=params)
130127
topics = {}
131-
subscriptions = [Subscription.from_api_repr(resource,
132-
connection=connection,
133-
topics=topics)
128+
subscriptions = [Subscription.from_api_repr(resource, topics=topics)
134129
for resource in resp['subscriptions']]
135130
return subscriptions, resp.get('nextPageToken')

gcloud/pubsub/subscription.py

Lines changed: 67 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from gcloud.exceptions import NotFound
1818
from gcloud.pubsub.message import Message
1919
from gcloud.pubsub.topic import Topic
20+
from gcloud.pubsub._implicit_environ import _require_connection
2021

2122

2223
class Subscription(object):
@@ -46,17 +47,12 @@ def __init__(self, name, topic, ack_deadline=None, push_endpoint=None):
4647
self.push_endpoint = push_endpoint
4748

4849
@classmethod
49-
def from_api_repr(cls, resource, connection=None, topics=None):
50+
def from_api_repr(cls, resource, topics=None):
5051
"""Factory: construct a topic given its API representation
5152
5253
:type resource: dict
5354
:param resource: topic resource representation returned from the API
5455
55-
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
56-
:param connection: the connection to use. If not passed,
57-
falls back to the default inferred from the
58-
environment.
59-
6056
:type topics: dict or None
6157
:param topics: A mapping of topic names -> topics. If not passed,
6258
the subscription will have a newly-created topic.
@@ -68,8 +64,7 @@ def from_api_repr(cls, resource, connection=None, topics=None):
6864
t_name = resource['topic']
6965
topic = topics.get(t_name)
7066
if topic is None:
71-
topic = topics[t_name] = Topic.from_api_repr({'name': t_name},
72-
connection)
67+
topic = topics[t_name] = Topic.from_api_repr({'name': t_name})
7368
_, _, _, name = resource['name'].split('/')
7469
ack_deadline = resource.get('ackDeadlineSeconds')
7570
push_config = resource.get('pushConfig', {})
@@ -82,11 +77,15 @@ def path(self):
8277
project = self.topic.project
8378
return '/projects/%s/subscriptions/%s' % (project, self.name)
8479

85-
def create(self):
80+
def create(self, connection=None):
8681
"""API call: create the subscription via a PUT request
8782
8883
See:
8984
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/create
85+
86+
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
87+
:param connection: the connection to use. If not passed,
88+
falls back to the topic's connection.
9089
"""
9190
data = {'topic': self.topic.full_name}
9291

@@ -96,36 +95,44 @@ def create(self):
9695
if self.push_endpoint is not None:
9796
data['pushConfig'] = {'pushEndpoint': self.push_endpoint}
9897

99-
conn = self.topic.connection
100-
conn.api_request(method='PUT', path=self.path, data=data)
98+
connection = _require_connection(connection)
99+
connection.api_request(method='PUT', path=self.path, data=data)
101100

102-
def exists(self):
101+
def exists(self, connection=None):
103102
"""API call: test existence of the subscription via a GET request
104103
105104
See
106105
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
106+
107+
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
108+
:param connection: the connection to use. If not passed,
109+
falls back to the topic's connection.
107110
"""
108-
conn = self.topic.connection
111+
connection = _require_connection(connection)
109112
try:
110-
conn.api_request(method='GET', path=self.path)
113+
connection.api_request(method='GET', path=self.path)
111114
except NotFound:
112115
return False
113116
else:
114117
return True
115118

116-
def reload(self):
119+
def reload(self, connection=None):
117120
"""API call: sync local subscription configuration via a GET request
118121
119122
See
120123
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/get
124+
125+
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
126+
:param connection: the connection to use. If not passed,
127+
falls back to the topic's connection.
121128
"""
122-
conn = self.topic.connection
123-
data = conn.api_request(method='GET', path=self.path)
129+
connection = _require_connection(connection)
130+
data = connection.api_request(method='GET', path=self.path)
124131
self.ack_deadline = data.get('ackDeadline')
125132
push_config = data.get('pushConfig', {})
126133
self.push_endpoint = push_config.get('pushEndpoint')
127134

128-
def modify_push_configuration(self, push_endpoint):
135+
def modify_push_configuration(self, push_endpoint, connection=None):
129136
"""API call: update the push endpoint for the subscription.
130137
131138
See:
@@ -135,18 +142,22 @@ def modify_push_configuration(self, push_endpoint):
135142
:param push_endpoint: URL to which messages will be pushed by the
136143
back-end. If None, the application must pull
137144
messages.
145+
146+
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
147+
:param connection: the connection to use. If not passed,
148+
falls back to the topic's connection.
138149
"""
150+
connection = _require_connection(connection)
139151
data = {}
140152
config = data['pushConfig'] = {}
141153
if push_endpoint is not None:
142154
config['pushEndpoint'] = push_endpoint
143-
conn = self.topic.connection
144-
conn.api_request(method='POST',
145-
path='%s:modifyPushConfig' % self.path,
146-
data=data)
155+
connection.api_request(method='POST',
156+
path='%s:modifyPushConfig' % self.path,
157+
data=data)
147158
self.push_endpoint = push_endpoint
148159

149-
def pull(self, return_immediately=False, max_messages=1):
160+
def pull(self, return_immediately=False, max_messages=1, connection=None):
150161
"""API call: retrieve messages for the subscription.
151162
152163
See:
@@ -161,36 +172,44 @@ def pull(self, return_immediately=False, max_messages=1):
161172
:type max_messages: int
162173
:param max_messages: the maximum number of messages to return.
163174
175+
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
176+
:param connection: the connection to use. If not passed,
177+
falls back to the topic's connection.
178+
164179
:rtype: list of (ack_id, message) tuples
165180
:returns: sequence of tuples: ``ack_id`` is the ID to be used in a
166181
subsequent call to :meth:`acknowledge`, and ``message``
167182
is an instance of :class:`gcloud.pubsub.message.Message`.
168183
"""
184+
connection = _require_connection(connection)
169185
data = {'returnImmediately': return_immediately,
170186
'maxMessages': max_messages}
171-
conn = self.topic.connection
172-
response = conn.api_request(method='POST',
173-
path='%s:pull' % self.path,
174-
data=data)
187+
response = connection.api_request(method='POST',
188+
path='%s:pull' % self.path,
189+
data=data)
175190
return [(info['ackId'], Message.from_api_repr(info['message']))
176191
for info in response['receivedMessages']]
177192

178-
def acknowledge(self, ack_ids):
193+
def acknowledge(self, ack_ids, connection=None):
179194
"""API call: acknowledge retrieved messages for the subscription.
180195
181196
See:
182197
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/acknowledge
183198
184199
:type ack_ids: list of string
185200
:param ack_ids: ack IDs of messages being acknowledged
201+
202+
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
203+
:param connection: the connection to use. If not passed,
204+
falls back to the topic's connection.
186205
"""
206+
connection = _require_connection(connection)
187207
data = {'ackIds': ack_ids}
188-
conn = self.topic.connection
189-
conn.api_request(method='POST',
190-
path='%s:acknowledge' % self.path,
191-
data=data)
208+
connection.api_request(method='POST',
209+
path='%s:acknowledge' % self.path,
210+
data=data)
192211

193-
def modify_ack_deadline(self, ack_id, ack_deadline):
212+
def modify_ack_deadline(self, ack_id, ack_deadline, connection=None):
194213
"""API call: update acknowledgement deadline for a retrieved message.
195214
196215
See:
@@ -201,18 +220,26 @@ def modify_ack_deadline(self, ack_id, ack_deadline):
201220
202221
:type ack_deadline: int
203222
:param ack_deadline: new deadline for the message, in seconds
223+
224+
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
225+
:param connection: the connection to use. If not passed,
226+
falls back to the topic's connection.
204227
"""
228+
connection = _require_connection(connection)
205229
data = {'ackId': ack_id, 'ackDeadlineSeconds': ack_deadline}
206-
conn = self.topic.connection
207-
conn.api_request(method='POST',
208-
path='%s:modifyAckDeadline' % self.path,
209-
data=data)
230+
connection.api_request(method='POST',
231+
path='%s:modifyAckDeadline' % self.path,
232+
data=data)
210233

211-
def delete(self):
234+
def delete(self, connection=None):
212235
"""API call: delete the subscription via a DELETE request.
213236
214237
See:
215238
https://cloud.google.com/pubsub/reference/rest/v1beta2/projects/subscriptions/delete
239+
240+
:type connection: :class:`gcloud.pubsub.connection.Connection` or None
241+
:param connection: the connection to use. If not passed,
242+
falls back to the topic's connection.
216243
"""
217-
conn = self.topic.connection
218-
conn.api_request(method='DELETE', path=self.path)
244+
connection = _require_connection(connection)
245+
connection.api_request(method='DELETE', path=self.path)

gcloud/pubsub/test__implicit_environ.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,35 @@ def _callFUT(self):
2323

2424
def test_wo_override(self):
2525
self.assertTrue(self._callFUT() is None)
26+
27+
28+
class Test__require_connection(unittest2.TestCase):
29+
30+
def _callFUT(self, connection=None):
31+
from gcloud.pubsub._implicit_environ import _require_connection
32+
return _require_connection(connection=connection)
33+
34+
def _monkey(self, connection):
35+
from gcloud.pubsub._testing import _monkey_defaults
36+
return _monkey_defaults(connection=connection)
37+
38+
def test_implicit_unset(self):
39+
with self._monkey(None):
40+
with self.assertRaises(EnvironmentError):
41+
self._callFUT()
42+
43+
def test_implicit_unset_passed_explicitly(self):
44+
CONNECTION = object()
45+
with self._monkey(None):
46+
self.assertTrue(self._callFUT(CONNECTION) is CONNECTION)
47+
48+
def test_implicit_set(self):
49+
IMPLICIT_CONNECTION = object()
50+
with self._monkey(IMPLICIT_CONNECTION):
51+
self.assertTrue(self._callFUT() is IMPLICIT_CONNECTION)
52+
53+
def test_implicit_set_passed_explicitly(self):
54+
IMPLICIT_CONNECTION = object()
55+
CONNECTION = object()
56+
with self._monkey(IMPLICIT_CONNECTION):
57+
self.assertTrue(self._callFUT(CONNECTION) is CONNECTION)

0 commit comments

Comments
 (0)