no rollback when using Spring, ActiveMQ, JOOQ, and Postgres

403 views
Skip to first unread message

james.m.d...@gmail.com

unread,
Sep 25, 2015, 12:31:43 PM9/25/15
to jOOQ User Group
Database writes are not rolling back as I expected.
I've spent many hours reading software documentation and web postings.
I have not been able to resolve the issue.
I'm hoping you folks can help me.

Scenario
. My application pulls a message from a queue, extracts data from the message, and writes it to a database.
. The method that writes to the database does 2 SQL inserts.
. The second insert gets an exception:
   org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "table2_PK"
. However, the first insert is still getting committed to the database.

Relevant Software
. spring-boot 1.2.5.RELEASE
. atomikos-util 3.9.3 (from spring-boot-starter-jta-atomikos 1.2.5.RELEASE)
. activemq-client 5.1.2
. jooq 3.6.2
. postgresql 9.4-1201-jdbc41

Application Code
I've pasted the relevant parts of my code below.
 1. GdmServer - my "server" class, which also declares Spring bean configurations
 2. PortSIQueue - my JMS MessageListener class
 3. Kernel - my worker class, a Spring bean invoked by my MessageListener, i.e. code that writes to database

I'd appreciate any help anyone can offer.
Thanks

------------------------------------------------------------
package com.sm.gis.gdm;

import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;

@SpringBootApplication
@EnableJms
@EnableTransactionManagement
public class GdmServer {
    
    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    GisConfig                       gisConfig;
    
    /**
     * Starts the GDM Server
     */
    public static void main(String[] args) {
        SpringApplication.run(GdmServer.class, args);
    }
    
    // -------------------------------------------------------------------------
    // Spring bean configurations
    // -------------------------------------------------------------------------
    
    @Bean
    GisConfig gisConfig() {
        return new GisConfig();
    }
    
    @Bean
    PlatformTransactionManager transactionManager() throws SystemException {
        JtaTransactionManager manager = new JtaTransactionManager();
        manager.setTransactionManager( atomikosUserTransactionManager() );
        manager.setUserTransaction   ( atomikosUserTransaction() );
        manager.setAllowCustomIsolationLevels(true);
        return manager;
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    UserTransactionManager atomikosUserTransactionManager() throws SystemException {
        UserTransactionManager manager = new UserTransactionManager();
        manager.setStartupTransactionService(true);
        manager.setForceShutdown(false);
        manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
        return manager;
    }
    
    @Bean
    UserTransaction atomikosUserTransaction() {
        return new UserTransactionImp();
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosDataSourceBean atomikosJdbcConnectionFactory() {
        PGXADataSource pgXADataSource = new PGXADataSource();
        pgXADataSource.setUrl( gisConfig.getGdbUrl() );
        pgXADataSource.setUser( gisConfig.getGdbUser() );
        pgXADataSource.setPassword( gisConfig.getGdbPassword() );
    
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(pgXADataSource);
        xaDataSource.setUniqueResourceName("gdb");
        xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
        return xaDataSource;
    }
    
    @Bean
    DSLContext dslContext() {
        DSLContext dslContext = DSL.using(atomikosJdbcConnectionFactory(), SQLDialect.POSTGRES);
        return dslContext;
    }

    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
        activeMQXAConnectionFactory.setBrokerURL( gisConfig.getMomBrokerUrl() );
        
        AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
        atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
        atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
        atomikosConnectionFactoryBean.setLocalTransactionMode(false);
        return atomikosConnectionFactoryBean;
    }
    
    @Bean
    DefaultMessageListenerContainer queueWrapperGDM() throws SystemException {
        DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer();
        messageSource.setTransactionManager( transactionManager() );
        messageSource.setConnectionFactory( atomikosJmsConnectionFactory() );
        messageSource.setSessionTransacted(true);
        messageSource.setConcurrentConsumers(1);
        messageSource.setReceiveTimeout( gisConfig.getMomQueueGdmTimeoutReceive() );
        messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
        messageSource.setMessageListener( context.getBean("portSIQueue") );
        return messageSource;
    }
    
    @Bean
    JmsTemplate queueWrapperLIMS() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
        jmsTemplate.setDefaultDestinationName( gisConfig.getMomQueueLimsName() );
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }
 
}

------------------------------------------------------------
package com.sm.gis.gdm.ports;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;

@Component
public class PortSIQueue implements MessageListener {
    
    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    GisMessageMarshaler             queueMessageMashaler;
    @Autowired
    Kernel                          kernel;
    
    @Override
    @Transactional(rollbackFor = {Throwable.class})
    public void onMessage(Message jmsMessage) {
        
        TextMessage jmsTextMessage = (TextMessage) jmsMessage;
        
        // Extract JMS message body...
        String jmsPayload = "";
        try {
            jmsPayload = jmsTextMessage.getText();
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
        
        // Marshal XML text to object...
        Object gisMessage = queueMessageMashaler.toObject(jmsPayload);
        
        kernel.receiveCreateGenomicTestOrderInGIS( (CreateGenomicTestOrderInGIS) gisMessage );
    }

}

------------------------------------------------------------
package com.sm.gis.gdm.kernel;

import org.jooq.DSLContext;
import org.jooq.impl.DSL;

@Component
public class Kernel {

    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    DSLContext                      dslContext;

<snip>
    public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {

            dslContext.insertInto(table1)
                .set(...)
                .set(...)
            .execute();

            dslContext.insertInto(table2)
                .set(...)
                .set(...)
            .execute();
    }
<snip>
}

------------------------------------------------------------

Lukas Eder

unread,
Sep 28, 2015, 12:19:23 PM9/28/15
to jooq...@googlegroups.com
Hello

I suspect that your exception translator might be wrong. Could you please post that as well?

Cheers,
Lukas

(For the record, this issue is also being discussed here: http://stackoverflow.com/q/32797078/521799)

--
You received this message because you are subscribed to the Google Groups "jOOQ User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to jooq-user+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

james.m.d...@gmail.com

unread,
Sep 28, 2015, 12:58:13 PM9/28/15
to jOOQ User Group
Hi Lukas - 
Hmmm. I did not implement an exception translator. 
Perhaps that's the key point I missed.
I presume I'm currently using using the default behavior.
Sounds like I should have implemented an exception translator, is that correct?
Thanks,
Jim

Lukas Eder

unread,
Sep 29, 2015, 4:03:28 AM9/29/15
to jooq...@googlegroups.com
Hi Jim,

Yes - there's an example here:

Spring Boot 1.3 also works with jOOQ out of the box - perhaps that might be an alternative way to go:

Hope this helps.
Let us know if you run into any further issues.

james.m.d...@gmail.com

unread,
Sep 29, 2015, 11:20:30 AM9/29/15
to jOOQ User Group
Hi Lukas - 
Unfortunately, implementing an exception translator didn't resolve the issue.
Or perhaps I did something wrong.

I implemented the following exception translator... 

package com.sm.gis.gdm;

import org.jooq.ExecuteContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultExecuteListener;

import org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator;
import org.springframework.jdbc.support.SQLExceptionTranslator;
import org.springframework.jdbc.support.SQLStateSQLExceptionTranslator;

public class JooqToSpringExceptionTransformer extends DefaultExecuteListener {
    
    private static final long serialVersionUID = 1L;

    @Override
    public void exception(ExecuteContext ctx) {

        if (ctx.sqlException() != null) {
            SQLDialect dialect = ctx.configuration().dialect();
            SQLExceptionTranslator translator = null;
            if ( dialect != null ) {
                translator = new SQLErrorCodeSQLExceptionTranslator(dialect.name());
            } else {
                translator = new SQLStateSQLExceptionTranslator();
            }
            ctx.exception(translator.translate("jOOQ", ctx.sql(), ctx.sqlException()));
        }
    }
    
}


    @Bean
    DSLContext dslContext() throws Exception {
        DefaultConfiguration jooqConfiguration = new DefaultConfiguration();
        jooqConfiguration.set(SQLDialect.POSTGRES_9_4);
        jooqConfiguration.set(atomikosJdbcConnectionFactory());
        jooqConfiguration.set(new DefaultExecuteListenerProvider(new JooqToSpringExceptionTransformer()));
        DSLContext dslContext = new DefaultDSLContext(jooqConfiguration);
        return dslContext;
    }

I can see the exception is translated...

    at org.springframework.jdbc.support.SQLStateSQLExceptionTranslator.doTranslate(SQLStateSQLExceptionTranslator.java:102)
    at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:73)
    at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81)
    at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81)
    at com.sm.gis.gdm.JooqToSpringExceptionTransformer.exception(JooqToSpringExceptionTransformer.java:26)
    at org.jooq.impl.ExecuteListeners.exception(ExecuteListeners.java:246)
    at org.jooq.impl.AbstractQuery.execute(AbstractQuery.java:357)
    at org.jooq.impl.AbstractDelegatingQuery.execute(AbstractDelegatingQuery.java:133)
    at com.sm.gis.gdm.kernel.Kernel.receiveCreateGenomicTestOrderInGIS(Kernel.java:932)
    at com.sm.gis.gdm.sports.PortSIQueue.onMessage(PortSIQueue.java:53)


If I can, I'd prefer to wait for official release of spring boot 1.3. 
Of course, I'll try 1.3, if you think that is my best bet.
I did upgrade from spring-boot 1.2.5.RELEASE to spring-boot 1.2.6.RELEASE.
The upgrade to 1.2.6 had no effect.

I do appreciate the help.
Any other insights or suggestions?

Thanks,
Jim

Lukas Eder

unread,
Sep 29, 2015, 11:33:18 AM9/29/15
to jooq...@googlegroups.com
Thank you for the update.

I'm afraid that I don't know what the issue could be right now. Have you debugged through the depths of Spring yet, to see if there is perhaps a commit issued after your first insert at some point?

And just to rule out things, are there perhaps any triggers or other database objects that might create such a commit? And also, to be sure: You are sure that your connection pool doesn't provide you with auto-commit connections?

Cheers,
Lukas

james.m.d...@gmail.com

unread,
Sep 30, 2015, 11:25:06 AM9/30/15
to jOOQ User Group
I'm an idiot.
Turns out the issue was due to a defect in my application logic.
The ActiveMQ component retries a message if the first attempt to process the message fails with an exception. The transaction created for the first attempt rolled back correctly. It was the second attempt that succeeded. The retry succeeded because a database sequence number was incremented by the application logic during the first attempt, and the second attempt did not result in a duplicate key violation. After correcting the application logic defect, since in my application no message is retry-able anyway, I turned off retry, too.
I apologize for wasting the time of those who read my post.

Along the way, I did make some changes to the implementation. The changes make certain default values explicit choices. I left those changes in because I believe they will make it easier for other developers on my team to understand what's happening more quickly. I also left the JOOQ exception translation code in place because it would be needed in other circumstances and appears to be best practice anyway.

I've included the modified code in this post, in case others might find it useful.

------------------------------------------------------------
package com.sm.gis.gdm;

import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultConfiguration;
import org.jooq.impl.DefaultDSLContext;
import org.jooq.impl.DefaultExecuteListenerProvider;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;

@SpringBootApplication
@EnableJms
@EnableTransactionManagement(proxyTargetClass=true)
public class GdmServer {
    
    @Autowired
    ConfigurableApplicationContext   context;
    @Autowired
    GisConfig                        gisConfig;
    
    /**
     * Starts the GDM Server
     */
    public static void main(String[] args) {
        SpringApplication.run(GdmServer.class, args);
    }
    
    // -------------------------------------------------------------------------
    // Spring bean configurations
    // -------------------------------------------------------------------------
    
    @Bean
    GisConfig gisConfig() {
        return new GisConfig();
    }
    
    @Bean
    @DependsOn({ "atomikosUserTransactionManager", "atomikosUserTransaction", "atomikosJdbcConnectionFactory", "atomikosJmsConnectionFactory" })
    PlatformTransactionManager transactionManager() throws SystemException {
        JtaTransactionManager manager = new JtaTransactionManager();
        manager.setTransactionManager( atomikosUserTransactionManager() );
        manager.setUserTransaction( atomikosUserTransaction() );
        manager.setAllowCustomIsolationLevels(true);
        manager.afterPropertiesSet();
        return manager;
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    UserTransactionManager atomikosUserTransactionManager() throws SystemException {
        UserTransactionManager manager = new UserTransactionManager();
        manager.setStartupTransactionService(true);
        manager.setForceShutdown(false);
        manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
        return manager;
    }
    
    @Bean
    UserTransaction atomikosUserTransaction() {
        return new UserTransactionImp();
    }
    
    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosDataSourceBean atomikosJdbcConnectionFactory() throws Exception {
        PGXADataSource pgXADataSource = new PGXADataSource();
        pgXADataSource.setUrl( gisConfig.getGdbUrl() );
        pgXADataSource.setUser( gisConfig.getGdbUser() );
        pgXADataSource.setPassword( gisConfig.getGdbPassword() );
    
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(pgXADataSource);
        xaDataSource.setUniqueResourceName("gdb");
        xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
        xaDataSource.setTestQuery("SELECT 1");
        xaDataSource.afterPropertiesSet();
        return xaDataSource;
    }
    
    @Bean
    @DependsOn({ "atomikosJdbcConnectionFactory" })
    DSLContext dslContext() throws Exception {
        DefaultConfiguration jooqConfiguration = new DefaultConfiguration();
        jooqConfiguration.set( SQLDialect.POSTGRES_9_4 );
        jooqConfiguration.set( atomikosJdbcConnectionFactory() );
        jooqConfiguration.set( new DefaultExecuteListenerProvider(new JooqToSpringExceptionTransformer()) );
        DSLContext dslContext = new DefaultDSLContext(jooqConfiguration);
        return dslContext;
    }


    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setInitialRedeliveryDelay(0);
        redeliveryPolicy.setRedeliveryDelay(0);
        redeliveryPolicy.setUseExponentialBackOff(false);
        redeliveryPolicy.setMaximumRedeliveries(0);
        
        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
        activeMQXAConnectionFactory.setBrokerURL( gisConfig.getMomBrokerUrl() );
        activeMQXAConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        
        AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
        atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
        atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
        atomikosConnectionFactoryBean.setLocalTransactionMode(false);
        return atomikosConnectionFactoryBean;
    }
    
    @Bean
    @DependsOn({ "transactionManager" })
    DefaultMessageListenerContainer queueWrapperGDM() throws SystemException {
        DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer();
        messageSource.setTransactionManager( transactionManager() );
        messageSource.setConnectionFactory( atomikosJmsConnectionFactory() );
        messageSource.setSessionTransacted(true);
        messageSource.setSessionAcknowledgeMode(0);
        messageSource.setConcurrentConsumers(1);
        messageSource.setReceiveTimeout( gisConfig.getMomQueueGdmTimeoutReceive() );
        messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
        messageSource.setMessageListener( context.getBean("portSIQueue") );
        messageSource.afterPropertiesSet();
        return messageSource;
    }
    
    @Bean
    @DependsOn({ "transactionManager" })
    JmsTemplate queueWrapperLIMS() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
        jmsTemplate.setDefaultDestinationName( gisConfig.getMomQueueLimsName() );
        jmsTemplate.setSessionTransacted(true);
        jmsTemplate.setSessionAcknowledgeMode(0);
        return jmsTemplate;
    }
 
}

------------------------------------------------------------
package com.sm.gis.gdm.ports;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;

@Component
public class PortSIQueue implements MessageListener {
    
    @Autowired
    ConfigurableApplicationContext    context;
    @Autowired
    GisMessageMarshaler               queueMessageMashaler;
    @Autowired
    Kernel                            kernel;
    
    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = {Throwable.class})
    public void onMessage(Message jmsMessage) {
        
        TextMessage jmsTextMessage = (TextMessage) jmsMessage;
        
        // Extract JMS message body...
        String jmsPayload = "";
        try {
            jmsPayload = jmsTextMessage.getText();
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
        
        // Marshal XML text to object...
        Object gisMessage = queueMessageMashaler.toObject(jmsPayload);

        kernel.receiveCreateGenomicTestOrderInGIS( (CreateGenomicTestOrderInGIS) gisMessage );

}

------------------------------------------------------------
package com.sm.gis.gdm.kernel;

import org.jooq.DSLContext;

@Component
public class Kernel {

    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    DSLContext                      dslContext;

<snip>
    public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {

        dslContext.insertInto(table1)
            .set(...)
            .set(...)
        .execute();

        dslContext.insertInto(table2)
            .set(...)
            .set(...)
        .execute();
    }
<snip>
}

------------------------------------------------------------

Lukas Eder

unread,
Sep 30, 2015, 11:29:02 AM9/30/15
to jooq...@googlegroups.com
Thank you for the update. Don't worry, it happens to the best :)

Reply all
Reply to author
Forward
0 new messages