Skip to content

pubsub - modify_ack_deadline does not work, message gets redelivered after 10 minutes always #4648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
sachin-shetty opened this issue Dec 21, 2017 · 27 comments
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. triaged for GA type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@sachin-shetty
Copy link

sachin-shetty commented Dec 21, 2017

  1. Specify the API at the beginning of the title (for example, "BigQuery: ...")
    General, Core, and Other are also allowed as types
    google-cloud-pubsub - 0.29.2
  2. OS type and version
    Linux 4.4.0-104-generic Set up local machine test automation with tox. #127-Ubuntu SMP Mon Dec 11 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
  3. Python version and virtual environment information python --version
    Python 2.7.12
  4. google-cloud-python version pip show google-cloud, pip show google-<service> or pip freeze
    google-cloud: 0.30.0
    google-cloud-pubsub - 0.29.2
  5. Stacktrace if available
  6. Steps to reproduce
    I need to have very long ack deadlines since our consumer process could be running for long time per message. No matter what I set as ack_deadline_seconds in subscription definition, or call subscriber.modify_ack_deadline every 1 minute, the message gets redelivered in exact ten minutes.
  7. Code example

I have a test case with a simple subscriber::

subscriber.create_subscription(sname, tname, ack_deadline_seconds=120)
subscription = subscriber.subscribe(
        sname #, flow_control=flow_control
 )
future = subscription.open(callback)

