[mule-dev] Re: [mule-scm] [mule][25161] branches/mule-3.x: MULE-6578 - Simplify MessageSource creation - Applying new design to http transport

7 views
Skip to first unread message

Daniel Feist

unread,
Mar 12, 2013, 8:47:17 PM3/12/13
to d...@mule.codehaus.org, s...@mule.codehaus.org
Hi,

Sorry I didn't review this before (was on vacation at the time).  Few specific things.

- API should be separate and in a org.mule.api package tree.
- "message.processing" doesn't seem a very good package name, isn't there anything better? 
- "org.mule.message.processing.PhaseResultNotifier.phaseSuccessfully()"  shouldn't that be "phaseCompletedSuccessfully" or phaseSuccessfull()?

My overall concern with this though is:

- I can't find any discussion or design document around this on a fairly significant addition to mule-core, which I'd have expected.  Is there anything?
- Mule already has a processing model (with it's limitations of course) and this package appears to introduce another one using a different approach for use in some specific cases as well as  including some interfaces that have significant similarities with existing mule constructs. e.g. MessageProcessContext & MuleEvent.  I'm not saying this wasn't required, just not something we should do without careful consideration.  Was it not possible to either i) achieve what was required with existing processing model or ii) improve existing model to cater for needs?

Given this, and given it's only actually used for http currently, does it really make sense to introduce this into mule-core at this point?

Dan


On Sun, Jan 6, 2013 at 12:06 AM, <pablo....@codehaus.org> wrote:
Revision
25161
Author
pablo.lagreca
Date
2013-01-05 18:06:58 -0600 (Sat, 05 Jan 2013)

Log Message

MULE-6578 - Simplify MessageSource creation - Applying new design to http transport

Modified Paths

Added Paths

Removed Paths

Diff

Modified: branches/mule-3.x/core/src/main/java/org/mule/api/config/MuleProperties.java (25160 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/core/src/main/java/org/mule/api/config/MuleProperties.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -153,6 +153,7 @@
     public static final String OBJECT_EXPRESSION_LANGUAGE = "_muleExpressionLanguage";
     public static final String OBJECT_LOCK_MANAGER = "_muleLockManager";
     public static final String OBJECT_LOCK_PROVIDER = "_muleLockProvider";
+    public static final String OBJECT_DEFAULT_MESSAGE_PROCESSING_MANAGER = "_muleMessageProcessingManager";
 
     // Not currently used as these need to be instance variables of the MuleContext.
     public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";

Modified: branches/mule-3.x/core/src/main/java/org/mule/config/builders/DefaultsConfigurationBuilder.java (25160 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/config/builders/DefaultsConfigurationBuilder.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/core/src/main/java/org/mule/config/builders/DefaultsConfigurationBuilder.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -23,6 +23,7 @@
 import org.mule.config.bootstrap.SimpleRegistryBootstrap;
 import org.mule.el.mvel.MVELExpressionLanguage;
 import org.mule.endpoint.DefaultEndpointFactory;
+import org.mule.message.processing.MuleMessageProcessingManager;
 import org.mule.model.seda.SedaModel;
 import org.mule.retry.policies.NoRetryPolicyTemplate;
 import org.mule.security.MuleSecurityManager;
@@ -75,6 +76,7 @@
         registry.registerObject(MuleProperties.QUEUE_STORE_DEFAULT_PERSISTENT_NAME, DefaultObjectStoreFactoryBean.createDefaultPersistentQueueStore());
         registry.registerObject(MuleProperties.DEFAULT_USER_OBJECT_STORE_NAME, DefaultObjectStoreFactoryBean.createDefaultUserObjectStore());
         registry.registerObject(MuleProperties.OBJECT_STORE_MANAGER, new MuleObjectStoreManager());
+        registry.registerObject(MuleProperties.OBJECT_DEFAULT_MESSAGE_PROCESSING_MANAGER, new MuleMessageProcessingManager());
 
         registry.registerObject(MuleProperties.OBJECT_MULE_ENDPOINT_FACTORY, new DefaultEndpointFactory());
         registry.registerObject(MuleProperties.OBJECT_MULE_STREAM_CLOSER_SERVICE, new DefaultStreamCloserService());

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndPhaseTemplate.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndPhaseTemplate.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndPhaseTemplate.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,24 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+/**
+ *  Phase were the {@link MessageProcessTemplate} is notified that
+ *  the message processing has ended
+ */
+public interface EndPhaseTemplate extends MessageProcessTemplate
+{
+
+    /**
+     * template method call when the message processing ends
+     */
+    void messageProcessingEnded();
+
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndProcessPhase.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndProcessPhase.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndProcessPhase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,33 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+/**
+ * This phase notifies to the {@link MessageProcessTemplate} that the message processing
+ * has ended.
+ *
+ * To participate on this phase {@link MessageProcessTemplate} must implement {@link EndPhaseTemplate}.
+ */
+public class EndProcessPhase implements MessageProcessPhase<EndPhaseTemplate>
+{
+
+    @Override
+    public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate)
+    {
+        return messageProcessTemplate instanceof EndPhaseTemplate;
+    }
+
+    @Override
+    public void runPhase(EndPhaseTemplate messageProcessTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier)
+    {
+        messageProcessTemplate.messageProcessingEnded();
+    }
+
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhase.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhase.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,119 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import org.mule.api.MessagingException;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.context.MuleContextAware;
+import org.mule.api.execution.ExecutionCallback;
+import org.mule.execution.TransactionalErrorHandlingExecutionTemplate;
+import org.mule.transaction.MuleTransactionConfig;
+
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkException;
+
+/**
+ * This phase routes the message through the flow.
+ *
+ * To participate of this phase, {@link MessageProcessTemplate} must implement {@link FlowProcessingPhaseTemplate}
+ */
+public class FlowProcessingPhase implements MessageProcessPhase<FlowProcessingPhaseTemplate>, Comparable<MessageProcessPhase>
+{
+
+    @Override
+    public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate)
+    {
+        return messageProcessTemplate instanceof FlowProcessingPhaseTemplate;
+    }
+
+    @Override
+    public void runPhase(final FlowProcessingPhaseTemplate flowProcessingPhaseTemplate, final MessageProcessContext messageProcessContext, final PhaseResultNotifier phaseResultNotifier)
+    {
+        Work flowExecutionWork = new Work()
+        {
+            @Override
+            public void release()
+            {
+            }
+
+            @Override
+            public void run()
+            {
+                try
+                {
+                    try
+                    {
+                        TransactionalErrorHandlingExecutionTemplate transactionTemplate = TransactionalErrorHandlingExecutionTemplate.
+                                createMainExecutionTemplate(messageProcessContext.getFlowConstruct().getMuleContext(),
+                                                            (messageProcessContext.getTransactionConfig() == null ? new MuleTransactionConfig() : messageProcessContext.getTransactionConfig()),
+                                                            messageProcessContext.getFlowConstruct().getExceptionListener());
+                        MuleEvent response = transactionTemplate.execute(new ExecutionCallback<MuleEvent>()
+                        {
+                            @Override
+                            public MuleEvent process() throws Exception
+                            {
+                                Object message = flowProcessingPhaseTemplate.getOriginalMessage();
+                                if (message == null)
+                                {
+                                    return null;
+                                }
+                                MuleEvent muleEvent = flowProcessingPhaseTemplate.getMuleEvent();
+                                muleEvent = flowProcessingPhaseTemplate.beforeRouteEvent(muleEvent);
+                                muleEvent = flowProcessingPhaseTemplate.routeEvent(muleEvent);
+                                muleEvent = flowProcessingPhaseTemplate.afterRouteEvent(muleEvent);
+                                return muleEvent;
+                            }
+                        });
+                        if (flowProcessingPhaseTemplate instanceof RequestResponseFlowProcessingPhaseTemplate)
+                        {
+                            ((RequestResponseFlowProcessingPhaseTemplate)flowProcessingPhaseTemplate).sendResponseToClient(response);
+                        }
+                        flowProcessingPhaseTemplate.afterSuccessfulProcessingFlow(response);
+                    }
+                    catch (MessagingException e)
+                    {
+                        flowProcessingPhaseTemplate.afterFailureProcessingFlow(e);
+                    }
+                    phaseResultNotifier.phaseSuccessfully();
+                }
+                catch (Exception e)
+                {
+                    phaseResultNotifier.phaseFailure(e);
+                }
+            }
+        };
+        if (messageProcessContext.supportsAsynchronousProcessing())
+        {
+            try
+            {
+                messageProcessContext.getFlowExecutionWorkManager().scheduleWork(flowExecutionWork);
+            }
+            catch (WorkException e)
+            {
+                phaseResultNotifier.phaseFailure(e);
+            }
+        }
+        else
+        {
+            flowExecutionWork.run();
+        }
+    }
+
+    @Override
+    public int compareTo(MessageProcessPhase messageProcessPhase)
+    {
+        if (messageProcessPhase instanceof ValidationPhase)
+        {
+            return 1;
+        }
+        return 0;
+    }
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhaseTemplate.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhaseTemplate.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhaseTemplate.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,72 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import org.mule.api.MessagingException;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.source.MessageSource;
+
+/**
+ * Template methods for {@link MessageSource} specific behavior during
+ * flow execution.
+ */
+public interface FlowProcessingPhaseTemplate extends MessageProcessTemplate
+{
+
+    /**
+     * @return a {@link org.mule.api.MuleEvent} created from the original message
+     */
+    MuleEvent getMuleEvent() throws MuleException;
+
+    /**
+     * @return the original message
+     */
+    Object getOriginalMessage() throws MuleException;
+
+    /**
+     * Pre processing of the {@link MuleEvent} to route
+     *
+     * @param muleEvent
+     */
+    MuleEvent beforeRouteEvent(MuleEvent muleEvent) throws MuleException;
+
+    /**
+     * Routes the {@link MuleEvent} through the processors chain
+     *
+     * @param muleEvent {@link MuleEvent} created from the raw message of this context
+     * @return the response {@link MuleEvent}
+     * @throws MuleException
+     */
+    MuleEvent routeEvent(MuleEvent muleEvent) throws MuleException;
+
+    /**
+     * Post processing of the routed {@link MuleEvent}
+     *
+     * @param muleEvent
+     */
+    MuleEvent afterRouteEvent(MuleEvent muleEvent) throws MuleException;
+
+    /**
+     * Call after successfully processing the message through the flow
+     *
+     * @param muleEvent
+     */
+    void afterSuccessfulProcessingFlow(MuleEvent muleEvent) throws MuleException;
+
+
+    /**
+     * Call when the processing of the message through the flow fails
+     *
+     * @param messagingException
+     */
+    void afterFailureProcessingFlow(MessagingException messagingException) throws MuleException;
+
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessContext.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessContext.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessContext.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,60 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.context.WorkManager;
+import org.mule.api.source.MessageSource;
+import org.mule.api.transaction.TransactionConfig;
+
+/**
+ * Context for processing one message from a {@link org.mule.api.source.MessageSource}.
+ *
+ * Mule {@link org.mule.api.source.MessageSource} implementations should create one instance of MessageProcessContext per message
+ * that generates.
+ *
+ * MessageProcessContext is responsible for
+ * - Define if the incoming message can be processed in a separate thread
+ * - Provide access to the {@link MessageSource} of the message
+ * - Provide access to the {@link FlowConstruct} were the message is going to be executed
+ * - Provide access, if available, to the {@link WorkManager} to use for processing the message
+ * - Provide the {@link MessageSource} transaction configuration
+ */
+public interface MessageProcessContext
+{
+
+    /**
+     * @return true if the message can be processed in a different thread than the one it was acquired, false otherwise
+     */
+    boolean supportsAsynchronousProcessing();
+
+    /**
+     * @return the {@link MessageSource} that retrieve the message. Can not be null
+     */
+    MessageSource getMessageSource();
+
+    /**
+     * @return the {@link FlowConstruct} were the incoming message is going to be executed. Can not be null
+     */
+    FlowConstruct getFlowConstruct();
+
+    /**
+     * @return the {@link WorkManager} were the incoming message must be processed.
+     * If null it will be executed in the same thread were the message was received
+     */
+    WorkManager getFlowExecutionWorkManager();
+
+    /**
+     * @return the {@link TransactionConfig} associated to the {@link MessageSource} that received the message.
+     * If null then no transaction config will be used.
+     */
+    TransactionConfig getTransactionConfig();
+
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessPhase.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessPhase.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessPhase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,52 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+/**
+ *
+ * Defines a phase that process a message using a {@link MessageProcessTemplate}
+ *
+ * The phase will be part of a chain of responsibility were the phase can define
+ * the end of the execution of the set of phases by calling:
+ * - {@link org.mule.message.processing.PhaseResultNotifier#phaseConsumedMessage()} which indicates that the phase has consume the message
+ * and it should not be longer processed
+ * - {@link org.mule.message.processing.PhaseResultNotifier#phaseFailure(Exception)} which indicates that there was a failure
+ * during message processing.
+ *
+ * Whenever a phase finish execution it must call {@link org.mule.message.processing.PhaseResultNotifier#phaseSuccessfully()} which will cause
+ * the next phase to be executed.
+ *
+ * Optionally a {@link MessageProcessPhase} can implement {@link Comparable<MessageProcessPhase>}
+ * to define the order in which it must be positioned in the {@link MessageProcessPhase} chain
+ *
+ */
+public interface MessageProcessPhase<Template extends MessageProcessTemplate>
+{
+
+    /**
+     * Determines if a certain phase supports a given template.
+     *
+     * If phase does not supports the template instance then the phase will be skipped.
+     *
+     * @param messageProcessTemplate template to be processed
+     * @return true if the phase supports this template, false otherwise
+     */
+    boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate);
+
+    /**
+     * Process the template through the phase.
+     *
+     * @param messageProcessTemplate template containing message source specific behavior
+     * @param messageProcessContext provides context information for executing the message
+     * @param phaseResultNotifier notifier that must be advice under certain scenarios
+     */
+    void runPhase(Template messageProcessTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier);
+
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessTemplate.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessTemplate.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessTemplate.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,21 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+/**
+ * Maker interface for every template that can be used in a {@link MessageProcessPhase}
+ *
+ * A {@link MessageProcessTemplate} must contain all the required method that redefines behavior
+ * inside a {@link MessageProcessPhase} and it's particular from the {@link org.mule.api.source.MessageSource}
+ *
+ */
+public interface MessageProcessTemplate
+{
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessingManager.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessingManager.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessingManager.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,20 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+/**
+ * In charge of processing messages through mule.
+ */
+public interface MessageProcessingManager
+{
+
+    void processMessage(MessageProcessTemplate messageProcessTemplate, MessageProcessContext messageProcessContext);
+
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/MuleMessageProcessingManager.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MuleMessageProcessingManager.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/MuleMessageProcessingManager.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,74 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import org.mule.api.MuleContext;
+import org.mule.api.context.MuleContextAware;
+import org.mule.api.lifecycle.Initialisable;
+import org.mule.api.lifecycle.InitialisationException;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Default implementation for {@link MessageProcessingManager}.
+ */
+public class MuleMessageProcessingManager implements MessageProcessingManager, MuleContextAware, Initialisable
+{
+    private final EndProcessPhase endProcessPhase = new EndProcessPhase();
+    private MuleContext muleContext;
+    private PhaseExecutionEngine phaseExecutionEngine;
+
+    @Override
+    public void processMessage(MessageProcessTemplate messageProcessTemplate, MessageProcessContext messageProcessContext)
+    {
+        phaseExecutionEngine.process(messageProcessTemplate, messageProcessContext);
+    }
+
+    @Override
+    public void setMuleContext(MuleContext context)
+    {
+        this.muleContext = context;
+    }
+
+    @Override
+    public void initialise() throws InitialisationException
+    {
+        Collection<MessageProcessPhase> registryMessageProcessPhases = muleContext.getRegistry().lookupObjects(MessageProcessPhase.class);
+        List<MessageProcessPhase> messageProcessPhaseList = new ArrayList<MessageProcessPhase>();
+        if (registryMessageProcessPhases != null)
+        {
+            messageProcessPhaseList.addAll(registryMessageProcessPhases);
+        }
+        messageProcessPhaseList.add(new ValidationPhase());
+        messageProcessPhaseList.add(new FlowProcessingPhase());
+        Collections.sort(messageProcessPhaseList, new Comparator<MessageProcessPhase>()
+        {
+            @Override
+            public int compare(MessageProcessPhase messageProcessPhase, MessageProcessPhase messageProcessPhase2)
+            {
+                int compareValue = 0;
+                if (messageProcessPhase instanceof Comparable)
+                {
+                    compareValue = ((Comparable) messageProcessPhase).compareTo(messageProcessPhase2);
+                }
+                if (compareValue == 0 && messageProcessPhase2 instanceof Comparable)
+                {
+                    compareValue = ((Comparable) messageProcessPhase2).compareTo(messageProcessPhase) * -1;
+                }
+                return compareValue;
+            }
+        });
+        phaseExecutionEngine = new PhaseExecutionEngine(messageProcessPhaseList, muleContext.getExceptionListener(), endProcessPhase);
+    }
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseExecutionEngine.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseExecutionEngine.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseExecutionEngine.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,101 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import org.mule.api.exception.SystemExceptionHandler;
+
+import java.util.List;
+
+public class PhaseExecutionEngine
+{
+
+    private final List<MessageProcessPhase> phaseList;
+    private final SystemExceptionHandler exceptionHandler;
+    private final EndProcessPhase endProcessPhase;
+
+    public PhaseExecutionEngine(List<MessageProcessPhase> messageProcessPhaseList, SystemExceptionHandler exceptionHandler, EndProcessPhase endProcessPhase)
+    {
+        this.phaseList = messageProcessPhaseList;
+        this.exceptionHandler = exceptionHandler;
+        this.endProcessPhase = endProcessPhase;
+    }
+
+    public void process(MessageProcessTemplate messageProcessTemplate, MessageProcessContext messageProcessContext)
+    {
+        InternalPhaseExecutionEngine internalPhaseExecutionEngine = new InternalPhaseExecutionEngine(messageProcessTemplate, messageProcessContext);
+        internalPhaseExecutionEngine.process();
+    }
+
+    public class InternalPhaseExecutionEngine implements PhaseResultNotifier
+    {
+        private int currentPhase = 0;
+        private final MessageProcessContext messageProcessContext;
+        private final MessageProcessTemplate messageProcessTemplate;
+        private boolean endPhaseProcessed;
+
+        public InternalPhaseExecutionEngine(MessageProcessTemplate messageProcessTemplate, MessageProcessContext messageProcessContext)
+        {
+            this.messageProcessTemplate = messageProcessTemplate;
+            this.messageProcessContext = messageProcessContext;
+        }
+
+        @Override
+        public void phaseSuccessfully()
+        {
+            currentPhase++;
+            if (currentPhase < phaseList.size())
+            {
+                phaseList.get(currentPhase).runPhase(messageProcessTemplate,messageProcessContext,this);
+            }
+            else
+            {
+                processEndPhase();
+            }
+        }
+
+        @Override
+        public void phaseConsumedMessage()
+        {
+            processEndPhase();
+        }
+
+        @Override
+        public void phaseFailure(Exception reason)
+        {
+            exceptionHandler.handleException(reason);
+            processEndPhase();
+        }
+
+        private void processEndPhase()
+        {
+            if (!endPhaseProcessed)
+            {
+                endPhaseProcessed = true;
+                if (endProcessPhase.supportsTemplate(messageProcessTemplate))
+                {
+                    endProcessPhase.runPhase((EndPhaseTemplate) messageProcessTemplate, messageProcessContext,this);
+                }
+            }
+        }
+
+        public void process()
+        {
+            for (MessageProcessPhase phase : phaseList)
+            {
+                if (phase.supportsTemplate(messageProcessTemplate))
+                {
+                    phase.runPhase(messageProcessTemplate, messageProcessContext, this);
+                    return;
+                }
+                currentPhase++;
+            }
+        }
+    }
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseResultNotifier.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseResultNotifier.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseResultNotifier.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,35 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+/**
+ *  Notifier used by {@link MessageProcessPhase} in order to
+ *  define the result of the phase execution
+ */
+public interface PhaseResultNotifier
+{
+
+    /**
+     * This method must be called when the phase complete successfully
+     */
+    public void phaseSuccessfully();
+
+    /**
+     * This method must be called when the phase consume the message and the chain should not continue processing the message
+     */
+    public void phaseConsumedMessage();
+
+    /**
+     * This message must be called when a phase execution throw an exception
+     *
+     * @param reason exception that represents the failure in the phase
+     */
+    public void phaseFailure(Exception reason);
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/RequestResponseFlowProcessingPhaseTemplate.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/RequestResponseFlowProcessingPhaseTemplate.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/RequestResponseFlowProcessingPhaseTemplate.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,33 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+
+/**
+ * Extension of {@link FlowProcessingPhaseTemplate} for those {@link org.mule.api.source.MessageSource}
+ * that requires sending a response of the message processed.
+ */
+public interface RequestResponseFlowProcessingPhaseTemplate extends FlowProcessingPhaseTemplate
+{
+
+    /**
+     * Template method to send a response after processing the message.
+     *
+     * This method is executed outside the flow. In case of failure the {@link org.mule.api.exception.SystemExceptionHandler}
+     * will be executed.
+     *
+     * @param muleEvent the event with the content of the response to be sent.
+     * @throws MuleException exception thrown during the response is being sent.
+     */
+    public void sendResponseToClient(MuleEvent muleEvent) throws MuleException;
+
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/ThrottlingPhaseTemplate.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/ThrottlingPhaseTemplate.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/ThrottlingPhaseTemplate.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,28 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import org.mule.api.MuleException;
+
+/**
+ * Template that a {@link org.mule.api.source.MessageSource} must implement
+ * if it wants to participate in the throttling phase when processing a message
+ */
+public interface ThrottlingPhaseTemplate extends FlowProcessingPhaseTemplate
+{
+
+    /**
+     * Discards the message due to ThrottlingPolicy configured for the {@link org.mule.api.source.MessageSource} is exceeded
+     *
+     * @throws MuleException
+     */
+    void discardMessageOnThrottlingExceeded() throws MuleException;
+
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhase.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhase.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,60 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import org.mule.api.MuleContext;
+import org.mule.api.context.MuleContextAware;
+
+/**
+ * This phase validates the incoming message.
+ *
+ * To participate of this phase, {@link MessageProcessTemplate} must implement
+ * {@link ValidationPhaseTemplate}.
+ */
+public class ValidationPhase implements MessageProcessPhase<ValidationPhaseTemplate>, Comparable<MessageProcessPhase>
+{
+
+    @Override
+    public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate)
+    {
+        return messageProcessTemplate instanceof ValidationPhaseTemplate;
+    }
+
+    @Override
+    public void runPhase(ValidationPhaseTemplate validationPhaseTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier)
+    {
+        try
+        {
+            if (!validationPhaseTemplate.validateMessage())
+            {
+                validationPhaseTemplate.discardInvalidMessage();
+                phaseResultNotifier.phaseConsumedMessage();
+            }
+            else
+            {
+                phaseResultNotifier.phaseSuccessfully();
+            }
+        }
+        catch (Exception e)
+        {
+            phaseResultNotifier.phaseFailure(e);
+        }
+    }
+
+    @Override
+    public int compareTo(MessageProcessPhase messageProcessPhase)
+    {
+        if (messageProcessPhase instanceof FlowProcessingPhaseTemplate)
+        {
+            return -1;
+        }
+        return 0;
+    }
+}

Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhaseTemplate.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhaseTemplate.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhaseTemplate.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,38 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import org.mule.api.MuleException;
+
+/**
+ * Phase for validation of the incoming message.
+ *
+ * This template allows to validate a message and discard it in case is invalid.
+ */
+public interface ValidationPhaseTemplate extends MessageProcessTemplate
+{
+    /**
+     * Validates the message content.
+     *
+     * In case that the message is not valid then {@link #discardInvalidMessage()}
+     * will be executed so the implementation can save the reason why the message is invalid
+     * to report why the message has been discarded when {@link #discardInvalidMessage()} is called
+     *
+     * @return false if the message is invalid, true otherwise
+     */
+    boolean validateMessage();
+
+    /**
+     * Discards the message because the validation failed
+     *
+     * @throws org.mule.api.MuleException
+     */
+    void discardInvalidMessage() throws MuleException;
+}

Modified: branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java (25160 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -39,6 +39,9 @@
 import org.mule.context.notification.EndpointMessageNotification;
 import org.mule.execution.TransactionalErrorHandlingExecutionTemplate;
 import org.mule.lifecycle.PrimaryNodeLifecycleNotificationListener;
+import org.mule.message.processing.MessageProcessContext;
+import org.mule.message.processing.MessageProcessTemplate;
+import org.mule.message.processing.MessageProcessingManager;
 import org.mule.session.DefaultMuleSession;
 import org.mule.session.LegacySessionHandler;
 import org.mule.transaction.TransactionCoordination;
@@ -88,6 +91,7 @@
 
     protected ReplyToHandler replyToHandler;
     private PrimaryNodeLifecycleNotificationListener primaryNodeLifecycleNotificationListener;
+    private MessageProcessingManager messageProcessingManager;
 
     /**
      * Creates the Message Receiver
@@ -166,6 +170,8 @@
             primaryNodeLifecycleNotificationListener.register();
         }
 
+        messageProcessingManager = getConnector().getMuleContext().getRegistry().get(MuleProperties.OBJECT_DEFAULT_MESSAGE_PROCESSING_MANAGER);
+
         super.initialise();
     }
 
@@ -217,41 +223,7 @@
             applyInboundTransformers(muleEvent);
         }
 
-        MuleEvent resultEvent = listener.process(muleEvent);
-        if (resultEvent != null
-            && !VoidMuleEvent.getInstance().equals(resultEvent)
-            && resultEvent.getMessage() != null
-            && resultEvent.getMessage().getExceptionPayload() != null
-            && resultEvent.getMessage().getExceptionPayload().getException() instanceof FilterUnacceptedException)
-        {
-            handleUnacceptedFilter(muleEvent.getMessage());
-            return muleEvent;
-        }
-
-        if (endpoint.getExchangePattern().hasResponse() && resultEvent != null
-            && !VoidMuleEvent.getInstance().equals(resultEvent))
-        {
-            // Do not propagate security context back to caller
-            MuleSession resultSession = new DefaultMuleSession(resultEvent.getSession());
-            resultSession.setSecurityContext(null);
-            connector.getSessionHandler().storeSessionInfoToMessage(resultSession, resultEvent.getMessage());
-
-            if (resultEvent.getMessage() != null && !endpoint.isDisableTransportTransformer())
-            {
-                applyResponseTransformers(resultEvent);
-            }
-
-            if (connector.isEnableMessageEvents())
-            {
-                connector.fireNotification(new EndpointMessageNotification(resultEvent.getMessage(),
-                    endpoint, resultEvent.getFlowConstruct(), EndpointMessageNotification.MESSAGE_RESPONSE));
-            }
-            return resultEvent;
-        }
-        else
-        {
-            return null;
-        }
+        return routeEvent(muleEvent);
     }
 
     protected void propagateRootMessageIdProperty(MuleMessage message)
@@ -491,4 +463,49 @@
             }
         }
     }
+
+    public MuleEvent routeEvent(MuleEvent muleEvent) throws MuleException
+    {
+        MuleEvent resultEvent = listener.process(muleEvent);
+        if (resultEvent != null
+            && !VoidMuleEvent.getInstance().equals(resultEvent)
+            && resultEvent.getMessage() != null
+            && resultEvent.getMessage().getExceptionPayload() != null
+            && resultEvent.getMessage().getExceptionPayload().getException() instanceof FilterUnacceptedException)
+        {
+            handleUnacceptedFilter(muleEvent.getMessage());
+            return muleEvent;
+        }
+
+        if (endpoint.getExchangePattern().hasResponse() && resultEvent != null
+            && !VoidMuleEvent.getInstance().equals(resultEvent))
+        {
+            // Do not propagate security context back to caller
+            MuleSession resultSession = new DefaultMuleSession(resultEvent.getSession());
+            resultSession.setSecurityContext(null);
+            connector.getSessionHandler().storeSessionInfoToMessage(resultSession, resultEvent.getMessage());
+
+            if (resultEvent.getMessage() != null && !endpoint.isDisableTransportTransformer())
+            {
+                applyResponseTransformers(resultEvent);
+            }
+
+            if (connector.isEnableMessageEvents())
+            {
+                connector.fireNotification(new EndpointMessageNotification(resultEvent.getMessage(),
+                                                                           endpoint, resultEvent.getFlowConstruct(), EndpointMessageNotification.MESSAGE_RESPONSE));
+            }
+            return resultEvent;
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    protected void processMessage(final MessageProcessTemplate messageProcessTemplate, final MessageProcessContext messageProcessContext)
+    {
+        messageProcessingManager.processMessage(messageProcessTemplate,messageProcessContext);
+    }
+
 }

Added: branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractTransportMessageProcessTemplate.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractTransportMessageProcessTemplate.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractTransportMessageProcessTemplate.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,216 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport;
+
+import org.mule.MessageExchangePattern;
+import org.mule.api.MessagingException;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
+import org.mule.api.config.MuleProperties;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.context.WorkManager;
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.api.source.MessageSource;
+import org.mule.api.transaction.TransactionConfig;
+import org.mule.api.transport.PropertyScope;
+import org.mule.message.processing.FlowProcessingPhaseTemplate;
+import org.mule.message.processing.MessageProcessContext;
+import org.mule.message.processing.ValidationPhaseTemplate;
+import org.mule.util.ObjectUtils;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public abstract class AbstractTransportMessageProcessTemplate<MessageReceiverType extends AbstractMessageReceiver, ConnectorType extends AbstractConnector> implements FlowProcessingPhaseTemplate, ValidationPhaseTemplate, MessageProcessContext
+{
+
+    protected transient Log logger = LogFactory.getLog(getClass());
+
+    private final MessageReceiverType messageReceiver;
+    private Object rawMessage;
+    private WorkManager flowExecutionWorkManager;
+
+    public AbstractTransportMessageProcessTemplate(MessageReceiverType messageReceiver, WorkManager flowExecutionWorkManager)
+    {
+        this.messageReceiver = messageReceiver;
+        this.flowExecutionWorkManager = flowExecutionWorkManager;
+    }
+
+    public MuleEvent getMuleEvent() throws MuleException
+    {
+        MuleMessage messageFromSource = createMessageFromSource(acquireMessage());
+        return createEventFromMuleMessage(messageFromSource);
+    }
+
+    @Override
+    public MessageSource getMessageSource()
+    {
+        return this.messageReceiver;
+    }
+
+
+    @Override
+    public Object getOriginalMessage() throws MuleException
+    {
+
+        if (this.rawMessage == null)
+        {
+            this.rawMessage = acquireMessage();
+        }
+        return this.rawMessage;
+    }
+
+    public void afterFailureProcessingFlow(MessagingException messagingException) throws MuleException
+    {
+    }
+
+    public MuleEvent routeEvent(MuleEvent muleEvent) throws MuleException
+    {
+        return messageReceiver.routeEvent(muleEvent);
+    }
+
+    protected void sendResponseMessage(MuleEvent responseMuleEvent) throws MessagingException
+    {
+    }
+
+    public void afterSuccessfulProcessingFlow(MuleEvent response) throws MuleException
+    {
+        if (messageReceiver.getEndpoint().getExchangePattern().equals(MessageExchangePattern.REQUEST_RESPONSE))
+        {
+            try
+            {
+                sendResponseMessage(response);
+            }
+            catch (MessagingException e)
+            {
+                throw new ResponseDispatchMessagingException(response,e);
+            }
+        }
+    }
+
+    /**
+     * This method will only be called once for the {@link MessageProcessContext}
+     *
+     * @return the raw message from the {@link MessageSource}
+     * @throws MuleException
+     */
+    public abstract Object acquireMessage() throws MuleException;
+
+    protected void propagateRootMessageIdProperty(MuleMessage message)
+    {
+        String rootId = message.getInboundProperty(MuleProperties.MULE_ROOT_MESSAGE_ID_PROPERTY);
+        if (rootId != null)
+        {
+            message.setMessageRootId(rootId);
+            message.removeProperty(MuleProperties.MULE_ROOT_MESSAGE_ID_PROPERTY, PropertyScope.INBOUND);
+        }
+    }
+
+    @Override
+    public boolean validateMessage()
+    {
+        return true;
+    }
+
+    @Override
+    public void discardInvalidMessage() throws MuleException
+    {
+    }
+
+    protected void warnIfMuleClientSendUsed(MuleMessage message)
+    {
+        final Object remoteSyncProperty = message.removeProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY,
+                                                                 PropertyScope.INBOUND);
+        if (ObjectUtils.getBoolean(remoteSyncProperty, false) && !messageReceiver.getEndpoint().getExchangePattern().hasResponse())
+        {
+            logger.warn("MuleClient.send() was used but inbound endpoint "
+                        + messageReceiver.getEndpoint().getEndpointURI().getUri().toString()
+                        + " is not 'request-response'.  No response will be returned.");
+        }
+
+        message.removeProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, PropertyScope.INBOUND);
+    }
+
+    private MuleEvent createEventFromMuleMessage(MuleMessage muleMessage) throws MuleException
+    {
+        MuleEvent muleEvent = messageReceiver.createMuleEvent(muleMessage, null);
+        if (!messageReceiver.getEndpoint().isDisableTransportTransformer())
+        {
+            messageReceiver.applyInboundTransformers(muleEvent);
+        }
+        return muleEvent;
+    }
+
+    protected MuleMessage createMessageFromSource(Object message) throws MuleException
+    {
+        MuleMessage muleMessage = messageReceiver.createMuleMessage(message, messageReceiver.getEndpoint().getEncoding());
+        warnIfMuleClientSendUsed(muleMessage);
+        propagateRootMessageIdProperty(muleMessage);
+        return muleMessage;
+    }
+
+    protected MessageReceiverType getMessageReceiver()
+    {
+        return this.messageReceiver;
+    }
+
+    protected InboundEndpoint getInboundEndpoint()
+    {
+        return this.messageReceiver.getEndpoint();
+    }
+
+    protected ConnectorType getConnector()
+    {
+        return (ConnectorType) this.messageReceiver.getConnector();
+    }
+
+    protected MuleContext getMuleContext()
+    {
+        return this.messageReceiver.getEndpoint().getMuleContext();
+    }
+
+    public FlowConstruct getFlowConstruct()
+    {
+        return this.messageReceiver.getFlowConstruct();
+    }
+
+    @Override
+    public boolean supportsAsynchronousProcessing()
+    {
+        return true;
+    }
+
+    @Override
+    public MuleEvent beforeRouteEvent(MuleEvent muleEvent) throws MuleException
+    {
+        return muleEvent;
+    }
+
+    @Override
+    public MuleEvent afterRouteEvent(MuleEvent muleEvent) throws MuleException
+    {
+        return muleEvent;
+    }
+
+    @Override
+    public WorkManager getFlowExecutionWorkManager()
+    {
+        return flowExecutionWorkManager;
+    }
+
+    @Override
+    public TransactionConfig getTransactionConfig()
+    {
+        return getInboundEndpoint().getTransactionConfig();
+    }
+}
+

Added: branches/mule-3.x/core/src/main/java/org/mule/transport/ResponseDispatchMessagingException.java (0 => 25161)


--- branches/mule-3.x/core/src/main/java/org/mule/transport/ResponseDispatchMessagingException.java	                        (rev 0)
+++ branches/mule-3.x/core/src/main/java/org/mule/transport/ResponseDispatchMessagingException.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,51 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport;
+
+import org.mule.api.MessagingException;
+import org.mule.api.MuleEvent;
+import org.mule.api.processor.MessageProcessor;
+import org.mule.config.i18n.Message;
+
+/**
+ *
+ */
+class ResponseDispatchMessagingException extends MessagingException
+{
+    ResponseDispatchMessagingException(Message message, MuleEvent event)
+    {
+        super(message, event);
+    }
+
+    ResponseDispatchMessagingException(Message message, MuleEvent event, MessageProcessor failingMessageProcessor)
+    {
+        super(message, event, failingMessageProcessor);
+    }
+
+    ResponseDispatchMessagingException(Message message, MuleEvent event, Throwable cause)
+    {
+        super(message, event, cause);
+    }
+
+    ResponseDispatchMessagingException(Message message, MuleEvent event, Throwable cause, MessageProcessor failingMessageProcessor)
+    {
+        super(message, event, cause, failingMessageProcessor);
+    }
+
+    ResponseDispatchMessagingException(MuleEvent event, Throwable cause)
+    {
+        super(event, cause);
+    }
+
+    ResponseDispatchMessagingException(MuleEvent event, Throwable cause, MessageProcessor failingMessageProcessor)
+    {
+        super(event, cause, failingMessageProcessor);
+    }
+}

Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/EndProcessPhaseTestCase.java (0 => 25161)


--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/EndProcessPhaseTestCase.java	                        (rev 0)
+++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/EndProcessPhaseTestCase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,55 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.tck.size.SmallTest;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@SmallTest
+public class EndProcessPhaseTestCase extends AbstractMuleTestCase
+{
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MessageProcessTemplate notSupportedTemplate;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private EndPhaseTemplate supportedTemplate;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private PhaseResultNotifier mockPhaseResultNotifier;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MessageProcessContext mockMessageContext;
+    private EndProcessPhase endProcessPhase = new EndProcessPhase();
+
+    @Test
+    public void supportedTemplates()
+    {
+        new PhaseSupportTestHelper<EndPhaseTemplate>(EndPhaseTemplate.class).testSupportTemplates(endProcessPhase);
+    }
+
+    @Test
+    public void phaseExecution()
+    {
+        endProcessPhase.runPhase(supportedTemplate,mockMessageContext,mockPhaseResultNotifier);
+        verify(mockPhaseResultNotifier, times(0)).phaseConsumedMessage();
+        verify(mockPhaseResultNotifier, times(0)).phaseFailure(any(Exception.class));
+        verify(mockPhaseResultNotifier, times(0)).phaseConsumedMessage();
+        verify(supportedTemplate, times(1)).messageProcessingEnded();
+    }
+
+}

Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/FlowProcessingPhaseTestCase.java (0 => 25161)


--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/FlowProcessingPhaseTestCase.java	                        (rev 0)
+++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/FlowProcessingPhaseTestCase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,130 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.mule.api.MessagingException;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+import org.mule.tck.size.SmallTest;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+
+@RunWith(MockitoJUnitRunner.class)
+@SmallTest
+public class FlowProcessingPhaseTestCase extends AbstractMuleTestCase
+{
+
+    private FlowProcessingPhase phase = new FlowProcessingPhase();
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private FlowProcessingPhaseTemplate mockTemplate;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private RequestResponseFlowProcessingPhaseTemplate mockRequestResponseTemplate;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MessageProcessContext mockContext;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private PhaseResultNotifier mockNotifier;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MessagingException mockMessagingException;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MuleException mockException;
+
+
+    @Test
+    public void supportedTemplates()
+    {
+        new PhaseSupportTestHelper<FlowProcessingPhaseTemplate>(FlowProcessingPhaseTemplate.class).testSupportTemplates(phase);
+    }
+
+    @Test
+    public void order()
+    {
+        assertThat(phase.compareTo(new ValidationPhase()), is(1));
+        assertThat(phase.compareTo(Mockito.mock(MessageProcessPhase.class)), is(0));
+    }
+
+    @Test
+    public void runPhaseWithExceptionThrown() throws Exception
+    {
+        when(mockContext.supportsAsynchronousProcessing()).thenReturn(false);
+        doThrow(mockException).when(mockTemplate).afterSuccessfulProcessingFlow(any(MuleEvent.class));
+        phase.runPhase(mockTemplate, mockContext, mockNotifier);
+        verify(mockContext.getFlowConstruct(), Mockito.times(1)).getExceptionListener();
+        verifyOnlyFailureWasCalled(mockException);
+    }
+
+    @Test
+    public void runPhaseWithMessagingExceptionThrown() throws Exception
+    {
+        when(mockContext.supportsAsynchronousProcessing()).thenReturn(false);
+        when(mockTemplate.routeEvent(Mockito.any(MuleEvent.class))).thenThrow(mockMessagingException);
+        phase.runPhase(mockTemplate, mockContext, mockNotifier);
+        verify(mockContext.getFlowConstruct(), Mockito.times(1)).getExceptionListener();
+        verifyOnlySuccessfulWasCalled();
+    }
+
+    @Test
+    public void callSendResponseForRequestResponseTemplate() throws Exception
+    {
+        when(mockContext.supportsAsynchronousProcessing()).thenReturn(false);
+        phase.runPhase(mockRequestResponseTemplate, mockContext, mockNotifier);
+        verifyOnlySuccessfulWasCalled();
+        verify(mockRequestResponseTemplate, times(1)).sendResponseToClient(any(MuleEvent.class));
+    }
+
+    @Test
+    public void successfulPhaseExecutionInOrder() throws Exception
+    {
+        when(mockContext.supportsAsynchronousProcessing()).thenReturn(false);
+        phase.runPhase(mockRequestResponseTemplate, mockContext, mockNotifier);
+        InOrder inOrderVerify = Mockito.inOrder(mockContext, mockContext.getFlowConstruct(), mockRequestResponseTemplate, mockNotifier);
+        inOrderVerify.verify(mockContext, atLeastOnce()).getTransactionConfig();
+        inOrderVerify.verify(mockContext.getFlowConstruct(), times(1)).getExceptionListener();
+        inOrderVerify.verify(mockRequestResponseTemplate, times(1)).getMuleEvent();
+        inOrderVerify.verify(mockRequestResponseTemplate, times(1)).beforeRouteEvent(any(MuleEvent.class));
+        inOrderVerify.verify(mockRequestResponseTemplate, times(1)).routeEvent(any(MuleEvent.class));
+        inOrderVerify.verify(mockRequestResponseTemplate, times(1)).afterRouteEvent(any(MuleEvent.class));
+        inOrderVerify.verify(mockRequestResponseTemplate, times(1)).afterSuccessfulProcessingFlow(any(MuleEvent.class));
+        inOrderVerify.verify(mockNotifier, times(1)).phaseSuccessfully();
+        verifyOnlySuccessfulWasCalled();
+    }
+
+    private void verifyOnlySuccessfulWasCalled()
+    {
+        verify(mockNotifier, Mockito.times(0)).phaseFailure(any(Exception.class));
+        verify(mockNotifier, Mockito.times(0)).phaseConsumedMessage();
+        verify(mockNotifier, Mockito.times(1)).phaseSuccessfully();
+    }
+
+    private void verifyOnlyFailureWasCalled(Exception e)
+    {
+        verify(mockNotifier, Mockito.times(1)).phaseFailure(e);
+        verify(mockNotifier, Mockito.times(0)).phaseConsumedMessage();
+        verify(mockNotifier, Mockito.times(0)).phaseSuccessfully();
+    }
+
+}

Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/MuleMessageProcessingManagerTestCase.java (0 => 25161)


--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/MuleMessageProcessingManagerTestCase.java	                        (rev 0)
+++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/MuleMessageProcessingManagerTestCase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,204 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.mule.api.DefaultMuleException;
+import org.mule.api.MuleContext;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.exception.SystemExceptionHandler;
+import org.mule.api.lifecycle.InitialisationException;
+import org.mule.tck.size.SmallTest;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+@SmallTest
+public class MuleMessageProcessingManagerTestCase extends org.mule.tck.junit4.AbstractMuleTestCase
+{
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MuleContext mockMuleContext;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private TestMessageProcessTemplateAndContext completeMessageProcessTemplateAndContext;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private SystemExceptionHandler mockExceptionListener;
+
+    @Test
+    public void nullMessageProcessPhaseInRegistry() throws Exception
+    {
+        processAndVerifyDefaultPhasesUsingRegistryPhases(null);
+    }
+
+    @Test
+    public void emptyMessageProcessPhaseInRegistry() throws Exception
+    {
+        processAndVerifyDefaultPhasesUsingRegistryPhases(Collections.<MessageProcessPhase>emptyList());
+    }
+
+    @Test
+    public void notSupportedMessageProcessPhaseInRegistry() throws Exception
+    {
+        MessageProcessPhase notSupportedPhase = createNotSupportedPhase();
+        processAndVerifyDefaultPhasesUsingRegistryPhases(Arrays.asList(notSupportedPhase));
+    }
+
+    @Test
+    public void messageConsumedPreventsNextPhaseToBeExecuted() throws Exception
+    {
+        PhaseAfterValidationBeforeFlow messageProcessPhase = createPhaseAfterValidation();
+        when(completeMessageProcessTemplateAndContext.validateMessage()).thenReturn(true);
+        when(messageProcessPhase.compareTo(any(MessageProcessPhase.class))).thenCallRealMethod();
+        when(messageProcessPhase.supportsTemplate(any(MessageProcessTemplate.class))).thenCallRealMethod();
+        doCallRealMethod().when(messageProcessPhase).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class));
+        MuleMessageProcessingManager manager = createManagerUsingPhasesInRegistry(Arrays.<MessageProcessPhase>asList(messageProcessPhase));
+        manager.processMessage(completeMessageProcessTemplateAndContext, completeMessageProcessTemplateAndContext);
+        verify(completeMessageProcessTemplateAndContext, times(0)).routeEvent(any(MuleEvent.class));
+        verify(completeMessageProcessTemplateAndContext, times(1)).validateMessage();
+        verify(completeMessageProcessTemplateAndContext, times(1)).messageProcessingEnded();
+    }
+
+    private PhaseAfterValidationBeforeFlow createPhaseAfterValidation()
+    {
+        return mock(PhaseAfterValidationBeforeFlow.class,Answers.RETURNS_DEEP_STUBS.get());
+    }
+
+    @Test
+    public void testExceptionHandlerIsCalledDuringPhaseFailure() throws Exception
+    {
+        MessageProcessPhase failureMessageProcessPhase = createFailureMessageProcessPhase();
+        when(mockMuleContext.getExceptionListener()).thenReturn(mockExceptionListener);
+        MuleMessageProcessingManager manager = createManagerUsingPhasesInRegistry(Arrays.asList(failureMessageProcessPhase));
+        manager.processMessage(completeMessageProcessTemplateAndContext, completeMessageProcessTemplateAndContext);
+        verify(mockExceptionListener, times(1)).handleException(any(MuleException.class));
+    }
+
+    private MessageProcessPhase createFailureMessageProcessPhase()
+    {
+        FailureMessageProcessPhase failureMessageProcessPhase = mock(FailureMessageProcessPhase.class, Answers.RETURNS_DEEP_STUBS.get());
+        when(failureMessageProcessPhase.supportsTemplate(any(MessageProcessTemplate.class))).thenCallRealMethod();
+        doAnswer(new Answer()
+        {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                PhaseResultNotifier phaseResultNotifier = (PhaseResultNotifier) invocationOnMock.getArguments()[2];
+                phaseResultNotifier.phaseFailure(new DefaultMuleException("error"));
+                return null;
+            }
+        }).when(failureMessageProcessPhase).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class));
+        return failureMessageProcessPhase;
+    }
+
+    private MessageProcessPhase createNotSupportedPhase()
+    {
+        MessageProcessPhase notSupportedPhase = mock(MessageProcessPhase.class, Answers.RETURNS_DEEP_STUBS.get());
+        when(notSupportedPhase.supportsTemplate(any(MessageProcessTemplate.class))).thenReturn(false);
+        return notSupportedPhase;
+    }
+
+    private void processAndVerifyDefaultPhasesUsingRegistryPhases(Collection<MessageProcessPhase> phasesInRegistry) throws Exception
+    {
+        MuleMessageProcessingManager manager = createManagerUsingPhasesInRegistry(phasesInRegistry);
+        processAndVerifyDefaultPhasesAreExecuted(manager);
+    }
+
+    private MuleMessageProcessingManager createManagerUsingPhasesInRegistry(Collection<MessageProcessPhase> phasesInRegistry) throws InitialisationException
+    {
+        MuleMessageProcessingManager manager = new MuleMessageProcessingManager();
+        manager.setMuleContext(mockMuleContext);
+        when(mockMuleContext.getRegistry().lookupObjects(MessageProcessPhase.class)).thenReturn(phasesInRegistry);
+        manager.initialise();
+        return manager;
+    }
+
+    private void processAndVerifyDefaultPhasesAreExecuted(MuleMessageProcessingManager manager) throws Exception
+    {
+        when(completeMessageProcessTemplateAndContext.validateMessage()).thenReturn(true);
+
+        manager.processMessage(completeMessageProcessTemplateAndContext,completeMessageProcessTemplateAndContext);
+        InOrder verifyInOrder = Mockito.inOrder(completeMessageProcessTemplateAndContext);
+        verifyInOrder.verify(completeMessageProcessTemplateAndContext, times(1)).validateMessage();
+        verifyInOrder.verify(completeMessageProcessTemplateAndContext, times(1)).routeEvent(Mockito.any(MuleEvent.class));
+        verifyInOrder.verify(completeMessageProcessTemplateAndContext, times(1)).messageProcessingEnded();
+    }
+
+    public interface TestMessageProcessTemplateAndContext extends ValidationPhaseTemplate, FlowProcessingPhaseTemplate, EndPhaseTemplate, MessageProcessContext
+    {
+    }
+
+    public abstract class PhaseAfterValidationBeforeFlow implements MessageProcessPhase, Comparable<MessageProcessPhase>
+    {
+
+        @Override
+        public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate)
+        {
+            return true;
+        }
+
+        @Override
+        public int compareTo(MessageProcessPhase messageProcessPhase)
+        {
+            if (messageProcessPhase instanceof ValidationPhase)
+            {
+                return 1;
+            }
+            if (messageProcessPhase instanceof  FlowProcessingPhase)
+            {
+                return -1;
+            }
+            return 0;
+        }
+
+        @Override
+        public void runPhase(MessageProcessTemplate messageProcessTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier)
+        {
+            phaseResultNotifier.phaseConsumedMessage();
+        }
+    }
+
+    public abstract class FailureMessageProcessPhase implements MessageProcessPhase, Comparable<MessageProcessPhase>
+    {
+
+        @Override
+        public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate)
+        {
+            return true;
+        }
+
+        @Override
+        public int compareTo(MessageProcessPhase messageProcessPhase)
+        {
+            return -1;
+        }
+
+    }
+
+}

Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseExecutionEngineTestCase.java (0 => 25161)


--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseExecutionEngineTestCase.java	                        (rev 0)
+++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseExecutionEngineTestCase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,115 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.mule.api.exception.SystemExceptionHandler;
+import org.mule.tck.size.SmallTest;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+@SmallTest
+public class PhaseExecutionEngineTestCase
+{
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private SystemExceptionHandler mockExceptionHandler;
+    private List<MessageProcessPhase> phaseList = new ArrayList<MessageProcessPhase>();
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private EndProcessPhase mockEndPhase;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MessageProcessPhase mockProcessPhase1;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MessageProcessPhase mockProcessPhase2;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MessageProcessPhase mockProcessPhase3;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MessageProcessPhase mockFailingPhase;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private EndPhaseTemplate mockTemplate;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MessageProcessContext mockContext;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private PhaseResultNotifier mockNotifier;
+
+    @Test
+    public void allPhasesRun() throws Exception
+    {
+        when(mockEndPhase.supportsTemplate(mockTemplate)).thenReturn(true);
+        verifyAllPhasesAreRun();
+        verify(mockEndPhase, Mockito.times(1)).runPhase(any(EndPhaseTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class));
+    }
+
+    @Test
+    public void endPhaseDoesNotRun() throws Exception
+    {
+        when(mockEndPhase.supportsTemplate(mockTemplate)).thenReturn(false);
+        verifyAllPhasesAreRun();
+        verify(mockEndPhase, Mockito.times(0)).runPhase(any(EndPhaseTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class));
+    }
+
+    @Test
+    public void exceptionHandlerIsCalledOnFailure() throws Exception
+    {
+        addPhase(mockFailingPhase);
+        addPhase(mockProcessPhase1);
+        when(mockEndPhase.supportsTemplate(mockTemplate)).thenReturn(true);
+        PhaseExecutionEngine phaseExecutionEngine = new PhaseExecutionEngine(phaseList, mockExceptionHandler, mockEndPhase);
+        phaseExecutionEngine.process(mockTemplate,mockContext);
+        verify(mockEndPhase,times(1)).runPhase(any(EndPhaseTemplate.class),any(MessageProcessContext.class), any(PhaseResultNotifier.class));
+    }
+
+    private void verifyAllPhasesAreRun()
+    {
+        PhaseExecutionEngine engine = new PhaseExecutionEngine(phaseList,mockExceptionHandler, mockEndPhase);
+        addAllPhases();
+        engine.process(mockTemplate, mockContext);
+        verify(mockProcessPhase1, Mockito.times(1)).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class));
+        verify(mockProcessPhase2, Mockito.times(1)).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class));
+        verify(mockProcessPhase3, Mockito.times(1)).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class));
+    }
+
+    private void addAllPhases()
+    {
+        addPhase(mockProcessPhase1);
+        addPhase(mockProcessPhase2);
+        addPhase(mockProcessPhase3);
+    }
+
+    private void addPhase(MessageProcessPhase mockProcessPhase)
+    {
+        phaseList.add(mockProcessPhase);
+        when(mockProcessPhase.supportsTemplate(mockTemplate)).thenReturn(true);
+        Mockito.doAnswer(new Answer()
+        {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable
+            {
+                ((PhaseResultNotifier)invocationOnMock.getArguments()[2]).phaseSuccessfully();
+                return null;
+            }
+        }).when(mockProcessPhase).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class));
+    }
+}

Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseSupportTestHelper.java (0 => 25161)


--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseSupportTestHelper.java	                        (rev 0)
+++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseSupportTestHelper.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,47 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+import org.mockito.Mockito;
+
+public class PhaseSupportTestHelper<T>
+{
+
+    private final Class<T> supportedTemplateClass;
+    private final T supportedTemplate;
+    private final MessageProcessTemplate notSupportedTemplate;
+
+    public PhaseSupportTestHelper(Class<T> supportedTemplate)
+    {
+        this.supportedTemplateClass = supportedTemplate;
+        this.supportedTemplate = Mockito.mock(this.supportedTemplateClass);
+        this.notSupportedTemplate = Mockito.mock(MessageProcessTemplate.class);
+    }
+
+    public void testSupportTemplates(MessageProcessPhase messageProcessPhase)
+    {
+        notSupportedTemplateTest(messageProcessPhase);
+        supportedTemplateTest(messageProcessPhase);
+    }
+
+    public void notSupportedTemplateTest(MessageProcessPhase messageProcessPhase)
+    {
+        assertThat(messageProcessPhase.supportsTemplate(notSupportedTemplate), is(false));
+    }
+
+    public void supportedTemplateTest(MessageProcessPhase messageProcessPhase)
+    {
+        assertThat(messageProcessPhase.supportsTemplate((MessageProcessTemplate) supportedTemplate), is(true));
+    }
+
+}

Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/ValidationPhaseTestCase.java (0 => 25161)


--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/ValidationPhaseTestCase.java	                        (rev 0)
+++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/ValidationPhaseTestCase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,73 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.message.processing;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.mule.api.MuleException;
+import org.mule.tck.junit4.AbstractMuleContextTestCase;
+import org.mule.tck.size.SmallTest;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Answers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+@SmallTest
+public class ValidationPhaseTestCase extends AbstractMuleContextTestCase
+{
+
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private ValidationPhaseTemplate mockTemplate;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MessageProcessContext mockContext;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private PhaseResultNotifier mockPhaseResultNotifier;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MuleException mockMulException;
+
+    @Test
+    public void supportsTemplate()
+    {
+        new PhaseSupportTestHelper<ValidationPhaseTemplate>(ValidationPhaseTemplate.class).testSupportTemplates(new ValidationPhase());
+    }
+
+    @Test
+    public void valid()
+    {
+        when(mockTemplate.validateMessage()).thenReturn(true);
+        new ValidationPhase().runPhase(mockTemplate, mockContext, mockPhaseResultNotifier);
+        verify(mockPhaseResultNotifier, Mockito.times(1)).phaseSuccessfully();
+    }
+
+    @Test
+    public void invalid() throws Exception
+    {
+        when(mockTemplate.validateMessage()).thenReturn(false);
+        new ValidationPhase().runPhase(mockTemplate, mockContext, mockPhaseResultNotifier);
+        verify(mockTemplate,times(1)).discardInvalidMessage();
+        verify(mockPhaseResultNotifier, Mockito.times(1)).phaseConsumedMessage();
+    }
+
+    @Test
+    public void validationFails() throws Exception
+    {
+        when(mockTemplate.validateMessage()).thenReturn(false);
+        doThrow(mockMulException).when(mockTemplate).discardInvalidMessage();
+        new ValidationPhase().runPhase(mockTemplate, mockContext, mockPhaseResultNotifier);
+        verify(mockPhaseResultNotifier, Mockito.times(1)).phaseFailure(mockMulException);
+    }
+}

Modified: branches/mule-3.x/modules/spring-config/src/main/resources/default-mule-config.xml (25160 => 25161)


--- branches/mule-3.x/modules/spring-config/src/main/resources/default-mule-config.xml	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/modules/spring-config/src/main/resources/default-mule-config.xml	2013-01-06 00:06:58 UTC (rev 25161)
@@ -60,6 +60,8 @@
 
     <bean name="_muleSecurityManager" class="org.mule.security.MuleSecurityManager"/>
 
+    <bean name="_muleMessageProcessingManager" class="org.mule.message.processing.MuleMessageProcessingManager"/>
+
     <bean name="_muleProperties" class="java.util.HashMap"/>
 
     <bean name="_muleEndpointFactory" class="org.mule.endpoint.DefaultEndpointFactory"/>

Added: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageProcessTemplate.java (0 => 25161)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageProcessTemplate.java	                        (rev 0)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageProcessTemplate.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,417 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.http;
+
+import org.mule.DefaultMuleEvent;
+import org.mule.DefaultMuleMessage;
+import org.mule.OptimizedRequestContext;
+import org.mule.RequestContext;
+import org.mule.api.DefaultMuleException;
+import org.mule.api.MessagingException;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.MuleMessage;
+import org.mule.api.config.MuleProperties;
+import org.mule.api.context.WorkManager;
+import org.mule.api.endpoint.ImmutableEndpoint;
+import org.mule.api.transaction.TransactionConfig;
+import org.mule.api.transport.PropertyScope;
+import org.mule.config.ExceptionHelper;
+import org.mule.message.processing.RequestResponseFlowProcessingPhaseTemplate;
+import org.mule.transport.AbstractTransportMessageProcessTemplate;
+import org.mule.message.processing.EndPhaseTemplate;
+import org.mule.message.processing.ThrottlingPhaseTemplate;
+import org.mule.transport.NullPayload;
+import org.mule.transport.http.i18n.HttpMessages;
+import org.mule.util.concurrent.Latch;
+
+import java.io.IOException;
+
+import org.apache.commons.httpclient.Header;
+import org.apache.commons.httpclient.HttpVersion;
+
+public class HttpMessageProcessTemplate extends AbstractTransportMessageProcessTemplate<HttpMessageReceiver, HttpConnector> implements ThrottlingPhaseTemplate, EndPhaseTemplate, RequestResponseFlowProcessingPhaseTemplate
+{
+
+    public static final int MESSAGE_DISCARD_STATUS_CODE = Integer.valueOf(System.getProperty("mule.transport.http.throttling.discardstatuscode","429"));
+
+    private final HttpServerConnection httpServerConnection;
+    private HttpRequest request;
+    private boolean badRequest;
+    private Latch messageProcessedLatch = new Latch();
+    private boolean failureSendingResponse;
+
+    public HttpMessageProcessTemplate(final HttpMessageReceiver messageReceiver, final HttpServerConnection httpServerConnection, final WorkManager flowExecutionWorkManager)
+    {
+        super(messageReceiver,flowExecutionWorkManager);
+        this.httpServerConnection = httpServerConnection;
+    }
+
+    @Override
+    public void afterFailureProcessingFlow(MessagingException messagingException) throws MuleException
+    {
+        if (!failureSendingResponse)
+        {
+            Exception e = messagingException;
+            MuleEvent response = messagingException.getEvent();
+            if (response != null &&
+                response.getMessage().getExceptionPayload() != null &&
+                response.getMessage().getExceptionPayload().getException() instanceof MessagingException)
+            {
+                e = (Exception) response.getMessage().getExceptionPayload().getException();
+            }
+
+            String temp = ExceptionHelper.getErrorMapping(getInboundEndpoint().getConnector().getProtocol(), messagingException.getClass(), getMuleContext());
+            int httpStatus = Integer.valueOf(temp);
+            try
+            {
+                if (e instanceof  MessagingException)
+                {
+                    httpStatus = response.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY) != null ? Integer.valueOf(response.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY).toString()) : httpStatus;
+                    httpServerConnection.writeFailureResponse(httpStatus,e.getMessage());
+                }
+                else
+                {
+                    httpServerConnection.writeFailureResponse(httpStatus, e.getMessage());
+                }
+            }
+            catch (IOException ioException)
+            {
+                throw new DefaultMuleException(ioException);
+            }
+        }
+    }
+
+    @Override
+    public void sendResponseToClient(MuleEvent muleEvent) throws MuleException
+    {
+        sendHttpResponse(muleEvent);
+    }
+
+    @Override
+    public MuleEvent beforeRouteEvent(MuleEvent muleEvent) throws MuleException
+    {
+        try
+        {
+            sendExpect100(request);
+            return muleEvent;
+        }
+        catch (IOException e)
+        {
+            throw new DefaultMuleException(e);
+        }
+    }
+
+    private void sendExpect100(HttpRequest request) throws MuleException, IOException
+    {
+        RequestLine requestLine = request.getRequestLine();
+
+        // respond with status code 100, for Expect handshake
+        // according to rfc 2616 and http 1.1
+        // the processing will continue and the request will be fully
+        // read immediately after
+        HttpVersion requestVersion = requestLine.getHttpVersion();
+        if (HttpVersion.HTTP_1_1.equals(requestVersion))
+        {
+            Header expectHeader = request.getFirstHeader(HttpConstants.HEADER_EXPECT);
+            if (expectHeader != null)
+            {
+                String expectHeaderValue = expectHeader.getValue();
+                if (HttpConstants.HEADER_EXPECT_CONTINUE_REQUEST_VALUE.equals(expectHeaderValue))
+                {
+                    HttpResponse expected = new HttpResponse();
+                    expected.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_CONTINUE);
+                    final DefaultMuleEvent event = new DefaultMuleEvent(new DefaultMuleMessage(expected,
+                                                  getMuleContext()), getInboundEndpoint(), getFlowConstruct());
+                    RequestContext.setEvent(event);
+                    httpServerConnection.writeResponse(transformResponse(expected));
+                }
+            }
+        }
+    }
+
+    private void sendHttpResponse(MuleEvent responseMuleEvent) throws MessagingException
+    {
+        try
+        {
+            if (logger.isTraceEnabled())
+            {
+                logger.trace("Sending http response");
+            }
+            MuleMessage returnMessage = responseMuleEvent == null ? null : responseMuleEvent.getMessage();
+
+            Object tempResponse;
+            if (returnMessage != null)
+            {
+                tempResponse = returnMessage.getPayload();
+            }
+            else
+            {
+                tempResponse = NullPayload.getInstance();
+            }
+            // This removes the need for users to explicitly adding the response transformer
+            // ObjectToHttpResponse in their config
+            HttpResponse response;
+            if (tempResponse instanceof HttpResponse)
+            {
+                response = (HttpResponse) tempResponse;
+            }
+            else
+            {
+                response = transformResponse(returnMessage);
+            }
+
+            response.setupKeepAliveFromRequestVersion(request.getRequestLine().getHttpVersion());
+            HttpConnector httpConnector = (HttpConnector) getMessageReceiver().getEndpoint().getConnector();
+            response.disableKeepAlive(!httpConnector.isKeepAlive());
+
+            Header connectionHeader = request.getFirstHeader("Connection");
+            if (connectionHeader != null)
+            {
+                String value = connectionHeader.getValue();
+                boolean endpointOverride = getEndpointKeepAliveValue(getMessageReceiver().getEndpoint());
+                if ("keep-alive".equalsIgnoreCase(value) && endpointOverride)
+                {
+                    response.setKeepAlive(true);
+
+                    if (response.getHttpVersion().equals(HttpVersion.HTTP_1_0))
+                    {
+                        connectionHeader = new Header(HttpConstants.HEADER_CONNECTION, "Keep-Alive");
+                        response.setHeader(connectionHeader);
+                    }
+                }
+                else if ("close".equalsIgnoreCase(value))
+                {
+                    response.setKeepAlive(false);
+                }
+            }
+            try
+            {
+                httpServerConnection.writeResponse(response);
+            }
+            catch (Exception e)
+            {
+                failureSendingResponse = true;
+            }
+            if (logger.isTraceEnabled())
+            {
+                logger.trace("HTTP response sent successfully");
+            }
+        }
+        catch (Exception e)
+        {
+            if (logger.isDebugEnabled())
+            {
+                logger.debug("Exception while sending http response");
+                logger.debug(e);
+            }
+            throw new MessagingException(responseMuleEvent,e);
+        }
+    }
+
+    /**
+     * Check if endpoint has a keep-alive property configured. Note the translation from
+     * keep-alive in the schema to keepAlive here.
+     */
+    private boolean getEndpointKeepAliveValue(ImmutableEndpoint ep)
+    {
+        String value = (String) ep.getProperty("keepAlive");
+        if (value != null)
+        {
+            return Boolean.parseBoolean(value);
+        }
+        return true;
+    }
+
+    protected HttpResponse transformResponse(Object response) throws MuleException
+    {
+        MuleMessage message;
+        if (response instanceof MuleMessage)
+        {
+            message = (MuleMessage) response;
+        }
+        else
+        {
+            message = new DefaultMuleMessage(response, getMessageReceiver().getEndpoint().getConnector().getMuleContext());
+        }
+        //TODO RM*: Maybe we can have a generic Transformer wrapper rather that using DefaultMuleMessage (or another static utility
+        //class
+        message.applyTransformers(null, getMessageReceiver().getResponseTransportTransformers(), HttpResponse.class);
+        return (HttpResponse) message.getPayload();
+    }
+
+    protected MuleMessage createMessageFromSource(Object message) throws MuleException
+    {
+        MuleMessage muleMessage = super.createMessageFromSource(message);
+        String path = muleMessage.getInboundProperty(HttpConnector.HTTP_REQUEST_PROPERTY);
+        int i = path.indexOf('?');
+        if (i > -1)
+        {
+            path = path.substring(0, i);
+        }
+
+        muleMessage.setProperty(HttpConnector.HTTP_REQUEST_PATH_PROPERTY, path, PropertyScope.INBOUND);
+
+        if (logger.isDebugEnabled())
+        {
+            logger.debug(muleMessage.getInboundProperty(HttpConnector.HTTP_REQUEST_PROPERTY));
+        }
+
+        // determine if the request path on this request denotes a different receiver
+        //final MessageReceiver receiver = getTargetReceiver(message, endpoint);
+
+        // the response only needs to be transformed explicitly if
+        // A) the request was not served or B) a null result was returned
+        String contextPath = HttpConnector.normalizeUrl(getInboundEndpoint().getEndpointURI().getPath());
+        muleMessage.setProperty(HttpConnector.HTTP_CONTEXT_PATH_PROPERTY,
+                            contextPath,
+                            PropertyScope.INBOUND);
+
+        muleMessage.setProperty(HttpConnector.HTTP_CONTEXT_URI_PROPERTY,
+                                getInboundEndpoint().getEndpointURI().getAddress(),
+                            PropertyScope.INBOUND);
+
+        muleMessage.setProperty(HttpConnector.HTTP_RELATIVE_PATH_PROPERTY,
+                            processRelativePath(contextPath, path),
+                            PropertyScope.INBOUND);
+
+        muleMessage.setProperty(MuleProperties.MULE_REMOTE_CLIENT_ADDRESS, httpServerConnection.getRemoteClientAddress(), PropertyScope.INBOUND);
+        return muleMessage;
+    }
+
+    protected String processRelativePath(String contextPath, String path)
+    {
+        String relativePath = path.substring(contextPath.length());
+        if (relativePath.startsWith("/"))
+        {
+            return relativePath.substring(1);
+        }
+        return relativePath;
+    }
+
+    @Override
+    public Object acquireMessage() throws MuleException
+    {
+        final HttpRequest request;
+        try
+        {
+            request = httpServerConnection.readRequest();
+        }
+        catch (IOException e)
+        {
+            throw new DefaultMuleException(e);
+        }
+        if (request == null)
+        {
+            throw new HttpMessageReceiver.EmptyRequestException();
+        }
+        this.request = request;
+        return request;
+    }
+
+    public boolean validateMessage()
+    {
+        try
+        {
+            this.request = httpServerConnection.readRequest();
+            if (request == null)
+            {
+                return false;
+            }
+
+            RequestLine requestLine = request.getRequestLine();
+            String method = requestLine.getMethod();
+
+            if (!(method.equals(HttpConstants.METHOD_GET)
+                || method.equals(HttpConstants.METHOD_HEAD)
+                || method.equals(HttpConstants.METHOD_POST)
+                || method.equals(HttpConstants.METHOD_OPTIONS)
+                || method.equals(HttpConstants.METHOD_PUT)
+                || method.equals(HttpConstants.METHOD_DELETE)
+                || method.equals(HttpConstants.METHOD_TRACE)
+                || method.equals(HttpConstants.METHOD_CONNECT)
+                || method.equals(HttpConstants.METHOD_PATCH)))
+            {
+                badRequest = true;
+                return false;
+            }
+        }
+        catch (IOException e)
+        {
+            return false;
+        }
+        return true;
+    }
+
+
+    @Override
+    public void discardInvalidMessage() throws MuleException
+    {
+        if (badRequest)
+        {
+            try
+            {
+                httpServerConnection.writeResponse(doBad(request.getRequestLine()));
+            }
+            catch (IOException e)
+            {
+                throw new DefaultMuleException(e);
+            }
+        }
+    }
+
+    @Override
+    public boolean supportsAsynchronousProcessing()
+    {
+        return true;
+    }
+
+    protected HttpResponse doBad(RequestLine requestLine) throws MuleException
+    {
+        MuleMessage message = getMessageReceiver().createMuleMessage(null);
+        MuleEvent event = new DefaultMuleEvent(message, getInboundEndpoint(), getFlowConstruct());
+        OptimizedRequestContext.unsafeSetEvent(event);
+        HttpResponse response = new HttpResponse();
+        response.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_BAD_REQUEST);
+        response.setBody(HttpMessages.malformedSyntax().toString() + HttpConstants.CRLF);
+        return transformResponse(response);
+    }
+
+    protected HttpServerConnection getHttpServerConnection()
+    {
+        return httpServerConnection;
+    }
+
+    public Latch getMessageProcessedLatch()
+    {
+        return messageProcessedLatch;
+    }
+
+    @Override
+    public void discardMessageOnThrottlingExceeded() throws MuleException
+    {
+        try
+        {
+            httpServerConnection.writeFailureResponse(MESSAGE_DISCARD_STATUS_CODE, "API calls exceeded");
+        }
+        catch (IOException e)
+        {
+            throw new DefaultMuleException(e);
+        }
+    }
+
+    @Override
+    public void messageProcessingEnded()
+    {
+        messageProcessedLatch.release();
+    }
+
+
+}

Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageReceiver.java (25160 => 25161)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageReceiver.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageReceiver.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -27,6 +27,7 @@
 import org.mule.api.execution.ExecutionTemplate;
 import org.mule.api.lifecycle.CreateException;
 import org.mule.api.lifecycle.InitialisationException;
+import org.mule.api.transformer.Transformer;
 import org.mule.api.transport.Connector;
 import org.mule.api.transport.PropertyScope;
 import org.mule.config.ExceptionHelper;
@@ -34,11 +35,13 @@
 import org.mule.config.i18n.MessageFactory;
 import org.mule.transport.AbstractMessageReceiver;
 import org.mule.transport.ConnectException;
+import org.mule.message.processing.MessageProcessContext;
 import org.mule.transport.NullPayload;
 import org.mule.transport.http.i18n.HttpMessages;
 import org.mule.util.MapUtils;
 
 import java.io.IOException;
+import java.util.List;
 
 import javax.resource.spi.work.Work;
 
@@ -70,345 +73,16 @@
         ((HttpConnector) connector).disconnect(endpoint.getEndpointURI());
     }
 
-    public Work createWork(HttpServerConnection httpServerConnection) throws IOException
+    MessageProcessContext createMessageContext(HttpServerConnection httpServerConnection)
     {
-        return new HttpWorker(httpServerConnection);
+        return new HttpMessageProcessTemplate(this,httpServerConnection,getWorkManager());
     }
 
-    @SuppressWarnings("synthetic-access")
-    protected class HttpWorker implements Work
+    void processRequest(HttpServerConnection httpServerConnection) throws InterruptedException, MuleException
     {
-
-        private HttpServerConnection conn;
-        private String remoteClientAddress;
-
-        public HttpWorker(HttpServerConnection httpServerConnection) throws IOException
-        {
-            String encoding = endpoint.getEncoding();
-            if (encoding == null)
-            {
-                encoding = connector.getMuleContext().getConfiguration().getDefaultEncoding();
-            }
-            conn = httpServerConnection;
-            remoteClientAddress = conn.getRemoteClientAddress();
-        }
-
-        protected HttpServerConnection getServerConnection()
-        {
-            return conn;
-        }
-
-        @Override
-        public void run()
-        {
-            try
-            {
-                final HttpRequest request = conn.readRequest();
-                if (request == null)
-                {
-                    throw new EmptyRequestException();
-                }
-
-                try
-                {
-                    HttpResponse httpResponse = processRequest(request);
-                    conn.writeResponse(httpResponse);
-                }
-                catch (Exception e)
-                {
-                    MuleEvent response = null;
-                    if (e instanceof MessagingException)
-                    {
-                        response = ((MessagingException) e).getEvent();
-                    }
-                    else
-                    {
-                        getConnector().getMuleContext().getExceptionListener().handleException(e);
-                    }
-
-                    if (response != null &&
-                        response.getMessage().getExceptionPayload() != null &&
-                        response.getMessage().getExceptionPayload().getException() instanceof MessagingException)
-                    {
-                        e = (Exception) response.getMessage().getExceptionPayload().getException();
-                    }
-                    //MULE-5656 There was custom code here for mapping status codes to exceptions
-                    //I have removed this code and now make an explicit call to the Exception helper,
-                    //but the real fix is to make sure Mule handles this automatically through the
-                    //InboundExceptionDetailsMessageProcessor
-
-                    //Response code mappings are loaded from META-INF/services/org/mule/config/http-exception-mappings.properties
-                    String temp = ExceptionHelper.getErrorMapping(connector.getProtocol(), e.getClass(), flowConstruct.getMuleContext());
-                    int httpStatus = Integer.valueOf(temp);
-
-                    if (e instanceof MessagingException)
-                    {
-                        MuleEvent event = ((MessagingException) e).getEvent();
-                        httpStatus = event.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY) != null ? Integer.valueOf(event.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY).toString()) : httpStatus;
-                        conn.writeResponse(buildFailureResponse(event, e.getMessage(), httpStatus));
-                    }
-                    else
-                    {
-                        conn.writeFailureResponse(httpStatus, e.getMessage());
-                    }
-                    throw new FailureProcessingRequestException();
-                }
-                finally
-                {
-                    if (request.getBody() != null)
-                    {
-                        request.getBody().close();
-                    }
-                }
-            }
-            catch (EmptyRequestException e)
-            {
-                throw e;
-            }
-            catch (FailureProcessingRequestException e)
-            {
-                throw e;
-            }
-            catch (Exception e)
-            {
-                getConnector().getMuleContext().getExceptionListener().handleException(e);
-            }
-        }
-
-
-        protected HttpResponse processRequest(HttpRequest request) throws MuleException, IOException
-        {
-            RequestLine requestLine = request.getRequestLine();
-            String method = requestLine.getMethod();
-
-            if (method.equals(HttpConstants.METHOD_GET)
-                || method.equals(HttpConstants.METHOD_HEAD)
-                || method.equals(HttpConstants.METHOD_POST)
-                || method.equals(HttpConstants.METHOD_OPTIONS)
-                || method.equals(HttpConstants.METHOD_PUT)
-                || method.equals(HttpConstants.METHOD_DELETE)
-                || method.equals(HttpConstants.METHOD_TRACE)
-                || method.equals(HttpConstants.METHOD_CONNECT)
-                || method.equals(HttpConstants.METHOD_PATCH))
-            {
-                return doRequest(request);
-            }
-            else
-            {
-                return doBad(requestLine);
-            }
-        }
-
-        protected HttpResponse doRequest(HttpRequest request) throws IOException, MuleException
-        {
-            sendExpect100(request);
-
-            final MuleMessage message = createMuleMessage(request);
-
-            String path = message.getInboundProperty(HttpConnector.HTTP_REQUEST_PROPERTY);
-            int i = path.indexOf('?');
-            if (i > -1)
-            {
-                path = path.substring(0, i);
-            }
-
-            message.setProperty(HttpConnector.HTTP_REQUEST_PATH_PROPERTY, path, PropertyScope.INBOUND);
-
-            if (logger.isDebugEnabled())
-            {
-                logger.debug(message.getInboundProperty(HttpConnector.HTTP_REQUEST_PROPERTY));
-            }
-
-            // determine if the request path on this request denotes a different receiver
-            //final MessageReceiver receiver = getTargetReceiver(message, endpoint);
-
-            HttpResponse response;
-            // the response only needs to be transformed explicitly if
-            // A) the request was not served or B) a null result was returned
-            String contextPath = HttpConnector.normalizeUrl(getEndpointURI().getPath());
-            message.setProperty(HttpConnector.HTTP_CONTEXT_PATH_PROPERTY,
-                                contextPath,
-                                PropertyScope.INBOUND);
-
-            message.setProperty(HttpConnector.HTTP_CONTEXT_URI_PROPERTY,
-                                getEndpointURI().getAddress(),
-                                PropertyScope.INBOUND);
-
-            message.setProperty(HttpConnector.HTTP_RELATIVE_PATH_PROPERTY,
-                                processRelativePath(contextPath, path),
-                                PropertyScope.INBOUND);
-
-            ExecutionTemplate<MuleEvent> executionTemplate = createExecutionTemplate();
-
-            MuleEvent returnEvent;
-            try
-            {
-                returnEvent = executionTemplate.execute(new ExecutionCallback<MuleEvent>()
-                {
-                    @Override
-                    public MuleEvent process() throws Exception
-                    {
-                        preRouteMessage(message);
-                        return routeMessage(message);
-                    }
-                });
-            }
-            catch (MuleException e)
-            {
-                throw e;
-            }
-            catch (IOException e)
-            {
-                throw e;
-            }
-            catch (Exception e)
-            {
-                throw new DefaultMuleException(e);
-            }
-
-            MuleMessage returnMessage = returnEvent == null ? null : returnEvent.getMessage();
-
-            Object tempResponse;
-            if (returnMessage != null)
-            {
-                tempResponse = returnMessage.getPayload();
-            }
-            else
-            {
-                tempResponse = NullPayload.getInstance();
-            }
-            // This removes the need for users to explicitly adding the response transformer
-            // ObjectToHttpResponse in their config
-            if (tempResponse instanceof HttpResponse)
-            {
-                response = (HttpResponse) tempResponse;
-            }
-            else
-            {
-                response = transformResponse(returnMessage);
-            }
-
-            response.setupKeepAliveFromRequestVersion(request.getRequestLine().getHttpVersion());
-            HttpConnector httpConnector = (HttpConnector) connector;
-            response.disableKeepAlive(!httpConnector.isKeepAlive());
-
-            Header connectionHeader = request.getFirstHeader("Connection");
-            if (connectionHeader != null)
-            {
-                String value = connectionHeader.getValue();
-                boolean endpointOverride = getEndpointKeepAliveValue(endpoint);
-                if ("keep-alive".equalsIgnoreCase(value) && endpointOverride)
-                {
-                    response.setKeepAlive(true);
-
-                    if (response.getHttpVersion().equals(HttpVersion.HTTP_1_0))
-                    {
-                        connectionHeader = new Header(HttpConstants.HEADER_CONNECTION, "Keep-Alive");
-                        response.setHeader(connectionHeader);
-                    }
-                }
-                else if ("close".equalsIgnoreCase(value))
-                {
-                    response.setKeepAlive(false);
-                }
-            }
-            return response;
-        }
-
-        /**
-         * Check if endpoint has a keep-alive property configured. Note the translation from
-         * keep-alive in the schema to keepAlive here.
-         */
-        private boolean getEndpointKeepAliveValue(ImmutableEndpoint ep)
-        {
-            String value = (String) ep.getProperty("keepAlive");
-            if (value != null)
-            {
-                return Boolean.parseBoolean(value);
-            }
-            return true;
-        }
-
-
-        protected HttpResponse doOtherValid(RequestLine requestLine, String method) throws MuleException
-        {
-            MuleMessage message = createMuleMessage(null);
-            MuleEvent event = new DefaultMuleEvent(message, (InboundEndpoint) endpoint, flowConstruct);
-            OptimizedRequestContext.unsafeSetEvent(event);
-            HttpResponse response = new HttpResponse();
-            response.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_METHOD_NOT_ALLOWED);
-            response.setBody(HttpMessages.methodNotAllowed(method).toString() + HttpConstants.CRLF);
-            return transformResponse(response);
-        }
-
-        protected HttpResponse doBad(RequestLine requestLine) throws MuleException
-        {
-            MuleMessage message = createMuleMessage(null);
-            MuleEvent event = new DefaultMuleEvent(message, (InboundEndpoint) endpoint, flowConstruct);
-            OptimizedRequestContext.unsafeSetEvent(event);
-            HttpResponse response = new HttpResponse();
-            response.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_BAD_REQUEST);
-            response.setBody(HttpMessages.malformedSyntax().toString() + HttpConstants.CRLF);
-            return transformResponse(response);
-        }
-
-        private void sendExpect100(HttpRequest request) throws MuleException, IOException
-        {
-            RequestLine requestLine = request.getRequestLine();
-
-            // respond with status code 100, for Expect handshake
-            // according to rfc 2616 and http 1.1
-            // the processing will continue and the request will be fully
-            // read immediately after
-            HttpVersion requestVersion = requestLine.getHttpVersion();
-            if (HttpVersion.HTTP_1_1.equals(requestVersion))
-            {
-                Header expectHeader = request.getFirstHeader(HttpConstants.HEADER_EXPECT);
-                if (expectHeader != null)
-                {
-                    String expectHeaderValue = expectHeader.getValue();
-                    if (HttpConstants.HEADER_EXPECT_CONTINUE_REQUEST_VALUE.equals(expectHeaderValue))
-                    {
-                        HttpResponse expected = new HttpResponse();
-                        expected.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_CONTINUE);
-                        final DefaultMuleEvent event = new DefaultMuleEvent(new DefaultMuleMessage(expected,
-                                                                                                   connector.getMuleContext()), (InboundEndpoint) endpoint, flowConstruct);
-                        RequestContext.setEvent(event);
-                        conn.writeResponse(transformResponse(expected));
-                    }
-                }
-            }
-        }
-
-        private HttpResponse buildFailureResponse(MuleEvent event, String description, int httpStatusCode) throws MuleException
-        {
-            event.getMessage().setOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY, httpStatusCode);
-            event.getMessage().setPayload(description);
-            return transformResponse(event.getMessage());
-        }
-
-        protected HttpResponse buildFailureResponse(HttpVersion version, int statusCode, String description) throws MuleException
-        {
-            HttpResponse response = new HttpResponse();
-            response.setStatusLine(version, statusCode);
-            response.setBody(description);
-            DefaultMuleEvent event = new DefaultMuleEvent(new DefaultMuleMessage(response,
-                                                                                 connector.getMuleContext()), (InboundEndpoint) endpoint, flowConstruct);
-            RequestContext.setEvent(event);
-            // The DefaultResponseTransformer will set the necessary headers
-            return transformResponse(response);
-        }
-
-        protected void preRouteMessage(MuleMessage message) throws MessagingException
-        {
-            message.setProperty(MuleProperties.MULE_REMOTE_CLIENT_ADDRESS, remoteClientAddress, PropertyScope.INBOUND);
-        }
-
-        @Override
-        public void release()
-        {
-            //Nothing to do.
-        }
+        HttpMessageProcessTemplate messageContext = (HttpMessageProcessTemplate) createMessageContext(httpServerConnection);
+        processMessage(messageContext,messageContext);
+        messageContext.getMessageProcessedLatch().await();
     }
 
     protected String processRelativePath(String contextPath, String path)
@@ -421,24 +95,6 @@
         return relativePath;
     }
 
-    protected HttpResponse transformResponse(Object response) throws MuleException
-    {
-        MuleMessage message;
-        if (response instanceof MuleMessage)
-        {
-            message = (MuleMessage) response;
-        }
-        else
-        {
-            message = new DefaultMuleMessage(response, connector.getMuleContext());
-        }
-        //TODO RM*: Maybe we can have a generic Transformer wrapper rather that using DefaultMuleMessage (or another static utility
-        //class
-        message.applyTransformers(null, defaultResponseTransformers, HttpResponse.class);
-        return (HttpResponse) message.getPayload();
-    }
-
-
     @Override
     protected void initializeMessageFactory() throws InitialisationException
     {
@@ -480,6 +136,11 @@
         return message;
     }
 
+    public List<Transformer> getResponseTransportTransformers()
+    {
+        return this.defaultResponseTransformers;
+    }
+
     public static class EmptyRequestException extends RuntimeException
     {
 

Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java (25160 => 25161)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -9,16 +9,21 @@
  */
 package org.mule.transport.http;
 
+import org.mule.api.config.ThreadingProfile;
 import org.mule.api.context.WorkManager;
 import org.mule.api.retry.RetryCallback;
 import org.mule.api.retry.RetryContext;
 import org.mule.api.retry.RetryPolicyTemplate;
+import org.mule.config.ImmutableThreadingProfile;
+import org.mule.config.MutableThreadingProfile;
 import org.mule.transport.ConnectException;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.resource.spi.work.Work;
@@ -39,6 +44,7 @@
     private ServerSocket serverSocket;
     private HttpConnector httpConnector;
     private RetryPolicyTemplate retryTemplate;
+    protected ExecutorService requestHandOffExecutor;
     private WorkManager workManager;
     private final AtomicBoolean disconnect = new AtomicBoolean(false);
 
@@ -64,8 +70,18 @@
         this.retryTemplate = retryPolicyTemplate;
         this.serverSocket = serverSocket;
         this.workManager = workManager;
+        this.requestHandOffExecutor = createRequestDispatcherThreadPool(httpConnector);
     }
 
