Keeping RabbitMQ connections alive in PHP

Leo
Mollie
Published in
7 min readNov 16, 2018

--

We have made use of Beanstalk’s services for all of our queueing needs for a few years due to its simple and quick nature. However, as the traffic Mollie handles is continuously increasing, we were recently forced to look into more comprehensive queueing technologies. Eventually we decided to replace Beanstalk with the more feature-complete and highly-available RabbitMQ.

Illustration by Sven Franzen

The refactor from Beanstalk to RabbitMQ started out without major problems. Though we managed to successfully migrate most of our Beanstalk queues to RabbitMQ without any issues, things became problematic when we began converting our payment export daemon.

The payment export daemon

We offer our merchants exports of their transaction information of a given period for their accounting software, which can be downloaded via the Mollie dashboard. The dashboard requests the export via our internal API, which subsequently queues the request in a RabbitMQ queue.

Our PHP-based payment export daemon sleeps in the background, waiting to consume messages on the other end of the RabbitMQ queue. Once it receives a message, the daemon immediately picks up the download request and starts generating the export files.

A single export file can contain anywhere from a couple of payments to hundreds of thousands in the case of our bigger merchants. The export process is quite simple: look up the requested transactions and generate the files in the requested format.

The daemon uses the php-amqp library to set up the connection. Heavily simplified, the PHP setup would look as follows:

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, ...);
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'export_payments');function export_payments(AMQPMessage $message): void
{
// Create the export.
}
while (count($channel->callbacks)) {
$channel->wait();
}

After setting up the connection and declaring the queue, the daemon waits for messages. As soon as a message is received, the library calls the function we have defined as the callback — in this case export_payments() .

Seemingly random ‘invalid frame type’ exceptions

Soon after migrating the payment export daemon from using Beanstalk to RabbitMQ, we noticed that the daemon started crashing intermittently, with the AMQP library php-amqp we used to connect to RabbitMQ issuing a vague ‘invalid frame type’ exception.

Error: Uncaught PhpAmqpLib\Exception\AMQPRuntimeException: Invalid frame type 65 in /some/path/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php:528

After investigating further, we realized the exceptions only occurred after attempts to pick up the next job immediately after successfully running exports bigger than a certain size.

For exports with many transactions, the process would spend a significant amount of time waiting on a large database query, or on the PDF generation in case of PDF exports. Once the job was finished and the worker tried to pick up a new job from the RabbitMQ queue, the dreaded ‘invalid frame type’ exception was thrown.

After reading into similar issues faced by other developers and diving into the internals of the AMQP library, the cause was clear; we had configured the queue to use heartbeats in accordance with RabbitMQ best practices, but simply were not sending enough heartbeats.

RabbitMQ uses so-called ‘heartbeats’ as a keep-alive mechanism with which, in simple terms, the client is expected to send heartbeats to the server informing it that it is still alive and running. The server will drop the connection if the client does not show a sign of life for two consecutive heartbeats, as it will then assume the client is dead. The heartbeat interval can be configured per connection.

However, as PHP is a synchronous language, there is no way for the AMQP library to keep sending heartbeats in the background of a long-running task, let alone throw an exception if a heartbeat is skipped. Instead, the server will silently drop the connection after two missed beats, and the client will only find out once it tries to continue using the queue.

How php-amqp handles heartbeats

So far our theory made sense, but we could not yet explain what the ‘invalid frame type’ error message actually meant. Intrigued, we went further down the Rabbit hole. (Warning: in-depth explanation follows.)

We found that the php-amqp library handles heartbeats through an implementation of the method AbstractIO::check_heartbeat(). This IO method is called by the library every time you use the connection, for example in AMQPChannel::basic_consume() , AMQPChannel::queue_declare() , or AMQPChannel::basic_publish() .

If heartbeats are configured, the check_heartbeat() method determines how much time has passed since the connection has last been used. It then either tries to reconnect automatically if two heartbeats have been skipped, or it will issue a heartbeat if half of the configured heartbeat time has elapsed.

As a result, we would have expected the library to automatically reconnect in our case.

