A couple of comments:
Can you create some stills class to extract the static methods related to MP paths from AbstractPipeline?
In ChoiceMessagingExceptionStrategy#getMessageProcessorPaths seems to be some duplication of code that is also in the AbstractPipeline static methods.
Pablo
- Revision
- 24919
- Author
- svacas
- Date
- 2012-09-27 13:17:37 -0500 (Thu, 27 Sep 2012)
Log Message
MULE-5751: Allow to programmatically distinguish internal MessageProcessor delegate building of the path map to the message processor containersModified Paths
- branches/mule-3.x/core/src/main/java/org/mule/api/construct/Pipeline.java
- branches/mule-3.x/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java
- branches/mule-3.x/core/src/main/java/org/mule/construct/AbstractPipeline.java
- branches/mule-3.x/core/src/main/java/org/mule/exception/ChoiceMessagingExceptionStrategy.java
- branches/mule-3.x/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java
- branches/mule-3.x/core/src/main/java/org/mule/processor/chain/AbstractMessageProcessorChain.java
- branches/mule-3.x/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java
- branches/mule-3.x/core/src/main/java/org/mule/routing/Foreach.java
- branches/mule-3.x/tests/integration/src/test/java/org/mule/context/notification/MessageProcessorNotificationPathTestCase.java
Diff
Modified: branches/mule-3.x/core/src/main/java/org/mule/api/construct/Pipeline.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/api/construct/Pipeline.java 2012-09-26 20:05:02 UTC (rev 24918) +++ branches/mule-3.x/core/src/main/java/org/mule/api/construct/Pipeline.java 2012-09-27 18:17:37 UTC (rev 24919) @@ -11,6 +11,7 @@ package org.mule.api.construct; import org.mule.api.processor.MessageProcessor; +import org.mule.api.processor.MessageProcessorContainer; import org.mule.api.processor.ProcessingStrategy; import org.mule.api.source.MessageSource; @@ -20,7 +21,7 @@ * A pipeline has an ordered list of {@link MessageProcessor}'s that are invoked in order to processor new * messages received from it's {@link MessageSource} */
-public interface Pipeline extends FlowConstruct+public interface Pipeline extends FlowConstruct, MessageProcessorContainer { public void setMessageSource(MessageSource messageSource); @@ -37,5 +38,4 @@ public String getProcessorPath(MessageProcessor processor);- public String[] getProcessorPaths();}Modified: branches/mule-3.x/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java 2012-09-26 20:05:02 UTC (rev 24918) +++ branches/mule-3.x/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java 2012-09-27 18:17:37 UTC (rev 24919) @@ -10,17 +10,18 @@ package org.mule.api.processor;
-import java.util.List;+import java.util.Map; /** * Identifies Constructs that contain Message Processors configured by the user. */ public interface MessageProcessorContainer {-/**- * @return the list of Message Processors configured by the user. - * Internal Message Processors are omitted.+ * Generates a map of the child message processors with the message processor + * instance as key and the identifier path as value. + * + * @return Map with the paths of the child message processors */- List<MessageProcessor> getMessageProcessors();+ Map<MessageProcessor, String> getMessageProcessorPaths(); }Modified: branches/mule-3.x/core/src/main/java/org/mule/construct/AbstractPipeline.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/construct/AbstractPipeline.java 2012-09-26 20:05:02 UTC (rev 24918) +++ branches/mule-3.x/core/src/main/java/org/mule/construct/AbstractPipeline.java 2012-09-27 18:17:37 UTC (rev 24919) @@ -314,49 +314,74 @@ logger.warn("flow map already populated"); return; }
- List<MessageProcessor> mps = getMessageProcessors(); - createFlowMap(mps, "/" + getName() + "/processors/");+ flowMap = getMessageProcessorPaths(); + } + + @Override + public Map<MessageProcessor, String> getMessageProcessorPaths() + { + Map<MessageProcessor, String> result = new LinkedHashMap<MessageProcessor, String>(); + int index = 0; + String base = "/" + getName(); + for (MessageProcessor mp : getMessageProcessors()) + { + String prefix = base + "/processors/" + index; + result.put(mp, prefix); + if (mp instanceof MessageProcessorContainer) + { + Map<MessageProcessor, String> children = ((MessageProcessorContainer) mp).getMessageProcessorPaths(); + prefixMessageProcessorPaths(prefix, children); + result.putAll(children); + } + index++; + } if (exceptionListener instanceof MessageProcessorContainer) {- mps = ((MessageProcessorContainer) exceptionListener).getMessageProcessors(); - createFlowMap(mps, "/" + getName() + "/es/");+ Map<MessageProcessor, String> esPathMap = ((MessageProcessorContainer) exceptionListener).getMessageProcessorPaths(); + prefixMessageProcessorPaths(base + "/es", esPathMap); + result.putAll(esPathMap); } + return result; }- private void createFlowMap(List<MessageProcessor> processors, String prefix)+ public static Map<MessageProcessor, String> buildMessageProcessorPaths(List<MessageProcessor> processors) {- int idx = 0;+ Map<MessageProcessor, String> result = new LinkedHashMap<MessageProcessor, String>(); + int index = 0; for (MessageProcessor mp : processors) {- if (mp == null) - { - logger.warn("NULL mp!"); - continue; - } - String currentPrefix = prefix + idx; - flowMap.put(mp, currentPrefix);+ String prefix = "/" + index; + result.put(mp, prefix); if (mp instanceof MessageProcessorContainer) {- createFlowMap(((MessageProcessorContainer) mp).getMessageProcessors(), currentPrefix + "/");+ Map<MessageProcessor, String> children = ((MessageProcessorContainer) mp).getMessageProcessorPaths(); + prefixMessageProcessorPaths(prefix, children); + result.putAll(children); }- idx++;+ index++; } + return result; }- @Override - public String getProcessorPath(MessageProcessor processor)+ public static void prefixMessageProcessorPaths(String prefix, Map<MessageProcessor, String> pathMap) {- return flowMap.get(processor);+ if (prefix.endsWith("/")) + { + prefix = prefix.substring(0, prefix.length() - 1); + } + for (Map.Entry entry : pathMap.entrySet()) + { + entry.setValue(prefix + entry.getValue()); + } } @Override- public String[] getProcessorPaths()+ public String getProcessorPath(MessageProcessor processor) {- String[] paths = new String[flowMap.size()]; - paths = flowMap.values().toArray(paths); - return paths;+ return flowMap.get(processor); } + public class ProcessIfPipelineStartedMessageProcessor extends AbstractFilteringMessageProcessor {Modified: branches/mule-3.x/core/src/main/java/org/mule/exception/ChoiceMessagingExceptionStrategy.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/exception/ChoiceMessagingExceptionStrategy.java 2012-09-26 20:05:02 UTC (rev 24918) +++ branches/mule-3.x/core/src/main/java/org/mule/exception/ChoiceMessagingExceptionStrategy.java 2012-09-27 18:17:37 UTC (rev 24919) @@ -19,12 +19,14 @@ import org.mule.api.processor.MessageProcessor; import org.mule.api.processor.MessageProcessorContainer; import org.mule.config.i18n.CoreMessages; +import org.mule.construct.AbstractPipeline; import org.mule.message.DefaultExceptionPayload; import org.mule.processor.AbstractMuleObjectOwner;
-import java.util.ArrayList;import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; /** * Selects which exception strategy to execute based on filtering. @@ -131,16 +133,21 @@ } @Override- public List<MessageProcessor> getMessageProcessors()+ public Map<MessageProcessor, String> getMessageProcessorPaths() {- List<MessageProcessor> mps = new ArrayList<MessageProcessor>();+ Map<MessageProcessor, String> mpPaths = new LinkedHashMap<MessageProcessor, String> (); + int idx = 0; for(MessagingExceptionHandlerAcceptor listener : exceptionListeners) { + String prefix = "/" + idx; if (listener instanceof MessageProcessorContainer) {- mps.addAll(((MessageProcessorContainer) listener).getMessageProcessors());+ Map<MessageProcessor, String> children = ((MessageProcessorContainer) listener).getMessageProcessorPaths(); + AbstractPipeline.prefixMessageProcessorPaths(prefix, children); + mpPaths.putAll(children); } + idx++; }- return mps;+ return mpPaths; } }Modified: branches/mule-3.x/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java 2012-09-26 20:05:02 UTC (rev 24918) +++ branches/mule-3.x/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java 2012-09-27 18:17:37 UTC (rev 24919) @@ -9,19 +9,21 @@ */ package org.mule.processor;
-import javax.xml.namespace.QName; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import org.mule.api.AnnotatedObject; import org.mule.api.construct.FlowConstructAware; import org.mule.api.context.MuleContextAware; import org.mule.api.lifecycle.Lifecycle; import org.mule.api.processor.MessageProcessor; import org.mule.api.processor.MessageProcessorContainer; +import org.mule.construct.AbstractPipeline; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.xml.namespace.QName; + /** * An object that owns message processors and delegates startup/shutdown events to them. */ @@ -53,9 +55,9 @@ protected abstract List<MessageProcessor> getOwnedMessageProcessors(); @Override- public List<MessageProcessor> getMessageProcessors()+ public Map<MessageProcessor, String> getMessageProcessorPaths() {- return getOwnedMessageProcessors();+ return AbstractPipeline.buildMessageProcessorPaths(getOwnedMessageProcessors()); } }Modified: branches/mule-3.x/core/src/main/java/org/mule/processor/chain/AbstractMessageProcessorChain.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/processor/chain/AbstractMessageProcessorChain.java 2012-09-26 20:05:02 UTC (rev 24918) +++ branches/mule-3.x/core/src/main/java/org/mule/processor/chain/AbstractMessageProcessorChain.java 2012-09-27 18:17:37 UTC (rev 24919) @@ -25,12 +25,14 @@ import org.mule.api.processor.MessageProcessor; import org.mule.api.processor.MessageProcessorChain; import org.mule.api.processor.MessageProcessorContainer; +import org.mule.construct.AbstractPipeline; import org.mule.endpoint.EndpointAware; import org.mule.processor.AbstractInterceptingMessageProcessor; import org.mule.util.StringUtils; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -182,4 +184,9 @@ return processors; } + @Override + public Map<MessageProcessor, String> getMessageProcessorPaths() + { + return AbstractPipeline.buildMessageProcessorPaths(getMessageProcessors()); + } }
Modified: branches/mule-3.x/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java 2012-09-26 20:05:02 UTC (rev 24918) +++ branches/mule-3.x/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java 2012-09-27 18:17:37 UTC (rev 24919) @@ -32,6 +32,7 @@ import org.mule.api.routing.SelectiveRouter; import org.mule.api.routing.filter.Filter; import org.mule.config.i18n.MessageFactory; +import org.mule.construct.AbstractPipeline; import org.mule.management.stats.RouterStatistics; import java.util.ArrayList; @@ -365,7 +366,7 @@ } @Override
- public List<MessageProcessor> getMessageProcessors()+ public Map<MessageProcessor, String> getMessageProcessorPaths() { List<MessageProcessor> messageProcessors = new ArrayList<MessageProcessor>(); for (MessageProcessorFilterPair cmp : conditionalMessageProcessors) @@ -373,7 +374,7 @@ messageProcessors.add(cmp.getMessageProcessor()); } messageProcessors.add(defaultProcessor);- return messageProcessors;+ return AbstractPipeline.buildMessageProcessorPaths(messageProcessors); } @OverrideModified: branches/mule-3.x/core/src/main/java/org/mule/routing/Foreach.java (24918 => 24919)
--- branches/mule-3.x/core/src/main/java/org/mule/routing/Foreach.java 2012-09-26 20:05:02 UTC (rev 24918) +++ branches/mule-3.x/core/src/main/java/org/mule/routing/Foreach.java 2012-09-27 18:17:37 UTC (rev 24919) @@ -20,6 +20,7 @@ import org.mule.api.processor.MessageProcessor; import org.mule.api.transformer.DataType; import org.mule.api.transformer.TransformerException; +import org.mule.construct.AbstractPipeline; import org.mule.expression.ExpressionConfig; import org.mule.processor.AbstractMessageProcessorOwner; import org.mule.processor.chain.DefaultMessageProcessorChainBuilder; @@ -152,10 +153,11 @@ } @Override
- public List<MessageProcessor> getMessageProcessors()+ public Map<MessageProcessor, String> getMessageProcessorPaths() { //skip the splitter that is added at the beginning- return getOwnedMessageProcessors().subList(1, getOwnedMessageProcessors().size());+ List<MessageProcessor> mps = getOwnedMessageProcessors().subList(1, getOwnedMessageProcessors().size()); + return AbstractPipeline.buildMessageProcessorPaths(mps); } @OverrideModified: branches/mule-3.x/tests/integration/src/test/java/org/mule/context/notification/MessageProcessorNotificationPathTestCase.java (24918 => 24919)
--- branches/mule-3.x/tests/integration/src/test/java/org/mule/context/notification/MessageProcessorNotificationPathTestCase.java 2012-09-26 20:05:02 UTC (rev 24918) +++ branches/mule-3.x/tests/integration/src/test/java/org/mule/context/notification/MessageProcessorNotificationPathTestCase.java 2012-09-27 18:17:37 UTC (rev 24919) @@ -11,9 +11,11 @@ import org.mule.api.construct.FlowConstruct; import org.mule.api.construct.Pipeline; +import org.mule.api.processor.MessageProcessor; import org.mule.tck.junit4.FunctionalTestCase; import java.util.LinkedHashSet; +import java.util.Map; import java.util.Set; import org.junit.Assert; @@ -35,22 +37,22 @@ public void components() throws Exception { testFlowPaths("singleMP", "/0");
- testFlowPaths("processorChain", "/0/0", "/0/1");+ testFlowPaths("processorChain", "/0", "/0/0", "/0/1"); } @Test public void routers() throws Exception {- testFlowPaths("choice", "/0/0/0", "/0/1/0", "/0/2/0"); - testFlowPaths("all", "/0/0/0", "/0/1/0");+ testFlowPaths("choice", "/0", "/0/0", "/0/0/0", "/0/1", "/0/1/0", "/0/2", "/0/2/0"); + testFlowPaths("all", "/0", "/0/0", "/0/0/0", "/0/1", "/0/1/0"); } @Test public void scopes() throws Exception {- testFlowPaths("foreach", "/0/0"); - testFlowPaths("enricher", "/0/0", "/1/0/0", "/1/0/1"); - testFlowPaths("until-successful", "/0/0/0", "/0/0/1");+ testFlowPaths("foreach", "/0", "/0/0"); + testFlowPaths("enricher", "/0", "/0/0", "/1", "/1/0", "/1/0/0", "/1/0/1"); + testFlowPaths("until-successful", "/0", "/0/0", "/0/0/0", "/0/0/1"); //testFlowPaths("async", "/0/0", "/0/1"); } @@ -64,34 +66,30 @@ public void exceptionStrategies() throws Exception { testFlowPaths("catch-es", "/0", "es/0");- testFlowPaths("rollback-es", "/0", "es/0"); - testFlowPaths("choice-es", "/0", "es/0", "es/1");+ testFlowPaths("rollback-es", "/0", "es/0", "es/1"); + testFlowPaths("choice-es", "/0", "es/0/0", "es/0/1", "es/1/0"); }- private void testFlowPaths(String flowName, String... leaves) throws Exception+ private void testFlowPaths(String flowName, String... nodes) throws Exception {- String[] expectedPaths = generatePathsFromLeaves(flowName, leaves);+ String[] expectedPaths = generatePaths(flowName, nodes); FlowConstruct flow = getFlowConstruct(flowName);- String[] flowPaths = ((Pipeline) flow).getProcessorPaths();+ Map<MessageProcessor,String> messageProcessorPaths = ((Pipeline) flow).getMessageProcessorPaths(); + String[] flowPaths = messageProcessorPaths.values().toArray(new String[]{}); Assert.assertArrayEquals(expectedPaths, flowPaths); }- private String[] generatePathsFromLeaves(String flowName, String[] leaves)+ private String[] generatePaths(String flowName, String[] nodes) { Set<String> pathSet = new LinkedHashSet<String>(); String base = "/" + flowName + "/processors";- for (String leaf : leaves)+ for (String node : nodes) {- if (leaf.startsWith("es/"))+ if (node.startsWith("es/")) {- base = "/" + flowName + "/es";+ base = "/" + flowName + "/"; }- String prefix = "/"; - for (String part : leaf.substring(leaf.indexOf("/") + 1).split("/")) - { - pathSet.add(base + prefix + part); - prefix += part + "/"; - }+ pathSet.add(base + node); } return pathSet.toArray(new String[0]); }
To unsubscribe from this list please visit: