[mule-dev] Re: [mule-scm] [mule][24919] branches/mule-3.x: MULE-5751: Allow to programmatically distinguish internal MessageProcessor

24 views
Skip to first unread message

Pablo Kraan

unread,
Sep 27, 2012, 3:09:56 PM9/27/12
to d...@mule.codehaus.org, s...@mule.codehaus.org


A couple of comments:


Can you create some stills class to extract the static methods related to MP paths from AbstractPipeline?

In ChoiceMessagingExceptionStrategy#getMessageProcessorPaths seems to be some duplication of code that is also in the AbstractPipeline static methods.

Pablo


On Thu, Sep 27, 2012 at 3:17 PM, <sva...@codehaus.org> wrote:
Revision
24919
Author
svacas
Date
2012-09-27 13:17:37 -0500 (Thu, 27 Sep 2012)

Log Message

MULE-5751: Allow to programmatically distinguish internal MessageProcessor

delegate building of the path map to the message processor containers

Modified Paths

Diff

Modified: branches/mule-3.x/core/src/main/java/org/mule/api/construct/Pipeline.java (24918 => 24919)


--- branches/mule-3.x/core/src/main/java/org/mule/api/construct/Pipeline.java	2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/api/construct/Pipeline.java	2012-09-27 18:17:37 UTC (rev 24919)
@@ -11,6 +11,7 @@
 package org.mule.api.construct;
 
 import org.mule.api.processor.MessageProcessor;
+import org.mule.api.processor.MessageProcessorContainer;
 import org.mule.api.processor.ProcessingStrategy;
 import org.mule.api.source.MessageSource;
 
@@ -20,7 +21,7 @@
  * A pipeline has an ordered list of {@link MessageProcessor}'s that are invoked in order to processor new
  * messages received from it's {@link MessageSource}
  */
-public interface Pipeline extends FlowConstruct
+public interface Pipeline extends FlowConstruct, MessageProcessorContainer
 {
 
     public void setMessageSource(MessageSource messageSource);
@@ -37,5 +38,4 @@
 
     public String getProcessorPath(MessageProcessor processor);
 
-    public String[] getProcessorPaths();
 }

Modified: branches/mule-3.x/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java (24918 => 24919)


--- branches/mule-3.x/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java	2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/api/processor/MessageProcessorContainer.java	2012-09-27 18:17:37 UTC (rev 24919)
@@ -10,17 +10,18 @@
 
 package org.mule.api.processor;
 
-import java.util.List;
+import java.util.Map;
 
 /**
  *  Identifies Constructs that contain Message Processors configured by the user.
  */
 public interface MessageProcessorContainer
 {
-
     /**
-     * @return the list of Message Processors configured by the user.
-     * Internal Message Processors are omitted.
+     * Generates a map of the child message processors with the message processor
+     * instance as key and the identifier path as value.
+     *
+     * @return Map with the paths of the child message processors
      */
-    List<MessageProcessor> getMessageProcessors();
+    Map<MessageProcessor, String> getMessageProcessorPaths();
 }

Modified: branches/mule-3.x/core/src/main/java/org/mule/construct/AbstractPipeline.java (24918 => 24919)


--- branches/mule-3.x/core/src/main/java/org/mule/construct/AbstractPipeline.java	2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/construct/AbstractPipeline.java	2012-09-27 18:17:37 UTC (rev 24919)
@@ -314,49 +314,74 @@
             logger.warn("flow map already populated");
             return;
         }
-        List<MessageProcessor> mps = getMessageProcessors();
-        createFlowMap(mps, "/" + getName() + "/processors/");
+        flowMap = getMessageProcessorPaths();
+    }
+
+    @Override
+    public Map<MessageProcessor, String> getMessageProcessorPaths()
+    {
+        Map<MessageProcessor, String> result = new LinkedHashMap<MessageProcessor, String>();
+        int index = 0;
+        String base = "/" + getName();
+        for (MessageProcessor mp : getMessageProcessors())
+        {
+            String prefix =  base + "/processors/" + index;
+            result.put(mp, prefix);
+            if (mp instanceof MessageProcessorContainer)
+            {
+                Map<MessageProcessor, String> children = ((MessageProcessorContainer) mp).getMessageProcessorPaths();
+                prefixMessageProcessorPaths(prefix, children);
+                result.putAll(children);
+            }
+            index++;
+        }
         if (exceptionListener instanceof MessageProcessorContainer)
         {
-            mps = ((MessageProcessorContainer) exceptionListener).getMessageProcessors();
-            createFlowMap(mps, "/" + getName() + "/es/");
+            Map<MessageProcessor, String> esPathMap = ((MessageProcessorContainer) exceptionListener).getMessageProcessorPaths();
+            prefixMessageProcessorPaths(base + "/es", esPathMap);
+            result.putAll(esPathMap);
         }
+        return result;
     }
 
