- Revision
- 25161
- Author
- pablo.lagreca
- Date
- 2013-01-05 18:06:58 -0600 (Sat, 05 Jan 2013)
Log Message
MULE-6578 - Simplify MessageSource creation - Applying new design to http transportModified Paths
- branches/mule-3.x/core/src/main/java/org/mule/api/config/MuleProperties.java
- branches/mule-3.x/core/src/main/java/org/mule/config/builders/DefaultsConfigurationBuilder.java
- branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java
- branches/mule-3.x/modules/spring-config/src/main/resources/default-mule-config.xml
- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageReceiver.java
- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java
- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java
- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageReceiver.java
- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java
- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java
- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java
- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpResponseTimeoutTestCase.java
- branches/mule-3.x/transports/ssl/src/test/java/org/mule/transport/ssl/MockSslSocket.java
Added Paths
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndPhaseTemplate.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndProcessPhase.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhase.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhaseTemplate.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessContext.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessPhase.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessTemplate.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessingManager.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MuleMessageProcessingManager.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseExecutionEngine.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseResultNotifier.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/RequestResponseFlowProcessingPhaseTemplate.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/ThrottlingPhaseTemplate.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhase.java
- branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhaseTemplate.java
- branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractTransportMessageProcessTemplate.java
- branches/mule-3.x/core/src/main/java/org/mule/transport/ResponseDispatchMessagingException.java
- branches/mule-3.x/core/src/test/java/org/mule/message/processing/
- branches/mule-3.x/core/src/test/java/org/mule/message/processing/EndProcessPhaseTestCase.java
- branches/mule-3.x/core/src/test/java/org/mule/message/processing/FlowProcessingPhaseTestCase.java
- branches/mule-3.x/core/src/test/java/org/mule/message/processing/MuleMessageProcessingManagerTestCase.java
- branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseExecutionEngineTestCase.java
- branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseSupportTestHelper.java
- branches/mule-3.x/core/src/test/java/org/mule/message/processing/ValidationPhaseTestCase.java
- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageProcessTemplate.java
- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageProcessTemplate.java
- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpsHandshakeTimingTestCase.java
Removed Paths
Diff
Modified: branches/mule-3.x/core/src/main/java/org/mule/api/config/MuleProperties.java (25160 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/core/src/main/java/org/mule/api/config/MuleProperties.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -153,6 +153,7 @@ public static final String OBJECT_EXPRESSION_LANGUAGE = "_muleExpressionLanguage"; public static final String OBJECT_LOCK_MANAGER = "_muleLockManager"; public static final String OBJECT_LOCK_PROVIDER = "_muleLockProvider"; + public static final String OBJECT_DEFAULT_MESSAGE_PROCESSING_MANAGER = "_muleMessageProcessingManager"; // Not currently used as these need to be instance variables of the MuleContext. public static final String OBJECT_WORK_MANAGER = "_muleWorkManager";
Modified: branches/mule-3.x/core/src/main/java/org/mule/config/builders/DefaultsConfigurationBuilder.java (25160 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/config/builders/DefaultsConfigurationBuilder.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/core/src/main/java/org/mule/config/builders/DefaultsConfigurationBuilder.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -23,6 +23,7 @@ import org.mule.config.bootstrap.SimpleRegistryBootstrap; import org.mule.el.mvel.MVELExpressionLanguage; import org.mule.endpoint.DefaultEndpointFactory; +import org.mule.message.processing.MuleMessageProcessingManager; import org.mule.model.seda.SedaModel; import org.mule.retry.policies.NoRetryPolicyTemplate; import org.mule.security.MuleSecurityManager; @@ -75,6 +76,7 @@ registry.registerObject(MuleProperties.QUEUE_STORE_DEFAULT_PERSISTENT_NAME, DefaultObjectStoreFactoryBean.createDefaultPersistentQueueStore()); registry.registerObject(MuleProperties.DEFAULT_USER_OBJECT_STORE_NAME, DefaultObjectStoreFactoryBean.createDefaultUserObjectStore()); registry.registerObject(MuleProperties.OBJECT_STORE_MANAGER, new MuleObjectStoreManager()); + registry.registerObject(MuleProperties.OBJECT_DEFAULT_MESSAGE_PROCESSING_MANAGER, new MuleMessageProcessingManager()); registry.registerObject(MuleProperties.OBJECT_MULE_ENDPOINT_FACTORY, new DefaultEndpointFactory()); registry.registerObject(MuleProperties.OBJECT_MULE_STREAM_CLOSER_SERVICE, new DefaultStreamCloserService());
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndPhaseTemplate.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndPhaseTemplate.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndPhaseTemplate.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,24 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +/** + * Phase were the {@link MessageProcessTemplate} is notified that + * the message processing has ended + */ +public interface EndPhaseTemplate extends MessageProcessTemplate +{ + + /** + * template method call when the message processing ends + */ + void messageProcessingEnded(); + +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndProcessPhase.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndProcessPhase.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/EndProcessPhase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,33 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +/** + * This phase notifies to the {@link MessageProcessTemplate} that the message processing + * has ended. + * + * To participate on this phase {@link MessageProcessTemplate} must implement {@link EndPhaseTemplate}. + */ +public class EndProcessPhase implements MessageProcessPhase<EndPhaseTemplate> +{ + + @Override + public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate) + { + return messageProcessTemplate instanceof EndPhaseTemplate; + } + + @Override + public void runPhase(EndPhaseTemplate messageProcessTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier) + { + messageProcessTemplate.messageProcessingEnded(); + } + +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhase.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhase.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,119 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import org.mule.api.MessagingException; +import org.mule.api.MuleContext; +import org.mule.api.MuleEvent; +import org.mule.api.context.MuleContextAware; +import org.mule.api.execution.ExecutionCallback; +import org.mule.execution.TransactionalErrorHandlingExecutionTemplate; +import org.mule.transaction.MuleTransactionConfig; + +import javax.resource.spi.work.Work; +import javax.resource.spi.work.WorkException; + +/** + * This phase routes the message through the flow. + * + * To participate of this phase, {@link MessageProcessTemplate} must implement {@link FlowProcessingPhaseTemplate} + */ +public class FlowProcessingPhase implements MessageProcessPhase<FlowProcessingPhaseTemplate>, Comparable<MessageProcessPhase> +{ + + @Override + public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate) + { + return messageProcessTemplate instanceof FlowProcessingPhaseTemplate; + } + + @Override + public void runPhase(final FlowProcessingPhaseTemplate flowProcessingPhaseTemplate, final MessageProcessContext messageProcessContext, final PhaseResultNotifier phaseResultNotifier) + { + Work flowExecutionWork = new Work() + { + @Override + public void release() + { + } + + @Override + public void run() + { + try + { + try + { + TransactionalErrorHandlingExecutionTemplate transactionTemplate = TransactionalErrorHandlingExecutionTemplate. + createMainExecutionTemplate(messageProcessContext.getFlowConstruct().getMuleContext(), + (messageProcessContext.getTransactionConfig() == null ? new MuleTransactionConfig() : messageProcessContext.getTransactionConfig()), + messageProcessContext.getFlowConstruct().getExceptionListener()); + MuleEvent response = transactionTemplate.execute(new ExecutionCallback<MuleEvent>() + { + @Override + public MuleEvent process() throws Exception + { + Object message = flowProcessingPhaseTemplate.getOriginalMessage(); + if (message == null) + { + return null; + } + MuleEvent muleEvent = flowProcessingPhaseTemplate.getMuleEvent(); + muleEvent = flowProcessingPhaseTemplate.beforeRouteEvent(muleEvent); + muleEvent = flowProcessingPhaseTemplate.routeEvent(muleEvent); + muleEvent = flowProcessingPhaseTemplate.afterRouteEvent(muleEvent); + return muleEvent; + } + }); + if (flowProcessingPhaseTemplate instanceof RequestResponseFlowProcessingPhaseTemplate) + { + ((RequestResponseFlowProcessingPhaseTemplate)flowProcessingPhaseTemplate).sendResponseToClient(response); + } + flowProcessingPhaseTemplate.afterSuccessfulProcessingFlow(response); + } + catch (MessagingException e) + { + flowProcessingPhaseTemplate.afterFailureProcessingFlow(e); + } + phaseResultNotifier.phaseSuccessfully(); + } + catch (Exception e) + { + phaseResultNotifier.phaseFailure(e); + } + } + }; + if (messageProcessContext.supportsAsynchronousProcessing()) + { + try + { + messageProcessContext.getFlowExecutionWorkManager().scheduleWork(flowExecutionWork); + } + catch (WorkException e) + { + phaseResultNotifier.phaseFailure(e); + } + } + else + { + flowExecutionWork.run(); + } + } + + @Override + public int compareTo(MessageProcessPhase messageProcessPhase) + { + if (messageProcessPhase instanceof ValidationPhase) + { + return 1; + } + return 0; + } +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhaseTemplate.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhaseTemplate.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/FlowProcessingPhaseTemplate.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,72 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import org.mule.api.MessagingException; +import org.mule.api.MuleEvent; +import org.mule.api.MuleException; +import org.mule.api.source.MessageSource; + +/** + * Template methods for {@link MessageSource} specific behavior during + * flow execution. + */ +public interface FlowProcessingPhaseTemplate extends MessageProcessTemplate +{ + + /** + * @return a {@link org.mule.api.MuleEvent} created from the original message + */ + MuleEvent getMuleEvent() throws MuleException; + + /** + * @return the original message + */ + Object getOriginalMessage() throws MuleException; + + /** + * Pre processing of the {@link MuleEvent} to route + * + * @param muleEvent + */ + MuleEvent beforeRouteEvent(MuleEvent muleEvent) throws MuleException; + + /** + * Routes the {@link MuleEvent} through the processors chain + * + * @param muleEvent {@link MuleEvent} created from the raw message of this context + * @return the response {@link MuleEvent} + * @throws MuleException + */ + MuleEvent routeEvent(MuleEvent muleEvent) throws MuleException; + + /** + * Post processing of the routed {@link MuleEvent} + * + * @param muleEvent + */ + MuleEvent afterRouteEvent(MuleEvent muleEvent) throws MuleException; + + /** + * Call after successfully processing the message through the flow + * + * @param muleEvent + */ + void afterSuccessfulProcessingFlow(MuleEvent muleEvent) throws MuleException; + + + /** + * Call when the processing of the message through the flow fails + * + * @param messagingException + */ + void afterFailureProcessingFlow(MessagingException messagingException) throws MuleException; + +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessContext.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessContext.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessContext.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,60 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import org.mule.api.construct.FlowConstruct; +import org.mule.api.context.WorkManager; +import org.mule.api.source.MessageSource; +import org.mule.api.transaction.TransactionConfig; + +/** + * Context for processing one message from a {@link org.mule.api.source.MessageSource}. + * + * Mule {@link org.mule.api.source.MessageSource} implementations should create one instance of MessageProcessContext per message + * that generates. + * + * MessageProcessContext is responsible for + * - Define if the incoming message can be processed in a separate thread + * - Provide access to the {@link MessageSource} of the message + * - Provide access to the {@link FlowConstruct} were the message is going to be executed + * - Provide access, if available, to the {@link WorkManager} to use for processing the message + * - Provide the {@link MessageSource} transaction configuration + */ +public interface MessageProcessContext +{ + + /** + * @return true if the message can be processed in a different thread than the one it was acquired, false otherwise + */ + boolean supportsAsynchronousProcessing(); + + /** + * @return the {@link MessageSource} that retrieve the message. Can not be null + */ + MessageSource getMessageSource(); + + /** + * @return the {@link FlowConstruct} were the incoming message is going to be executed. Can not be null + */ + FlowConstruct getFlowConstruct(); + + /** + * @return the {@link WorkManager} were the incoming message must be processed. + * If null it will be executed in the same thread were the message was received + */ + WorkManager getFlowExecutionWorkManager(); + + /** + * @return the {@link TransactionConfig} associated to the {@link MessageSource} that received the message. + * If null then no transaction config will be used. + */ + TransactionConfig getTransactionConfig(); + +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessPhase.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessPhase.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessPhase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,52 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +/** + * + * Defines a phase that process a message using a {@link MessageProcessTemplate} + * + * The phase will be part of a chain of responsibility were the phase can define + * the end of the execution of the set of phases by calling: + * - {@link org.mule.message.processing.PhaseResultNotifier#phaseConsumedMessage()} which indicates that the phase has consume the message + * and it should not be longer processed + * - {@link org.mule.message.processing.PhaseResultNotifier#phaseFailure(Exception)} which indicates that there was a failure + * during message processing. + * + * Whenever a phase finish execution it must call {@link org.mule.message.processing.PhaseResultNotifier#phaseSuccessfully()} which will cause + * the next phase to be executed. + * + * Optionally a {@link MessageProcessPhase} can implement {@link Comparable<MessageProcessPhase>} + * to define the order in which it must be positioned in the {@link MessageProcessPhase} chain + * + */ +public interface MessageProcessPhase<Template extends MessageProcessTemplate> +{ + + /** + * Determines if a certain phase supports a given template. + * + * If phase does not supports the template instance then the phase will be skipped. + * + * @param messageProcessTemplate template to be processed + * @return true if the phase supports this template, false otherwise + */ + boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate); + + /** + * Process the template through the phase. + * + * @param messageProcessTemplate template containing message source specific behavior + * @param messageProcessContext provides context information for executing the message + * @param phaseResultNotifier notifier that must be advice under certain scenarios + */ + void runPhase(Template messageProcessTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier); + +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessTemplate.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessTemplate.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessTemplate.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,21 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +/** + * Maker interface for every template that can be used in a {@link MessageProcessPhase} + * + * A {@link MessageProcessTemplate} must contain all the required method that redefines behavior + * inside a {@link MessageProcessPhase} and it's particular from the {@link org.mule.api.source.MessageSource} + * + */ +public interface MessageProcessTemplate +{ +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessingManager.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessingManager.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/MessageProcessingManager.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,20 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +/** + * In charge of processing messages through mule. + */ +public interface MessageProcessingManager +{ + + void processMessage(MessageProcessTemplate messageProcessTemplate, MessageProcessContext messageProcessContext); + +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/MuleMessageProcessingManager.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/MuleMessageProcessingManager.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/MuleMessageProcessingManager.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,74 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import org.mule.api.MuleContext; +import org.mule.api.context.MuleContextAware; +import org.mule.api.lifecycle.Initialisable; +import org.mule.api.lifecycle.InitialisationException; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * Default implementation for {@link MessageProcessingManager}. + */ +public class MuleMessageProcessingManager implements MessageProcessingManager, MuleContextAware, Initialisable +{ + private final EndProcessPhase endProcessPhase = new EndProcessPhase(); + private MuleContext muleContext; + private PhaseExecutionEngine phaseExecutionEngine; + + @Override + public void processMessage(MessageProcessTemplate messageProcessTemplate, MessageProcessContext messageProcessContext) + { + phaseExecutionEngine.process(messageProcessTemplate, messageProcessContext); + } + + @Override + public void setMuleContext(MuleContext context) + { + this.muleContext = context; + } + + @Override + public void initialise() throws InitialisationException + { + Collection<MessageProcessPhase> registryMessageProcessPhases = muleContext.getRegistry().lookupObjects(MessageProcessPhase.class); + List<MessageProcessPhase> messageProcessPhaseList = new ArrayList<MessageProcessPhase>(); + if (registryMessageProcessPhases != null) + { + messageProcessPhaseList.addAll(registryMessageProcessPhases); + } + messageProcessPhaseList.add(new ValidationPhase()); + messageProcessPhaseList.add(new FlowProcessingPhase()); + Collections.sort(messageProcessPhaseList, new Comparator<MessageProcessPhase>() + { + @Override + public int compare(MessageProcessPhase messageProcessPhase, MessageProcessPhase messageProcessPhase2) + { + int compareValue = 0; + if (messageProcessPhase instanceof Comparable) + { + compareValue = ((Comparable) messageProcessPhase).compareTo(messageProcessPhase2); + } + if (compareValue == 0 && messageProcessPhase2 instanceof Comparable) + { + compareValue = ((Comparable) messageProcessPhase2).compareTo(messageProcessPhase) * -1; + } + return compareValue; + } + }); + phaseExecutionEngine = new PhaseExecutionEngine(messageProcessPhaseList, muleContext.getExceptionListener(), endProcessPhase); + } +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseExecutionEngine.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseExecutionEngine.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseExecutionEngine.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,101 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import org.mule.api.exception.SystemExceptionHandler; + +import java.util.List; + +public class PhaseExecutionEngine +{ + + private final List<MessageProcessPhase> phaseList; + private final SystemExceptionHandler exceptionHandler; + private final EndProcessPhase endProcessPhase; + + public PhaseExecutionEngine(List<MessageProcessPhase> messageProcessPhaseList, SystemExceptionHandler exceptionHandler, EndProcessPhase endProcessPhase) + { + this.phaseList = messageProcessPhaseList; + this.exceptionHandler = exceptionHandler; + this.endProcessPhase = endProcessPhase; + } + + public void process(MessageProcessTemplate messageProcessTemplate, MessageProcessContext messageProcessContext) + { + InternalPhaseExecutionEngine internalPhaseExecutionEngine = new InternalPhaseExecutionEngine(messageProcessTemplate, messageProcessContext); + internalPhaseExecutionEngine.process(); + } + + public class InternalPhaseExecutionEngine implements PhaseResultNotifier + { + private int currentPhase = 0; + private final MessageProcessContext messageProcessContext; + private final MessageProcessTemplate messageProcessTemplate; + private boolean endPhaseProcessed; + + public InternalPhaseExecutionEngine(MessageProcessTemplate messageProcessTemplate, MessageProcessContext messageProcessContext) + { + this.messageProcessTemplate = messageProcessTemplate; + this.messageProcessContext = messageProcessContext; + } + + @Override + public void phaseSuccessfully() + { + currentPhase++; + if (currentPhase < phaseList.size()) + { + phaseList.get(currentPhase).runPhase(messageProcessTemplate,messageProcessContext,this); + } + else + { + processEndPhase(); + } + } + + @Override + public void phaseConsumedMessage() + { + processEndPhase(); + } + + @Override + public void phaseFailure(Exception reason) + { + exceptionHandler.handleException(reason); + processEndPhase(); + } + + private void processEndPhase() + { + if (!endPhaseProcessed) + { + endPhaseProcessed = true; + if (endProcessPhase.supportsTemplate(messageProcessTemplate)) + { + endProcessPhase.runPhase((EndPhaseTemplate) messageProcessTemplate, messageProcessContext,this); + } + } + } + + public void process() + { + for (MessageProcessPhase phase : phaseList) + { + if (phase.supportsTemplate(messageProcessTemplate)) + { + phase.runPhase(messageProcessTemplate, messageProcessContext, this); + return; + } + currentPhase++; + } + } + } +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseResultNotifier.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseResultNotifier.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/PhaseResultNotifier.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,35 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +/** + * Notifier used by {@link MessageProcessPhase} in order to + * define the result of the phase execution + */ +public interface PhaseResultNotifier +{ + + /** + * This method must be called when the phase complete successfully + */ + public void phaseSuccessfully(); + + /** + * This method must be called when the phase consume the message and the chain should not continue processing the message + */ + public void phaseConsumedMessage(); + + /** + * This message must be called when a phase execution throw an exception + * + * @param reason exception that represents the failure in the phase + */ + public void phaseFailure(Exception reason); +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/RequestResponseFlowProcessingPhaseTemplate.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/RequestResponseFlowProcessingPhaseTemplate.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/RequestResponseFlowProcessingPhaseTemplate.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,33 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import org.mule.api.MuleEvent; +import org.mule.api.MuleException; + +/** + * Extension of {@link FlowProcessingPhaseTemplate} for those {@link org.mule.api.source.MessageSource} + * that requires sending a response of the message processed. + */ +public interface RequestResponseFlowProcessingPhaseTemplate extends FlowProcessingPhaseTemplate +{ + + /** + * Template method to send a response after processing the message. + * + * This method is executed outside the flow. In case of failure the {@link org.mule.api.exception.SystemExceptionHandler} + * will be executed. + * + * @param muleEvent the event with the content of the response to be sent. + * @throws MuleException exception thrown during the response is being sent. + */ + public void sendResponseToClient(MuleEvent muleEvent) throws MuleException; + +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/ThrottlingPhaseTemplate.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/ThrottlingPhaseTemplate.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/ThrottlingPhaseTemplate.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,28 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import org.mule.api.MuleException; + +/** + * Template that a {@link org.mule.api.source.MessageSource} must implement + * if it wants to participate in the throttling phase when processing a message + */ +public interface ThrottlingPhaseTemplate extends FlowProcessingPhaseTemplate +{ + + /** + * Discards the message due to ThrottlingPolicy configured for the {@link org.mule.api.source.MessageSource} is exceeded + * + * @throws MuleException + */ + void discardMessageOnThrottlingExceeded() throws MuleException; + +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhase.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhase.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,60 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import org.mule.api.MuleContext; +import org.mule.api.context.MuleContextAware; + +/** + * This phase validates the incoming message. + * + * To participate of this phase, {@link MessageProcessTemplate} must implement + * {@link ValidationPhaseTemplate}. + */ +public class ValidationPhase implements MessageProcessPhase<ValidationPhaseTemplate>, Comparable<MessageProcessPhase> +{ + + @Override + public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate) + { + return messageProcessTemplate instanceof ValidationPhaseTemplate; + } + + @Override + public void runPhase(ValidationPhaseTemplate validationPhaseTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier) + { + try + { + if (!validationPhaseTemplate.validateMessage()) + { + validationPhaseTemplate.discardInvalidMessage(); + phaseResultNotifier.phaseConsumedMessage(); + } + else + { + phaseResultNotifier.phaseSuccessfully(); + } + } + catch (Exception e) + { + phaseResultNotifier.phaseFailure(e); + } + } + + @Override + public int compareTo(MessageProcessPhase messageProcessPhase) + { + if (messageProcessPhase instanceof FlowProcessingPhaseTemplate) + { + return -1; + } + return 0; + } +}
Added: branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhaseTemplate.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhaseTemplate.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/message/processing/ValidationPhaseTemplate.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,38 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import org.mule.api.MuleException; + +/** + * Phase for validation of the incoming message. + * + * This template allows to validate a message and discard it in case is invalid. + */ +public interface ValidationPhaseTemplate extends MessageProcessTemplate +{ + /** + * Validates the message content. + * + * In case that the message is not valid then {@link #discardInvalidMessage()} + * will be executed so the implementation can save the reason why the message is invalid + * to report why the message has been discarded when {@link #discardInvalidMessage()} is called + * + * @return false if the message is invalid, true otherwise + */ + boolean validateMessage(); + + /** + * Discards the message because the validation failed + * + * @throws org.mule.api.MuleException + */ + void discardInvalidMessage() throws MuleException; +}
Modified: branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java (25160 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractMessageReceiver.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -39,6 +39,9 @@ import org.mule.context.notification.EndpointMessageNotification; import org.mule.execution.TransactionalErrorHandlingExecutionTemplate; import org.mule.lifecycle.PrimaryNodeLifecycleNotificationListener; +import org.mule.message.processing.MessageProcessContext; +import org.mule.message.processing.MessageProcessTemplate; +import org.mule.message.processing.MessageProcessingManager; import org.mule.session.DefaultMuleSession; import org.mule.session.LegacySessionHandler; import org.mule.transaction.TransactionCoordination; @@ -88,6 +91,7 @@ protected ReplyToHandler replyToHandler; private PrimaryNodeLifecycleNotificationListener primaryNodeLifecycleNotificationListener; + private MessageProcessingManager messageProcessingManager; /** * Creates the Message Receiver @@ -166,6 +170,8 @@ primaryNodeLifecycleNotificationListener.register(); } + messageProcessingManager = getConnector().getMuleContext().getRegistry().get(MuleProperties.OBJECT_DEFAULT_MESSAGE_PROCESSING_MANAGER); + super.initialise(); } @@ -217,41 +223,7 @@ applyInboundTransformers(muleEvent); }
- MuleEvent resultEvent = listener.process(muleEvent); - if (resultEvent != null - && !VoidMuleEvent.getInstance().equals(resultEvent) - && resultEvent.getMessage() != null - && resultEvent.getMessage().getExceptionPayload() != null - && resultEvent.getMessage().getExceptionPayload().getException() instanceof FilterUnacceptedException) - { - handleUnacceptedFilter(muleEvent.getMessage()); - return muleEvent; - } - - if (endpoint.getExchangePattern().hasResponse() && resultEvent != null - && !VoidMuleEvent.getInstance().equals(resultEvent)) - { - // Do not propagate security context back to caller - MuleSession resultSession = new DefaultMuleSession(resultEvent.getSession()); - resultSession.setSecurityContext(null); - connector.getSessionHandler().storeSessionInfoToMessage(resultSession, resultEvent.getMessage()); - - if (resultEvent.getMessage() != null && !endpoint.isDisableTransportTransformer()) - { - applyResponseTransformers(resultEvent); - } - - if (connector.isEnableMessageEvents()) - { - connector.fireNotification(new EndpointMessageNotification(resultEvent.getMessage(), - endpoint, resultEvent.getFlowConstruct(), EndpointMessageNotification.MESSAGE_RESPONSE)); - } - return resultEvent; - } - else - { - return null; - }+ return routeEvent(muleEvent); } protected void propagateRootMessageIdProperty(MuleMessage message) @@ -491,4 +463,49 @@ } } } + + public MuleEvent routeEvent(MuleEvent muleEvent) throws MuleException + { + MuleEvent resultEvent = listener.process(muleEvent); + if (resultEvent != null + && !VoidMuleEvent.getInstance().equals(resultEvent) + && resultEvent.getMessage() != null + && resultEvent.getMessage().getExceptionPayload() != null + && resultEvent.getMessage().getExceptionPayload().getException() instanceof FilterUnacceptedException) + { + handleUnacceptedFilter(muleEvent.getMessage()); + return muleEvent; + } + + if (endpoint.getExchangePattern().hasResponse() && resultEvent != null + && !VoidMuleEvent.getInstance().equals(resultEvent)) + { + // Do not propagate security context back to caller + MuleSession resultSession = new DefaultMuleSession(resultEvent.getSession()); + resultSession.setSecurityContext(null); + connector.getSessionHandler().storeSessionInfoToMessage(resultSession, resultEvent.getMessage()); + + if (resultEvent.getMessage() != null && !endpoint.isDisableTransportTransformer()) + { + applyResponseTransformers(resultEvent); + } + + if (connector.isEnableMessageEvents()) + { + connector.fireNotification(new EndpointMessageNotification(resultEvent.getMessage(), + endpoint, resultEvent.getFlowConstruct(), EndpointMessageNotification.MESSAGE_RESPONSE)); + } + return resultEvent; + } + else + { + return null; + } + } + + protected void processMessage(final MessageProcessTemplate messageProcessTemplate, final MessageProcessContext messageProcessContext) + { + messageProcessingManager.processMessage(messageProcessTemplate,messageProcessContext); + } + }Added: branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractTransportMessageProcessTemplate.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractTransportMessageProcessTemplate.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/transport/AbstractTransportMessageProcessTemplate.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,216 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.transport; + +import org.mule.MessageExchangePattern; +import org.mule.api.MessagingException; +import org.mule.api.MuleContext; +import org.mule.api.MuleEvent; +import org.mule.api.MuleException; +import org.mule.api.MuleMessage; +import org.mule.api.config.MuleProperties; +import org.mule.api.construct.FlowConstruct; +import org.mule.api.context.WorkManager; +import org.mule.api.endpoint.InboundEndpoint; +import org.mule.api.source.MessageSource; +import org.mule.api.transaction.TransactionConfig; +import org.mule.api.transport.PropertyScope; +import org.mule.message.processing.FlowProcessingPhaseTemplate; +import org.mule.message.processing.MessageProcessContext; +import org.mule.message.processing.ValidationPhaseTemplate; +import org.mule.util.ObjectUtils; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public abstract class AbstractTransportMessageProcessTemplate<MessageReceiverType extends AbstractMessageReceiver, ConnectorType extends AbstractConnector> implements FlowProcessingPhaseTemplate, ValidationPhaseTemplate, MessageProcessContext +{ + + protected transient Log logger = LogFactory.getLog(getClass()); + + private final MessageReceiverType messageReceiver; + private Object rawMessage; + private WorkManager flowExecutionWorkManager; + + public AbstractTransportMessageProcessTemplate(MessageReceiverType messageReceiver, WorkManager flowExecutionWorkManager) + { + this.messageReceiver = messageReceiver; + this.flowExecutionWorkManager = flowExecutionWorkManager; + } + + public MuleEvent getMuleEvent() throws MuleException + { + MuleMessage messageFromSource = createMessageFromSource(acquireMessage()); + return createEventFromMuleMessage(messageFromSource); + } + + @Override + public MessageSource getMessageSource() + { + return this.messageReceiver; + } + + + @Override + public Object getOriginalMessage() throws MuleException + { + + if (this.rawMessage == null) + { + this.rawMessage = acquireMessage(); + } + return this.rawMessage; + } + + public void afterFailureProcessingFlow(MessagingException messagingException) throws MuleException + { + } + + public MuleEvent routeEvent(MuleEvent muleEvent) throws MuleException + { + return messageReceiver.routeEvent(muleEvent); + } + + protected void sendResponseMessage(MuleEvent responseMuleEvent) throws MessagingException + { + } + + public void afterSuccessfulProcessingFlow(MuleEvent response) throws MuleException + { + if (messageReceiver.getEndpoint().getExchangePattern().equals(MessageExchangePattern.REQUEST_RESPONSE)) + { + try + { + sendResponseMessage(response); + } + catch (MessagingException e) + { + throw new ResponseDispatchMessagingException(response,e); + } + } + } + + /** + * This method will only be called once for the {@link MessageProcessContext} + * + * @return the raw message from the {@link MessageSource} + * @throws MuleException + */ + public abstract Object acquireMessage() throws MuleException; + + protected void propagateRootMessageIdProperty(MuleMessage message) + { + String rootId = message.getInboundProperty(MuleProperties.MULE_ROOT_MESSAGE_ID_PROPERTY); + if (rootId != null) + { + message.setMessageRootId(rootId); + message.removeProperty(MuleProperties.MULE_ROOT_MESSAGE_ID_PROPERTY, PropertyScope.INBOUND); + } + } + + @Override + public boolean validateMessage() + { + return true; + } + + @Override + public void discardInvalidMessage() throws MuleException + { + } + + protected void warnIfMuleClientSendUsed(MuleMessage message) + { + final Object remoteSyncProperty = message.removeProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, + PropertyScope.INBOUND); + if (ObjectUtils.getBoolean(remoteSyncProperty, false) && !messageReceiver.getEndpoint().getExchangePattern().hasResponse()) + { + logger.warn("MuleClient.send() was used but inbound endpoint " + + messageReceiver.getEndpoint().getEndpointURI().getUri().toString() + + " is not 'request-response'. No response will be returned."); + } + + message.removeProperty(MuleProperties.MULE_REMOTE_SYNC_PROPERTY, PropertyScope.INBOUND); + } + + private MuleEvent createEventFromMuleMessage(MuleMessage muleMessage) throws MuleException + { + MuleEvent muleEvent = messageReceiver.createMuleEvent(muleMessage, null); + if (!messageReceiver.getEndpoint().isDisableTransportTransformer()) + { + messageReceiver.applyInboundTransformers(muleEvent); + } + return muleEvent; + } + + protected MuleMessage createMessageFromSource(Object message) throws MuleException + { + MuleMessage muleMessage = messageReceiver.createMuleMessage(message, messageReceiver.getEndpoint().getEncoding()); + warnIfMuleClientSendUsed(muleMessage); + propagateRootMessageIdProperty(muleMessage); + return muleMessage; + } + + protected MessageReceiverType getMessageReceiver() + { + return this.messageReceiver; + } + + protected InboundEndpoint getInboundEndpoint() + { + return this.messageReceiver.getEndpoint(); + } + + protected ConnectorType getConnector() + { + return (ConnectorType) this.messageReceiver.getConnector(); + } + + protected MuleContext getMuleContext() + { + return this.messageReceiver.getEndpoint().getMuleContext(); + } + + public FlowConstruct getFlowConstruct() + { + return this.messageReceiver.getFlowConstruct(); + } + + @Override + public boolean supportsAsynchronousProcessing() + { + return true; + } + + @Override + public MuleEvent beforeRouteEvent(MuleEvent muleEvent) throws MuleException + { + return muleEvent; + } + + @Override + public MuleEvent afterRouteEvent(MuleEvent muleEvent) throws MuleException + { + return muleEvent; + } + + @Override + public WorkManager getFlowExecutionWorkManager() + { + return flowExecutionWorkManager; + } + + @Override + public TransactionConfig getTransactionConfig() + { + return getInboundEndpoint().getTransactionConfig(); + } +} +
Added: branches/mule-3.x/core/src/main/java/org/mule/transport/ResponseDispatchMessagingException.java (0 => 25161)
--- branches/mule-3.x/core/src/main/java/org/mule/transport/ResponseDispatchMessagingException.java (rev 0) +++ branches/mule-3.x/core/src/main/java/org/mule/transport/ResponseDispatchMessagingException.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,51 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.transport; + +import org.mule.api.MessagingException; +import org.mule.api.MuleEvent; +import org.mule.api.processor.MessageProcessor; +import org.mule.config.i18n.Message; + +/** + * + */ +class ResponseDispatchMessagingException extends MessagingException +{ + ResponseDispatchMessagingException(Message message, MuleEvent event) + { + super(message, event); + } + + ResponseDispatchMessagingException(Message message, MuleEvent event, MessageProcessor failingMessageProcessor) + { + super(message, event, failingMessageProcessor); + } + + ResponseDispatchMessagingException(Message message, MuleEvent event, Throwable cause) + { + super(message, event, cause); + } + + ResponseDispatchMessagingException(Message message, MuleEvent event, Throwable cause, MessageProcessor failingMessageProcessor) + { + super(message, event, cause, failingMessageProcessor); + } + + ResponseDispatchMessagingException(MuleEvent event, Throwable cause) + { + super(event, cause); + } + + ResponseDispatchMessagingException(MuleEvent event, Throwable cause, MessageProcessor failingMessageProcessor) + { + super(event, cause, failingMessageProcessor); + } +}
Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/EndProcessPhaseTestCase.java (0 => 25161)
--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/EndProcessPhaseTestCase.java (rev 0) +++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/EndProcessPhaseTestCase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,55 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.mule.tck.junit4.AbstractMuleTestCase; +import org.mule.tck.size.SmallTest; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +@SmallTest +public class EndProcessPhaseTestCase extends AbstractMuleTestCase +{ + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MessageProcessTemplate notSupportedTemplate; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private EndPhaseTemplate supportedTemplate; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private PhaseResultNotifier mockPhaseResultNotifier; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MessageProcessContext mockMessageContext; + private EndProcessPhase endProcessPhase = new EndProcessPhase(); + + @Test + public void supportedTemplates() + { + new PhaseSupportTestHelper<EndPhaseTemplate>(EndPhaseTemplate.class).testSupportTemplates(endProcessPhase); + } + + @Test + public void phaseExecution() + { + endProcessPhase.runPhase(supportedTemplate,mockMessageContext,mockPhaseResultNotifier); + verify(mockPhaseResultNotifier, times(0)).phaseConsumedMessage(); + verify(mockPhaseResultNotifier, times(0)).phaseFailure(any(Exception.class)); + verify(mockPhaseResultNotifier, times(0)).phaseConsumedMessage(); + verify(supportedTemplate, times(1)).messageProcessingEnded(); + } + +}
Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/FlowProcessingPhaseTestCase.java (0 => 25161)
--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/FlowProcessingPhaseTestCase.java (rev 0) +++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/FlowProcessingPhaseTestCase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,130 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.mule.api.MessagingException; +import org.mule.api.MuleEvent; +import org.mule.api.MuleException; +import org.mule.tck.junit4.AbstractMuleTestCase; +import org.mule.tck.size.SmallTest; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + + +@RunWith(MockitoJUnitRunner.class) +@SmallTest +public class FlowProcessingPhaseTestCase extends AbstractMuleTestCase +{ + + private FlowProcessingPhase phase = new FlowProcessingPhase(); + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private FlowProcessingPhaseTemplate mockTemplate; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private RequestResponseFlowProcessingPhaseTemplate mockRequestResponseTemplate; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MessageProcessContext mockContext; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private PhaseResultNotifier mockNotifier; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MessagingException mockMessagingException; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MuleException mockException; + + + @Test + public void supportedTemplates() + { + new PhaseSupportTestHelper<FlowProcessingPhaseTemplate>(FlowProcessingPhaseTemplate.class).testSupportTemplates(phase); + } + + @Test + public void order() + { + assertThat(phase.compareTo(new ValidationPhase()), is(1)); + assertThat(phase.compareTo(Mockito.mock(MessageProcessPhase.class)), is(0)); + } + + @Test + public void runPhaseWithExceptionThrown() throws Exception + { + when(mockContext.supportsAsynchronousProcessing()).thenReturn(false); + doThrow(mockException).when(mockTemplate).afterSuccessfulProcessingFlow(any(MuleEvent.class)); + phase.runPhase(mockTemplate, mockContext, mockNotifier); + verify(mockContext.getFlowConstruct(), Mockito.times(1)).getExceptionListener(); + verifyOnlyFailureWasCalled(mockException); + } + + @Test + public void runPhaseWithMessagingExceptionThrown() throws Exception + { + when(mockContext.supportsAsynchronousProcessing()).thenReturn(false); + when(mockTemplate.routeEvent(Mockito.any(MuleEvent.class))).thenThrow(mockMessagingException); + phase.runPhase(mockTemplate, mockContext, mockNotifier); + verify(mockContext.getFlowConstruct(), Mockito.times(1)).getExceptionListener(); + verifyOnlySuccessfulWasCalled(); + } + + @Test + public void callSendResponseForRequestResponseTemplate() throws Exception + { + when(mockContext.supportsAsynchronousProcessing()).thenReturn(false); + phase.runPhase(mockRequestResponseTemplate, mockContext, mockNotifier); + verifyOnlySuccessfulWasCalled(); + verify(mockRequestResponseTemplate, times(1)).sendResponseToClient(any(MuleEvent.class)); + } + + @Test + public void successfulPhaseExecutionInOrder() throws Exception + { + when(mockContext.supportsAsynchronousProcessing()).thenReturn(false); + phase.runPhase(mockRequestResponseTemplate, mockContext, mockNotifier); + InOrder inOrderVerify = Mockito.inOrder(mockContext, mockContext.getFlowConstruct(), mockRequestResponseTemplate, mockNotifier); + inOrderVerify.verify(mockContext, atLeastOnce()).getTransactionConfig(); + inOrderVerify.verify(mockContext.getFlowConstruct(), times(1)).getExceptionListener(); + inOrderVerify.verify(mockRequestResponseTemplate, times(1)).getMuleEvent(); + inOrderVerify.verify(mockRequestResponseTemplate, times(1)).beforeRouteEvent(any(MuleEvent.class)); + inOrderVerify.verify(mockRequestResponseTemplate, times(1)).routeEvent(any(MuleEvent.class)); + inOrderVerify.verify(mockRequestResponseTemplate, times(1)).afterRouteEvent(any(MuleEvent.class)); + inOrderVerify.verify(mockRequestResponseTemplate, times(1)).afterSuccessfulProcessingFlow(any(MuleEvent.class)); + inOrderVerify.verify(mockNotifier, times(1)).phaseSuccessfully(); + verifyOnlySuccessfulWasCalled(); + } + + private void verifyOnlySuccessfulWasCalled() + { + verify(mockNotifier, Mockito.times(0)).phaseFailure(any(Exception.class)); + verify(mockNotifier, Mockito.times(0)).phaseConsumedMessage(); + verify(mockNotifier, Mockito.times(1)).phaseSuccessfully(); + } + + private void verifyOnlyFailureWasCalled(Exception e) + { + verify(mockNotifier, Mockito.times(1)).phaseFailure(e); + verify(mockNotifier, Mockito.times(0)).phaseConsumedMessage(); + verify(mockNotifier, Mockito.times(0)).phaseSuccessfully(); + } + +}
Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/MuleMessageProcessingManagerTestCase.java (0 => 25161)
--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/MuleMessageProcessingManagerTestCase.java (rev 0) +++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/MuleMessageProcessingManagerTestCase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,204 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.mule.api.DefaultMuleException; +import org.mule.api.MuleContext; +import org.mule.api.MuleEvent; +import org.mule.api.MuleException; +import org.mule.api.exception.SystemExceptionHandler; +import org.mule.api.lifecycle.InitialisationException; +import org.mule.tck.size.SmallTest; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +@RunWith(MockitoJUnitRunner.class) +@SmallTest +public class MuleMessageProcessingManagerTestCase extends org.mule.tck.junit4.AbstractMuleTestCase +{ + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MuleContext mockMuleContext; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private TestMessageProcessTemplateAndContext completeMessageProcessTemplateAndContext; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private SystemExceptionHandler mockExceptionListener; + + @Test + public void nullMessageProcessPhaseInRegistry() throws Exception + { + processAndVerifyDefaultPhasesUsingRegistryPhases(null); + } + + @Test + public void emptyMessageProcessPhaseInRegistry() throws Exception + { + processAndVerifyDefaultPhasesUsingRegistryPhases(Collections.<MessageProcessPhase>emptyList()); + } + + @Test + public void notSupportedMessageProcessPhaseInRegistry() throws Exception + { + MessageProcessPhase notSupportedPhase = createNotSupportedPhase(); + processAndVerifyDefaultPhasesUsingRegistryPhases(Arrays.asList(notSupportedPhase)); + } + + @Test + public void messageConsumedPreventsNextPhaseToBeExecuted() throws Exception + { + PhaseAfterValidationBeforeFlow messageProcessPhase = createPhaseAfterValidation(); + when(completeMessageProcessTemplateAndContext.validateMessage()).thenReturn(true); + when(messageProcessPhase.compareTo(any(MessageProcessPhase.class))).thenCallRealMethod(); + when(messageProcessPhase.supportsTemplate(any(MessageProcessTemplate.class))).thenCallRealMethod(); + doCallRealMethod().when(messageProcessPhase).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class)); + MuleMessageProcessingManager manager = createManagerUsingPhasesInRegistry(Arrays.<MessageProcessPhase>asList(messageProcessPhase)); + manager.processMessage(completeMessageProcessTemplateAndContext, completeMessageProcessTemplateAndContext); + verify(completeMessageProcessTemplateAndContext, times(0)).routeEvent(any(MuleEvent.class)); + verify(completeMessageProcessTemplateAndContext, times(1)).validateMessage(); + verify(completeMessageProcessTemplateAndContext, times(1)).messageProcessingEnded(); + } + + private PhaseAfterValidationBeforeFlow createPhaseAfterValidation() + { + return mock(PhaseAfterValidationBeforeFlow.class,Answers.RETURNS_DEEP_STUBS.get()); + } + + @Test + public void testExceptionHandlerIsCalledDuringPhaseFailure() throws Exception + { + MessageProcessPhase failureMessageProcessPhase = createFailureMessageProcessPhase(); + when(mockMuleContext.getExceptionListener()).thenReturn(mockExceptionListener); + MuleMessageProcessingManager manager = createManagerUsingPhasesInRegistry(Arrays.asList(failureMessageProcessPhase)); + manager.processMessage(completeMessageProcessTemplateAndContext, completeMessageProcessTemplateAndContext); + verify(mockExceptionListener, times(1)).handleException(any(MuleException.class)); + } + + private MessageProcessPhase createFailureMessageProcessPhase() + { + FailureMessageProcessPhase failureMessageProcessPhase = mock(FailureMessageProcessPhase.class, Answers.RETURNS_DEEP_STUBS.get()); + when(failureMessageProcessPhase.supportsTemplate(any(MessageProcessTemplate.class))).thenCallRealMethod(); + doAnswer(new Answer() + { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable + { + PhaseResultNotifier phaseResultNotifier = (PhaseResultNotifier) invocationOnMock.getArguments()[2]; + phaseResultNotifier.phaseFailure(new DefaultMuleException("error")); + return null; + } + }).when(failureMessageProcessPhase).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class)); + return failureMessageProcessPhase; + } + + private MessageProcessPhase createNotSupportedPhase() + { + MessageProcessPhase notSupportedPhase = mock(MessageProcessPhase.class, Answers.RETURNS_DEEP_STUBS.get()); + when(notSupportedPhase.supportsTemplate(any(MessageProcessTemplate.class))).thenReturn(false); + return notSupportedPhase; + } + + private void processAndVerifyDefaultPhasesUsingRegistryPhases(Collection<MessageProcessPhase> phasesInRegistry) throws Exception + { + MuleMessageProcessingManager manager = createManagerUsingPhasesInRegistry(phasesInRegistry); + processAndVerifyDefaultPhasesAreExecuted(manager); + } + + private MuleMessageProcessingManager createManagerUsingPhasesInRegistry(Collection<MessageProcessPhase> phasesInRegistry) throws InitialisationException + { + MuleMessageProcessingManager manager = new MuleMessageProcessingManager(); + manager.setMuleContext(mockMuleContext); + when(mockMuleContext.getRegistry().lookupObjects(MessageProcessPhase.class)).thenReturn(phasesInRegistry); + manager.initialise(); + return manager; + } + + private void processAndVerifyDefaultPhasesAreExecuted(MuleMessageProcessingManager manager) throws Exception + { + when(completeMessageProcessTemplateAndContext.validateMessage()).thenReturn(true); + + manager.processMessage(completeMessageProcessTemplateAndContext,completeMessageProcessTemplateAndContext); + InOrder verifyInOrder = Mockito.inOrder(completeMessageProcessTemplateAndContext); + verifyInOrder.verify(completeMessageProcessTemplateAndContext, times(1)).validateMessage(); + verifyInOrder.verify(completeMessageProcessTemplateAndContext, times(1)).routeEvent(Mockito.any(MuleEvent.class)); + verifyInOrder.verify(completeMessageProcessTemplateAndContext, times(1)).messageProcessingEnded(); + } + + public interface TestMessageProcessTemplateAndContext extends ValidationPhaseTemplate, FlowProcessingPhaseTemplate, EndPhaseTemplate, MessageProcessContext + { + } + + public abstract class PhaseAfterValidationBeforeFlow implements MessageProcessPhase, Comparable<MessageProcessPhase> + { + + @Override + public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate) + { + return true; + } + + @Override + public int compareTo(MessageProcessPhase messageProcessPhase) + { + if (messageProcessPhase instanceof ValidationPhase) + { + return 1; + } + if (messageProcessPhase instanceof FlowProcessingPhase) + { + return -1; + } + return 0; + } + + @Override + public void runPhase(MessageProcessTemplate messageProcessTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier) + { + phaseResultNotifier.phaseConsumedMessage(); + } + } + + public abstract class FailureMessageProcessPhase implements MessageProcessPhase, Comparable<MessageProcessPhase> + { + + @Override + public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate) + { + return true; + } + + @Override + public int compareTo(MessageProcessPhase messageProcessPhase) + { + return -1; + } + + } + +}
Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseExecutionEngineTestCase.java (0 => 25161)
--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseExecutionEngineTestCase.java (rev 0) +++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseExecutionEngineTestCase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,115 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.mule.api.exception.SystemExceptionHandler; +import org.mule.tck.size.SmallTest; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +@RunWith(MockitoJUnitRunner.class) +@SmallTest +public class PhaseExecutionEngineTestCase +{ + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private SystemExceptionHandler mockExceptionHandler; + private List<MessageProcessPhase> phaseList = new ArrayList<MessageProcessPhase>(); + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private EndProcessPhase mockEndPhase; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MessageProcessPhase mockProcessPhase1; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MessageProcessPhase mockProcessPhase2; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MessageProcessPhase mockProcessPhase3; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MessageProcessPhase mockFailingPhase; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private EndPhaseTemplate mockTemplate; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MessageProcessContext mockContext; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private PhaseResultNotifier mockNotifier; + + @Test + public void allPhasesRun() throws Exception + { + when(mockEndPhase.supportsTemplate(mockTemplate)).thenReturn(true); + verifyAllPhasesAreRun(); + verify(mockEndPhase, Mockito.times(1)).runPhase(any(EndPhaseTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class)); + } + + @Test + public void endPhaseDoesNotRun() throws Exception + { + when(mockEndPhase.supportsTemplate(mockTemplate)).thenReturn(false); + verifyAllPhasesAreRun(); + verify(mockEndPhase, Mockito.times(0)).runPhase(any(EndPhaseTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class)); + } + + @Test + public void exceptionHandlerIsCalledOnFailure() throws Exception + { + addPhase(mockFailingPhase); + addPhase(mockProcessPhase1); + when(mockEndPhase.supportsTemplate(mockTemplate)).thenReturn(true); + PhaseExecutionEngine phaseExecutionEngine = new PhaseExecutionEngine(phaseList, mockExceptionHandler, mockEndPhase); + phaseExecutionEngine.process(mockTemplate,mockContext); + verify(mockEndPhase,times(1)).runPhase(any(EndPhaseTemplate.class),any(MessageProcessContext.class), any(PhaseResultNotifier.class)); + } + + private void verifyAllPhasesAreRun() + { + PhaseExecutionEngine engine = new PhaseExecutionEngine(phaseList,mockExceptionHandler, mockEndPhase); + addAllPhases(); + engine.process(mockTemplate, mockContext); + verify(mockProcessPhase1, Mockito.times(1)).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class)); + verify(mockProcessPhase2, Mockito.times(1)).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class)); + verify(mockProcessPhase3, Mockito.times(1)).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class)); + } + + private void addAllPhases() + { + addPhase(mockProcessPhase1); + addPhase(mockProcessPhase2); + addPhase(mockProcessPhase3); + } + + private void addPhase(MessageProcessPhase mockProcessPhase) + { + phaseList.add(mockProcessPhase); + when(mockProcessPhase.supportsTemplate(mockTemplate)).thenReturn(true); + Mockito.doAnswer(new Answer() + { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable + { + ((PhaseResultNotifier)invocationOnMock.getArguments()[2]).phaseSuccessfully(); + return null; + } + }).when(mockProcessPhase).runPhase(any(MessageProcessTemplate.class), any(MessageProcessContext.class), any(PhaseResultNotifier.class)); + } +}
Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseSupportTestHelper.java (0 => 25161)
--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseSupportTestHelper.java (rev 0) +++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/PhaseSupportTestHelper.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,47 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import org.mockito.Mockito; + +public class PhaseSupportTestHelper<T> +{ + + private final Class<T> supportedTemplateClass; + private final T supportedTemplate; + private final MessageProcessTemplate notSupportedTemplate; + + public PhaseSupportTestHelper(Class<T> supportedTemplate) + { + this.supportedTemplateClass = supportedTemplate; + this.supportedTemplate = Mockito.mock(this.supportedTemplateClass); + this.notSupportedTemplate = Mockito.mock(MessageProcessTemplate.class); + } + + public void testSupportTemplates(MessageProcessPhase messageProcessPhase) + { + notSupportedTemplateTest(messageProcessPhase); + supportedTemplateTest(messageProcessPhase); + } + + public void notSupportedTemplateTest(MessageProcessPhase messageProcessPhase) + { + assertThat(messageProcessPhase.supportsTemplate(notSupportedTemplate), is(false)); + } + + public void supportedTemplateTest(MessageProcessPhase messageProcessPhase) + { + assertThat(messageProcessPhase.supportsTemplate((MessageProcessTemplate) supportedTemplate), is(true)); + } + +}
Added: branches/mule-3.x/core/src/test/java/org/mule/message/processing/ValidationPhaseTestCase.java (0 => 25161)
--- branches/mule-3.x/core/src/test/java/org/mule/message/processing/ValidationPhaseTestCase.java (rev 0) +++ branches/mule-3.x/core/src/test/java/org/mule/message/processing/ValidationPhaseTestCase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,73 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.message.processing; + +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.mule.api.MuleException; +import org.mule.tck.junit4.AbstractMuleContextTestCase; +import org.mule.tck.size.SmallTest; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +@SmallTest +public class ValidationPhaseTestCase extends AbstractMuleContextTestCase +{ + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ValidationPhaseTemplate mockTemplate; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MessageProcessContext mockContext; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private PhaseResultNotifier mockPhaseResultNotifier; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MuleException mockMulException; + + @Test + public void supportsTemplate() + { + new PhaseSupportTestHelper<ValidationPhaseTemplate>(ValidationPhaseTemplate.class).testSupportTemplates(new ValidationPhase()); + } + + @Test + public void valid() + { + when(mockTemplate.validateMessage()).thenReturn(true); + new ValidationPhase().runPhase(mockTemplate, mockContext, mockPhaseResultNotifier); + verify(mockPhaseResultNotifier, Mockito.times(1)).phaseSuccessfully(); + } + + @Test + public void invalid() throws Exception + { + when(mockTemplate.validateMessage()).thenReturn(false); + new ValidationPhase().runPhase(mockTemplate, mockContext, mockPhaseResultNotifier); + verify(mockTemplate,times(1)).discardInvalidMessage(); + verify(mockPhaseResultNotifier, Mockito.times(1)).phaseConsumedMessage(); + } + + @Test + public void validationFails() throws Exception + { + when(mockTemplate.validateMessage()).thenReturn(false); + doThrow(mockMulException).when(mockTemplate).discardInvalidMessage(); + new ValidationPhase().runPhase(mockTemplate, mockContext, mockPhaseResultNotifier); + verify(mockPhaseResultNotifier, Mockito.times(1)).phaseFailure(mockMulException); + } +}
Modified: branches/mule-3.x/modules/spring-config/src/main/resources/default-mule-config.xml (25160 => 25161)
--- branches/mule-3.x/modules/spring-config/src/main/resources/default-mule-config.xml 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/modules/spring-config/src/main/resources/default-mule-config.xml 2013-01-06 00:06:58 UTC (rev 25161) @@ -60,6 +60,8 @@ <bean name="_muleSecurityManager" class="org.mule.security.MuleSecurityManager"/> + <bean name="_muleMessageProcessingManager" class="org.mule.message.processing.MuleMessageProcessingManager"/> + <bean name="_muleProperties" class="java.util.HashMap"/> <bean name="_muleEndpointFactory" class="org.mule.endpoint.DefaultEndpointFactory"/>
Added: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageProcessTemplate.java (0 => 25161)
--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageProcessTemplate.java (rev 0) +++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageProcessTemplate.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,417 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.transport.http; + +import org.mule.DefaultMuleEvent; +import org.mule.DefaultMuleMessage; +import org.mule.OptimizedRequestContext; +import org.mule.RequestContext; +import org.mule.api.DefaultMuleException; +import org.mule.api.MessagingException; +import org.mule.api.MuleEvent; +import org.mule.api.MuleException; +import org.mule.api.MuleMessage; +import org.mule.api.config.MuleProperties; +import org.mule.api.context.WorkManager; +import org.mule.api.endpoint.ImmutableEndpoint; +import org.mule.api.transaction.TransactionConfig; +import org.mule.api.transport.PropertyScope; +import org.mule.config.ExceptionHelper; +import org.mule.message.processing.RequestResponseFlowProcessingPhaseTemplate; +import org.mule.transport.AbstractTransportMessageProcessTemplate; +import org.mule.message.processing.EndPhaseTemplate; +import org.mule.message.processing.ThrottlingPhaseTemplate; +import org.mule.transport.NullPayload; +import org.mule.transport.http.i18n.HttpMessages; +import org.mule.util.concurrent.Latch; + +import java.io.IOException; + +import org.apache.commons.httpclient.Header; +import org.apache.commons.httpclient.HttpVersion; + +public class HttpMessageProcessTemplate extends AbstractTransportMessageProcessTemplate<HttpMessageReceiver, HttpConnector> implements ThrottlingPhaseTemplate, EndPhaseTemplate, RequestResponseFlowProcessingPhaseTemplate +{ + + public static final int MESSAGE_DISCARD_STATUS_CODE = Integer.valueOf(System.getProperty("mule.transport.http.throttling.discardstatuscode","429")); + + private final HttpServerConnection httpServerConnection; + private HttpRequest request; + private boolean badRequest; + private Latch messageProcessedLatch = new Latch(); + private boolean failureSendingResponse; + + public HttpMessageProcessTemplate(final HttpMessageReceiver messageReceiver, final HttpServerConnection httpServerConnection, final WorkManager flowExecutionWorkManager) + { + super(messageReceiver,flowExecutionWorkManager); + this.httpServerConnection = httpServerConnection; + } + + @Override + public void afterFailureProcessingFlow(MessagingException messagingException) throws MuleException + { + if (!failureSendingResponse) + { + Exception e = messagingException; + MuleEvent response = messagingException.getEvent(); + if (response != null && + response.getMessage().getExceptionPayload() != null && + response.getMessage().getExceptionPayload().getException() instanceof MessagingException) + { + e = (Exception) response.getMessage().getExceptionPayload().getException(); + } + + String temp = ExceptionHelper.getErrorMapping(getInboundEndpoint().getConnector().getProtocol(), messagingException.getClass(), getMuleContext()); + int httpStatus = Integer.valueOf(temp); + try + { + if (e instanceof MessagingException) + { + httpStatus = response.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY) != null ? Integer.valueOf(response.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY).toString()) : httpStatus; + httpServerConnection.writeFailureResponse(httpStatus,e.getMessage()); + } + else + { + httpServerConnection.writeFailureResponse(httpStatus, e.getMessage()); + } + } + catch (IOException ioException) + { + throw new DefaultMuleException(ioException); + } + } + } + + @Override + public void sendResponseToClient(MuleEvent muleEvent) throws MuleException + { + sendHttpResponse(muleEvent); + } + + @Override + public MuleEvent beforeRouteEvent(MuleEvent muleEvent) throws MuleException + { + try + { + sendExpect100(request); + return muleEvent; + } + catch (IOException e) + { + throw new DefaultMuleException(e); + } + } + + private void sendExpect100(HttpRequest request) throws MuleException, IOException + { + RequestLine requestLine = request.getRequestLine(); + + // respond with status code 100, for Expect handshake + // according to rfc 2616 and http 1.1 + // the processing will continue and the request will be fully + // read immediately after + HttpVersion requestVersion = requestLine.getHttpVersion(); + if (HttpVersion.HTTP_1_1.equals(requestVersion)) + { + Header expectHeader = request.getFirstHeader(HttpConstants.HEADER_EXPECT); + if (expectHeader != null) + { + String expectHeaderValue = expectHeader.getValue(); + if (HttpConstants.HEADER_EXPECT_CONTINUE_REQUEST_VALUE.equals(expectHeaderValue)) + { + HttpResponse expected = new HttpResponse(); + expected.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_CONTINUE); + final DefaultMuleEvent event = new DefaultMuleEvent(new DefaultMuleMessage(expected, + getMuleContext()), getInboundEndpoint(), getFlowConstruct()); + RequestContext.setEvent(event); + httpServerConnection.writeResponse(transformResponse(expected)); + } + } + } + } + + private void sendHttpResponse(MuleEvent responseMuleEvent) throws MessagingException + { + try + { + if (logger.isTraceEnabled()) + { + logger.trace("Sending http response"); + } + MuleMessage returnMessage = responseMuleEvent == null ? null : responseMuleEvent.getMessage(); + + Object tempResponse; + if (returnMessage != null) + { + tempResponse = returnMessage.getPayload(); + } + else + { + tempResponse = NullPayload.getInstance(); + } + // This removes the need for users to explicitly adding the response transformer + // ObjectToHttpResponse in their config + HttpResponse response; + if (tempResponse instanceof HttpResponse) + { + response = (HttpResponse) tempResponse; + } + else + { + response = transformResponse(returnMessage); + } + + response.setupKeepAliveFromRequestVersion(request.getRequestLine().getHttpVersion()); + HttpConnector httpConnector = (HttpConnector) getMessageReceiver().getEndpoint().getConnector(); + response.disableKeepAlive(!httpConnector.isKeepAlive()); + + Header connectionHeader = request.getFirstHeader("Connection"); + if (connectionHeader != null) + { + String value = connectionHeader.getValue(); + boolean endpointOverride = getEndpointKeepAliveValue(getMessageReceiver().getEndpoint()); + if ("keep-alive".equalsIgnoreCase(value) && endpointOverride) + { + response.setKeepAlive(true); + + if (response.getHttpVersion().equals(HttpVersion.HTTP_1_0)) + { + connectionHeader = new Header(HttpConstants.HEADER_CONNECTION, "Keep-Alive"); + response.setHeader(connectionHeader); + } + } + else if ("close".equalsIgnoreCase(value)) + { + response.setKeepAlive(false); + } + } + try + { + httpServerConnection.writeResponse(response); + } + catch (Exception e) + { + failureSendingResponse = true; + } + if (logger.isTraceEnabled()) + { + logger.trace("HTTP response sent successfully"); + } + } + catch (Exception e) + { + if (logger.isDebugEnabled()) + { + logger.debug("Exception while sending http response"); + logger.debug(e); + } + throw new MessagingException(responseMuleEvent,e); + } + } + + /** + * Check if endpoint has a keep-alive property configured. Note the translation from + * keep-alive in the schema to keepAlive here. + */ + private boolean getEndpointKeepAliveValue(ImmutableEndpoint ep) + { + String value = (String) ep.getProperty("keepAlive"); + if (value != null) + { + return Boolean.parseBoolean(value); + } + return true; + } + + protected HttpResponse transformResponse(Object response) throws MuleException + { + MuleMessage message; + if (response instanceof MuleMessage) + { + message = (MuleMessage) response; + } + else + { + message = new DefaultMuleMessage(response, getMessageReceiver().getEndpoint().getConnector().getMuleContext()); + } + //TODO RM*: Maybe we can have a generic Transformer wrapper rather that using DefaultMuleMessage (or another static utility + //class + message.applyTransformers(null, getMessageReceiver().getResponseTransportTransformers(), HttpResponse.class); + return (HttpResponse) message.getPayload(); + } + + protected MuleMessage createMessageFromSource(Object message) throws MuleException + { + MuleMessage muleMessage = super.createMessageFromSource(message); + String path = muleMessage.getInboundProperty(HttpConnector.HTTP_REQUEST_PROPERTY); + int i = path.indexOf('?'); + if (i > -1) + { + path = path.substring(0, i); + } + + muleMessage.setProperty(HttpConnector.HTTP_REQUEST_PATH_PROPERTY, path, PropertyScope.INBOUND); + + if (logger.isDebugEnabled()) + { + logger.debug(muleMessage.getInboundProperty(HttpConnector.HTTP_REQUEST_PROPERTY)); + } + + // determine if the request path on this request denotes a different receiver + //final MessageReceiver receiver = getTargetReceiver(message, endpoint); + + // the response only needs to be transformed explicitly if + // A) the request was not served or B) a null result was returned + String contextPath = HttpConnector.normalizeUrl(getInboundEndpoint().getEndpointURI().getPath()); + muleMessage.setProperty(HttpConnector.HTTP_CONTEXT_PATH_PROPERTY, + contextPath, + PropertyScope.INBOUND); + + muleMessage.setProperty(HttpConnector.HTTP_CONTEXT_URI_PROPERTY, + getInboundEndpoint().getEndpointURI().getAddress(), + PropertyScope.INBOUND); + + muleMessage.setProperty(HttpConnector.HTTP_RELATIVE_PATH_PROPERTY, + processRelativePath(contextPath, path), + PropertyScope.INBOUND); + + muleMessage.setProperty(MuleProperties.MULE_REMOTE_CLIENT_ADDRESS, httpServerConnection.getRemoteClientAddress(), PropertyScope.INBOUND); + return muleMessage; + } + + protected String processRelativePath(String contextPath, String path) + { + String relativePath = path.substring(contextPath.length()); + if (relativePath.startsWith("/")) + { + return relativePath.substring(1); + } + return relativePath; + } + + @Override + public Object acquireMessage() throws MuleException + { + final HttpRequest request; + try + { + request = httpServerConnection.readRequest(); + } + catch (IOException e) + { + throw new DefaultMuleException(e); + } + if (request == null) + { + throw new HttpMessageReceiver.EmptyRequestException(); + } + this.request = request; + return request; + } + + public boolean validateMessage() + { + try + { + this.request = httpServerConnection.readRequest(); + if (request == null) + { + return false; + } + + RequestLine requestLine = request.getRequestLine(); + String method = requestLine.getMethod(); + + if (!(method.equals(HttpConstants.METHOD_GET) + || method.equals(HttpConstants.METHOD_HEAD) + || method.equals(HttpConstants.METHOD_POST) + || method.equals(HttpConstants.METHOD_OPTIONS) + || method.equals(HttpConstants.METHOD_PUT) + || method.equals(HttpConstants.METHOD_DELETE) + || method.equals(HttpConstants.METHOD_TRACE) + || method.equals(HttpConstants.METHOD_CONNECT) + || method.equals(HttpConstants.METHOD_PATCH))) + { + badRequest = true; + return false; + } + } + catch (IOException e) + { + return false; + } + return true; + } + + + @Override + public void discardInvalidMessage() throws MuleException + { + if (badRequest) + { + try + { + httpServerConnection.writeResponse(doBad(request.getRequestLine())); + } + catch (IOException e) + { + throw new DefaultMuleException(e); + } + } + } + + @Override + public boolean supportsAsynchronousProcessing() + { + return true; + } + + protected HttpResponse doBad(RequestLine requestLine) throws MuleException + { + MuleMessage message = getMessageReceiver().createMuleMessage(null); + MuleEvent event = new DefaultMuleEvent(message, getInboundEndpoint(), getFlowConstruct()); + OptimizedRequestContext.unsafeSetEvent(event); + HttpResponse response = new HttpResponse(); + response.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_BAD_REQUEST); + response.setBody(HttpMessages.malformedSyntax().toString() + HttpConstants.CRLF); + return transformResponse(response); + } + + protected HttpServerConnection getHttpServerConnection() + { + return httpServerConnection; + } + + public Latch getMessageProcessedLatch() + { + return messageProcessedLatch; + } + + @Override + public void discardMessageOnThrottlingExceeded() throws MuleException + { + try + { + httpServerConnection.writeFailureResponse(MESSAGE_DISCARD_STATUS_CODE, "API calls exceeded"); + } + catch (IOException e) + { + throw new DefaultMuleException(e); + } + } + + @Override + public void messageProcessingEnded() + { + messageProcessedLatch.release(); + } + + +}
Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageReceiver.java (25160 => 25161)
--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageReceiver.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpMessageReceiver.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -27,6 +27,7 @@ import org.mule.api.execution.ExecutionTemplate; import org.mule.api.lifecycle.CreateException; import org.mule.api.lifecycle.InitialisationException; +import org.mule.api.transformer.Transformer; import org.mule.api.transport.Connector; import org.mule.api.transport.PropertyScope; import org.mule.config.ExceptionHelper; @@ -34,11 +35,13 @@ import org.mule.config.i18n.MessageFactory; import org.mule.transport.AbstractMessageReceiver; import org.mule.transport.ConnectException; +import org.mule.message.processing.MessageProcessContext; import org.mule.transport.NullPayload; import org.mule.transport.http.i18n.HttpMessages; import org.mule.util.MapUtils; import java.io.IOException; +import java.util.List; import javax.resource.spi.work.Work; @@ -70,345 +73,16 @@ ((HttpConnector) connector).disconnect(endpoint.getEndpointURI()); }
- public Work createWork(HttpServerConnection httpServerConnection) throws IOException+ MessageProcessContext createMessageContext(HttpServerConnection httpServerConnection) {- return new HttpWorker(httpServerConnection);+ return new HttpMessageProcessTemplate(this,httpServerConnection,getWorkManager()); }- @SuppressWarnings("synthetic-access") - protected class HttpWorker implements Work+ void processRequest(HttpServerConnection httpServerConnection) throws InterruptedException, MuleException {- - private HttpServerConnection conn; - private String remoteClientAddress; - - public HttpWorker(HttpServerConnection httpServerConnection) throws IOException - { - String encoding = endpoint.getEncoding(); - if (encoding == null) - { - encoding = connector.getMuleContext().getConfiguration().getDefaultEncoding(); - } - conn = httpServerConnection; - remoteClientAddress = conn.getRemoteClientAddress(); - } - - protected HttpServerConnection getServerConnection() - { - return conn; - } - - @Override - public void run() - { - try - { - final HttpRequest request = conn.readRequest(); - if (request == null) - { - throw new EmptyRequestException(); - } - - try - { - HttpResponse httpResponse = processRequest(request); - conn.writeResponse(httpResponse); - } - catch (Exception e) - { - MuleEvent response = null; - if (e instanceof MessagingException) - { - response = ((MessagingException) e).getEvent(); - } - else - { - getConnector().getMuleContext().getExceptionListener().handleException(e); - } - - if (response != null && - response.getMessage().getExceptionPayload() != null && - response.getMessage().getExceptionPayload().getException() instanceof MessagingException) - { - e = (Exception) response.getMessage().getExceptionPayload().getException(); - } - //MULE-5656 There was custom code here for mapping status codes to exceptions - //I have removed this code and now make an explicit call to the Exception helper, - //but the real fix is to make sure Mule handles this automatically through the - //InboundExceptionDetailsMessageProcessor - - //Response code mappings are loaded from META-INF/services/org/mule/config/http-exception-mappings.properties - String temp = ExceptionHelper.getErrorMapping(connector.getProtocol(), e.getClass(), flowConstruct.getMuleContext()); - int httpStatus = Integer.valueOf(temp); - - if (e instanceof MessagingException) - { - MuleEvent event = ((MessagingException) e).getEvent(); - httpStatus = event.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY) != null ? Integer.valueOf(event.getMessage().getOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY).toString()) : httpStatus; - conn.writeResponse(buildFailureResponse(event, e.getMessage(), httpStatus)); - } - else - { - conn.writeFailureResponse(httpStatus, e.getMessage()); - } - throw new FailureProcessingRequestException(); - } - finally - { - if (request.getBody() != null) - { - request.getBody().close(); - } - } - } - catch (EmptyRequestException e) - { - throw e; - } - catch (FailureProcessingRequestException e) - { - throw e; - } - catch (Exception e) - { - getConnector().getMuleContext().getExceptionListener().handleException(e); - } - } - - - protected HttpResponse processRequest(HttpRequest request) throws MuleException, IOException - { - RequestLine requestLine = request.getRequestLine(); - String method = requestLine.getMethod(); - - if (method.equals(HttpConstants.METHOD_GET) - || method.equals(HttpConstants.METHOD_HEAD) - || method.equals(HttpConstants.METHOD_POST) - || method.equals(HttpConstants.METHOD_OPTIONS) - || method.equals(HttpConstants.METHOD_PUT) - || method.equals(HttpConstants.METHOD_DELETE) - || method.equals(HttpConstants.METHOD_TRACE) - || method.equals(HttpConstants.METHOD_CONNECT) - || method.equals(HttpConstants.METHOD_PATCH)) - { - return doRequest(request); - } - else - { - return doBad(requestLine); - } - } - - protected HttpResponse doRequest(HttpRequest request) throws IOException, MuleException - { - sendExpect100(request); - - final MuleMessage message = createMuleMessage(request); - - String path = message.getInboundProperty(HttpConnector.HTTP_REQUEST_PROPERTY); - int i = path.indexOf('?'); - if (i > -1) - { - path = path.substring(0, i); - } - - message.setProperty(HttpConnector.HTTP_REQUEST_PATH_PROPERTY, path, PropertyScope.INBOUND); - - if (logger.isDebugEnabled()) - { - logger.debug(message.getInboundProperty(HttpConnector.HTTP_REQUEST_PROPERTY)); - } - - // determine if the request path on this request denotes a different receiver - //final MessageReceiver receiver = getTargetReceiver(message, endpoint); - - HttpResponse response; - // the response only needs to be transformed explicitly if - // A) the request was not served or B) a null result was returned - String contextPath = HttpConnector.normalizeUrl(getEndpointURI().getPath()); - message.setProperty(HttpConnector.HTTP_CONTEXT_PATH_PROPERTY, - contextPath, - PropertyScope.INBOUND); - - message.setProperty(HttpConnector.HTTP_CONTEXT_URI_PROPERTY, - getEndpointURI().getAddress(), - PropertyScope.INBOUND); - - message.setProperty(HttpConnector.HTTP_RELATIVE_PATH_PROPERTY, - processRelativePath(contextPath, path), - PropertyScope.INBOUND); - - ExecutionTemplate<MuleEvent> executionTemplate = createExecutionTemplate(); - - MuleEvent returnEvent; - try - { - returnEvent = executionTemplate.execute(new ExecutionCallback<MuleEvent>() - { - @Override - public MuleEvent process() throws Exception - { - preRouteMessage(message); - return routeMessage(message); - } - }); - } - catch (MuleException e) - { - throw e; - } - catch (IOException e) - { - throw e; - } - catch (Exception e) - { - throw new DefaultMuleException(e); - } - - MuleMessage returnMessage = returnEvent == null ? null : returnEvent.getMessage(); - - Object tempResponse; - if (returnMessage != null) - { - tempResponse = returnMessage.getPayload(); - } - else - { - tempResponse = NullPayload.getInstance(); - } - // This removes the need for users to explicitly adding the response transformer - // ObjectToHttpResponse in their config - if (tempResponse instanceof HttpResponse) - { - response = (HttpResponse) tempResponse; - } - else - { - response = transformResponse(returnMessage); - } - - response.setupKeepAliveFromRequestVersion(request.getRequestLine().getHttpVersion()); - HttpConnector httpConnector = (HttpConnector) connector; - response.disableKeepAlive(!httpConnector.isKeepAlive()); - - Header connectionHeader = request.getFirstHeader("Connection"); - if (connectionHeader != null) - { - String value = connectionHeader.getValue(); - boolean endpointOverride = getEndpointKeepAliveValue(endpoint); - if ("keep-alive".equalsIgnoreCase(value) && endpointOverride) - { - response.setKeepAlive(true); - - if (response.getHttpVersion().equals(HttpVersion.HTTP_1_0)) - { - connectionHeader = new Header(HttpConstants.HEADER_CONNECTION, "Keep-Alive"); - response.setHeader(connectionHeader); - } - } - else if ("close".equalsIgnoreCase(value)) - { - response.setKeepAlive(false); - } - } - return response; - } - - /** - * Check if endpoint has a keep-alive property configured. Note the translation from - * keep-alive in the schema to keepAlive here. - */ - private boolean getEndpointKeepAliveValue(ImmutableEndpoint ep) - { - String value = (String) ep.getProperty("keepAlive"); - if (value != null) - { - return Boolean.parseBoolean(value); - } - return true; - } - - - protected HttpResponse doOtherValid(RequestLine requestLine, String method) throws MuleException - { - MuleMessage message = createMuleMessage(null); - MuleEvent event = new DefaultMuleEvent(message, (InboundEndpoint) endpoint, flowConstruct); - OptimizedRequestContext.unsafeSetEvent(event); - HttpResponse response = new HttpResponse(); - response.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_METHOD_NOT_ALLOWED); - response.setBody(HttpMessages.methodNotAllowed(method).toString() + HttpConstants.CRLF); - return transformResponse(response); - } - - protected HttpResponse doBad(RequestLine requestLine) throws MuleException - { - MuleMessage message = createMuleMessage(null); - MuleEvent event = new DefaultMuleEvent(message, (InboundEndpoint) endpoint, flowConstruct); - OptimizedRequestContext.unsafeSetEvent(event); - HttpResponse response = new HttpResponse(); - response.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_BAD_REQUEST); - response.setBody(HttpMessages.malformedSyntax().toString() + HttpConstants.CRLF); - return transformResponse(response); - } - - private void sendExpect100(HttpRequest request) throws MuleException, IOException - { - RequestLine requestLine = request.getRequestLine(); - - // respond with status code 100, for Expect handshake - // according to rfc 2616 and http 1.1 - // the processing will continue and the request will be fully - // read immediately after - HttpVersion requestVersion = requestLine.getHttpVersion(); - if (HttpVersion.HTTP_1_1.equals(requestVersion)) - { - Header expectHeader = request.getFirstHeader(HttpConstants.HEADER_EXPECT); - if (expectHeader != null) - { - String expectHeaderValue = expectHeader.getValue(); - if (HttpConstants.HEADER_EXPECT_CONTINUE_REQUEST_VALUE.equals(expectHeaderValue)) - { - HttpResponse expected = new HttpResponse(); - expected.setStatusLine(requestLine.getHttpVersion(), HttpConstants.SC_CONTINUE); - final DefaultMuleEvent event = new DefaultMuleEvent(new DefaultMuleMessage(expected, - connector.getMuleContext()), (InboundEndpoint) endpoint, flowConstruct); - RequestContext.setEvent(event); - conn.writeResponse(transformResponse(expected)); - } - } - } - } - - private HttpResponse buildFailureResponse(MuleEvent event, String description, int httpStatusCode) throws MuleException - { - event.getMessage().setOutboundProperty(HttpConnector.HTTP_STATUS_PROPERTY, httpStatusCode); - event.getMessage().setPayload(description); - return transformResponse(event.getMessage()); - } - - protected HttpResponse buildFailureResponse(HttpVersion version, int statusCode, String description) throws MuleException - { - HttpResponse response = new HttpResponse(); - response.setStatusLine(version, statusCode); - response.setBody(description); - DefaultMuleEvent event = new DefaultMuleEvent(new DefaultMuleMessage(response, - connector.getMuleContext()), (InboundEndpoint) endpoint, flowConstruct); - RequestContext.setEvent(event); - // The DefaultResponseTransformer will set the necessary headers - return transformResponse(response); - } - - protected void preRouteMessage(MuleMessage message) throws MessagingException - { - message.setProperty(MuleProperties.MULE_REMOTE_CLIENT_ADDRESS, remoteClientAddress, PropertyScope.INBOUND); - } - - @Override - public void release() - { - //Nothing to do. - }+ HttpMessageProcessTemplate messageContext = (HttpMessageProcessTemplate) createMessageContext(httpServerConnection); + processMessage(messageContext,messageContext); + messageContext.getMessageProcessedLatch().await(); } protected String processRelativePath(String contextPath, String path) @@ -421,24 +95,6 @@ return relativePath; }- protected HttpResponse transformResponse(Object response) throws MuleException - { - MuleMessage message; - if (response instanceof MuleMessage) - { - message = (MuleMessage) response; - } - else - { - message = new DefaultMuleMessage(response, connector.getMuleContext()); - } - //TODO RM*: Maybe we can have a generic Transformer wrapper rather that using DefaultMuleMessage (or another static utility - //class - message.applyTransformers(null, defaultResponseTransformers, HttpResponse.class); - return (HttpResponse) message.getPayload(); - } - -@Override protected void initializeMessageFactory() throws InitialisationException { @@ -480,6 +136,11 @@ return message; } + public List<Transformer> getResponseTransportTransformers() + { + return this.defaultResponseTransformers; + } + public static class EmptyRequestException extends RuntimeException {Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java (25160 => 25161)
--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcher.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -9,16 +9,21 @@ */ package org.mule.transport.http; +import org.mule.api.config.ThreadingProfile; import org.mule.api.context.WorkManager; import org.mule.api.retry.RetryCallback; import org.mule.api.retry.RetryContext; import org.mule.api.retry.RetryPolicyTemplate; +import org.mule.config.ImmutableThreadingProfile; +import org.mule.config.MutableThreadingProfile; import org.mule.transport.ConnectException; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.resource.spi.work.Work; @@ -39,6 +44,7 @@ private ServerSocket serverSocket; private HttpConnector httpConnector; private RetryPolicyTemplate retryTemplate; + protected ExecutorService requestHandOffExecutor; private WorkManager workManager; private final AtomicBoolean disconnect = new AtomicBoolean(false); @@ -64,8 +70,18 @@ this.retryTemplate = retryPolicyTemplate; this.serverSocket = serverSocket; this.workManager = workManager; + this.requestHandOffExecutor = createRequestDispatcherThreadPool(httpConnector); } + private ExecutorService createRequestDispatcherThreadPool(HttpConnector httpConnector) + { + ThreadingProfile receiverThreadingProfile = httpConnector.getReceiverThreadingProfile(); + MutableThreadingProfile dispatcherThreadingProfile = new MutableThreadingProfile(receiverThreadingProfile); + dispatcherThreadingProfile.setMaxThreadsActive(dispatcherThreadingProfile.getMaxThreadsActive()*2); + ExecutorService executorService = dispatcherThreadingProfile.createPool("http-request-dispatch-" + serverSocket.getInetAddress()); + return executorService; + } + @Override public void run() { @@ -94,8 +110,9 @@ if (socket != null) {
- Work work = new HttpRequestDispatcherWork(httpConnector, socket); - workManager.scheduleWork(work, javax.resource.spi.work.WorkManager.INDEFINITE, null, httpConnector);+ final Runnable httpRequestDispatcherWork = new HttpRequestDispatcherWork(httpConnector, socket); + // Process each connection in a different thread so we can continue accepting connection right away. + requestHandOffExecutor.execute(httpRequestDispatcherWork); } }Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java (25160 => 25161)
--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpRequestDispatcherWork.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -15,15 +15,13 @@ import java.net.Socket; import java.util.concurrent.TimeUnit;
-import javax.resource.spi.work.Work; -import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** * Dispatches HttpRequest to the appropriate MessageReceiver */-public class HttpRequestDispatcherWork implements Work, Expirable+public class HttpRequestDispatcherWork implements Runnable, Expirable { private static Log logger = LogFactory.getLog(HttpRequestDispatcherWork.class); @@ -47,11 +45,6 @@ } @Override- public void release() - { - } - - @Overridepublic void run() { try @@ -77,8 +70,7 @@ HttpMessageReceiver httpMessageReceiver = httpConnector.lookupReceiver(socket, request); if (httpMessageReceiver != null) {- Work work = httpMessageReceiver.createWork(httpServerConnection); - work.run();+ httpMessageReceiver.processRequest(httpServerConnection); } else {Added: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageProcessTemplate.java (0 => 25161)
--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageProcessTemplate.java (rev 0) +++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageProcessTemplate.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,57 @@ +/* + * $Id: HttpsHandshakeTimingTestCase.java 25119 2012-12-10 21:20:57Z pablo.lagreca $ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ +package org.mule.transport.http; + +import org.mule.api.MessagingException; +import org.mule.api.MuleEvent; +import org.mule.api.MuleException; +import org.mule.api.context.WorkManager; +import org.mule.transport.http.i18n.HttpMessages; + +import java.util.concurrent.TimeUnit; + +public class HttpsMessageProcessTemplate extends HttpMessageProcessTemplate +{ + + public HttpsMessageProcessTemplate(final HttpMessageReceiver messageReceiver, final HttpServerConnection httpServerConnection, final WorkManager flowExecutionWorkManager) + { + super(messageReceiver,httpServerConnection,flowExecutionWorkManager); + } + + @Override + public MuleEvent beforeRouteEvent(MuleEvent muleEvent) throws MuleException + { + try + { + long timeout = ((HttpsConnector) getConnector()).getSslHandshakeTimeout(); + boolean handshakeComplete = getHttpServerConnection().getSslSocketHandshakeCompleteLatch().await(timeout, TimeUnit.MILLISECONDS); + if (!handshakeComplete) + { + throw new MessagingException(HttpMessages.sslHandshakeDidNotComplete(), muleEvent); + } + } + catch (InterruptedException e) + { + throw new MessagingException(HttpMessages.sslHandshakeDidNotComplete(), + muleEvent, e); + } + if (getHttpServerConnection().getPeerCertificateChain() != null) + { + muleEvent.getMessage().setOutboundProperty(HttpsConnector.PEER_CERTIFICATES, getHttpServerConnection().getPeerCertificateChain()); + } + if (getHttpServerConnection().getLocalCertificateChain() != null) + { + muleEvent.getMessage().setOutboundProperty(HttpsConnector.LOCAL_CERTIFICATES, getHttpServerConnection().getLocalCertificateChain()); + } + + super.beforeRouteEvent(muleEvent); + return muleEvent; + } +}
Modified: branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageReceiver.java (25160 => 25161)
--- branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageReceiver.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/transports/http/src/main/java/org/mule/transport/http/HttpsMessageReceiver.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -10,22 +10,15 @@ package org.mule.transport.http;
-import org.mule.api.MessagingException; -import org.mule.api.MuleMessage;import org.mule.api.construct.FlowConstruct; import org.mule.api.endpoint.InboundEndpoint; import org.mule.api.lifecycle.CreateException; import org.mule.api.transport.Connector; import org.mule.config.i18n.CoreMessages; import org.mule.transport.ConnectException;-import org.mule.transport.http.i18n.HttpMessages;+import org.mule.message.processing.MessageProcessContext; import org.mule.util.StringUtils;-import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import javax.resource.spi.work.Work; -public class HttpsMessageReceiver extends HttpMessageReceiver { @@ -53,48 +46,9 @@ } @Override- public Work createWork(HttpServerConnection httpServerConnection) throws IOException+ HttpMessageProcessTemplate createMessageContext(HttpServerConnection httpServerConnection) {- return new HttpsWorker(httpServerConnection);+ return new HttpsMessageProcessTemplate(this,httpServerConnection,getWorkManager()); }- private class HttpsWorker extends HttpWorker - { - - public HttpsWorker(HttpServerConnection httpServerConnection) throws IOException - { - super(httpServerConnection); - } - - @Override - protected void preRouteMessage(MuleMessage message) throws MessagingException - { - try - { - long timeout = ((HttpsConnector) getConnector()).getSslHandshakeTimeout(); - boolean handshakeComplete = getServerConnection().getSslSocketHandshakeCompleteLatch().await(timeout, TimeUnit.MILLISECONDS); - if (!handshakeComplete) - { - throw new MessagingException(HttpMessages.sslHandshakeDidNotComplete(), message); - } - } - catch (InterruptedException e) - { - throw new MessagingException(HttpMessages.sslHandshakeDidNotComplete(), - message, e); - } - - super.preRouteMessage(message); - - if (getServerConnection().getPeerCertificateChain() != null) - { - message.setOutboundProperty(HttpsConnector.PEER_CERTIFICATES, getServerConnection().getPeerCertificateChain()); - } - if (getServerConnection().getLocalCertificateChain() != null) - { - message.setOutboundProperty(HttpsConnector.LOCAL_CERTIFICATES, getServerConnection().getLocalCertificateChain()); - } - } - - }}Modified: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java (25160 => 25161)
--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpConnectionManagerTestCase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -29,6 +29,7 @@ import javax.resource.spi.work.ExecutionContext; import javax.resource.spi.work.WorkListener; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Answers;
Modified: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java (25160 => 25161)
--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherTestCase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -11,7 +11,6 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyLong;import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -30,11 +29,9 @@ import java.io.IOException; import java.lang.reflect.Field; import java.net.ServerSocket; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit;-import javax.resource.spi.work.ExecutionContext; -import javax.resource.spi.work.WorkListener; -import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Answers; @@ -65,6 +62,8 @@ private RetryContext mockRetryContext; @Mock(answer = Answers.RETURNS_DEEP_STUBS) private SystemExceptionHandler mockExceptionListener; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ExecutorService mockExecutor; @Test(expected = IllegalArgumentException.class) @@ -143,7 +142,7 @@ try { dispatcherThread.start();- verify(mockWorkManager, times(0)).scheduleWork(any(HttpRequestDispatcherWork.class), anyLong(), any(ExecutionContext.class), any(WorkListener.class));+ verify(mockRetryTemplate, times(0)).execute(any(RetryCallback.class),any(WorkManager.class)); } finally { @@ -156,6 +155,7 @@ public void whenSocketAcceptedExecuteWork() throws Exception { final HttpRequestDispatcher httpRequestDispatcher = new HttpRequestDispatcher(mockHttpConnector, mockRetryTemplate, mockServerSocket, mockWorkManager); + httpRequestDispatcher.requestHandOffExecutor = mockExecutor; final Latch acceptCalledLath = new Latch(); sustituteLifecycleManager(); when(mockConnectorLifecycleManager.getState().isStarted()).thenReturn(true); @@ -176,7 +176,7 @@ acceptCalledLath.release(); return null; }- }).when(mockWorkManager).scheduleWork(any(HttpRequestDispatcherWork.class), anyLong(), any(ExecutionContext.class), any(WorkListener.class));+ }).when(mockExecutor).execute(any(HttpRequestDispatcherWork.class)); Thread dispatcherThread = createDispatcherThread(httpRequestDispatcher); dispatcherThread.start(); tryModified: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java (25160 => 25161)
--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpRequestDispatcherWorkTestCase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -17,6 +17,7 @@ import static org.mockito.Mockito.when; import org.mule.tck.size.SmallTest; +import org.mule.message.processing.MessageProcessContext; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -48,6 +49,8 @@ private HttpMessageReceiver mockHttpMessageReceiver; @Mock(answer = Answers.RETURNS_DEEP_STUBS) private Work mockWork; + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private MessageProcessContext mockMessageContext; @Test(expected = IllegalArgumentException.class) public void createHttpRequestDispatcherWorkWithNullHttpConnector() @@ -91,10 +94,9 @@ HttpRequestDispatcherWork httpRequestDispatcherWork = new HttpRequestDispatcherWork(mockHttpConnector, mockSocket); when(mockHttpConnector.lookupReceiver(isA(Socket.class), isA(HttpRequest.class))).thenReturn(mockHttpMessageReceiver); setUpSocketMessage();
- when(mockHttpMessageReceiver.createWork(isA(HttpServerConnection.class))).thenReturn(mockWork);+ when(mockHttpMessageReceiver.createMessageContext(isA(HttpServerConnection.class))).thenReturn(mockMessageContext); httpRequestDispatcherWork.run();- verify(mockWork, times(1)).run(); -+ verify(mockHttpMessageReceiver, times(1)).processRequest(isA(HttpServerConnection.class)); } private void setUpSocketMessage() throws IOExceptionCopied: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpsHandshakeTimingTestCase.java (from rev 25135, branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java) (0 => 25161)
--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpsHandshakeTimingTestCase.java (rev 0) +++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpsHandshakeTimingTestCase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -0,0 +1,116 @@ +/* + * $Id$ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ + +package org.mule.transport.http; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.mule.DefaultMuleMessage; +import org.mule.api.MessagingException; +import org.mule.api.MuleEvent; +import org.mule.api.MuleMessage; +import org.mule.api.construct.FlowConstruct; +import org.mule.api.endpoint.InboundEndpoint; +import org.mule.api.lifecycle.CreateException; +import org.mule.api.service.Service; +import org.mule.api.transport.Connector; +import org.mule.tck.junit4.AbstractMuleContextTestCase; +import org.mule.transport.ssl.MockHandshakeCompletedEvent; +import org.mule.transport.ssl.MockSslSocket; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import javax.net.ssl.HandshakeCompletedEvent; +import javax.resource.spi.work.Work; + +import org.junit.Test; + +/** + * Test for SSL handshake timeouts. Unfortunately, there is no easy way to blackbox-test this + * as it would require a SSLSocket implementation that could actually add arbitrary delays to + * the SSL handshake. + * <p/> + * The approach chosen here is based on reflection and massive subclassing/stubbing to make things + * work. Yes, this is hacky and fragile but this seems to be the only reasonable alternative + * for now. + */ +public class HttpsHandshakeTimingTestCase extends AbstractMuleContextTestCase +{ + + @Test(expected = MessagingException.class) + public void testHttpsHandshakeExceedsTimeout() throws Exception + { + MockHttpsMessageReceiver messageReceiver = setupMockHttpsMessageReceiver(); + + MockSslSocket socket = new MockSslSocket(); + HttpMessageProcessTemplate messageProcessTemplate = messageReceiver.createMessageContext(new HttpServerConnection(socket, messageReceiver.getEndpoint().getEncoding(), (HttpConnector) messageReceiver.getConnector())); + + MuleMessage message = new DefaultMuleMessage(TEST_MESSAGE, muleContext); + messageProcessTemplate.beforeRouteEvent(getTestEvent(message)); + } + + @Test + public void testHttpsHandshakeCompletesBeforeProcessingMessage() throws Exception + { + MockHttpsMessageReceiver messageReceiver = setupMockHttpsMessageReceiver(); + + MockSslSocket socket = new MockSslSocket(); + socket.setInputStream(new ByteArrayInputStream("GET /path/to/file/index.html HTTP/1.0\n\n\n".getBytes())); + HttpServerConnection serverConnection = new HttpServerConnection(socket, "utf-8", (HttpConnector) messageReceiver.getConnector()); + HttpMessageProcessTemplate messageContext = (HttpMessageProcessTemplate) messageReceiver.createMessageContext(serverConnection); + + invokeHandshakeCompleted(serverConnection, socket); + + MuleMessage message = new DefaultMuleMessage(TEST_MESSAGE, muleContext); + messageContext.acquireMessage(); + serverConnection.readRequest(); + MuleEvent muleEvent = messageContext.beforeRouteEvent(getTestEvent(message)); + assertNotNull(muleEvent.getMessage().<Object>getOutboundProperty(HttpsConnector.LOCAL_CERTIFICATES)); + assertNotNull(muleEvent.getMessage().<Object>getOutboundProperty(HttpsConnector.PEER_CERTIFICATES)); + } + + private void invokeHandshakeCompleted(HttpServerConnection serverConnection, MockSslSocket socket) throws Exception + { + HandshakeCompletedEvent event = new MockHandshakeCompletedEvent(socket); + serverConnection.handshakeCompleted(event); + } + + private MockHttpsMessageReceiver setupMockHttpsMessageReceiver() throws CreateException + { + HttpsConnector httpsConnector = new HttpsConnector(muleContext); + httpsConnector.setSslHandshakeTimeout(1000); + + Map<String, Object> properties = Collections.emptyMap(); + + InboundEndpoint inboundEndpoint = mock(InboundEndpoint.class); + when(inboundEndpoint.getConnector()).thenReturn(httpsConnector); + when(inboundEndpoint.getProperties()).thenReturn(properties); + + Service service = mock(Service.class); + return new MockHttpsMessageReceiver(httpsConnector, service, inboundEndpoint); + } + + private static class MockHttpsMessageReceiver extends HttpsMessageReceiver + { + + public MockHttpsMessageReceiver(Connector connector, FlowConstruct flowConstruct, + InboundEndpoint endpoint) throws CreateException + { + super(connector, flowConstruct, endpoint); + } + } +}
Property changes: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/HttpsHandshakeTimingTestCase.java
Added: svn:keywords
Added: svn:eol-style
Modified: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpResponseTimeoutTestCase.java (25160 => 25161)
--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpResponseTimeoutTestCase.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpResponseTimeoutTestCase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -105,7 +105,7 @@ // message will be returned as the response. There is no exception payload. assertNotNull(message); assertNull(message.getExceptionPayload());
- assertNotNull(getPayload(), message.getPayloadAsString());+ assertNotNull(message.getPayloadAsString()); assertTrue((afterCall.getTime() - beforeCall.getTime()) > DEFAULT_RESPONSE_TIMEOUT); }Deleted: branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java (25160 => 25161)
--- branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/transports/http/src/test/java/org/mule/transport/http/functional/HttpsHandshakeTimingTestCase.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -1,147 +0,0 @@
-/* - * $Id$ - * -------------------------------------------------------------------------------------- - * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com - * - * The software in this package is published under the terms of the CPAL v1.0 - * license, a copy of which has been included with this distribution in the - * LICENSE.txt file. - */ - -package org.mule.transport.http.functional; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.mule.DefaultMuleMessage; -import org.mule.api.MessagingException; -import org.mule.api.MuleMessage; -import org.mule.api.config.MuleProperties; -import org.mule.api.construct.FlowConstruct; -import org.mule.api.endpoint.InboundEndpoint; -import org.mule.api.lifecycle.CreateException; -import org.mule.api.service.Service; -import org.mule.api.transport.Connector; -import org.mule.tck.junit4.AbstractMuleContextTestCase; -import org.mule.transport.http.HttpConnector; -import org.mule.transport.http.HttpServerConnection; -import org.mule.transport.http.HttpsConnector; -import org.mule.transport.http.HttpsMessageReceiver; -import org.mule.transport.ssl.MockHandshakeCompletedEvent; -import org.mule.transport.ssl.MockSslSocket; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Collections; -import java.util.Map; - -import javax.net.ssl.HandshakeCompletedEvent; -import javax.resource.spi.work.Work; - -import org.junit.Test; - -/** - * Test for SSL handshake timeouts. Unfortunately, there is no easy way to blackbox-test this - * as it would require a SSLSocket implementation that could actually add arbitrary delays to - * the SSL handshake. - * <p/> - * The approach chosen here is based on reflection and massive subclassing/stubbing to make things - * work. Yes, this is hacky and fragile but this seems to be the only reasonable alternative - * for now. - */ -public class HttpsHandshakeTimingTestCase extends AbstractMuleContextTestCase -{ - - @Test - public void testHttpsHandshakeExceedsTimeout() throws Exception - { - MockHttpsMessageReceiver messageReceiver = setupMockHttpsMessageReceiver(); - - MockSslSocket socket = new MockSslSocket(); - Work work = messageReceiver.createWork(new HttpServerConnection(socket, messageReceiver.getEndpoint().getEncoding(), (HttpConnector) messageReceiver.getConnector())); - assertNotNull(work); - - MuleMessage message = new DefaultMuleMessage(TEST_MESSAGE, muleContext); - try - { - // note how preRouteMessage is invoked here without a prior handshakeComplete - // which would count down the latch that's used in HttpsWorker - invokePreRouteMessage(work, message); - fail(); - } - catch (InvocationTargetException ite) - { - assertTrue(ite.getCause() instanceof MessagingException); - assertTrue(ite.getCause().getMessage().contains("handshake")); - } - } - - @Test - public void testHttpsHandshakeCompletesBeforeProcessingMessage() throws Exception - { - MockHttpsMessageReceiver messageReceiver = setupMockHttpsMessageReceiver(); - - MockSslSocket socket = new MockSslSocket(); - HttpServerConnection serverConnection = new HttpServerConnection(socket, messageReceiver.getEndpoint().getEncoding(), (HttpConnector) messageReceiver.getConnector()); - Work work = messageReceiver.createWork(serverConnection); - assertNotNull(work); - - invokeHandshakeCompleted(serverConnection, socket); - - MuleMessage message = new DefaultMuleMessage(TEST_MESSAGE, muleContext); - invokePreRouteMessage(work, message); - assertNotNull(message.<Object>getInboundProperty(MuleProperties.MULE_REMOTE_CLIENT_ADDRESS)); - } - - private void invokeHandshakeCompleted(HttpServerConnection serverConnection, MockSslSocket socket) throws Exception - { - HandshakeCompletedEvent event = new MockHandshakeCompletedEvent(socket); - serverConnection.handshakeCompleted(event); - } - - private void invokePreRouteMessage(Work work, MuleMessage message) throws Exception - { - Method preRouteMessage = work.getClass().getDeclaredMethod("preRouteMessage", MuleMessage.class); - assertNotNull(preRouteMessage); - preRouteMessage.setAccessible(true); - preRouteMessage.invoke(work, new Object[] {message}); - } - - private MockHttpsMessageReceiver setupMockHttpsMessageReceiver() throws CreateException - { - HttpsConnector httpsConnector = new HttpsConnector(muleContext); - httpsConnector.setSslHandshakeTimeout(1000); - - Map<String, Object> properties = Collections.emptyMap(); - - InboundEndpoint inboundEndpoint = mock(InboundEndpoint.class); - when(inboundEndpoint.getConnector()).thenReturn(httpsConnector); - when(inboundEndpoint.getProperties()).thenReturn(properties); - - Service service = mock(Service.class); - return new MockHttpsMessageReceiver(httpsConnector, service, inboundEndpoint); - } - - private static class MockHttpsMessageReceiver extends HttpsMessageReceiver - { - - public MockHttpsMessageReceiver(Connector connector, FlowConstruct flowConstruct, - InboundEndpoint endpoint) throws CreateException - { - super(connector, flowConstruct, endpoint); - } - - /** - * Open up access for unit test - */ - @Override - public Work createWork(HttpServerConnection httpServerConnection) throws IOException - { - return super.createWork(httpServerConnection); - } - } -}Modified: branches/mule-3.x/transports/ssl/src/test/java/org/mule/transport/ssl/MockSslSocket.java (25160 => 25161)
--- branches/mule-3.x/transports/ssl/src/test/java/org/mule/transport/ssl/MockSslSocket.java 2013-01-04 17:39:12 UTC (rev 25160) +++ branches/mule-3.x/transports/ssl/src/test/java/org/mule/transport/ssl/MockSslSocket.java 2013-01-06 00:06:58 UTC (rev 25161) @@ -25,7 +25,9 @@ */ public class MockSslSocket extends SSLSocket {
-+ + private InputStream inputStream; + public void addHandshakeCompletedListener(HandshakeCompletedListener listener) { // not needed @@ -119,7 +121,7 @@ @Override public InputStream getInputStream() throws IOException {- return null;+ return inputStream; } @Override @@ -134,5 +136,10 @@ return new InetSocketAddress("localhost", 12345); } + public void setInputStream(InputStream inputStream) + { + this.inputStream = inputStream; + } } +
To unsubscribe from this list please visit: