JSON Split Aggregate with WSO2 ESB

Introduction

Split-Aggregate (Scatter-Gather) is a common messaging pattern [1] use in enterprise world. In split-aggregate pattern, client’s request sends to multiple endpoint simultaneously. Responses from multiple endpoints aggregated and sends back as a single response to the client. There are plenty of use-cases you will find that this scenario plays when you try to integrate enterprise systems.

scattergather

WSO2 ESB (currently a part of WSO2 EI), is a famous middleware which is used to integrate enterprise system. It is also famous for its comprehensive middleware stack which comprises all the functionalities you need to integrate enterprise systems. WSO2 ESB provides an in-built set of mediators for you to achieve this commonly using Split-Aggregate pattern. They are Iterate Mediator, Clone Mediator and Aggregate Mediator. You will find a sample use-case of using those mediators in this documentation [2].

Existing Problem

Existing mediators provides a good support for Split-Aggregate scenarios when you are working with XML payloads. However current trend is more towards using JSON payloads for message exchanges. Although those existing mediators still can be used with JSON payloads, they do not provide a convenient support. Because of that, when using the existing mediators, you need to map your JSON payload to XML payload. This conversion most of the time adds extra burden to the mediation logic.

In this post I am discussing about two mediators which are optimized for JSON payload handling in Split-Aggregate scenarios. They are Json Iterate Mediator and Json Aggregate Mediator. Those mediators handle JSON payloads in its own way and do not convert to XML (native JSON support). Please note that these mediators do not come with WSO2 ESB out-of-box. You can find the relevant documentation at this location [3]

Configuring Mediators

To use these mediators, you need to build the sourcecode at here [3] and get the resultant jar files. Put the Json (Iterate/Aggregate) Mediator-1.0.0.jar file to ESB_HOME/repository/components/dropins folder. Along with those, add json-path-2.1.0.jar [4], json-smart-2.2.jar [5] and accessors-smart-1.1.jar [9] to the same location. Then start WSO2 ESB (sh bin/wso2server.sh)

Sample Scenario

Once you add those artifacts to ESB, you can refer two new mediators similar to inbuilt mediators. The respective xml tags are <jsonIterate> and <jsonAggregate>. In this post I’m showing a sample configuration of using those mediators and describing it briefly. The same scenario is discussed more descriptive manner at here [3]

<api xmlns="http://ws.apache.org/ns/synapse" name="sampleApi" context="/sample">
<resource methods="POST" uri-template="/*">
<inSequence>
    <log level="full"/>
    <jsonIterate continueParent="false" preservePayload="true" expression="$.messages" attachPath="$.messages">
        <target>
            <sequence>
                <log level="full"/>
                <header name="To" value="http://www.mocky.io/v2/58d6459b100000e601949cb7"/>
                <call/>
            </sequence>
        </target>
    </jsonIterate>
    <log level="full"/>
    <jsonAggregate>
        <completeCondition>
            <messageCount min="-1" max="-1"/>
        </completeCondition>
        <onComplete expression="$.message" enclosingElementProperty="responses">
            <log level="full"/>
            <send/>
        </onComplete>
    </jsonAggregate>
</inSequence>
</resource>
</api>

 

The above example shows how to do split-aggregate on a message receiving to an API. You can send the following request payload to the API create by ESB at http://localhost:8280/sample

curl -X POST \

http://localhost:8280/sample \

-H ‘content-type: application/json’ \

-d ‘{“originator”:”my-company”,”messages”:[{“country”:”Sri Lanka”,”code”:”94″},{“country”:”America”,”code”:”01″},{“country”:”Australia”,”code”:”61″}]}’

The expression at the jsonIterate mediator takes the responsibility of deciding where to split the message payload. It should be written as a JSON Path [6]. Within the sequence inside jsonIterate mediator, you will find splitted message payloads. They are sending to the backend URL given in the config. You can refer additional configurations relate to JSON Iterate mediator at here [7]

Each response comes into the jsonAggregate mediator. At the expression of JSON Aggregate mediator, you need to specify which part of the response should be taken for aggregation. This expression is again a JSONPath expression. Once it satisfies completion condition, aggregated message comes in to the onComplete sequence. You can do further processing on the message inside onComplete sequence. If you are more interested, you can look into documentation [8] which gives a complete guide on configuring JSON Aggregate mediator.