-    private void createFlowMap(List<MessageProcessor> processors, String prefix)
+    public static Map<MessageProcessor, String> buildMessageProcessorPaths(List<MessageProcessor> processors)
     {
-        int idx = 0;
+        Map<MessageProcessor, String> result = new LinkedHashMap<MessageProcessor, String>();
+        int index = 0;
         for (MessageProcessor mp : processors)
         {
-            if (mp == null)
-            {
-                logger.warn("NULL mp!");
-                continue;
-            }
-            String currentPrefix = prefix + idx;
-            flowMap.put(mp, currentPrefix);
+            String prefix = "/" + index;
+            result.put(mp, prefix);
             if (mp instanceof MessageProcessorContainer)
             {
-                createFlowMap(((MessageProcessorContainer) mp).getMessageProcessors(), currentPrefix + "/");
+                Map<MessageProcessor, String> children = ((MessageProcessorContainer) mp).getMessageProcessorPaths();
+                prefixMessageProcessorPaths(prefix, children);
+                result.putAll(children);
             }
-            idx++;
+            index++;
         }
+        return result;
     }
 
-    @Override
-    public String getProcessorPath(MessageProcessor processor)
+    public static void prefixMessageProcessorPaths(String prefix, Map<MessageProcessor, String> pathMap)
     {
-        return flowMap.get(processor);
+        if (prefix.endsWith("/"))
+        {
+            prefix = prefix.substring(0, prefix.length() - 1);
+        }
+        for (Map.Entry entry : pathMap.entrySet())
+        {
+            entry.setValue(prefix + entry.getValue());
+        }
     }
 
     @Override
-    public String[] getProcessorPaths()
+    public String getProcessorPath(MessageProcessor processor)
     {
-        String[] paths = new String[flowMap.size()];
-        paths = flowMap.values().toArray(paths);
-        return paths;
+        return flowMap.get(processor);
     }
 
+
     public class ProcessIfPipelineStartedMessageProcessor extends AbstractFilteringMessageProcessor
     {
 

Modified: branches/mule-3.x/core/src/main/java/org/mule/exception/ChoiceMessagingExceptionStrategy.java (24918 => 24919)


--- branches/mule-3.x/core/src/main/java/org/mule/exception/ChoiceMessagingExceptionStrategy.java	2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/exception/ChoiceMessagingExceptionStrategy.java	2012-09-27 18:17:37 UTC (rev 24919)
@@ -19,12 +19,14 @@
 import org.mule.api.processor.MessageProcessor;
 import org.mule.api.processor.MessageProcessorContainer;
 import org.mule.config.i18n.CoreMessages;
+import org.mule.construct.AbstractPipeline;
 import org.mule.message.DefaultExceptionPayload;
 import org.mule.processor.AbstractMuleObjectOwner;
 
-import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Selects which exception strategy to execute based on filtering.
@@ -131,16 +133,21 @@
     }
 
     @Override
-    public List<MessageProcessor> getMessageProcessors()
+    public Map<MessageProcessor, String> getMessageProcessorPaths()
     {
-        List<MessageProcessor> mps = new ArrayList<MessageProcessor>();
+        Map<MessageProcessor, String> mpPaths = new LinkedHashMap<MessageProcessor, String> ();
+        int idx = 0;
         for(MessagingExceptionHandlerAcceptor listener : exceptionListeners)
         {
+            String prefix = "/" + idx;
             if (listener instanceof MessageProcessorContainer)
             {
-                mps.addAll(((MessageProcessorContainer) listener).getMessageProcessors());
+                Map<MessageProcessor, String> children = ((MessageProcessorContainer) listener).getMessageProcessorPaths();
+                AbstractPipeline.prefixMessageProcessorPaths(prefix, children);
+                mpPaths.putAll(children);
             }
+            idx++;
         }
-        return mps;
+        return mpPaths;
     }
 }

Modified: branches/mule-3.x/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java (24918 => 24919)


--- branches/mule-3.x/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java	2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/processor/AbstractMessageProcessorOwner.java	2012-09-27 18:17:37 UTC (rev 24919)
@@ -9,19 +9,21 @@
  */
 package org.mule.processor;
 
