Auto Error Retry a.k.a autoerroretry

One of the most commonly recurring pattern in developing data processing pipelines is the necessity to retry transient failures and a need to gracefully handle bad data. Given that this is a recurring pattern. I recently authored an open-source library autoerrorretry ( https://github.com/dipayan90/autoerroretry )  that provides a neat solution to this need.

The solution provides clean intuitive interface to achieve retrying of business logic  which is totally customizable along with a retry functionality whose frequency is user driven. More over this solution is completely asynchronous and the retry logic is handled by a background daemon.

The solution comes out of the box to support Amazon SQS, Apache Kafka, Square Tape ( a file based queuing mechanism ) . The solution also provides easy interfaces that helps abstract out the core problem and implement custom solutions.

To start with, you can get this library which is a java jar using maven:

<dependency>
 <groupId>com.nordstrom.ds</groupId>
 <artifactId>autoerroretry</artifactId>
 <version>1.3</version>
 <type>pom</type>
</dependency>

You can also use IVY and Gradle for the same.

Now that you have the library, how do you use it. Well, actually its pretty simple:

All the objects are serialized and de-serialized using a String serializer. The library comes out of the box with converters to help you out with this. ObjectStringConverter helps you to convert any given java object to a String.

Example:

Transaction transaction = new Transaction();
        transaction.setDate("12/27/1990");
        transaction.setTransactionId(1);
        transaction.setValue(100);
        
 String s = converter.toString(transaction);
  
 List<String> messageList = Collections.singletonList(s);

Now that we have converted it to a string, all the bad data that needs a second look can be moved to a deadletter queue or an error queue using:

LoaderHandlerClient client = new LoaderHandlerClient();
client.publishErrors(new PublishErrorMessageRequest. PublishErrorMessageRequestBuilder() 
.withMessages(messageList) .withMessageBroker(MessageBroker.SQS) 
.withSqsUrl(queueUrl)
.build());

As you can see the mandatory parameters are your objects that have bad data issues, an indication as to what type of backend message broker you want to use, in this case since we are using SQS we have to provide the global SQS url with it.

Retries have a very similar interface as errors, all you have to do is call publishRetries method instead of publishErrors.

For a kafka based backend you would use:

client.publishRetries(new PublishErrorMessageRequest.
                PublishErrorMessageRequestBuilder()
                .withMessages(messageList)
                .withMessageBroker(MessageBroker.KAFKA)
                .withKafkaRetries(0)
                .withKafkaServers(Collections.singletonList("localhost:9092"))
                .withKafkaTopic("retry")
                .withOrderGuarentee(true)
                .build());

Similarly for a simple file based backed you can use:

client.publishRetries(new PublishErrorMessageRequest.
                PublishErrorMessageRequestBuilder()
                .withMessages(messageList)
                .withMessageBroker(MessageBroker.TAPE)
                .withTapeFileName("tape.txt")
                .build());

Good use cases of retries would be transient database unavailability, I/O errors due to network issues, failure of preceding steps in a chain of steps and so on. The best way to do see what to retry would be to cat exceptions on your processing functions and call this retry routine or get a list of failed messages as a result from your processing functions and adding them to the queue. The process of sending retriable messages in non blocking and is handled by a background thread which will not stop your chain of execution.

Handling retirable objects

Now that we know what to retry and how to publish retriable messages, lets see how do we handle retriable messages. Turns out that even that is quite simple. All you got to do is pass your business logic that needs to be retired on the objects.

A simple example would be:

client.recieveRetires(new ReceiveErrorMessageRequest
                .ReceiveErrorMessageRequestBuilder()
                .withSqsUrl(queueUrl)
                .withPingInterval(5)
                .build(), strings -> {
            strings.forEach(e -> { System.out.println(converter.fromString(e)); });
            return null;
        });

Please not that the input function to this method takes a list of strings that first needs to be converted to the native object. ObjectStringConverter to the rescue, which has a small utility method to convert Strings back to your object. Whatever business logic/function that you pass as an argument to the method call will be executed on your native object. In the example above, we are passing a function that just prints stuff. We can use this to save our native objects to a database, send it to another stream based system and so on. Again this is also non blocking and is processed by a background daemon. Please take a look at readme for more details on how other message brokers like kafka and tape are supported.

Please note , if the function execution fails on the retriable objects you can either discard the message by sending it to the erro queues or push is back to be retired again.

Hope this small light weight library helps solve some of your commonly recurring issues. Let me know your comments on this. Persistence is the key 🙂 .