The target of this post is to show you how errors are handled when using the messaging system with Spring Integration. You will see that error handling is different between synchronous and asynchronous messaging. As usual, I'll skip the chat and proceed with some examples.
You can get the source code at github.
2 The sample application
I will use a basic example, since I want to focus on exception handling. The application consists in an order service, which receives an order, processes it and returns a confirmation.
Below we can see how the messaging system is configured:
The gateway is the entry point of the messaging system. It will receive the order and send it to the direct channel "requestChannel" where a router will redirect it to the appropriate channel based on the order id:
- syncChannel: A direct channel that will send the order to an order processor subscribed to this channel.
- asyncChannel: A queue channel from which the order processor will actively retrieve the order.
Once the order is processed, an order confirmation will be sent back to the gateway. Here is a graphic representing this:
Ok, let's start with the simplest case, synchronous sending using a Direct Channel.
3 Synchronous sending with Direct channel
The order processor is subscribed to the "syncChannel" Direct Channel. The "processOrder" method will be invoked in the sender's thread.
Now, we will implement a test that will provoke an exception by sending an invalid order. This test will send an order to the gateway:
We run the test and see how an exception is raised in the order processor and reaches the test. That's fine; we wanted to validate that sending an invalid order raised an exception. This happened because the test sent the order and blocked waiting for the order to be processed in the same thread. But, what happens when we use an asynchronous channel? Let's continue to the next section.
4 Asynchronous sending with Queue Channel
This section's test sends an order that will be redirected by the router to the queue channel. The gateway is shown below:
Notice that this time the gateway is returning a Future. If we didn't return this, the gateway would block the test thread. By returning Future, the gateway becomes asynchronous and doesn't block the sender's thread.
Ok, so now we are going to launch the test and see the exception raising...
java.lang.AssertionError: Expected exception: org.springframework.integration.MessageHandlingException
Oops the test failed because no exception reached the test! What happened? Well, the explanation is below:
Since we are using an asynchronous channel (a queue), the sender sends the order and moves on. Then, the receiver polls the order from a different thread. For this reason, it won't be possible to throw the Exception back to the sender. Let's act like nothing happened then? Well you better not, there are other options.
5 Asynchronous error handling
When using asynchronous messaging, Spring Integration handles exceptions by publishing them to message channels. The exception thrown will be wrapped into a MessagingException and become the payload of the message.
What channel is the error message sent to? First, it will check if the request message contains a header called "errorChannel". If found, the error message will be sent there. Otherwise, the message will be sent to a so-called global error channel.
5.1 Global error channel
By default, a global error channel called "errorChannel" is created by Spring Integration. This channel is a publish-subscribe channel. This means we can subscribe several endpoints to this channel. In fact, there's already an endpoint subscribed to it: a logging handler.This handler will log the payload of messages arriving to the channel, though it can be configured to behave differently.
We will now subscribe a new handler to this global channel and test that it receives the exception message by storing it into a database.
First of all, we will need to change a few things in our configuration. I've created a new file so it doesn't interfere with our previous tests:
The gateway: I've added an error channel. If the invocation fails, the error message will be sent to this channel. If I hadn't defined an error channel, the gateway would have propagated the exception to the caller, but in this case it wouldn't have worked since this is an asynchronous gateway.
The error handler: I've defined a new endpoint that is subscribed to the global error channel. Now, any error message sent to the global error channel will be delivered to our handler.
I've also added a configuration file in order to configure the database. Our error handler will insert received errors to this database:
The error handler is pretty simple; it receives the error message and inserts its information to the database:
Ok, now is all set. Let's implement a new test:
This time the test is successful, the error message has been stored to the database.
5.2 Other mechanisms
Custom error channel: You can define your error channel and define it as a queue channel instead of the default publish-subscribe channel:
ErrorMessageExceptionTypeRouter: This Spring Integration specialized router will resolve the channel where the error message will be sent. It bases its decision on the most specific cause of the error:
We have learnt what are the different mechanisms for error handling when using Spring Integration. With this base, you will be able to extend it and configure your error handling by implementing transformers to extract the information from the error message, using header enrichers for setting error channel or implementing your own router among other things.
I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.
Labels: Integration, Spring