Managing SQS Consumers in a NodeJS Application

In my last post, I tried to summarize how Job Distribution can be used to off-load the main application, and less-important process can be queued up.

Managing this off-loaded task requires extra caring. For example — If sending an email fails, it should be retried. With a REST endpoint, you immediately get a 5xx or 2xx or 4xx response. Similarly for a Queue implementation it’s either a fail or success, which should be handled in a smooth manner.

A success is an Acknowledgement event, which let the Queue Broker know that message has been processed. A failure is Not Acknowledgement, which signifies that there was an error while processing the event, and event message should not be removed from the Queue. Different Queue Implementation have different Terminology. Reference of ACK and NACK is taken from Stomp Protocol specification. In case of SQS, application need to call `deleteMessage` with a reference string received when getting the message. Not calling this method is similar to a failure, and message will probably be re-pushed to Queue (keep reading to understand the use of ‘probably’ word here)

So far, we’ve figured out how to handle failures for an event, we’ll mark it as failed, and it will be retried. Now next problem is an infinite loop of retries. Assume that you’ve pushed a commit recently which is causing an issue under a very specific condition, where event will never be successful. So, it will keep retrying it-self. This can handled by Re-drive Policy of SQS. Most Queue Broker have similar configuration. Under the hood, Message Broker maintains the counter of how many time a message has been received by any consumer. You can define a threshold value, after which it will not be moved to the Queue, but rather a Dead Queue.

From AWS-SQS dashboard, you can define number of times a message should be delivered to the original queue and the name of target Dead Queue. If the next receive count exceeds the threshold, the message will be moved to the Dead Queue. It’s a good practice, to create a target queue when you create any SQS Queue with specific number of retry. Now if a particular event is causing trouble, that will be safely moved to somewhere else. We can analyze this event by check dead queue’s message, and handle the scenario in our application accordingly (e.g Fix the code, revert it, add more validations)

Now next problem comes — I’ve too many messages in Dead Queue, which should have been a success (This is the original motive of this post). Being in similar situation in early stage of programming career, I’ve concluded that an Error Reporting tool is a must’ve in any application. I’ve been using Bugsnag for a while, and it’s pretty straight-forward in terms of setting account and projects. I’m biased here, since I’ve not used any other Error Monitoring tool (except checking error in application logs).

With a queue consumer, what I wanted to achieve is —

  1. Any log statements generated for processing a single queue event, should be filterable easily.
  2. Any error notification, should be pushed to Error Tool.
  3. I wanted to write 1-line code, to start watching the message.

At very first stage, when I used implemented SQS first time, I was doing a `getMessage` and `deleteMessage` call in following manner —

This code was a living hell, and soon I realized that something is terribly wrong. There were multiple issues, `receiveMessage` is called before service process the event, no proper-error handling. Unhandled promise rejections.

Later, I stumbled upon this awesome sqs-consumer library. Pretty simple to use, with all the knit-knat of `receiveMessage` and `deleteMessage` wrapped to a `handler` and `done` callback (Usage)

Now next thing I wanted to do is to integrate Bugsnag with this, and a child logger for each event. Here is what I ended up with —

Soon I realized having multiple consumers in application, the complete code is duplicated for every consumer. If I want to do a small change, I had to changed everywhere. And yes!, changes were happening, and I often missed making changes here or there.

Now coming to the actual point, I wanted to have a 1-line consumer code. Here is what I ended up with —

consumer.js is almost one liner. With Error Notification, AWS Attributes parsing absraction, Time Tracking, Log generator stuffed into the queue helper file. Further scope of improvement, I could figure is —

  1. Push ProcessingTime metrics to CloudWatch, against the queueName
  2. SIGINT termination handling is attached for all the consumers, there should be only 1 such handler, which drain all the consumers.
  3. If Error message is not a valid body, it shouldn’t be moved to a Dead Queue. In short 5xx and 4xx error should be differentiated. 4xx shouldn’t be retried.

Note [May-21] — This article was written for sqs-consumer 3.8.0

Developer (Plus multiple other stuff) @ Masmic (Knowledge Sharing Incentivized) & KoinOK (Crypto Exchange) | Ex — LimeTray | TV Buff |