However, there’s a bug in the implementation of check_heartbeat(). If an automatic reconnect is triggered, the server will try to issue an AMQP header to start the usual handshake. The library, however, forgets to perform the handshake when reconnecting. Therefore, once we try to read from the stream again, the first byte we receive is the letter ‘A’ from the ‘AMQP’ header sent by the server. The wait() method expects a frame type byte though and considers the ‘A’ (ASCII value 65) an invalid frame type. Hence the exception: ‘invalid frame type 65’.

This finding confirmed our earlier suspicions. Phew! We shared our findings on GitHub.

Why wasn’t this an issue with Beanstalk?

To seasoned users of RabbitMQ, the above findings are completely obvious. Coming from Beanstalk, however, our notion of heartbeats was different.

Beanstalk relies on a similar, but a much simpler keep-alive mechanism: the TTR (time to resolution). Simply put, each job has a configurable timeout which is used to determine how long Beanstalk should wait on a ‘reserved’ job to be completed before requeueing it.

During execution of a Beanstalk job, a touch can be issued which resets the TTR. The touch represents an indication from the client that it is aware that part of the process might take longer than expected.

Quick solution: just drop the connection

The simplest solution to our RabbitMQ issue was obvious: drop the connection if we do not care for it, only to reopen it once we need it. We would simply call reconnect() on the connection and declare the queue once more before waiting for the next job.

There are caveats to this approach though — not only is it less resource-efficient, but it also only works if we have an ‘at most once delivery’ setup where we do not care if we lose messages.

Better solution: send manual heartbeats

We soon decided it was necessary to improve the reliability of the payment export process, changing it from an ‘at most once delivery’ system into an ‘at least once delivery’ system. We were aiming to leverage RabbitMQ’s delivery acknowledgment feature for this purpose.

Our quick fix would not work with this improved setup. If we wanted to acknowledge the delivery only after finishing the job, we needed to find a way to keep the connection alive and start triggering heartbeats manually during the work that takes place in export_payments().

Our callback function had to change to roughly the following set-up:

function export_payments(AMQPMessage $message): void
{
try {
do_something_that_may_take_a_while();
send_heartbeat();
do_something_else_that_may_take_a_while();
ack_message($message);
} catch (\Exception $e) {
nack_message($message);
}
}

The solution appears simple, but it proved difficult to find and test the right way to send a heartbeat. There was no documentation on triggering manual heartbeats, therefore we were quickly going into undocumented territory.

Calling the aforementioned check_heartbeat() method directly was a good starting point, but the method has its own ruleset to determine whether a heartbeat should be sent. After understanding what this method does and testing it in our testing environment, we discovered heartbeats are only triggered when messages are exchanged through the socket connection with the RabbitMQ server, meaning writes or reads on the socket must occur in order for a check_heartbeat() to work successfully.

With this knowledge we then came up with the following simple, yet effective, solution:

function send_heartbeat() use ($connection): void
{
$connection->getIO()->read(0);
}

By triggering a 0 byte read on the socket, in which nothing is actually consumed from the queue, the check_heartbeat() method is tricked into sending a heartbeat.

Or, just don’t use PHP for asynchronous work

It was not a surprise to us that PHP was not the best language to handle asynchronous jobs with. However, given that this specific part of our application was already built in PHP, and given the maturity of the application’s infrastructure, we decided to take the risk.

Although we eventually solved the issue, we would still recommend keeping away from implementing PHP-based queue listeners. If you do decide to venture down that road though, then perhaps triggering manual heartbeats might be the stupidly simple solution you are looking for.

More recently, we discovered a CLI-based RabbitMQ consumer that would have prevented the issues described in this post. We are excited to try out that solution for the next queue we introduce.

Edit (7–11–2018): A small bug was introduced in v2.8.0 of the php-amqp package that prevents the read(0) call from sending manual heartbeats. This was quickly noticed by the community and the contributors of the package are in the process reverting the changes for releases v2.8.1 and onwards.

We’re always looking for self-driven and intuitive talents who appreciate the art of technology and can help us with shipping disruptive payment products.

Check out our current job openings.
Read our other articles here.
Follow me on
Github.
Sign up at
our website, or find us on Twitter.

--

--

Someone happy to share what he knows, and learn what he doesn’t