Monday, November 21, 2016

Spring Integration MongoDB adapters with Java DSL

1 Introduction


This post explains how to save and retrieve entities from a MongoDB database using Spring Integration. In order to accomplish that, we are going to configure inbound and outbound MongoDB channel adapters using the Java DSL configuration extension. As an example, we are going to build an application to allow you to write orders to a MongoDB store, and then retrieve them for processing.

The application flow can be split in two parts:

  • New orders are sent to the messaging system, where they will be converted to actual products and then stored to MongoDB.
  • On the other hand, another component is continuously polling the database and processing any new product it finds.

The source code can be found in my Spring Integration repository.


2 MessagingGateway - Entering the messaging system


Our application does not know anything about the messaging system. In fact, it will just create new orders and send them to an interface (OrderService):

Taking an initial look at the configuration, we can see that the OrderService is actually a messaging gateway.

Any order sent to the order method will be introduced to the messaging system as a Message<Order> through the 'sendOrder.input' direct channel.


3 First part - processing orders


The first part of the Spring Integration messaging flow is composed by the following components:



We use a lambda to create an IntegrationFlow definition, which registers a DirectChannel as its input channel. The name of the input channel is resolved as 'beanName + .input'. Hence, the name is the one we specified in the gateway: 'sendOrder.input'

The first thing the flow does when receiving a new order is use a transformer to convert the order into a product. To register a transformer we can use the Transformers factory provided by the DSL API. Here, we have different possibilities. The one I chose is using a PayloadTypeConvertingTransformer, which delegates to a converter the transformation of the payload into an object.

The next step in the orders flow is to store the newly created product to the database. Here, we use a MongoDB outbound adapter:

If you wonder what the message handler is actually doing internally, it uses a mongoTemplate to save the entity:


4 Second part - processing products


In this second part we have another integration flow for processing products:



In order to retrieve previously created products, we have defined an inbound channel adapter which will continuously be polling the MongoDB database:

The MongoDB inbound channel adapter is the one responsible for polling products from the database. We specify the query in the constructor. In this case, we poll one non processed product each time:

The router definition shows how the product is sent to a different service activator method depending on the 'premium' field:

As a service activator, we have a simple bean which logs a message and sets the product as processed. Then, it will return the product so it can be handled by the next endpoint in the flow.

The reason for setting the product as processed is because the next step is to update its status in the database in order to not poll it again. We save it by redirecting the flow to the mongoDb outbound channel adapter again.


5 Conclusion


You have seen what endpoints you do have to use in order to interact with a MongoDB database using Spring Integration. The outbound channel adapter passively saves products to the database, while the inbound channel adapter actively polls the database to retrieve new products.

If you found this post useful, please share it or star my repository. I appreciate it :)

I'm publishing my new posts on Google plus and Twitter. Follow me if you want to be updated with new content.