Conclusion

Split-Aggregate is a very common message exchanging pattern in enterprise world. JSON is becoming more popular message format across enterprise world too. However WSO2 ESB has lack of support to JSON message exchanging with split-aggregate scenarios. To cater that requirement I have built two custom mediators which makes life easier. Those two mediators can be configured to do split-aggregate with JSON payloads without converting to XML (native JSON support).

References

[1] EIP patterns reference : http://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html

[2] WSO2 ESB Doc : https://docs.wso2.com/display/ESB500/Split-Aggregate+Pattern

[3] GitHub repository : https://github.com/Buddhima/Json-EIP-Mediators

[4] json-path-2.1.0 : https://mvnrepository.com/artifact/com.jayway.jsonpath/json-path/2.1.0

[5] json-smart-2.2.1 : https://mvnrepository.com/artifact/net.minidev/json-smart/2.2.1

[6] JSON Path documentation : https://github.com/jayway/JsonPath/blob/json-path-2.1.0/README.md

[7] JSON Iterate Mediator documentation : https://github.com/Buddhima/Json-EIP-Mediators/tree/master/JsonIterateMediator

[8] JSON Aggregate Mediator documentation : https://github.com/Buddhima/Json-EIP-Mediators/tree/master/JsonAggregateMediator

[9] accessors-smart-1.1.jar : https://mvnrepository.com/artifact/net.minidev/accessors-smart/1.1

Advertisements

10 thoughts on “JSON Split Aggregate with WSO2 ESB

  1. Hey Buddhima,

    I am looking to use what you have done here which is great.

    Unfortunately I cannot get it to work as in above. I`m using WSO2 ESB 5.0.0.
    Your help and assistance will be greatly appreciated.

    I have copied the following jars to dropin folder:
    * json-aggregate-mediator-1.0.0.jar
    * json-iterate-mediator-1.0.0.jar
    * json-path-2.1.0.jar
    * json-smart-2.2.1.jar

    When creating the API on the console, I get the following error:

    “net/minidev/json/writer/JsonReaderI”

    From the carbon logs it seams like an additional JAR is required.

    In the carbon logs:

    [2017-11-22 09:17:48,253] ERROR – RPCMessageReceiver net/minidev/json/writer/JsonReaderI
    java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.axis2.rpc.receivers.RPCUtil.invokeServiceClass(RPCUtil.java:212)
    at org.apache.axis2.rpc.receivers.RPCMessageReceiver.invokeBusinessLogic(RPCMessageReceiver.java:117)
    at org.apache.axis2.receivers.AbstractInOutMessageReceiver.invokeBusinessLogic(AbstractInOutMessageReceiver.java:40)
    at org.apache.axis2.receivers.AbstractMessageReceiver.receive(AbstractMessageReceiver.java:110)
    at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
    at org.apache.axis2.transport.local.LocalTransportReceiver.processMessage(LocalTransportReceiver.java:169)
    at org.apache.axis2.transport.local.LocalTransportReceiver.processMessage(LocalTransportReceiver.java:82)
    at org.wso2.carbon.core.transports.local.CarbonLocalTransportSender.finalizeSendWithToAddress(CarbonLocalTransportSender.java:45)
    at org.apache.axis2.transport.local.LocalTransportSender.invoke(LocalTransportSender.java:77)
    at org.apache.axis2.engine.AxisEngine.send(AxisEngine.java:442)
    at org.apache.axis2.description.OutInAxisOperationClient.send(OutInAxisOperation.java:430)
    at org.apache.axis2.description.OutInAxisOperationClient.executeImpl(OutInAxisOperation.java:225)
    at org.apache.axis2.client.OperationClient.execute(OperationClient.java:149)
    at org.wso2.carbon.rest.api.stub.RestApiAdminStub.addApiFromString(RestApiAdminStub.java:3195)
    at org.wso2.carbon.rest.api.ui.client.RestApiAdminClient.addApiFromString(RestApiAdminClient.java:220)
    at org.apache.jsp.api.savesource_002dajaxprocessor_jsp._jspService(savesource_002dajaxprocessor_jsp.java:202)
    at org.apache.jasper.runtime.HttpJspBase.service(HttpJspBase.java:70)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
    at org.apache.jasper.servlet.JspServletWrapper.service(JspServletWrapper.java:439)
    at org.apache.jasper.servlet.JspServlet.serviceJspFile(JspServlet.java:395)
    at org.apache.jasper.servlet.JspServlet.service(JspServlet.java:339)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
    at org.wso2.carbon.ui.JspServlet.service(JspServlet.java:155)
    at org.wso2.carbon.ui.TilesJspServlet.service(TilesJspServlet.java:80)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
    at org.eclipse.equinox.http.helper.ContextPathServletAdaptor.service(ContextPathServletAdaptor.java:37)
    at org.eclipse.equinox.http.servlet.internal.ServletRegistration.service(ServletRegistration.java:61)
    at org.eclipse.equinox.http.servlet.internal.ProxyServlet.processAlias(ProxyServlet.java:128)
    at org.eclipse.equinox.http.servlet.internal.ProxyServlet.service(ProxyServlet.java:68)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
    at org.wso2.carbon.tomcat.ext.servlet.DelegationServlet.service(DelegationServlet.java:68)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.owasp.csrfguard.CsrfGuardFilter.doFilter(CsrfGuardFilter.java:88)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.ui.filters.CSRFPreventionFilter.doFilter(CSRFPreventionFilter.java:88)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.ui.filters.CRLFPreventionFilter.doFilter(CRLFPreventionFilter.java:59)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.wso2.carbon.tomcat.ext.filter.CharacterSetFilter.doFilter(CharacterSetFilter.java:61)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.catalina.filters.HttpHeaderSecurityFilter.doFilter(HttpHeaderSecurityFilter.java:120)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:505)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:169)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
    at org.wso2.carbon.tomcat.ext.valves.CompositeValve.continueInvocation(CompositeValve.java:99)
    at org.wso2.carbon.tomcat.ext.valves.TomcatValveContainer.invokeValves(TomcatValveContainer.java:49)
    at org.wso2.carbon.tomcat.ext.valves.CompositeValve.invoke(CompositeValve.java:62)
    at org.wso2.carbon.tomcat.ext.valves.CarbonStuckThreadDetectionValve.invoke(CarbonStuckThreadDetectionValve.java:159)
    at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956)
    at org.wso2.carbon.tomcat.ext.valves.CarbonContextCreatorValve.invoke(CarbonContextCreatorValve.java:57)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:436)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1078)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:625)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1749)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1708)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.NoClassDefFoundError: net/minidev/json/writer/JsonReaderI
    at com.jayway.jsonpath.internal.DefaultsImpl.(DefaultsImpl.java:17)
    at com.jayway.jsonpath.internal.DefaultsImpl.(DefaultsImpl.java:15)
    at com.jayway.jsonpath.Configuration.getEffectiveDefaults(Configuration.java:48)
    at com.jayway.jsonpath.Configuration.defaultConfiguration(Configuration.java:173)
    at com.buddhima.JsonIterateMediator.(JsonIterateMediator.java:37)
    at com.buddhima.JsonIterateMediatorFactory.createSpecificMediator(JsonIterateMediatorFactory.java:25)
    at org.apache.synapse.config.xml.AbstractMediatorFactory.createMediator(AbstractMediatorFactory.java:94)
    at org.apache.synapse.config.xml.MediatorFactoryFinder.getMediator(MediatorFactoryFinder.java:224)
    at org.apache.synapse.config.xml.AbstractListMediatorFactory.addChildren(AbstractListMediatorFactory.java:46)
    at org.apache.synapse.config.xml.SequenceMediatorFactory.createAnonymousSequence(SequenceMediatorFactory.java:70)
    at org.apache.synapse.config.xml.rest.ResourceFactory.configureSequences(ResourceFactory.java:107)
    at org.apache.synapse.config.xml.rest.ResourceFactory.createResource(ResourceFactory.java:50)
    at org.apache.synapse.config.xml.rest.APIFactory.createAPI(APIFactory.java:88)
    at org.apache.synapse.config.xml.rest.APIFactory.createAPI(APIFactory.java:53)
    at org.wso2.carbon.rest.api.service.RestApiAdmin.addApi(RestApiAdmin.java:810)
    at org.wso2.carbon.rest.api.service.RestApiAdmin.addApiFromString(RestApiAdmin.java:78)
    … 76 more
    Caused by: java.lang.ClassNotFoundException: net.minidev.json.writer.JsonReaderI cannot be found by com.jayway.jsonpath.json-path_2.1.0

    1. Hi Markus,
      Thank you for trying out this.
      Yes, I have missed one dependency in the list: accessors-smart-1.1.jar, which caused activation failure of json-smart bundle.
      I have corrected it in the post, and successfully done the testing as well.
      Please let me know if it works for you as well.
      Thanks,

      1. Hey Buddhima,

        Thank you so much for responding – I do appreciate it a lot.
        Adding that jar did the trick and resolved the is working….But…..

        I was able to test the scenario you provide and seems to be working fine.

        The problem however comes now that when I deploy existing car files with API`s that use the WSO2 json-path expressions it fails. I suspect this is due to the json-path library added here that clashes with the WSO2 library used for Json Path.

        Regards,
        Markus

  2. Hello,

    When I installed json-path 2.1.0 in dropins folder, I had problems when I tried to use JSONPath in other mediators.

    TID: [-1234] [] [2018-01-31 00:37:11,752] FATAL {org.wso2.carbon.mediation.initializer.ServiceBusInitializer} – Failed to initialize ESB due to a fatal error {org.wso2.carbon.mediation.initializer.ServiceBusInitializer}
    java.lang.NoSuchMethodError: com.jayway.jsonpath.JsonPath.compile(Ljava/lang/String;[Lcom/jayway/jsonpath/Filter;)Lcom/jayway/jsonpath/JsonPath;
    at org.apache.synapse.util.xpath.SynapseJsonPath.(SynapseJsonPath.java:50)

    It was because WSO2 ESB 5.0.0 rely on json-path-0.8.1.wso2v1, wich is not compatible with 2.1.0.

    I managed to get it fully working with WSO2 ESB 5.0.0, without conflict, by adding json-path-2.1.0 in your jar, with maven shade, and using class relocation.
    In addition, you have to remove com.jayway.jsonpath from OSGI dependency.
    This way the new json-path is only used by json-iterate-mediator.

    1. Hi Adrien,
      Thank you for sharing your experience.
      I appreciate if you can share a sample synapse-configuration where this error occurred and the changes you have made in the pom file. I believe those information would be valuable assets for others as well.
      Regards,
      Buddhima

      1. Hello,

        The error occured with a simple sequence where i put a log with expression “json-eval($.)”
        I think this error will occur wherever you use a JSONPath expression, as WSO2 will try to compile it with new json-path lib.

        Here are the changes I made in POM :

        – Add in maven-bundle-plugin config

        org.apache.felix
        maven-bundle-plugin
        2.3.4
        true

        JsonIterateMediator
        JsonIterateMediator
        com.buddhima
        !com.jayway.jsonpath
        *
        synapse-core

        – Add maven-shade-plugin

        org.apache.maven.plugins
        maven-shade-plugin
        3.1.0

        package

        shade

        com.jayway.jsonpath:json-path

        com.jayway.jsonpath
        com.shaded.jayway.jsonpath

        This way the new json-iterate-mediator jar contains the json-path jar, you have to copy in dropins folder :
        * json-iterate-mediator-1.0.0.jar
        * json-smart-2.2.1.jar
        * accessors-smart-1.1.jar

        If you want, you can :
        – Add this dependency :

        net.minidev
        accessors-smart
        1.1

        – Add these 2 includes in maven-shade-plugin includes
        net.minidev:json-smart
        net.minidev:accessors-smart
        Then the new json-iterate-mediator jar will contains the 3 dependencies, and you have to only copy it dropins folder

  3. Good Day Buddhima,

    it seems there is an issue with your solution when using WSO2 EI, the moement you add your libs the server fails to start.

    Can we have an explanation about how you are generating your jars.

    Thank you

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s