-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Description
Pub/Sub client library stops sending Ack or modifyAckDeadline requests entirely when flow control settings are defined. See code below: when ran as it is against a subscription with a large backlog, it shows that the client thinks it's processing ~700 messages/second, while there is no outgoing traffic from the machine and the monitoring metrics show no ack | modAckDeadline traffic.
If we comment out the flow control argument, the overall processing rate is the same, acks and modifyAckDeadlines are regularly and successfully received by the server.
Expected behavior: flow control settings do not prevent the client library from acknowledging the message.
- OS type and version
Mac Os Sierra - Python version and virtual environment information
python --version
2.7 - google-cloud-python version
pip show google-cloud,pip show google-<service>orpip freeze
pip show google-cloud-pubsub
Name: google-cloud-pubsub
Version: 0.28.4 - Stacktrace if available
- Steps to reproduce
- Run without flow control setting against a subscription with a large (10K messages) backlog
- Run without flow control setting against a subscription with a large (10K messages) backlog
- Code example
import datetime
import sys
import time
import threading
from google.cloud import pubsub_v1 as pubsub
subscription_name = "projects/%s/subscriptions/%s"%(sys.argv[1], sys.argv[2])
sleep_time_ms = 0
try:
sleep_time_ms = int(sys.argv[3])
except Exception:
print "Could not parse custom sleep time."
print "Using sleep time %g ms"%sleep_time_ms
start_time = -1
messages_processed = 0.0
thread_lock = threading.Lock()
def callback(message):
global start_time, thread_lock,messages_processed
if start_time < 0 :
with thread_lock: start_time = time.time()
time.sleep(float(sleep_time_ms)/1000)
message.ack()
with thread_lock:
messages_processed += 1
subscription = pubsub.SubscriberClient().subscribe(subscription_name, flow_control = pubsub.types.FlowControl(max_messages=100)).open(callback=callback)
while True:
time.sleep(5)
print "Acking messages at rate %.5f / second"%( messages_processed / (time.time() - start_time))Using GitHub flavored markdown can help make your request clearer.
See: https://guides.github.com/features/mastering-markdown/