Apache Synapse is a lightweight, high performing Enterprise Service Bus (ESB) that provide an exceptional support for XML, Web services and REST with the help of fast and asynchronous mediation engine. With this improvement Apache Synapse will get the capability of re-sequencing the messages which goes through it according to a given sequence number. In this article I’m going to explain on how to configure the reseqencer and how it behave.
With in Synapse the messages may take different routes due to routers, filters etc. Due to the characteristics of each route the time to process the message will vary. Thus the end result will not the the same as the beginning. Some messages may arrive earlier than others. So this can be disadvantageous in a situation that order of delivering message matters. In order to overcome this issue, resequencing EAI pattern is introduced with a stateful processor called Resequencing Processor.
Implementing Resequencer can be done using by adding a message store and a resequencing message processor. the following diagram shows high-level view of it.
This resequencing processor will take the help of message store and message processor capability of the Synapse which are already presence. A message store is used to keep the messages that should be sent by the Resequencer in an ordered manner. The Resequencing Processor should be pointed to that store in order to retrieve and look the messages available in the store. All the messages that should be sent in a sequence must have a sequence number in a certain location of a message which can be accessible.
This Resequencing is completely responsible for re-ordering messages in a given message store. It can recognize the initial sequence number from the messages already in the store and inject messages to a given sequence in a given rate.
Implementation of the Resequencing Processor is done using the Scheduled Message Processor which is an abstract and gives the capability to implement a processor with scheduled tasks. So inherently the Resequencing Processor uses the Quartz Scheduler as an infrastructure. Other than the facilities inherited from Scheduling Message Processor, Resequencing Processor possess capabilities of deciding initial sequence number according to given configuration, store the state of the processor and so on.
The benefits of the design will serve the Synapse as follows.
- A separate storage is not needed. The resequencing processor can be attached to an in-memory store which is already added to Synapse. In addition to that Resequencer can work with persistence message stores too (which are not currently available in Synapse but can be added).
- The message which is chosen by the Resequencer will be sent to a sequence. Therefore user will have more opportunity to process the message further.
- The Resequencing Processor comes with a special algorithm to identify the starting sequence number. Therefore it can be plugged in to a message store which already have messages and continue re-sequencing.
After selecting an initial sequence number Resequencing Processor checks the given to extract the message which contains required sequence number. Once the message containing required sequence number is found, message is directed to the given sequence for further mediation and required sequence number get increased by one. For searching required message Resequencing Processor currently uses linear search through the messages in the message store. When a message is selected as the required message it should be removed from the message store, therefore it is important to use a message store which supports random removal of messages.
If the user wants to connect to external message store, you just need to configure Store mediator according to that. Resequencing Processor does not required to change. It is very important to select the right type of message store for the purpose, since the Resequencing Processor will not remove messages from the store until it finds the message with required sequence number.
Selecting initial sequence number
Resequencing Processor is designed to decide the starting sequence number automatically. Following logic is used to decide the initial sequence number.
- When starting Synapse Resequencing Processor checks the attached message store for any messages. If any messages found, select the minimum sequence number as the initial sequence number. Else continue without selecting initial sequence number.
- If initial sequence number is selected at the start up, continue sending messages to the given sequence. Otherwise waits for required number of messages to come with in a certain timeout.
- If the required number of messages are received, select initial sequence number from those and do further resequencing. If required number of messages are not received with in timeout, select the initial sequence number from available messages in the store.
Logical flow of selecting initial sequence number is also can be shown using a flow chart as follows.
In the diagram N notates the number of messages that the Resequencing Processor expects after starting without having initial sequence number. P notates the number of attempts that processor should made to check whether that required number of required messages are available in the store. The value of the Delay remains constant since user can configure P value as they want.
Resequencing Processor Configuration
Configuration of the Resequencing Message Processor can be shown as follows.
<messageProcessor class="org.apache.synapse.message.processors.resequence.ResequencingProcessor" name="ResequencingProcessor" messageStore="MyQueue"> <parameter name="interval">10000</parameter> <parameter name="seqNumXpath" xmlns:bank="<a href="http://bank.resequence.synapse.org/">http://bank.resequence.synapse.org</a>" expression="//bank:display/bank:seqNo"/> <parameter name="nextEsbSequence">next_seq</parameter> <parameter name="requiredInitMessages">15</parameter> <parameter name="requiredInitMessagesDelay">2</parameter> </messageProcessor>
In the attribute section of the message processor user should point to the class which the Resequencing Processor is implemented, name of the processor, the message store to use.
Parameter interval is to set the invoke interval in milliseconds (default value is 1000ms). “nextEsbSequence” is to point the next sequence that the selected message should be forwarded next and it is essentially needed. “seqNumXpath” parameter is also an essential parameter and tells the processor about the path to extract sequence number from a message. Expression attribute of “seqNumXpath” parameter says the place to extract. It is important to say that the Message Processor Serializer and Message Processor Factory should be modified in order to use “seqNumXpath”, hence Resequencing Processor too.
“requiredInitMessages” and “requiredInitMessagesDelay” are for the N and P values in flow chart diagram respectively. They have default values 4 and 5 but can be configured as given.
In addition to the given parameters, all the parameters inherited from Scheduled Message Processor can be used here.
Additional Modifications Made
Resequencing Processor needs to use Xpath expressions to extract the sequence number from message context. For that purpose it is required to use a different type of parameter. Currently Synapse supports name-value type parameters only. So I changed the Message Processor Factory and Message Processor Serializer in order to take name-expression parameters and create SynapseXpath objects during runtime. Because of that following type of configuration is also allowed in Message Processors.
<messageProcessor class="org.apache.synapse.message.processors.resequence.ResequencingProcessor" name="Resequencing Processor Name" messageStore="Message Store Name"> <parameter name="param1">value1</parameter> <parameter name="param2">value2</parameter> <parameter name="param3" xmlns:ns="http://url.org" expression="xpath expression"/> </messageProcessor>
This modification will not affect any other configurations and can be used only by parameters inside Message Processor.
Implementing a Message Resequencing Scenario with Apache Synapse
When exchanging messages with enterprise applications, in special cases the order of receiving messages matters critically. For such cases user can use a Resequencing Processor to send messages that has already in a message store to a given sequence in an ordered manner. In this section I’m going to explain a resequencing scenario using an in-memory message store and a message processor.
For the implementation I’m going to use the following flow.
1. An ESB Proxy accepts the message.
2. Message is stored in an in-memory message store.
3. Resequencing Processor consumes the messages in the store and send them to a given sequence in order.
4. That sequence will sends the message to a back-end service.
Required configurations are as follows.
This proxy will take the messages that receives to it and direct them to the insequence. Message mediation insequence terminated after message is caught by the store mediator. Thereafter message get stored inside MyStore message store.
<proxy name="resequencingProxy" transports=”http,https” startOnLoad=”true”> <target> <inSequence> <log level="full"/> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> <property name="OUT_ONLY" value="true"/> <store messageStore="MyStore"/> </inSequence> <outSequence> <send /> </outSequence> </target> </proxy>
Message Store and Resequencing Processor Configuration
According to the configuration Resequencing Processor is attached to the MyStore, an in-memory message store. Currently Synapse supports in-memory message stores only, but can support to other types of message stores which support random removal of messages.
Expression of the Resequencing Message Processor used to extract the sequence number from the messages inside the MyStore message store. The given expression takes the number from the by discarding the characters before hyphen. So according to this expression the symbol must consists with an hyphen and the sequence number after that. (eg: WSO2-5)
<store messageStore="MyStore"/> <messageProcessor name="ResequencingProcessor" messageStore="MyStore"> <parameter name="interval">10000</parameter> <parameter name="seqNumXpath" xmlns:m0="http://services.samples" expression="substring-after(//m0:placeOrder/m0:order/m0:symbol,'-')"/> <parameter name="nextEsbSequence">next_seq</parameter> </messageProcessor>
This sequence is to send messages to endpoint which connects to SimpleStockQuoteService. This service is currently shipped with Synapse.
<sequence name="next_seq"> <send> <endpoint> <address uri="http://localhost:9000/services/SimpleStockQuoteService"> <suspendOnFailure> <errorCodes>-1</errorCodes> <progressionFactor>1.0</progressionFactor> </suspendOnFailure> </address> </endpoint> </send> </sequence>
Executing the sample scenario
Synapse ships with sample client and services. First you need to build the SimpleStockQuoteService using Ant and run the axis2 server. All required files and executables can be found in side samples/axis2Server directory. Thereafter you have to build the client reside inside samples/axis2Client and have the experience of resequencer by sending placeorder requests. It is important to remember that the initial sequence number is get selected according to the timeouts set. Therefore to experience the resequencing process you need to send required messages within the timeout.
After sending requests with various sequence number you can see the requests received in order, not in the order you sent.
ant stockquote -Dtrpurl=http://localhost:8280/services/resequencingProxy -Dmode=placeorder -Dsymbol=WSO2-4
ant stockquote -Dtrpurl=http://localhost:8280/services/resequencingProxy -Dmode=placeorder -Dsymbol=WSO2-1
ant stockquote -Dtrpurl=http://localhost:8280/services/resequencingProxy -Dmode=placeorder -Dsymbol=WSO2-5
ant stockquote -Dtrpurl=http://localhost:8280/services/resequencingProxy -Dmode=placeorder -Dsymbol=WSO2-2
ant stockquote -Dtrpurl=http://localhost:8280/services/resequencingProxy -Dmode=placeorder -Dsymbol=WSO2-3
Received mesages on SimpleStockQouteService side:
Sat Sep 22 08:55:11 IST 2012 samples.services.
SimpleStockQuoteService :: Accepted order #16 for : 15860 stocks of WSO2-1 at $ 183.71120268664384
Sat Sep 22 08:55:28 IST 2012 samples.services.SimpleStockQuoteService :: Accepted order #17 for : 5165 stocks of WSO2-2 at $ 173.03463846494202
Sat Sep 22 08:55:28 IST 2012 samples.services.SimpleStockQuoteService :: Accepted order #18 for : 19535 stocks of WSO2-3 at $ 145.35557152135468
Sat Sep 22 08:55:28 IST 2012 samples.services.SimpleStockQuoteService :: Accepted order #19 for : 17457 stocks of WSO2-4 at $ 145.05789309716536
Sat Sep 22 08:55:28 IST 2012 samples.services.SimpleStockQuoteService :: Accepted order #20 for : 6260 stocks of WSO2-5 at $ 56.49392913339839
Currently the implementation is in Apache Synapse JIRA as an improvement. Link to JIRA : https://issues.apache.org/jira/browse/SYNAPSE-893
Recently this patch has applied in to Synapse trunk, you can use sample703 to see how this works!
 Apache Synapse project – http://synapse.apache.org/
 Implementing store and forward messaging patterns with WSO2 ESB – http://wso2.org/library/articles/2011/10/implementing-store-forward-messaging-patterns-wso2esb-part-1
 Synapse introduction to message stores – http://synapse.apache.org/userguide/samples/sample700.html
 Synapse message sampling processor – http://synapse.apache.org/userguide/samples/sample701.html
 Synapse trunk – http://svn.apache.org/repos/asf/synapse/trunk/