- Revision
- 24756
- Author
- pablo.lagreca
- Date
- 2012-08-16 20:34:27 -0500 (Thu, 16 Aug 2012)
Log Message
EE-2843 - changing redelivery policy to work in a clustered environmentModified Paths
Added Paths
Diff
Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24755 => 24756)
--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-16 21:58:19 UTC (rev 24755) +++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java 2012-08-17 01:34:27 UTC (rev 24756) @@ -11,24 +11,25 @@ import org.mule.api.MuleEvent; import org.mule.api.MuleException; +import org.mule.api.config.MuleProperties; import org.mule.api.lifecycle.Disposable; import org.mule.api.lifecycle.InitialisationException; import org.mule.api.lifecycle.Startable;
-import org.mule.api.store.ObjectAlreadyExistsException;+import org.mule.api.processor.MessageProcessor; +import org.mule.api.store.ObjectStore; import org.mule.api.store.ObjectStoreException; +import org.mule.api.store.ObjectStoreManager; +import org.mule.api.transformer.TransformerException; import org.mule.config.i18n.CoreMessages; import org.mule.transformer.simple.ByteArrayToHexString; import org.mule.transformer.simple.ObjectToByteArray;-import org.mule.transformer.simple.SerializableToByteArray; -import org.mule.util.store.AbstractMonitoredObjectStore; -import org.mule.util.store.InMemoryObjectStore;-import java.io.InputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.concurrent.atomic.AtomicInteger; +import org.mule.util.store.ObjectStorePartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,7 @@ private boolean useSecureHash; private String messageDigestAlgorithm; private String idExpression;- private AbstractMonitoredObjectStore<AtomicInteger> store;+ private ObjectStore<AtomicInteger> store; @Override public void initialise() throws InitialisationException @@ -56,10 +57,11 @@ super.initialise(); if (useSecureHash && idExpression != null) {- throw new InitialisationException( - CoreMessages.initialisationFailure(String.format( - "The Id expression'%s' was specified when a secure hash will be used", - idExpression)), this);+ useSecureHash = false; + if (logger.isWarnEnabled()) + { + logger.warn("Disabling useSecureHash in idempotent-redelivery-policy since an idExpression has been configured"); + } } if (!useSecureHash && messageDigestAlgorithm != null) { @@ -96,15 +98,11 @@ store = createStore(); }- private AbstractMonitoredObjectStore<AtomicInteger> createStore() throws InitialisationException+ private ObjectStore<AtomicInteger> createStore() throws InitialisationException {- AbstractMonitoredObjectStore s = new InMemoryObjectStore<AtomicInteger>(); - s.setName(flowConstruct.getName() + "." + getClass().getName()); - s.setMaxEntries(-1); - s.setEntryTTL(60 * 5 * 1000); - s.setExpirationInterval(6000); - s.initialise(); - return s;+ ObjectStoreManager objectStoreManager = (ObjectStoreManager) muleContext.getRegistry().get( + MuleProperties.OBJECT_STORE_MANAGER); + return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1, 60 * 5 * 1000, 6000 ); } @@ -115,7 +113,17 @@ if (store != null) {- store.dispose();+ if (store instanceof ObjectStorePartition) + { + try + { + ((ObjectStorePartition)store).close(); + } + catch (ObjectStoreException e) + { + logger.warn("error closing object store: " + e.getMessage(), e); + } + } store = null; } @@ -147,6 +155,11 @@ { messageId = getIdForEvent(event); } + catch (TransformerException e) + { + logger.warn("The message cannot be processed because the digest could not be generated. Either make the payload serializable or use an expression."); + return null; + } catch (Exception ex) { exceptionSeen = true; @@ -154,7 +167,7 @@ if (!exceptionSeen) {- counter = getCounter(messageId, null, false);+ counter = findCounter(messageId); tooMany = counter != null && counter.get() > maxRedeliveryCount; } @@ -174,7 +187,7 @@ try { MuleEvent returnEvent = processNext(event);- counter = getCounter(messageId, counter, false);+ counter = findCounter(messageId); if (counter != null) { counter.set(0); @@ -183,51 +196,42 @@ } catch (MuleException ex) {- incrementCounter(messageId, counter);+ incrementCounter(messageId); throw ex; } catch (RuntimeException ex) {- incrementCounter(messageId, counter);+ incrementCounter(messageId); throw ex; } }- - - private AtomicInteger incrementCounter(String messageId, AtomicInteger counter) throws ObjectStoreException+ + public AtomicInteger findCounter(String messageId) throws ObjectStoreException {- counter = getCounter(messageId, counter, true); - counter.incrementAndGet(); - return counter;+ boolean counterExists = store.contains(messageId); + if (counterExists) + { + return store.retrieve(messageId); + } + return null; }- private AtomicInteger getCounter(String messageId, AtomicInteger counter, boolean create) throws ObjectStoreException+ private AtomicInteger incrementCounter(String messageId) throws ObjectStoreException {- if (counter != null)+ AtomicInteger counter = findCounter(messageId); + if (counter == null) {- return counter;+ counter = new AtomicInteger(); }- boolean counterExists = store.contains(messageId); - if (counterExists)+ else {- return store.retrieve(messageId);+ store.remove(messageId); }- if (create) - { - try - { - counter = new AtomicInteger(); - store.store(messageId, counter); - } - catch (ObjectAlreadyExistsException e) - { - counter = store.retrieve(messageId); - } - }+ counter.incrementAndGet(); + store.store(messageId,counter); return counter; }-private String getIdForEvent(MuleEvent event) throws Exception { if (useSecureHash) @@ -278,4 +282,10 @@ { this.idExpression = idExpression; } + + public void setMessageProcessor(MessageProcessor processor) + { + this.deadLetterQueue = processor; + } } +Added: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (0 => 24756)
--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (rev 0) +++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java 2012-08-17 01:34:27 UTC (rev 24756) @@ -0,0 +1,219 @@ +/* + * $Id$ + * -------------------------------------------------------------------------------------- + * Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com + * + * The software in this package is published under the terms of the CPAL v1.0 + * license, a copy of which has been included with this distribution in the + * LICENSE.txt file. + */ + +package org.mule.processor; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.mockito.Answers; +import org.mockito.Mockito; +import org.mockito.internal.verification.VerificationModeFactory; +import org.mule.api.*; +import org.mule.api.config.MuleProperties; +import org.mule.api.construct.FlowConstruct; +import org.mule.api.processor.MessageProcessor; +import org.mule.api.store.ObjectStore; +import org.mule.api.store.ObjectStoreException; +import org.mule.api.store.ObjectStoreManager; +import org.mule.routing.MessageProcessorFilterPair; +import org.mule.tck.junit4.AbstractMuleTestCase; + +import org.junit.Test; + +import junit.framework.Assert; +import org.mule.util.SerializationUtils; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase +{ + + public static final String STRING_MESSAGE = "message"; + public static final int MAX_REDELIVERY_COUNT = 1; + private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get()); + private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get()); + private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get()); + private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get()); + private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get()); + private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get()); + + @Before + public void setUpTest() throws MuleException + { + when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing")); + System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8"); + } + + @Test + public void messageDigestFailure() throws Exception + { + Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager); + Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore()); + + IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy(); + irp.setUseSecureHash(true); + irp.setMaxRedeliveryCount(1); + irp.setFlowConstruct(mock(FlowConstruct.class)); + irp.setMuleContext(mockMuleContext); + irp.initialise(); + + + when(message.getPayload()).thenReturn(new Object()); + + when(event.getMessage()).thenReturn(message); + MuleEvent process = irp.process(event); + Assert.assertNull(process); + } + + @Test + public void testMessageRedeliveryUsingMemory() throws Exception + { + Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager); + Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore()); + + IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy(); + irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT); + irp.setUseSecureHash(true); + irp.setFlowConstruct(mock(FlowConstruct.class)); + irp.setMuleContext(mockMuleContext); + irp.setListener(mockFailingMessageProcessor); + irp.setDeadLetterQueue(mockDlqMessageProcessor); + irp.initialise(); + + when(message.getPayload()).thenReturn(STRING_MESSAGE); + when(event.getMessage()).thenReturn(message); + + for (int i = 0; i < MAX_REDELIVERY_COUNT; i++) + { + try + { + irp.process(event); + } + catch (Exception e) + { + } + } + verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1)); + } + + @Test + public void testMessageRedeliveryUsingSerializationStore() throws Exception + { + Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager); + Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore()); + + IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy(); + irp.setUseSecureHash(true); + irp.setMaxRedeliveryCount(1); + irp.setFlowConstruct(mock(FlowConstruct.class)); + irp.setMuleContext(mockMuleContext); + irp.setListener(mockFailingMessageProcessor); + irp.setDeadLetterQueue(mockDlqMessageProcessor); + irp.initialise(); + + when(message.getPayload()).thenReturn(STRING_MESSAGE); + when(event.getMessage()).thenReturn(message); + + for (int i = 0; i < MAX_REDELIVERY_COUNT; i++) + { + try + { + irp.process(event); + } + catch (Exception e) + { + } + } + verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1)); + } + + public static class SerializationObjectStore implements ObjectStore<AtomicInteger> + { + + private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>(); + + @Override + public boolean contains(Serializable key) throws ObjectStoreException + { + return store.containsKey(key); + } + + @Override + public void store(Serializable key, AtomicInteger value) throws ObjectStoreException + { + store.put(key, SerializationUtils.serialize(value)); + } + + @Override + public AtomicInteger retrieve(Serializable key) throws ObjectStoreException + { + Serializable serializable = store.get(key); + return (AtomicInteger) SerializationUtils.deserialize((byte[]) serializable); + } + + @Override + public AtomicInteger remove(Serializable key) throws ObjectStoreException + { + Serializable serializable = store.remove(key); + return (AtomicInteger) SerializationUtils.deserialize((byte[]) serializable); + } + + @Override + public boolean isPersistent() + { + return false; + } + } + + public static class InMemoryObjectStore implements ObjectStore<AtomicInteger> + { + private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>(); + + @Override + public boolean contains(Serializable key) throws ObjectStoreException + { + return store.containsKey(key); + } + + @Override + public void store(Serializable key, AtomicInteger value) throws ObjectStoreException + { + store.put(key,value); + } + + @Override + public AtomicInteger retrieve(Serializable key) throws ObjectStoreException + { + return store.get(key); + } + + @Override + public AtomicInteger remove(Serializable key) throws ObjectStoreException + { + return store.remove(key); + } + + @Override + public boolean isPersistent() + { + return false; + } + } + +} + + Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java ___________________________________________________________________
Added: svn:keywords
Added: svn:eol-style
To unsubscribe from this list please visit:
Some comments:Too many magic numbers: return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1, 60 * 5 * 1000, 6000 );
Dont use imports with "*"
Replace System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8"); with a SystemProperty rule
Refactor to remove duplicate code on testMessageRedeliveryUsingMemory and testMessageRedeliveryUsingSerializationStore tests.
I think you can replace the InMemoryObjectStore define in the test class with SimpleMemoryObjectStore
Comments below:On Fri, Aug 17, 2012 at 1:36 PM, Pablo Kraan <pablo...@mulesoft.com> wrote:
Some comments:Too many magic numbers: return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1, 60 * 5 * 1000, 6000 );Not part of my changes