-import javax.xml.namespace.QName;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.mule.api.AnnotatedObject;
 import org.mule.api.construct.FlowConstructAware;
 import org.mule.api.context.MuleContextAware;
 import org.mule.api.lifecycle.Lifecycle;
 import org.mule.api.processor.MessageProcessor;
 import org.mule.api.processor.MessageProcessorContainer;
+import org.mule.construct.AbstractPipeline;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.xml.namespace.QName;
+
 /**
  * An object that owns message processors and delegates startup/shutdown events to them.
  */
@@ -53,9 +55,9 @@
     protected abstract List<MessageProcessor> getOwnedMessageProcessors();
 
     @Override
-    public List<MessageProcessor> getMessageProcessors()
+    public Map<MessageProcessor, String> getMessageProcessorPaths()
     {
-        return getOwnedMessageProcessors();
+        return AbstractPipeline.buildMessageProcessorPaths(getOwnedMessageProcessors());
     }
 }
 

Modified: branches/mule-3.x/core/src/main/java/org/mule/processor/chain/AbstractMessageProcessorChain.java (24918 => 24919)


--- branches/mule-3.x/core/src/main/java/org/mule/processor/chain/AbstractMessageProcessorChain.java	2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/processor/chain/AbstractMessageProcessorChain.java	2012-09-27 18:17:37 UTC (rev 24919)
@@ -25,12 +25,14 @@
 import org.mule.api.processor.MessageProcessor;
 import org.mule.api.processor.MessageProcessorChain;
 import org.mule.api.processor.MessageProcessorContainer;
+import org.mule.construct.AbstractPipeline;
 import org.mule.endpoint.EndpointAware;
 import org.mule.processor.AbstractInterceptingMessageProcessor;
 import org.mule.util.StringUtils;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -182,4 +184,9 @@
         return processors;
     }
 
+    @Override
+    public Map<MessageProcessor, String> getMessageProcessorPaths()
+    {
+        return AbstractPipeline.buildMessageProcessorPaths(getMessageProcessors());
+    }
 }

Modified: branches/mule-3.x/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java (24918 => 24919)


--- branches/mule-3.x/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java	2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/AbstractSelectiveRouter.java	2012-09-27 18:17:37 UTC (rev 24919)
@@ -32,6 +32,7 @@
 import org.mule.api.routing.SelectiveRouter;
 import org.mule.api.routing.filter.Filter;
 import org.mule.config.i18n.MessageFactory;
+import org.mule.construct.AbstractPipeline;
 import org.mule.management.stats.RouterStatistics;
 
 import java.util.ArrayList;
@@ -365,7 +366,7 @@
     }
 
     @Override
-    public List<MessageProcessor> getMessageProcessors()
+    public Map<MessageProcessor, String> getMessageProcessorPaths()
     {
         List<MessageProcessor> messageProcessors = new ArrayList<MessageProcessor>();
         for (MessageProcessorFilterPair cmp : conditionalMessageProcessors)
@@ -373,7 +374,7 @@
             messageProcessors.add(cmp.getMessageProcessor());
         }
         messageProcessors.add(defaultProcessor);
-        return messageProcessors;
+        return AbstractPipeline.buildMessageProcessorPaths(messageProcessors);
     }
 
     @Override

Modified: branches/mule-3.x/core/src/main/java/org/mule/routing/Foreach.java (24918 => 24919)


--- branches/mule-3.x/core/src/main/java/org/mule/routing/Foreach.java	2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/core/src/main/java/org/mule/routing/Foreach.java	2012-09-27 18:17:37 UTC (rev 24919)
@@ -20,6 +20,7 @@
 import org.mule.api.processor.MessageProcessor;
 import org.mule.api.transformer.DataType;
 import org.mule.api.transformer.TransformerException;
+import org.mule.construct.AbstractPipeline;
 import org.mule.expression.ExpressionConfig;
 import org.mule.processor.AbstractMessageProcessorOwner;
 import org.mule.processor.chain.DefaultMessageProcessorChainBuilder;
@@ -152,10 +153,11 @@
     }
 
     @Override
-    public List<MessageProcessor> getMessageProcessors()
+    public Map<MessageProcessor, String> getMessageProcessorPaths()
     {
         //skip the splitter that is added at the beginning
-        return getOwnedMessageProcessors().subList(1, getOwnedMessageProcessors().size());
+        List<MessageProcessor> mps = getOwnedMessageProcessors().subList(1, getOwnedMessageProcessors().size());
+        return AbstractPipeline.buildMessageProcessorPaths(mps);
     }
 
     @Override

Modified: branches/mule-3.x/tests/integration/src/test/java/org/mule/context/notification/MessageProcessorNotificationPathTestCase.java (24918 => 24919)


--- branches/mule-3.x/tests/integration/src/test/java/org/mule/context/notification/MessageProcessorNotificationPathTestCase.java	2012-09-26 20:05:02 UTC (rev 24918)
+++ branches/mule-3.x/tests/integration/src/test/java/org/mule/context/notification/MessageProcessorNotificationPathTestCase.java	2012-09-27 18:17:37 UTC (rev 24919)
@@ -11,9 +11,11 @@
 
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.construct.Pipeline;
+import org.mule.api.processor.MessageProcessor;
 import org.mule.tck.junit4.FunctionalTestCase;
 
 import java.util.LinkedHashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.junit.Assert;
@@ -35,22 +37,22 @@
     public void components() throws Exception
     {
         testFlowPaths("singleMP", "/0");
-        testFlowPaths("processorChain", "/0/0", "/0/1");
+        testFlowPaths("processorChain", "/0", "/0/0", "/0/1");
     }
 
     @Test
     public void routers() throws Exception
     {
-        testFlowPaths("choice", "/0/0/0", "/0/1/0", "/0/2/0");
-        testFlowPaths("all", "/0/0/0", "/0/1/0");
+        testFlowPaths("choice", "/0", "/0/0", "/0/0/0", "/0/1", "/0/1/0", "/0/2", "/0/2/0");
+        testFlowPaths("all", "/0", "/0/0", "/0/0/0", "/0/1", "/0/1/0");
     }
 
     @Test
     public void scopes() throws Exception
     {
-        testFlowPaths("foreach", "/0/0");
-        testFlowPaths("enricher", "/0/0", "/1/0/0", "/1/0/1");
-        testFlowPaths("until-successful", "/0/0/0", "/0/0/1");
+        testFlowPaths("foreach", "/0", "/0/0");
+        testFlowPaths("enricher", "/0", "/0/0", "/1", "/1/0", "/1/0/0", "/1/0/1");
+        testFlowPaths("until-successful", "/0", "/0/0", "/0/0/0", "/0/0/1");
         //testFlowPaths("async", "/0/0", "/0/1");
     }
 
@@ -64,34 +66,30 @@
     public void exceptionStrategies() throws Exception
     {
         testFlowPaths("catch-es", "/0", "es/0");
-        testFlowPaths("rollback-es", "/0", "es/0");
-        testFlowPaths("choice-es", "/0", "es/0", "es/1");
+        testFlowPaths("rollback-es", "/0", "es/0", "es/1");
+        testFlowPaths("choice-es", "/0", "es/0/0", "es/0/1", "es/1/0");
     }
 
-    private void testFlowPaths(String flowName, String... leaves) throws Exception
+    private void testFlowPaths(String flowName, String... nodes) throws Exception
     {
-        String[] expectedPaths = generatePathsFromLeaves(flowName, leaves);
+        String[] expectedPaths = generatePaths(flowName, nodes);
         FlowConstruct flow = getFlowConstruct(flowName);
-        String[] flowPaths = ((Pipeline) flow).getProcessorPaths();
+        Map<MessageProcessor,String> messageProcessorPaths = ((Pipeline) flow).getMessageProcessorPaths();
+        String[] flowPaths = messageProcessorPaths.values().toArray(new String[]{});
         Assert.assertArrayEquals(expectedPaths, flowPaths);
     }
 
-    private String[] generatePathsFromLeaves(String flowName, String[] leaves)
+    private String[] generatePaths(String flowName, String[] nodes)
     {
         Set<String> pathSet = new LinkedHashSet<String>();
         String base = "/" + flowName + "/processors";
-        for (String leaf : leaves)
+        for (String node : nodes)
         {
-            if (leaf.startsWith("es/"))
+            if (node.startsWith("es/"))
             {
-                base = "/" + flowName + "/es";
+                base = "/" + flowName + "/";
             }
-            String prefix = "/";
-            for (String part : leaf.substring(leaf.indexOf("/") + 1).split("/"))
-            {
-                pathSet.add(base + prefix + part);
-                prefix += part + "/";
-            }
+            pathSet.add(base + node);
         }
         return pathSet.toArray(new String[0]);
     }

To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email


Reply all
Reply to author
Forward
0 new messages