def callback(pubsub_message):
    logging.warn("Received message: %s %s at %s", pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    i = 0
    while i < 480:
        i = i + 1
        time.sleep(60)
        subscriber.modify_ack_deadline(sname, [pubsub_message._ack_id], 120)
        logging.warn("Renewed: %s %s %s at %s", i, pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

    logging.warn("Ack: %s %s at %s", pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    pubsub_message.ack()

enabling DEBUG logging, I see ack messages every few seconds
DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:The current p99 value is 10 seconds.
DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Renewing lease for 5 ack IDs.
DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Snoozing lease management for 7.362112 seconds.
DEBUG:google.cloud.pubsub_v1.subscriber._consumer:Sending request:

but message gets redelivered in 10 minutes.

Using GitHub flavored markdown can help make your request clearer.
See: https://guides.github.com/features/mastering-markdown/

@dhermes
Copy link
Contributor

dhermes commented Dec 21, 2017

@sachin-shetty Could you try with google-cloud-python==0.30.0. There were a few bugs in flow control fixed between 0.29.2 and 0.29.4 (though it doesn't seem like that would be causing the issue you describe.

@dhermes
Copy link
Contributor

dhermes commented Dec 21, 2017

Also, I'm not 100% sure how the line

subscriber.modify_ack_deadline(sname, [pubsub_message._ack_id], 120)

will interact with the lease management thread (i.e. the one that is logging "Renewing lease for ...") but that seems to be the problem.

The "expected" usage pattern would be that you assume your callback only need the state of the Message it gets:

pubsub_message.modify_ack_deadline(120.0)

@sachin-shetty
Copy link
Author

Hi @dhermes

Tried new version as well as pubsub_message.modify_ack_deadline. Same result. Message gets re-delivered in exactly 10 minutes even after calling pubsub_message.modify_ack_deadline(120) every minute.

and even though pubsub_message.modify_ack_deadline(120) is called, debug logs still show 10s

DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:The current p99 value is 10 seconds.
DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Renewing lease for 4 ack IDs.
DEBUG:google.cloud.pubsub_v1.subscriber.policy.base:Snoozing lease management for 8.668086 seconds.
DEBUG:google.cloud.pubsub_v1.subscriber._consumer:Sending request:
modify_deadline_seconds: 10
modify_deadline_seconds: 10
modify_deadline_seconds: 10
modify_deadline_seconds: 10
modify_deadline_ack_ids: "fjY5RUFeQBJMPQxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIAIG8QLUJaGmJoXFx1B1ALGXV6MCFrU0cACERZfndrOTNpWF9xAFQEHnR7YHRjWjvkncSLwfJoZh89WxJLLD4"
modify_deadline_ack_ids: "fjY5RUFeQBJMPQxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIAIG8QLUJaGmJoXFx1B1ALGXV6MCFrU0cACEVZfndrOTNpWF9xDlAEGnV9aXxpUjvkncSLwfJoZh89WxJLLD4"
modify_deadline_ack_ids: "fjY5RUFeQBJMPQxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIAIG8QLUJaGGJoXFx1B1ALGXV6MHdjDhUBCERZfndrOTNpWF9wA1UKHXt8Yn1tXzvkncSLwfJoZh89WxJLLD4"
modify_deadline_ack_ids: "fjY5RUFeQBJMPQxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIAIG8QLUJaGGJoXFx1B1ALGXV6MHdjDhUBCEVZfndrOTNpWF9xAVELGXB1Z3RvUjvkncSLwfJoZh89WxJLLD4"

@sachin-shetty
Copy link
Author

I tried commenting _start_lease_worker() in subscriber/policy/thread.py, that stopped the lease worker from forcing a 10s ack deadline.

but the message sent to server by the subscriber.modify_ack_deadline(sname, [pubsub_message._ack_id], 120) still shows as 10s in the debug logs. I see that 10s is hardcoded in base.py

Is there a version I can downgrade to to get this to work, I need longer message ack time.

@dhermes dhermes added api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. labels Dec 21, 2017
@dhermes
Copy link
Contributor

dhermes commented Dec 21, 2017

I'm fairly certain this is a bug. The lease maintenance thread is trying to make sure a deadline gets extended for any messages that the subscriber is still responsible for, but in doing this extension it actually reduces the lease time for your message (this is a mistake in your and my opinion).

There is no equivalent high-throughput / highly concurrent implementation you can downgrade to. Here is every Pub / Sub release. Version 0.28.0 was when the rewrite happened, so downgrading to 0.27.0 or earlier you won't be subject to this bug, but the library was totally different at that point. (We don't have any published docs for that version of the library, but I'd be happy to work with you to show you how to build the docs locally.)

@sachin-shetty
Copy link
Author

Thankyou @dhermes. Do you think a potential fix would be available anytime soon? I am trying to figure out if I should build application level logic to handle this or wait for a fix.

@MaxDesiatov
Copy link

actually 0.27.0 docs are still available at http://temp.theadora.io/google-cloud-python-pubsub-0-27/pubsub/usage.html

@dhermes
Copy link
Contributor

dhermes commented Jan 2, 2018

@explicitcall Thanks for the link, but be aware that the temp is not a typo in that domain (it is a staging domain used by @jonparrott).

@MaxDesiatov
Copy link

@dhermes are there any plans to host documentation for multiple versions on the official docs site? That would be greatly appreciated.

@dhermes
Copy link
Contributor

dhermes commented Jan 2, 2018

@explicitcall No concrete plans, but I have ideas in my head once we can split google-cloud-python into N repositories (one for each subpackage). It's a difficult task right now that would require a HUGE investment of time making modifications to the RTD Sphinx theme.

@chemelnucfin chemelnucfin added the type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. label Jan 9, 2018
@danoscarmike
Copy link
Contributor

Blocked by #4325?

@danoscarmike danoscarmike added release blocking Required feature/issue must be fixed prior to next release. triaged for GA labels Jan 19, 2018
@kir-titievsky
Copy link

@sachin-shetty This is a known server-side bug. There are two that have rolled out server side this week. You should see a lot less of this issue, but it might recur to an extent. There is a relatively straightforward workaround -- let me know if things are not any better for you (literally) starting today. Thanks for your patience!

@kir-titievsky
Copy link

@dhermes Let's keep this one open since if we don't solve this completely server-side, we'll need to address the use case some other way -- e.g. through documentation on explicit limits and alternatives.

Also: the receipt acks bug/FR will make this worse. So we may need to fix that to call this fixed.

@SeanMaday
Copy link

I am still not able to the extend an acknowledgement using the google-cloud-pubsub==0.30.1 library. Can you please recommend a workaround?

@sachin-shetty
Copy link
Author

sachin-shetty commented Jan 21, 2018

@kir-titievsky . I tried with 0.30.1, still cannot get the message to be renewed after 10 minutes, irrespective of any number of callbacks to modify_ack_deadline.

My test script:

import google.cloud.pubsub_v1 as pubsub
import google.cloud.pubsub_v1.subscriber.policy.thread as pubsub_thread
from google.cloud import exceptions
from google.oauth2 import service_account
import time
import json
import logging,logging.handlers
from datetime import datetime
import sys,os
import ffmpeg_transcode
import utils
import shutil
from ConfigParser import ConfigParser
import re
from timeit import default_timer as timer
from logging.handlers import TimedRotatingFileHandler
import thread, threading
from concurrent.futures import ThreadPoolExecutor

subscriber = pubsub.SubscriberClient()
sname = 'projects/<project-name>/subscriptions/ack-test-topic-subscribe'
#logging.getLogger("google.cloud").setLevel(logging.DEBUG)

def create_topic(project_id, topic_name):
    publisher = pubsub.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)

    try:
        topic = publisher.create_topic(topic_path)
        logging.warn("Topic created: %s", topic)
    except exceptions.Conflict as e:
        logging.warn("Topic %s exists: %s", topic_path, repr(e))
    publisher.publish(topic_path, 'Published at: %s ' % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

def callback(pubsub_message):
    logging.warn("Received message: %s %s at %s", pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    i = 0
    #pubsub_message.ack()
    while i < 480:
        i = i + 1
        time.sleep(60)
        subscriber.modify_ack_deadline(sname, [pubsub_message._ack_id], 120)
        pubsub_message.modify_ack_deadline(120)
        logging.warn("Renewed: %s %s %s at %s", i, pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
        
    logging.warn("Ack: %s %s at %s", pubsub_message.message_id, pubsub_message.data, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
    pubsub_message.ack()

if __name__ == '__main__':
    tname = 'projects/<project-name>/topics/ack-test-topic'
    create_topic('<project-name>', 'ack-test-topic')
    try:
        subscriber.create_subscription(sname, tname, ack_deadline_seconds=120)
    except exceptions.Conflict as e:
        logging.warn("Subscriber %s exists: %s", sname, repr(e))

    flow_control = pubsub.types.FlowControl(max_messages=10)
    global subscription
    subscription = subscriber.subscribe(
        sname #, flow_control=flow_control
    )
    future = subscription.open(callback)
    while True:
        try:
            logging.warn("Waiting for messages on %s", sname)
            future.result()
            time.sleep(10)
        except Exception as ex:
            logging.exception("Error in blocked result")
            subscription.close()
            future = subscription.open(callback)
            logging.warn("Opened again: %s %s", tname, sname)

Output:

WARNING:root:Received message: 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:46:36
WARNING:root:Renewed: 1 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:47:36
WARNING:root:Renewed: 2 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:48:36
WARNING:root:Renewed: 3 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:49:37
WARNING:root:Renewed: 4 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:50:37
WARNING:root:Renewed: 5 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:51:37
WARNING:root:Renewed: 6 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:52:37
WARNING:root:Renewed: 7 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:53:38
WARNING:root:Renewed: 8 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:54:38
WARNING:root:Renewed: 9 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:55:39
WARNING:root:Received message: 26060171091203 Published at: 2018-01-21 23:46:32  at 2018-01-21 23:56:35

Message gets redelivered at 10th minute.

Please let me know of any workaround, this is a blocker for us.

@sachin-shetty
Copy link
Author

Hi - @kir-titievsky - can you please let me know the workaround?

@kir-titievsky
Copy link

Sachin, The code below worked for me with no duplicates. I have a vague subscription that your error handling at the very bottom might be to blame for your results: the underlying code occasionally surfaces exceptions that come from closed connections. The code will rebuild the connection automatically. You, however, close the client and rebuild it -- which stops all renewals. Try catching only specific exception types maybe?

import google.cloud.pubsub_v1 as pubsub
from google.cloud import exceptions
import time
import logging,logging.handlers
from datetime import datetime
import sys,os

subscriber = pubsub.SubscriberClient()
sname = 'projects/google.com:kir-learns-cloud/subscriptions/sachin'

def timestamp():
  return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

def create_topic(project_id, topic_name):
    publisher = pubsub.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_name)
    publish_future = publisher.publish(topic_path, 'Published at: %s ' % timestamp())
    # wait for the publish to succeed
    logging.warn("PUB : %s at %s", publish_future.result(), timestamp())

def callback(pubsub_message):
    logging.warn("PULL: %s %s at %s", pubsub_message.message_id, pubsub_message.data, timestamp())
    time.sleep(60*12)
    logging.warn("ACK : %s %s at %s", pubsub_message.message_id, pubsub_message.data, timestamp())
    pubsub_message.ack()

if __name__ == '__main__':
    project = 'google.com:kir-learns-cloud'
    create_topic(project, 'sachin')
    subscriber = pubsub.SubscriberClient()
    subscriber.subscribe("projects/%s/subscriptions/%s"%(project, "sachin"), callback=callback)
    while True:
        time.sleep(10)                      

@sachin-shetty
Copy link
Author

@kir-titievsky - Sorry no luck, with your script, I get duplicates at 10 minutes

WARNING:root:PUB : 27133513131810 at 2018-01-24 11:06:50
WARNING:root:PULL: 27133513131810 Published at: 2018-01-24 11:06:50 at 2018-01-24 11:06:51
WARNING:root:PULL: 27133513131810 Published at: 2018-01-24 11:06:50 at 2018-01-24 11:16:49
WARNING:root:ACK : 27133513131810 Published at: 2018-01-24 11:06:50 at 2018-01-24 11:18:51

I only changed the project and topic in your script.

< sname = 'projects/google.com:kir-learns-cloud/subscriptions/sachin'
---
> #sname = 'projects/google.com:kir-learns-cloud/subscriptions/sachin'
> sname = 'projects/audit-bq/subscriptions/ack-test-topic-subscribe'
> tname = 'projects/audit-bq/topics/ack-test-topic'
28,29c30,31
<     project = 'google.com:kir-learns-cloud'
<     create_topic(project, 'sachin')
---
>     project = 'audit-bq'
>     create_topic(project, 'ack-test-topic')
31c33
<     subscriber.subscribe("projects/%s/subscriptions/%s"%(project, "sachin"), callback=callback)
---
>     subscriber.subscribe(sname, callback=callback)

I also tried this from a US server to see if that makes any difference, no luck.

@SeanMaday - can you please try with the script provided by @kir-titievsky since you are also hitting the same issue.

@kir-titievsky
Copy link

kir-titievsky commented Jan 24, 2018 via email

@sachin-shetty
Copy link
Author

Thankyou @kir-titievsky. I tested 0.27 and it looks better, I am able to hold on to a message longer as long as I keep renewing it.

BTW, even nack does not work in 0.31, I fetched a messages, nacked it at 8th minute, the message got immediately redelivered - which is good, but it again got redelivered at 10th minute - so something is really funky about 10 minutes in 0.31.

@kir-titievsky
Copy link

@sachin-shetty Sachin, 10 minutes is a magical number in that it's the maximum duration for which you can modifyAckDeadline. To keep the messages longer, you have to repeatedly call modifyAckDeadline. Were you doing that?

@dhermes Is there something that the client library does at 10 min intervals?

@sachin-shetty
Copy link
Author

Yes, I tried modifyAckDeadline on both message and subscription

I have switched to 0.27 which is working well for me.

@opyate
Copy link
Contributor

opyate commented Feb 1, 2018

@kir-titievsky the pydoc says "This is not an extension", which is quite deceiving if this function can be called to indeed "extend" the deadline.

@kir-titievsky
Copy link

kir-titievsky commented Feb 1, 2018 via email

@lukesneeringer
Copy link
Contributor

lukesneeringer commented Feb 8, 2018

@opyate How would you prefer that to be written? (Disclaimer: I wrote the original statement, and am happy to modify to make it clearer, but I could not think of a clearer-cut way to write it.)

Nevermind. You sent a PR already.

@danoscarmike danoscarmike added priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. and removed priority: p2 Moderately-important priority. Fix may not be included in next release. release blocking Required feature/issue must be fixed prior to next release. labels Feb 8, 2018
@tseaver
Copy link
Contributor

tseaver commented Feb 20, 2018

Given that #4822 is merged, what work remains for this issue?

@theacodes
Copy link
Contributor

I think this is fine to close. We can always re-open or file a new issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p1 Important issue which blocks shipping the next release. Will be fixed prior to next release. triaged for GA type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests