[mule-dev] Re: [mule-scm] [mule][24756] branches/mule-3.2.x/core/src: EE-2843 - changing redelivery policy to work in a clustered environment

27 views
Skip to first unread message

Pablo Kraan

unread,
Aug 17, 2012, 12:36:09 PM8/17/12
to d...@mule.codehaus.org, s...@mule.codehaus.org
Some comments:

Too many magic numbers: return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1, 60 * 5 * 1000, 6000 );

Dont use imports with "*"

Replace System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8"); with a SystemProperty rule

Refactor to remove duplicate code on testMessageRedeliveryUsingMemory and testMessageRedeliveryUsingSerializationStore tests.

I think you can replace the InMemoryObjectStore define in the test class with SimpleMemoryObjectStore

Pablo
 
On Thu, Aug 16, 2012 at 10:34 PM, <pablo....@codehaus.org> wrote:
Revision
24756
Author
pablo.lagreca
Date
2012-08-16 20:34:27 -0500 (Thu, 16 Aug 2012)

Log Message

EE-2843 - changing redelivery policy to work in a clustered environment

Modified Paths

Added Paths

Diff

Modified: branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java (24755 => 24756)


--- branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-16 21:58:19 UTC (rev 24755)
+++ branches/mule-3.2.x/core/src/main/java/org/mule/processor/IdempotentRedeliveryPolicy.java	2012-08-17 01:34:27 UTC (rev 24756)
@@ -11,24 +11,25 @@
 
 import org.mule.api.MuleEvent;
 import org.mule.api.MuleException;
+import org.mule.api.config.MuleProperties;
 import org.mule.api.lifecycle.Disposable;
 import org.mule.api.lifecycle.InitialisationException;
 import org.mule.api.lifecycle.Startable;
-import org.mule.api.store.ObjectAlreadyExistsException;
+import org.mule.api.processor.MessageProcessor;
+import org.mule.api.store.ObjectStore;
 import org.mule.api.store.ObjectStoreException;
+import org.mule.api.store.ObjectStoreManager;
+import org.mule.api.transformer.TransformerException;
 import org.mule.config.i18n.CoreMessages;
 import org.mule.transformer.simple.ByteArrayToHexString;
 import org.mule.transformer.simple.ObjectToByteArray;
-import org.mule.transformer.simple.SerializableToByteArray;
-import org.mule.util.store.AbstractMonitoredObjectStore;
-import org.mule.util.store.InMemoryObjectStore;
 
-
 import java.io.InputStream;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.mule.util.store.ObjectStorePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,7 +49,7 @@
     private boolean useSecureHash;
     private String messageDigestAlgorithm;
     private String idExpression;
-    private AbstractMonitoredObjectStore<AtomicInteger> store;
+    private ObjectStore<AtomicInteger> store;
 
     @Override
     public void initialise() throws InitialisationException
@@ -56,10 +57,11 @@
         super.initialise();
         if (useSecureHash && idExpression != null)
         {
-            throw new InitialisationException(
-                CoreMessages.initialisationFailure(String.format(
-                    "The Id expression'%s' was specified when a secure hash will be used",
-                    idExpression)), this);
+            useSecureHash = false;
+            if (logger.isWarnEnabled())
+            {
+                logger.warn("Disabling useSecureHash in idempotent-redelivery-policy since an idExpression has been configured");
+            }
         }
         if (!useSecureHash && messageDigestAlgorithm != null)
         {
@@ -96,15 +98,11 @@
         store = createStore();
     }
 
-    private AbstractMonitoredObjectStore<AtomicInteger> createStore() throws InitialisationException
+    private ObjectStore<AtomicInteger> createStore() throws InitialisationException
     {
-        AbstractMonitoredObjectStore s = new InMemoryObjectStore<AtomicInteger>();
-        s.setName(flowConstruct.getName() + "." + getClass().getName());
-        s.setMaxEntries(-1);
-        s.setEntryTTL(60 * 5 * 1000);
-        s.setExpirationInterval(6000);
-        s.initialise();
-        return s;
+        ObjectStoreManager objectStoreManager = (ObjectStoreManager) muleContext.getRegistry().get(
+                                MuleProperties.OBJECT_STORE_MANAGER);
+        return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1,  60 * 5 * 1000, 6000 );
     }
 
 
@@ -115,7 +113,17 @@
 
         if (store != null)
         {
-            store.dispose();
+            if (store instanceof ObjectStorePartition)
+            {
+                try
+                {
+                    ((ObjectStorePartition)store).close();
+                }
+                catch (ObjectStoreException e)
+                {
+                    logger.warn("error closing object store: " + e.getMessage(), e);
+                }
+            }
             store = null;
         }
 
@@ -147,6 +155,11 @@
         {
             messageId = getIdForEvent(event);
         }
+        catch (TransformerException e)
+        {
+            logger.warn("The message cannot be processed because the digest could not be generated. Either make the payload serializable or use an expression.");
+            return null;
+        }
         catch (Exception ex)
         {
             exceptionSeen = true;
@@ -154,7 +167,7 @@
 
         if (!exceptionSeen)
         {
-            counter = getCounter(messageId, null, false);
+            counter = findCounter(messageId);
             tooMany = counter != null && counter.get() > maxRedeliveryCount;
         }
 
@@ -174,7 +187,7 @@
         try
         {
             MuleEvent returnEvent = processNext(event);
-            counter = getCounter(messageId, counter, false);
+            counter = findCounter(messageId);
             if (counter != null)
             {
                 counter.set(0);
@@ -183,51 +196,42 @@
         }
         catch (MuleException ex)
         {
-            incrementCounter(messageId, counter);
+            incrementCounter(messageId);
             throw ex;
         }
         catch (RuntimeException ex)
         {
-            incrementCounter(messageId, counter);
+            incrementCounter(messageId);
             throw ex;
         }
     }
-
-
-    private AtomicInteger incrementCounter(String messageId, AtomicInteger counter) throws ObjectStoreException
+    
+    public AtomicInteger findCounter(String messageId) throws ObjectStoreException
     {
-        counter = getCounter(messageId,  counter, true);
-        counter.incrementAndGet();
-        return counter;
+        boolean counterExists = store.contains(messageId);
+        if (counterExists)
+        {
+            return store.retrieve(messageId);
+        }
+        return null;
     }
 
-    private AtomicInteger getCounter(String messageId, AtomicInteger counter, boolean create) throws ObjectStoreException
+    private AtomicInteger incrementCounter(String messageId) throws ObjectStoreException
     {
-        if (counter != null)
+        AtomicInteger counter = findCounter(messageId);
+        if (counter == null)
         {
-            return counter;
+            counter = new AtomicInteger();
         }
-        boolean counterExists = store.contains(messageId);
-        if (counterExists)
+        else
         {
-            return store.retrieve(messageId);
+            store.remove(messageId);
         }
-        if (create)
-        {
-            try
-            {
-                counter = new AtomicInteger();
-                store.store(messageId, counter);
-            }
-            catch (ObjectAlreadyExistsException e)
-            {
-                counter = store.retrieve(messageId);
-            }
-        }
+        counter.incrementAndGet();
+        store.store(messageId,counter);
         return counter;
     }
 
-
     private String getIdForEvent(MuleEvent event) throws Exception
     {
         if (useSecureHash)
@@ -278,4 +282,10 @@
     {
         this.idExpression = idExpression;
     }
+    
+    public void setMessageProcessor(MessageProcessor processor)
+    {
+        this.deadLetterQueue = processor;
+    }
 }
+

Added: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java (0 => 24756)


--- branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	                        (rev 0)
+++ branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java	2012-08-17 01:34:27 UTC (rev 24756)
@@ -0,0 +1,219 @@
+/*
+ * $Id$
+ * --------------------------------------------------------------------------------------
+ * Copyright (c) MuleSoft, Inc.  All rights reserved.  http://www.mulesoft.com
+ *
+ * The software in this package is published under the terms of the CPAL v1.0
+ * license, a copy of which has been included with this distribution in the
+ * LICENSE.txt file.
+ */
+
+package org.mule.processor;
+
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.mockito.Answers;
+import org.mockito.Mockito;
+import org.mockito.internal.verification.VerificationModeFactory;
+import org.mule.api.*;
+import org.mule.api.config.MuleProperties;
+import org.mule.api.construct.FlowConstruct;
+import org.mule.api.processor.MessageProcessor;
+import org.mule.api.store.ObjectStore;
+import org.mule.api.store.ObjectStoreException;
+import org.mule.api.store.ObjectStoreManager;
+import org.mule.routing.MessageProcessorFilterPair;
+import org.mule.tck.junit4.AbstractMuleTestCase;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.mule.util.SerializationUtils;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleTestCase
+{
+
+    public static final String STRING_MESSAGE = "message";
+    public static final int MAX_REDELIVERY_COUNT = 1;
+    private MuleContext mockMuleContext = mock(MuleContext.class, Answers.RETURNS_DEEP_STUBS.get());
+    private ObjectStoreManager mockObjectStoreManager = mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessor mockFailingMessageProcessor = mock(MessageProcessor.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MessageProcessorFilterPair mockDlqMessageProcessor = mock(MessageProcessorFilterPair.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MuleMessage message = mock(MuleMessage.class, Answers.RETURNS_DEEP_STUBS.get());
+    private MuleEvent event = mock(MuleEvent.class, Answers.RETURNS_DEEP_STUBS.get());
+
+    @Before
+    public void setUpTest() throws MuleException
+    {
+        when(mockFailingMessageProcessor.process(any(MuleEvent.class))).thenThrow(new RuntimeException("failing"));
+        System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8");
+    }
+
+    @Test
+    public void messageDigestFailure() throws Exception
+    {
+        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(), anyBoolean(), anyInt(), anyInt(), anyInt())).thenReturn(new InMemoryObjectStore());
+
+        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
+        irp.setUseSecureHash(true);
+        irp.setMaxRedeliveryCount(1);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.initialise();
+
+
+        when(message.getPayload()).thenReturn(new Object());
+
+        when(event.getMessage()).thenReturn(message);
+        MuleEvent process = irp.process(event);
+        Assert.assertNull(process);
+    }
+
+    @Test
+    public void testMessageRedeliveryUsingMemory() throws Exception
+    {
+        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new InMemoryObjectStore());
+
+        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
+        irp.setMaxRedeliveryCount(MAX_REDELIVERY_COUNT);
+        irp.setUseSecureHash(true);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+        irp.initialise();
+
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        when(event.getMessage()).thenReturn(message);
+
+        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        {
+            try
+            {
+                irp.process(event);
+            }
+            catch (Exception e)
+            {
+            }
+        }
+        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
+    }
+
+    @Test
+    public void testMessageRedeliveryUsingSerializationStore() throws Exception
+    {
+        Mockito.when(mockMuleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).thenReturn(mockObjectStoreManager);
+        Mockito.when(mockObjectStoreManager.getObjectStore(anyString(),anyBoolean(),anyInt(),anyInt(),anyInt())).thenReturn(new SerializationObjectStore());
+
+        IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
+        irp.setUseSecureHash(true);
+        irp.setMaxRedeliveryCount(1);
+        irp.setFlowConstruct(mock(FlowConstruct.class));
+        irp.setMuleContext(mockMuleContext);
+        irp.setListener(mockFailingMessageProcessor);
+        irp.setDeadLetterQueue(mockDlqMessageProcessor);
+        irp.initialise();
+
+        when(message.getPayload()).thenReturn(STRING_MESSAGE);
+        when(event.getMessage()).thenReturn(message);
+
+        for (int i = 0; i < MAX_REDELIVERY_COUNT; i++)
+        {
+            try
+            {
+                irp.process(event);
+            }
+            catch (Exception e)
+            {
+            }
+        }
+        verify(mockDlqMessageProcessor.getMessageProcessor().process(event), VerificationModeFactory.times(1));
+    }
+
+    public static class SerializationObjectStore implements ObjectStore<AtomicInteger>
+    {
+
+        private Map<Serializable,Serializable> store = new HashMap<Serializable,Serializable>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, AtomicInteger value) throws ObjectStoreException
+        {
+            store.put(key, SerializationUtils.serialize(value));
+        }
+
+        @Override
+        public AtomicInteger retrieve(Serializable key) throws ObjectStoreException
+        {
+            Serializable serializable = store.get(key);
+            return (AtomicInteger) SerializationUtils.deserialize((byte[]) serializable);
+        }
+
+        @Override
+        public AtomicInteger remove(Serializable key) throws ObjectStoreException
+        {
+            Serializable serializable = store.remove(key);
+            return (AtomicInteger) SerializationUtils.deserialize((byte[]) serializable);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+    public static class InMemoryObjectStore implements ObjectStore<AtomicInteger>
+    {
+        private Map<Serializable,AtomicInteger> store = new HashMap<Serializable,AtomicInteger>();
+
+        @Override
+        public boolean contains(Serializable key) throws ObjectStoreException
+        {
+            return store.containsKey(key);
+        }
+
+        @Override
+        public void store(Serializable key, AtomicInteger value) throws ObjectStoreException
+        {
+            store.put(key,value);
+        }
+
+        @Override
+        public AtomicInteger retrieve(Serializable key) throws ObjectStoreException
+        {
+            return store.get(key);
+        }
+
+        @Override
+        public AtomicInteger remove(Serializable key) throws ObjectStoreException
+        {
+            return store.remove(key);
+        }
+
+        @Override
+        public boolean isPersistent()
+        {
+            return false;
+        }
+    }
+
+}
+
+
Property changes on: branches/mule-3.2.x/core/src/test/java/org/mule/processor/IdempotentRedeliveryPolicyTestCase.java
___________________________________________________________________

Added: svn:keywords

Added: svn:eol-style


To unsubscribe from this list please visit:

http://xircles.codehaus.org/manage_email


Pablo La Greca

unread,
Aug 17, 2012, 1:24:22 PM8/17/12
to d...@mule.codehaus.org
Comments below:

On Fri, Aug 17, 2012 at 1:36 PM, Pablo Kraan <pablo...@mulesoft.com> wrote:
Some comments:

Too many magic numbers: return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1, 60 * 5 * 1000, 6000 );

Not part of my changes 

Dont use imports with "*"

Will do. 

Replace System.setProperty(MuleProperties.MULE_ENCODING_SYSTEM_PROPERTY,"utf-8"); with a SystemProperty rule

Will do. 

Refactor to remove duplicate code on testMessageRedeliveryUsingMemory and testMessageRedeliveryUsingSerializationStore tests.

Will do. 

I think you can replace the InMemoryObjectStore define in the test class with SimpleMemoryObjectStore

I prefer not to make my code depend on  SimpleMemoryObjectStore

Don't want my test to rely on something 

Pablo Kraan

unread,
Aug 17, 2012, 2:18:50 PM8/17/12
to d...@mule.codehaus.org

Comments inline

On Fri, Aug 17, 2012 at 2:24 PM, Pablo La Greca <pablo....@mulesource.com> wrote:
Comments below:

On Fri, Aug 17, 2012 at 1:36 PM, Pablo Kraan <pablo...@mulesoft.com> wrote:
Some comments:

Too many magic numbers: return objectStoreManager.getObjectStore(flowConstruct.getName() + "." + getClass().getName(), false, -1, 60 * 5 * 1000, 6000 );

Not part of my changes 

This is part of the revision:

Pablo La Greca

unread,
Aug 17, 2012, 3:15:36 PM8/17/12
to d...@mule.codehaus.org
This has been fix in 24766 and 24767
Reply all
Reply to author
Forward
0 new messages