One of the many (many many many) extension points inside Apache Camel is the org.apache.camel.processor.aggregate.AggregationStrategy. These are used in everything from Content Enrichers to Splitters to Aggregators and more. Since their use is so prevalent, I figured that I’d dedicate a whole blog post just for them. So here goes…
So what are AggregationStrategy’s anyway? Simple… they’re implementations of the org.apache.camel.processor.aggregate.AggregationStrategy that allow you to specify exactly how two exchanges will be merged. This specification can be as simple or as complex as you require for your use case. Maybe you just want to take the first response and ignore all others. Maybe you want to combine the XML bodies into a list and then merge a select few headers. The limit really is your imagination. But what do I mean by “merging exchanges”? Let’s take a look at a few concrete examples.
Out of the Box
For starters, there are several implementations that are included out of the box. You can use them “as-is” without writing any custom code at all. Let’s talk through a few of them with some potential use cases.
The first is the org.apache.camel.processor.aggregate.UseLatestAggregationStrategy implementation. It’s the default strategy for most Camel EIPs that accept aggregation strategies. So if you don’t specify any strategy, this is likely the one you’re using. Basically, it takes the last exchange it receives and just uses that (ignoring any others that may have been aggregated prior). One example use case for this would be when doing an Aggregator. Perhaps you’re receiving many messages as input, but you want to buffer them (giving the user time to send in corrections/updates), and then only send the latest message to the backend after some period of inactivity. That might look like below:
For the next use case, we’ll cover the (very similar) org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy implementation. As the name would suggest, it “merges” two exchanges together by completely ignoring the new exchange and just taking the original. One example of where this might be useful is when doing a Multicast. Lets say I wanted to send a copy of a message off to multiple recipients, but really don’t care about their response. After the multicast is completed, I want to perform some transformation on the original message, and then return the result. Instead of rolling my own implementation, I could simply use the one provided. Something like this:
The next set of implementations, I’ll cover as a group. They are the org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy, org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy, and org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy strategies. They will combine the exchanges into a list and then pass the list itself along to the next processor. They only differ by what they put in the list (ie, List<Exchange>, List<Message>, or List<Object>). So, for instance, if you wanted to split a message, process each individual part, and then combine the individual results back into a list, you could do so easily using a Splitter like below:
The final implementation that I’ll cover for this section is the org.apache.camel.util.toolbox.XsltAggregationStrategy. It allows you to provide an XSLT that will be used to merge the original and new exchanges together. A great use case for this is when you want to Enrich an XML request with some extra data retrieved from a backend.
Since this example is a little more complex, it requires more than just a code snippet to explain. So I’ve put together an example application and thrown it up on GitHub. Take a look… https://github.com/joshdreagan/camel-xslt-enricher
It’s amazing how many use cases these “canned” aggregation strategies cover. But what if I they’re not quite exactly what you need?
Semi-Custom
In this section, we’ll discuss what I call “semi-custom” strategies. Basically, they’re base/utility classes that make it easy for you to implement a custom strategy with very little Java code.
The first class we’ll talk about is the org.apache.camel.processor.aggregate.AbstractListAggregationStrategy. Similar to the grouping implementations mentioned above, the end result of this strategy is a list of items. The difference is that you have total control over what data gets placed in said list as well as where you pull it from. Here’s a very simple example implementation:
@Override public String getValue(Exchange exchange){ return exchange.getIn().getHeader("MyAwesomeHeader", String.class); } }
If you need even more control over the aggregation, you can use the org.apache.camel.util.toolbox.FlexibleAggregationStrategy. The FlexibleAggregationStrategy is a fluent strategy builder that lets you define fairly complex aggregation strategy implementations using a very concise syntax. If you’re using the Java DSL to define your Camel routes (or are using any Java based bean wiring mechanism), you can just use the fluent builder directly. However, if you’re using it from the Spring DSL (using Spring’s XML bean definitions) it might be easier to wrapper it in a simple Java implementation. See below for an example:
Pretty powerful stuff! But what if you’re feeling even more imaginative?
Custom
The last type of strategy that I’ll talk about is a “completely custom” implementation. This basically just means that you will implement the org.apache.camel.processor.aggregate.AggregationStrategy interface directly without using any helper base classes (which might restrict you in some ways). Because of this direct implementation, you are free to do literally anything you want.
One example that I whipped up for a customer a while back is what I called the “semi-streaming aggregation strategy”.
@Override publicvoidafterPropertiesSet()throws Exception { Objects.requireNonNull(aggregateProcessorId, "The aggregateProcessorId property must not be null."); Objects.requireNonNull(camelContext, "The camelContext property must not be null."); Objects.requireNonNull(sequenceIdHeaderName, "The sequenceIdHeaderName property must not be null."); }
@Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange){
SortedSet<Message> pendingMessages = aggregateExchange.getIn().getBody(SortedSet.class); Long lastProcessedIndex = aggregateExchange.getProperty(LAST_PROCESSED_INDEX, Long.class);
Message currentMessage; Long currentMessageIndex; SortedSet<Message> messagesToBeEmitted = new TreeSet<>(_messageComparator()); do { currentMessage = pendingMessages.first(); currentMessageIndex = currentMessage.getHeader(sequenceIdHeaderName, Long.class); if (currentMessageIndex == lastProcessedIndex + 1) { messagesToBeEmitted.add(currentMessage); pendingMessages.remove(currentMessage); lastProcessedIndex = currentMessageIndex; } else { break; } } while (!pendingMessages.isEmpty()); if (!messagesToBeEmitted.isEmpty()) { log.info(String.format("Messages to be emitted: [%s] messages", messagesToBeEmitted.size())); aggregateExchange.setProperty(LAST_PROCESSED_INDEX, lastProcessedIndex); aggregateExchange.getIn().setBody(pendingMessages); Exchange exchangeToBeEmitted = ExchangeHelper.copyExchangeAndSetCamelContext(aggregateExchange, camelContext); exchangeToBeEmitted.getIn().setBody(messagesToBeEmitted); try { for (Processor processor : _aggregateProcessor().next()) { processor.process(exchangeToBeEmitted); } } catch (Exception e) { thrownew RuntimeCamelException(e); } } } }
Here’s a link to the full source for your perusal: [https://github.com/joshdreagan/camel-streaming-aggregation]. In this implementation, I was asked to do ordering aggregation of incoming messages. But as the messages came in, if the next sequential block was completed, the customer wanted those messages to be emitted at that time instead of waiting for the entire batch to complete. So, for example, if I got messages [1,3,5], those messages would be aggregated and stored in the aggregation repository. But then, when message [2] came in, messages [1,2,3] would be emitted/processed (while message [5] would remain in the repository). Finally, when message [4] came in, messages [4,5] would be emitted/processed. That’s about as custom as they come!
Hopefully this helps highlight some of the power and flexibility of Camel. Like I said at the beginning of this post, your imagination is the limit (or rather your use case). Enjoy!