+    private ExecutorService createRequestDispatcherThreadPool(HttpConnector httpConnector)
+    {
+        ThreadingProfile receiverThreadingProfile = httpConnector.getReceiverThreadingProfile();
+        MutableThreadingProfile dispatcherThreadingProfile = new MutableThreadingProfile(receiverThreadingProfile);
+        dispatcherThreadingProfile.setMaxThreadsActive(dispatcherThreadingProfile.getMaxThreadsActive()*2);
+        ExecutorService executorService = dispatcherThreadingProfile.createPool("http-request-dispatch-" + serverSocket.getInetAddress());
+        return executorService;
+    }
+
     @Override
     public void run()
     {
@@ -94,8 +110,9 @@
 
                             if (socket != null)
                             {
-                                Work work = new HttpRequestDispatcherWork(httpConnector, socket);
-                                workManager.scheduleWork(work, javax.resource.spi.work.WorkManager.INDEFINITE, null, httpConnector);
+                                final Runnable httpRequestDispatcherWork = new HttpRequestDispatcherWork(httpConnector, socket);
+                                // Process each connection in a different thread so we can continue accepting connection right away.
+                                requestHandOffExecutor.execute(httpRequestDispatcherWork);
                             }
                         }
 

Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java (25160 => 25161)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -15,15 +15,13 @@
 import java.net.Socket;
 import java.util.concurrent.TimeUnit;
 
-import javax.resource.spi.work.Work;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * Dispatches HttpRequest to the appropriate MessageReceiver
  */
-public class HttpRequestDispatcherWork implements Work, Expirable
+public class HttpRequestDispatcherWork implements Runnable, Expirable
 {
 
     private static Log logger = LogFactory.getLog(HttpRequestDispatcherWork.class);
@@ -47,11 +45,6 @@
     }
 
     @Override
-    public void release()
-    {
-    }
-
-    @Override
     public void run()
     {
         try
@@ -77,8 +70,7 @@
                     HttpMessageReceiver httpMessageReceiver = httpConnector.lookupReceiver(socket, request);
                     if (httpMessageReceiver != null)
                     {
-                        Work work = httpMessageReceiver.createWork(httpServerConnection);
-                        work.run();
+                        httpMessageReceiver.processRequest(httpServerConnection);
                     }
                     else
                     {

Added: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageProcessTemplate.java (0 => 25161)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageProcessTemplate.java	                        (rev 0)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageProcessTemplate.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,57 @@
+/*
+ * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+package org.mule.transport.http;
+
+import org.mule.api.MessagingException;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleException;
+import org.mule.api.context.WorkManager;
+import org.mule.transport.http.i18n.HttpMessages;
+
+import java.util.concurrent.TimeUnit;
+
+public class HttpsMessageProcessTemplate extends HttpMessageProcessTemplate
+{
+
+    public HttpsMessageProcessTemplate(final HttpMessageReceiver messageReceiver, final HttpServerConnection httpServerConnection, final WorkManager flowExecutionWorkManager)
+    {
+        super(messageReceiver,httpServerConnection,flowExecutionWorkManager);
+    }
+
+    @Override
+    public MuleEvent beforeRouteEvent(MuleEvent muleEvent) throws MuleException
+    {
+        try
+        {
+            long timeout = ((HttpsConnector) getConnector()).getSslHandshakeTimeout();
+            boolean handshakeComplete = getHttpServerConnection().getSslSocketHandshakeCompleteLatch().await(timeout, TimeUnit.MILLISECONDS);
+            if (!handshakeComplete)
+            {
+                throw new MessagingException(HttpMessages.sslHandshakeDidNotComplete(), muleEvent);
+            }
+        }
+        catch (InterruptedException e)
+        {
+            throw new MessagingException(HttpMessages.sslHandshakeDidNotComplete(),
+                                         muleEvent, e);
+        }
+        if (getHttpServerConnection().getPeerCertificateChain() != null)
+        {
+            muleEvent.getMessage().setOutboundProperty(HttpsConnector.PEER_CERTIFICATES, getHttpServerConnection().getPeerCertificateChain());
+        }
+        if (getHttpServerConnection().getLocalCertificateChain() != null)
+        {
+            muleEvent.getMessage().setOutboundProperty(HttpsConnector.LOCAL_CERTIFICATES, getHttpServerConnection().getLocalCertificateChain());
+        }
+
+        super.beforeRouteEvent(muleEvent);
+        return muleEvent;
+    }
+}

Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageReceiver.java (25160 => 25161)


--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageReceiver.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageReceiver.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -10,22 +10,15 @@
 
 package org.mule.transport.http;
 
-import org.mule.api.MessagingException;
-import org.mule.api.MuleMessage;
 import org.mule.api.construct.FlowConstruct;
 import org.mule.api.endpoint.InboundEndpoint;
 import org.mule.api.lifecycle.CreateException;
 import org.mule.api.transport.Connector;
 import org.mule.config.i18n.CoreMessages;
 import org.mule.transport.ConnectException;
-import org.mule.transport.http.i18n.HttpMessages;
+import org.mule.message.processing.MessageProcessContext;
 import org.mule.util.StringUtils;
 
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import javax.resource.spi.work.Work;
-
 public class HttpsMessageReceiver extends HttpMessageReceiver
 {
 
@@ -53,48 +46,9 @@
     }
 
     @Override
-    public Work createWork(HttpServerConnection httpServerConnection) throws IOException
+    HttpMessageProcessTemplate createMessageContext(HttpServerConnection httpServerConnection)
     {
-        return new HttpsWorker(httpServerConnection);
+        return new HttpsMessageProcessTemplate(this,httpServerConnection,getWorkManager());
     }
 
-    private class HttpsWorker extends HttpWorker
-    {
-
-        public HttpsWorker(HttpServerConnection httpServerConnection) throws IOException
-        {
-            super(httpServerConnection);
-        }
-
-        @Override
-        protected void preRouteMessage(MuleMessage message) throws MessagingException
-        {
-            try
-            {
-                long timeout = ((HttpsConnector) getConnector()).getSslHandshakeTimeout();
-                boolean handshakeComplete = getServerConnection().getSslSocketHandshakeCompleteLatch().await(timeout, TimeUnit.MILLISECONDS);
-                if (!handshakeComplete)
-                {
-                    throw new MessagingException(HttpMessages.sslHandshakeDidNotComplete(), message);
-                }
-            }
-            catch (InterruptedException e)
-            {
-                throw new MessagingException(HttpMessages.sslHandshakeDidNotComplete(),
-                                             message, e);
-            }
-
-            super.preRouteMessage(message);
-
-            if (getServerConnection().getPeerCertificateChain() != null)
-            {
-                message.setOutboundProperty(HttpsConnector.PEER_CERTIFICATES, getServerConnection().getPeerCertificateChain());
-            }
-            if (getServerConnection().getLocalCertificateChain() != null)
-            {
-                message.setOutboundProperty(HttpsConnector.LOCAL_CERTIFICATES, getServerConnection().getLocalCertificateChain());
-            }
-        }
-
-    }
 }

Modified: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java (25160 => 25161)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -29,6 +29,7 @@
 import javax.resource.spi.work.ExecutionContext;
 import javax.resource.spi.work.WorkListener;
 
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;

Modified: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java (25160 => 25161)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -11,7 +11,6 @@
 
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -30,11 +29,9 @@
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.ServerSocket;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import javax.resource.spi.work.ExecutionContext;
-import javax.resource.spi.work.WorkListener;
-
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Answers;
@@ -65,6 +62,8 @@
     private RetryContext mockRetryContext;
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private SystemExceptionHandler mockExceptionListener;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private ExecutorService mockExecutor;
 
 
     @Test(expected = IllegalArgumentException.class)
@@ -143,7 +142,7 @@
         try
         {
             dispatcherThread.start();
-            verify(mockWorkManager, times(0)).scheduleWork(any(HttpRequestDispatcherWork.class), anyLong(), any(ExecutionContext.class), any(WorkListener.class));
+            verify(mockRetryTemplate, times(0)).execute(any(RetryCallback.class),any(WorkManager.class));
         }
         finally
         {
@@ -156,6 +155,7 @@
     public void whenSocketAcceptedExecuteWork() throws Exception
     {
         final HttpRequestDispatcher httpRequestDispatcher = new HttpRequestDispatcher(mockHttpConnector, mockRetryTemplate, mockServerSocket, mockWorkManager);
+        httpRequestDispatcher.requestHandOffExecutor = mockExecutor;
         final Latch acceptCalledLath = new Latch();
         sustituteLifecycleManager();
         when(mockConnectorLifecycleManager.getState().isStarted()).thenReturn(true);
@@ -176,7 +176,7 @@
                 acceptCalledLath.release();
                 return null;
             }
-        }).when(mockWorkManager).scheduleWork(any(HttpRequestDispatcherWork.class), anyLong(), any(ExecutionContext.class), any(WorkListener.class));
+        }).when(mockExecutor).execute(any(HttpRequestDispatcherWork.class));
         Thread dispatcherThread = createDispatcherThread(httpRequestDispatcher);
         dispatcherThread.start();
         try

Modified: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java (25160 => 25161)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -17,6 +17,7 @@
 import static org.mockito.Mockito.when;
 
 import org.mule.tck.size.SmallTest;
+import org.mule.message.processing.MessageProcessContext;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -48,6 +49,8 @@
     private HttpMessageReceiver mockHttpMessageReceiver;
     @Mock(answer = Answers.RETURNS_DEEP_STUBS)
     private Work mockWork;
+    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
+    private MessageProcessContext mockMessageContext;
 
     @Test(expected = IllegalArgumentException.class)
     public void createHttpRequestDispatcherWorkWithNullHttpConnector()
@@ -91,10 +94,9 @@
         HttpRequestDispatcherWork httpRequestDispatcherWork = new HttpRequestDispatcherWork(mockHttpConnector, mockSocket);
         when(mockHttpConnector.lookupReceiver(isA(Socket.class), isA(HttpRequest.class))).thenReturn(mockHttpMessageReceiver);
         setUpSocketMessage();
-        when(mockHttpMessageReceiver.createWork(isA(HttpServerConnection.class))).thenReturn(mockWork);
+        when(mockHttpMessageReceiver.createMessageContext(isA(HttpServerConnection.class))).thenReturn(mockMessageContext);
         httpRequestDispatcherWork.run();
-        verify(mockWork, times(1)).run();
-
+        verify(mockHttpMessageReceiver, times(1)).processRequest(isA(HttpServerConnection.class));
     }
 
     private void setUpSocketMessage() throws IOException

Copied: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpsHandshakeTimingTestCase.java (from rev 25135, branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java) (0 => 25161)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpsHandshakeTimingTestCase.java	                        (rev 0)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpsHandshakeTimingTestCase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -0,0 +1,116 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.transport.http;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.mule.DefaultMuleMessage;
+import org.mule.api.MessagingException;
+import org.mule.api.MuleEvent;
+import org.mule.api.MuleMessage;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.endpoint.InboundEndpoint;
+import org.mule.api.lifecycle.CreateException;
+import org.mule.api.service.Service;
+import org.mule.api.transport.Connector;
+import org.mule.tck.junit4.AbstractMuleContextTestCase;
+import org.mule.transport.ssl.MockHandshakeCompletedEvent;
+import org.mule.transport.ssl.MockSslSocket;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+import javax.net.ssl.HandshakeCompletedEvent;
+import javax.resource.spi.work.Work;
+
+import org.junit.Test;
+
+/**
+ * Test for SSL handshake timeouts. Unfortunately, there is no easy way to blackbox-test this
+ * as it would require a SSLSocket implementation that could actually add arbitrary delays to
+ * the SSL handshake.
+ * <p/>
+ * The approach chosen here is based on reflection and massive subclassing/stubbing to make things
+ * work. Yes, this is hacky and fragile but this seems to be the only reasonable alternative
+ * for now.
+ */
+public class HttpsHandshakeTimingTestCase extends AbstractMuleContextTestCase
+{
+
+    @Test(expected = MessagingException.class)
+    public void testHttpsHandshakeExceedsTimeout() throws Exception
+    {
+        MockHttpsMessageReceiver messageReceiver = setupMockHttpsMessageReceiver();
+
+        MockSslSocket socket = new MockSslSocket();
+        HttpMessageProcessTemplate messageProcessTemplate = messageReceiver.createMessageContext(new HttpServerConnection(socket, messageReceiver.getEndpoint().getEncoding(), (HttpConnector) messageReceiver.getConnector()));
+
+        MuleMessage message = new DefaultMuleMessage(TEST_MESSAGE, muleContext);
+        messageProcessTemplate.beforeRouteEvent(getTestEvent(message));
+    }
+
+    @Test
+    public void testHttpsHandshakeCompletesBeforeProcessingMessage() throws Exception
+    {
+        MockHttpsMessageReceiver messageReceiver = setupMockHttpsMessageReceiver();
+
+        MockSslSocket socket = new MockSslSocket();
+        socket.setInputStream(new ByteArrayInputStream("GET /path/to/file/index.html HTTP/1.0\n\n\n".getBytes()));
+        HttpServerConnection serverConnection = new HttpServerConnection(socket, "utf-8", (HttpConnector) messageReceiver.getConnector());
+        HttpMessageProcessTemplate messageContext = (HttpMessageProcessTemplate) messageReceiver.createMessageContext(serverConnection);
+
+        invokeHandshakeCompleted(serverConnection, socket);
+
+        MuleMessage message = new DefaultMuleMessage(TEST_MESSAGE, muleContext);
+        messageContext.acquireMessage();
+        serverConnection.readRequest();
+        MuleEvent muleEvent = messageContext.beforeRouteEvent(getTestEvent(message));
+        assertNotNull(muleEvent.getMessage().<Object>getOutboundProperty(HttpsConnector.LOCAL_CERTIFICATES));
+        assertNotNull(muleEvent.getMessage().<Object>getOutboundProperty(HttpsConnector.PEER_CERTIFICATES));
+    }
+
+    private void invokeHandshakeCompleted(HttpServerConnection serverConnection, MockSslSocket socket) throws Exception
+    {
+        HandshakeCompletedEvent event = new MockHandshakeCompletedEvent(socket);
+        serverConnection.handshakeCompleted(event);
+    }
+
+    private MockHttpsMessageReceiver setupMockHttpsMessageReceiver() throws CreateException
+    {
+        HttpsConnector httpsConnector = new HttpsConnector(muleContext);
+        httpsConnector.setSslHandshakeTimeout(1000);
+
+        Map<String, Object> properties = Collections.emptyMap();
+
+        InboundEndpoint inboundEndpoint = mock(InboundEndpoint.class);
+        when(inboundEndpoint.getConnector()).thenReturn(httpsConnector);
+        when(inboundEndpoint.getProperties()).thenReturn(properties);
+
+        Service service = mock(Service.class);
+        return new MockHttpsMessageReceiver(httpsConnector, service, inboundEndpoint);
+    }
+
+    private static class MockHttpsMessageReceiver extends HttpsMessageReceiver
+    {
+
+        public MockHttpsMessageReceiver(Connector connector, FlowConstruct flowConstruct,
+                                        InboundEndpoint endpoint) throws CreateException
+        {
+            super(connector, flowConstruct, endpoint);
+        }
+    }
+}

Property changes: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpsHandshakeTimingTestCase.java


Added: svn:keywords

Added: svn:eol-style

Modified: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpResponseTimeoutTestCase.java (25160 => 25161)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpResponseTimeoutTestCase.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpResponseTimeoutTestCase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -105,7 +105,7 @@
         // message will be returned as the response. There is no exception payload.
         assertNotNull(message);
         assertNull(message.getExceptionPayload());
-        assertNotNull(getPayload(), message.getPayloadAsString());
+        assertNotNull(message.getPayloadAsString());
         assertTrue((afterCall.getTime() - beforeCall.getTime()) > DEFAULT_RESPONSE_TIMEOUT);
     }
 

Deleted: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java (25160 => 25161)


--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -1,147 +0,0 @@
-/*
- * $Id$
- * --------------------------------------------------------------------------------------
- * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
- *
- * The software in this package is published under the terms of the CPAL v1.0
- * license, a copy of which has been included with this distribution in the
- * LICENSE.txt file.
- */
-
-package org.mule.transport.http.functional;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import org.mule.DefaultMuleMessage;
-import org.mule.api.MessagingException;
-import org.mule.api.MuleMessage;
-import org.mule.api.config.MuleProperties;
-import org.mule.api.construct.FlowConstruct;
-import org.mule.api.endpoint.InboundEndpoint;
-import org.mule.api.lifecycle.CreateException;
-import org.mule.api.service.Service;
-import org.mule.api.transport.Connector;
-import org.mule.tck.junit4.AbstractMuleContextTestCase;
-import org.mule.transport.http.HttpConnector;
-import org.mule.transport.http.HttpServerConnection;
-import org.mule.transport.http.HttpsConnector;
-import org.mule.transport.http.HttpsMessageReceiver;
-import org.mule.transport.ssl.MockHandshakeCompletedEvent;
-import org.mule.transport.ssl.MockSslSocket;
-
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Collections;
-import java.util.Map;
-
-import javax.net.ssl.HandshakeCompletedEvent;
-import javax.resource.spi.work.Work;
-
-import org.junit.Test;
-
-/**
- * Test for SSL handshake timeouts. Unfortunately, there is no easy way to blackbox-test this
- * as it would require a SSLSocket implementation that could actually add arbitrary delays to
- * the SSL handshake.
- * <p/>
- * The approach chosen here is based on reflection and massive subclassing/stubbing to make things
- * work. Yes, this is hacky and fragile but this seems to be the only reasonable alternative
- * for now.
- */
-public class HttpsHandshakeTimingTestCase extends AbstractMuleContextTestCase
-{
-
-    @Test
-    public void testHttpsHandshakeExceedsTimeout() throws Exception
-    {
-        MockHttpsMessageReceiver messageReceiver = setupMockHttpsMessageReceiver();
-
-        MockSslSocket socket = new MockSslSocket();
-        Work work = messageReceiver.createWork(new HttpServerConnection(socket, messageReceiver.getEndpoint().getEncoding(), (HttpConnector) messageReceiver.getConnector()));
-        assertNotNull(work);
-
-        MuleMessage message = new DefaultMuleMessage(TEST_MESSAGE, muleContext);
-        try
-        {
-            // note how preRouteMessage is invoked here without a prior handshakeComplete
-            // which would count down the latch that's used in HttpsWorker
-            invokePreRouteMessage(work, message);
-            fail();
-        }
-        catch (InvocationTargetException ite)
-        {
-            assertTrue(ite.getCause() instanceof MessagingException);
-            assertTrue(ite.getCause().getMessage().contains("handshake"));
-        }
-    }
-
-    @Test
-    public void testHttpsHandshakeCompletesBeforeProcessingMessage() throws Exception
-    {
-        MockHttpsMessageReceiver messageReceiver = setupMockHttpsMessageReceiver();
-
-        MockSslSocket socket = new MockSslSocket();
-        HttpServerConnection serverConnection = new HttpServerConnection(socket, messageReceiver.getEndpoint().getEncoding(), (HttpConnector) messageReceiver.getConnector());
-        Work work = messageReceiver.createWork(serverConnection);
-        assertNotNull(work);
-
-        invokeHandshakeCompleted(serverConnection, socket);
-
-        MuleMessage message = new DefaultMuleMessage(TEST_MESSAGE, muleContext);
-        invokePreRouteMessage(work, message);
-        assertNotNull(message.<Object>getInboundProperty(MuleProperties.MULE_REMOTE_CLIENT_ADDRESS));
-    }
-
-    private void invokeHandshakeCompleted(HttpServerConnection serverConnection, MockSslSocket socket) throws Exception
-    {
-        HandshakeCompletedEvent event = new MockHandshakeCompletedEvent(socket);
-        serverConnection.handshakeCompleted(event);
-    }
-
-    private void invokePreRouteMessage(Work work, MuleMessage message) throws Exception
-    {
-        Method preRouteMessage = work.getClass().getDeclaredMethod("preRouteMessage", MuleMessage.class);
-        assertNotNull(preRouteMessage);
-        preRouteMessage.setAccessible(true);
-        preRouteMessage.invoke(work, new Object[] {message});
-    }
-
-    private MockHttpsMessageReceiver setupMockHttpsMessageReceiver() throws CreateException
-    {
-        HttpsConnector httpsConnector = new HttpsConnector(muleContext);
-        httpsConnector.setSslHandshakeTimeout(1000);
-
-        Map<String, Object> properties = Collections.emptyMap();
-
-        InboundEndpoint inboundEndpoint = mock(InboundEndpoint.class);
-        when(inboundEndpoint.getConnector()).thenReturn(httpsConnector);
-        when(inboundEndpoint.getProperties()).thenReturn(properties);
-
-        Service service = mock(Service.class);
-        return new MockHttpsMessageReceiver(httpsConnector, service, inboundEndpoint);
-    }
-
-    private static class MockHttpsMessageReceiver extends HttpsMessageReceiver
-    {
-
-        public MockHttpsMessageReceiver(Connector connector, FlowConstruct flowConstruct,
-                                        InboundEndpoint endpoint) throws CreateException
-        {
-            super(connector, flowConstruct, endpoint);
-        }
-
-        /**
-         * Open up access for unit test
-         */
-        @Override
-        public Work createWork(HttpServerConnection httpServerConnection) throws IOException
-        {
-            return super.createWork(httpServerConnection);
-        }
-    }
-}

Modified: branches/mule-3.x/transports/ssl/src/test/java/org/mule/transport/ssl/MockSslSocket.java (25160 => 25161)


--- branches/mule-3.x/transports/ssl/src/test/java/org/mule/transport/ssl/MockSslSocket.java	2013-01-04 17:39:12 UTC (rev 25160)
+++ branches/mule-3.x/transports/ssl/src/test/java/org/mule/transport/ssl/MockSslSocket.java	2013-01-06 00:06:58 UTC (rev 25161)
@@ -25,7 +25,9 @@
  */
 public class MockSslSocket extends SSLSocket
 {
-    
+
+    private InputStream inputStream;
+
     public void addHandshakeCompletedListener(HandshakeCompletedListener listener)
     {
         // not needed
@@ -119,7 +121,7 @@
     @Override
     public InputStream getInputStream() throws IOException
     {
-        return null;
+        return inputStream;
     }
 
     @Override
@@ -134,5 +136,10 @@
         return new InetSocketAddress("localhost", 12345);
     }
 
+    public void setInputStream(InputStream inputStream)
+    {
+        this.inputStream = inputStream;
+    }
 }
 
+

To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email


Reply all
Reply to author
Forward
0 new messages