Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing Connection Retry after Connection Loss #35

Open
severindellsperger opened this issue Mar 12, 2021 · 5 comments
Open

Missing Connection Retry after Connection Loss #35

severindellsperger opened this issue Mar 12, 2021 · 5 comments

Comments

@severindellsperger
Copy link

We have an HA RabbitMQ cluster, where we testing the HA functionality.
Our application structure is like following:

  • Several consumers, which consume messages from a defined exchange (we have to use a fan out a strategy because there are several identical distributed consumers):
    Consumer Implementation
    async def handle_message(self, message: bytes) -> None:
        # Here I process the messages...
        await get_channel_layer().group_send(
            "websocket", {"type": "websocket.message", "message": json.loads(message)}
        )

    def _random_letters(self, n: int) -> str:
        return "".join(random.choice(string.ascii_letters) for i in range(n))

    async def process_messages(self):
        channel_layer = get_channel_layer()
        carehare_connection = await channel_layer.carehare_connection
        self.queue_name = f"changes_{self._random_letters(12)}"
        await carehare_connection.exchange_declare(
            exchange_name=self.exchange, exchange_type="fanout"
        )
        await carehare_connection.queue_declare(
            queue_name=self.queue_name,
            durable=True,
            arguments={"x-queue-type": "quorum", "x-expires": 5},
        )
        await carehare_connection.queue_bind(
            exchange_name=self.exchange, queue_name=self.queue_name
        )
        self.logger.info(f"Connected to queue {self.queue_name}: ")
        async with carehare_connection.acking_consumer(self.queue_name) as consumer:
            async for message in consumer:
                await self.handle_message(message)

    def handle(self, *args, **options):
        self.exchange = options.get("exchange") or "test"
        asyncio.run(self.process_messages())

  • WebSocket Consumer, which receive messages from the consumers:
  • Django Channels Websocket Consumer Implementation
class UpdateTopologyConsumer(JsonWebsocketConsumer):
    def connect(self):
        async_to_sync(self.channel_layer.group_add)("websocket", self.channel_name)
        self.accept()

    def websocket_message(self, message):
        print(message, flush=True)
        self.send_json(message)

    def disconnect(self, close_code):
        self.close()

Now, we face the problem, that if one RabbitMQ node in the cluster goes down, the application breaks:
Consumer Error

backend> python manage.py listenonupdates changes -v 3
2021-03-12 12:31:19,713 | INFO | Connect to RabbitMQ and subscribe to exchange: changes
2021-03-12 12:31:19,888 | INFO | Connected to queue changes_PHnjRResdlDD: 
2021-03-12 12:33:08,772 | INFO | Message received: 
2021-03-12 12:33:08,772 | INFO | b'{"bla": "bla bla"}'
Disconnected from RabbitMQ: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode. Will reconnect.
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/core.py", line 263, in _reconnect_forever
    await connection.closed
carehare._exceptions.ConnectionClosedByServer: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode
Closing consumer
Traceback (most recent call last):
  File "/usr/src/app/backend/updatetopology/management/commands/listenonupdates.py", line 68, in process_messages
    async for message in consumer:
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
    message, self._yielded_delivery_tag = await _next_delivery(
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
    closed.result()  # raise exception if there is one
  File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/reader.py", line 37, in consume_into_multi_queue_until_connection_close
    multi_queue.put_nowait(
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
    await self.closed
  File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/reader.py", line 32, in consume_into_multi_queue_until_connection_close
    body, delivery_tag = await consumer.next_delivery()
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 196, in next_delivery
    return await _next_delivery(self._queue, self.closed)
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
    closed.result()  # raise exception if there is one
carehare._exceptions.ConnectionClosed
Traceback (most recent call last):
  File "/usr/src/app/backend/manage.py", line 22, in <module>
    main()
  File "/usr/src/app/backend/manage.py", line 18, in main
    execute_from_command_line(sys.argv)
  File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 401, in execute_from_command_line
    utility.execute()
  File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 395, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 330, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 371, in execute
    output = self.handle(*args, **options)
  File "/usr/src/app/backend/updatetopology/management/commands/listenonupdates.py", line 77, in handle
    asyncio.run(self.process_messages())
  File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/usr/src/app/backend/updatetopology/management/commands/listenonupdates.py", line 69, in process_messages
    await self.handle_message(message)
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
    await self.closed
  File "/usr/src/app/backend/updatetopology/management/commands/listenonupdates.py", line 68, in process_messages
    async for message in consumer:
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
    message, self._yielded_delivery_tag = await _next_delivery(
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
    closed.result()  # raise exception if there is one
  File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/reader.py", line 37, in consume_into_multi_queue_until_connection_close
    multi_queue.put_nowait(
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
    await self.closed
  File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/reader.py", line 32, in consume_into_multi_queue_until_connection_close
    body, delivery_tag = await consumer.next_delivery()
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 196, in next_delivery
    return await _next_delivery(self._queue, self.closed)
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
    closed.result()  # raise exception if there is one
carehare._exceptions.ConnectionClosed

In the Django Channels consumer, we most possibly have the same problem:
Django Channels Consumer error

Disconnected from RabbitMQ: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode. Will reconnect.
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/core.py", line 263, in _reconnect_forever
    await connection.closed
carehare._exceptions.ConnectionClosedByServer: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode

We like to reconnect automatically after a connection loss, how do you solve this problem?

@severindellsperger
Copy link
Author

Update: I saw that the Django Channels consumer recovers from the error.

Disconnected from RabbitMQ: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode. Will reconnect.
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/core.py", line 263, in _reconnect_forever
    await connection.closed
carehare._exceptions.ConnectionClosedByServer: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode
{'type': 'websocket.message', 'message': {'bla': 'bla'''}}
{'type': 'websocket.message', 'message': {'bla': 'bla'''}}
{'type': 'websocket.message', 'message': {'bla': 'bla'''}}

So it seems the problem is just with the own-written carehare connection...

@adamhooper
Copy link
Contributor

adamhooper commented Mar 12, 2021

Good question. Indeed, I got as frustrated as you about this very problem. That's why I wrote carehare in the first place.

The very-simplest logic is to ensure the whole app dies when the RabbitMQ connection fails. That way, in production, your service can spin up again and resume where it left off. At CJWorkbench, our background services do this.

You're asking for a way to reconnect your RabbitMQ consumer's carehare connection without crashing your app. I can think of two solid approaches.

Approach 1: a separate carehare connection for the consumer. Some pieces:

async def connect_with_retry(
    url: str,
    *,
    retries: int = 10,
    backoff_delay: float = 2.0,
    connect_timeout: float = 10.0,
    stop_retrying: Optional[asyncio.Future[None]] = None,  # cancellation logic
) -> carehare.Connection:
    """Connect to RabbitMQ, retrying if needed and failing in case of disaster.

    The caller should eventually await `retval.close()`.

    The caller can pass a `stop_retrying` Future. Then, if the caller calls its
    `.set_result(None)`, connection failures will be reported much more quickly.
    """
    next_delay = 0.0
    for retry in range(retries):
        try:
            connection = carehare.Connection(url, connect_timeout=connect_timeout)
            await connection.connect()
            return connection
        except (ConnectionError, asyncio.TimeoutError) as err:
            if retry >= retries - 1 or (
                stop_retrying is not None and stop_retrying.done()
            ):
                raise
            else:
                logger.warn(
                    "Failed to connect to RabbitMQ (%s); retrying in %fs",
                    str(err),
                    next_delay,
                )
                await asyncio.sleep(next_delay)  # TODO stop_retrying short-circuit
                next_delay += backoff_delay

async def process_messages_with_reconnect(stop: asyncio.Future[None]) -> None:
    while not stop.done():
        logger.info("Connecting to RabbitMQ")
        connection = await connect_with_retry(
            settings.RABBITMQ_HOST, stop_retrying=stop
        )  # raises ConnectionError, asyncio.TimeoutError; these should crash our program

        try:
            await process_messages(connection)  # TODO pass `stop` for a shutdown mechanism
        except (
            ConnectionError,
            carehare.ConnectionClosedByServer,
            carehare.ConnectionClosedByHeartbeatMonitor,
        ) as err:
            logger.exception("Abnormal disconnect from RabbitMQ: %s; reconnecting", str(err))
        finally:
            try:
                await connection.close()
            except Exception as err:
                logger.exception("Ignoring error cleaning up RabbitMQ connection: %s", str(err))

You can use your existing process_messages() here, except it must accept carehare_connection as an argument.

With this strategy, process_messages_with_reconnect() will handle one RabbitMQ connection, and channels_rabbitmq will handle another. Each will reconnect when needed. They might even connect to different servers. This should be fine in many (or all?) use cases.

We use this on our production Django servers, in our queueing code. Job queues use a global-variable carehare connection, and Channels use channels_rabbitmq.

Another idea -- untested -- is to piggy-back on channels_rabbitmq's retry mechanism. Something like this:

async def process_messages_with_reconnect() -> None:
    while True:
        connection = await get_channel_layer().carehare_connection  # the latest connection -- or raise
        happy_loop = asyncio.create_task(process_messages(connection))
        await asyncio.wait({connection.closed, happy_loop}, return_when=asyncio.ALL_COMPLETED)
        try:
            happy_loop.result()
        except (
            ConnectionError,
            carehare.ConnectionClosedByServer,
            carehare.ConnectionClosedByHeartbeatMonitor,
        ) as err:
            logger.exception("Abnormal disconnect from RabbitMQ: %s; reconnecting", str(err))
        # there might need to be a small pause here; not sure.

Here's the idea: channels_rabbitmq already runs a task akin to process_messages_with_reconnect() above. When that task detects a disconnect, it replaces the layer's .carehare_connection with a fresh Future. The idea is await get_channel_layer().carehare_connection should never give a stale connection. (There may be a small race, though, in which case you'll want to await asyncio.sleep(0) or some-such before looping. I'm not sure: I haven't tested.)

Neither solution brings me joy.

Django Channels is heavily influenced by its (flawed) Redis strategy. Its patterns are questionable. For us RabbitMQ users, the only value it provides is de-duplicating group_send() messages so RabbitMQ only sends only message per Django server instead of one message per Websockets connection. I always feel like I'm fighting Channels every step of the way. (For instance, Daphne leaks group subscriptions by default when connection-close logic is slow.)

@severindellsperger
Copy link
Author

We decided to use the first approach.
First I had the following error:

# Here I restarted the RabbitMQ master node
Closing consumer
Traceback (most recent call last):
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 53, in process_messages
    async for message in consumer:
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
    message, self._yielded_delivery_tag = await _next_delivery(
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
    closed.result()  # raise exception if there is one
carehare._exceptions.ConnectionClosed
2021-03-24 10:54:14,012 | ERROR | Ignoring error cleaning up RabbitMQ connection: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode
Traceback (most recent call last):
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 101, in process_messages_with_reconnect
    await self.process_messages(carehare_connection)
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 54, in process_messages
    await self.handle_message(message)
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
    await self.closed
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 53, in process_messages
    async for message in consumer:
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
    message, self._yielded_delivery_tag = await _next_delivery(
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
    closed.result()  # raise exception if there is one
carehare._exceptions.ConnectionClosed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 113, in process_messages_with_reconnect
    await carehare_connection.close()
  File "/usr/local/lib/python3.9/site-packages/carehare/_connection.py", line 89, in close
    await self.closed
carehare._exceptions.ConnectionClosedByServer: RabbitMQ closed the connection: 320 CONNECTION_FORCED - Node was put into maintenance mode
Closing consumer
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 101, in process_messages_with_reconnect
    await self.process_messages(carehare_connection)
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 54, in process_messages
    await self.handle_message(message)
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
    await self.closed
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 53, in process_messages
    async for message in consumer:
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
    message, self._yielded_delivery_tag = await _next_delivery(
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
    closed.result()  # raise exception if there is one
carehare._exceptions.ConnectionClosed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/channels_rabbitmq/reader.py", line 32, in consume_into_multi_queue_until_connection_close
    body, delivery_tag = await consumer.next_delivery()
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 196, in next_delivery
    return await _next_delivery(self._queue, self.closed)
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 39, in _next_delivery
    done, pending = await asyncio.wait(
  File "/usr/local/lib/python3.9/asyncio/tasks.py", line 413, in wait
    return await _wait(fs, timeout, return_when, loop)
  File "/usr/local/lib/python3.9/asyncio/tasks.py", line 529, in _wait
    await waiter
asyncio.exceptions.CancelledError
Traceback (most recent call last):
  File "/usr/src/app/manage.py", line 22, in <module>
    main()
  File "/usr/src/app/manage.py", line 18, in main
    execute_from_command_line(sys.argv)
  File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 401, in execute_from_command_line
    utility.execute()
  File "/usr/local/lib/python3.9/site-packages/django/core/management/__init__.py", line 395, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 330, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/usr/local/lib/python3.9/site-packages/django/core/management/base.py", line 371, in execute
    output = self.handle(*args, **options)
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 122, in handle
    asyncio.run(self.process_messages_with_reconnect(asyncio.Future()))
  File "/usr/local/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 101, in process_messages_with_reconnect
    await self.process_messages(carehare_connection)
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 54, in process_messages
    await self.handle_message(message)
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 223, in __aexit__
    await self.closed
  File "/usr/src/app/updatehandler/management/commands/handle_polling_messages.py", line 53, in process_messages
    async for message in consumer:
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 74, in __anext__
    message, self._yielded_delivery_tag = await _next_delivery(
  File "/usr/local/lib/python3.9/site-packages/carehare/_consume_channel.py", line 50, in _next_delivery
    closed.result()  # raise exception if there is one
carehare._exceptions.ConnectionClosed

I simply added the carehare.ConnectionClosed to the except statement:

async def process_messages_with_reconnect(self, stop: asyncio.Future[None]) -> None:
        while not stop.done():
            self.logger.info(
                f"Connect to RabbitMQ and subscribe to queue: {self.queue_name}"
            )
            carehare_connection = await self.connect_with_retry(stop_retrying=stop)
            # raises ConnectionError, asyncio.TimeoutError; these should crash our program
            try:
                await self.process_messages(carehare_connection)
            except (
                ConnectionError,
                carehare.ConnectionClosedByServer,
                carehare.ConnectionClosedByHeartbeatMonitor,
                carehare.ConnectionClosed,
            ) as err:
                self.logger.exception(
                    "Abnormal disconnect from RabbitMQ: %s; reconnecting", str(err)
                )
            finally:
                try:
                    await carehare_connection.close()
                except Exception as err:
                    self.logger.exception(
                        "Ignoring error cleaning up RabbitMQ connection: %s", str(err)
                    )

Afterward, it worked with automatic retries 👍
Thanks for your help.

@severindellsperger
Copy link
Author

PS: One question - is it fine to call the method with a new asyncio.Future()?

asyncio.run(self.process_messages_with_reconnect(asyncio.Future()))

In which case does stop.done() become effective?

@adamhooper
Copy link
Contributor

adamhooper commented Mar 24, 2021

D'oh, you indeed made the right fix.

self.process_messages() actually won't raise ConnectionError, ConnectionClosedByServer or ConnectionClosedByHeartbeatMonitor. It will only raise ConnectionClosed (which you should catch! -- and you did, thus fixing your problem) and ChannelClosedByServer (which you probably shouldn't catch -- it usually indicates an error message from RabbitMQ that you wouldn't expect).

As for the "stop" future: you can indeed pass a new Future ... or you can nix stop out of all my code. stop isn't for the problem at hand; it solves another problem you'll run into someday. Sooner or later you'll probably want graceful shutdowns. The Channels-recommended server, Daphne, has no mechanism for it: killing the web server disconnects everybody. Uvicorn implements "ASGI lifespan". The gist: your lifespan can supply a done and set stop.set_result(None) and then await the result of connect_with_retry(). It's the trickiest piece of the tricky problem of maintaining servers (and deploying) without breaking your users' experience.

But stop has nothing to do with reconnects. I wrote it because I'm a pedant: I wouldn't write a function that never returns :). If you don't want to implement shutdown, you don't need stop; it has no relation to this bug report.

Thank you for all your enthusiasm. I'm glad the restarts are working for you now.

I'll close this issue after I copy/paste some of this code into the README.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants