RabbitMQ inside Apache Tomcat - can't shutdown or redeploy

1,354 views
Skip to first unread message

Rob Wilson

unread,
Oct 15, 2014, 10:26:41 AM10/15/14
to rabbitm...@googlegroups.com
Hi,

Short story:  
When I use RabbitMQ within Apache Tomcat it prevents the application container from un-deploying war files, in particular my context-listener 'contextDestroyed' does not get executed.  Combined with Netbeans wanting to re-deploy after making code changes, I'm left with an environment that clunky.  If I remove RabbitMQ it returns to normal.

In depth:
I have successfully set up RabbitMQ between two different Java Web Applications, one 'war' is the consumer, the other 'war' is the publisher.  Initial deployment is good, the producer successfully sends a message and the consumer receives it.

However, when I make code changes in Netbeans it tries to redeploy the latest code to Tomcat, but Tomcat fails to undeploy the running war file.  Even when clearing out all war-files and deploying my consumer for the first time I seem to see 'Deployment error: Deployment timeout has exceeded. See the server log for details.' - there's nothing in these log files.

To simplify this, I've manually shutdown and undeployed the producer war, so I only have the consumer war running.  I would expect to be able shutdown Tomcat, or re-deploy the un-changed consumer war file, but consistently it doesn't work. The contextDestroyed method is never called.  If I remove RabbitMQ from my code it works as expected.

From what I can understand, there's possibly a problem with the threads used in RabbitMQ within Tomcat, so I've tried to create my own thread factory to ensure that I have daemon threads only and then pass that into a custom ExecutorService, none of this has improved the situation, so I guess I've overlooked something?  If you are able to spot the problem, I'd appreciate the feedback!

Please help!

Many thanks,
Rob.


ServletListener.java

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import javax.servlet.http.HttpSessionEvent;
import javax.servlet.http.HttpSessionListener;
...

@WebListener
public class ServletListener implements ServletContextListener, HttpSessionListener {

    private SessionManager sessionManager = ObjectFactory.getSessionManager();
    private RabbitMessageHandler rabbitMessageHandler = null;

    ThreadFactory threadFactory = new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            final String threadName = "ThreadFactoryGenerated-" + System.currentTimeMillis();
            Logger.getLogger(ServletListener.class.getName()).log(Level.INFO, "ThreadFactory is creating a new thread (Daemon) name:" + threadName);
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName(threadName);
            Logger.getLogger(ServletListener.class.getName()).log(Level.INFO, "ThreadFactory daemon created.");
            return t;
        }
    };
    
    final ExecutorService executorService = Executors.newSingleThreadExecutor(threadFactory);

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        Logger.getLogger(ServletListener.class.getName()).log(Level.INFO, "*** contextInitializing ***");

        rabbitMessageHandler = new RabbitMessageHandler(executorService, threadFactory); <-- My own class

        Logger.getLogger(ServletListener.class.getName()).log(Level.INFO, "contextInitialized");
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
 
       // I don't see these in the logs when RabbitMQ has been initialised, it works otherwise...

        Logger.getLogger(ServletListener.class.getName()).log(Level.INFO, "*** contextDestroying ***");
        try {
            Logger.getLogger(ServletListener.class.getName()).log(Level.INFO, "shutting down thread pool");
            executorService.shutdownNow();
            Logger.getLogger(ServletListener.class.getName()).log(Level.INFO, "awaiting termination");
            executorService.awaitTermination(20, TimeUnit.SECONDS);
        } catch (InterruptedException ex) {
            Logger.getLogger(ServletListener.class.getName()).log(Level.SEVERE, "Problem awaiting termination", ex);
        }

        if (rabbitMessageHandler != null) {
            rabbitMessageHandler.kill();
            this.sessionManager.clear();
        }
        
        Logger.getLogger(ServletListener.class.getName()).log(Level.INFO, "contextDestroyed");
    }
}

RabbitMessageHandler.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * @author Rob
 */
public class RabbitMessageHandler {

    private static String COLLECTION_CHANGED_QUEUE = "COLLECTION_CHANGED_QUEUE";
    private boolean initialised = false;
    private Connection connection = null;
    private Channel channel = null;
    private ExecutorService executorService;
    private ThreadFactory threadFactory;

    public RabbitMessageHandler(ExecutorService executorService, ThreadFactory threadFactory) {
        this.executorService = executorService;
        this.threadFactory = threadFactory;
        initialiseRabbit();
    }

    private synchronized void initialiseRabbit() {

        Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "initialiseRabbit called!");
        try {
            if (initialised == true) {
                Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.WARNING, "initialiseRabbit called when it's already initialised - returning early!");
                return;
            }

            Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "initialiseRabbit creating connection factory");

            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setAutomaticRecoveryEnabled(true);
            factory.setSharedExecutor(this.executorService); // <-- Tried with and without this
            factory.setThreadFactory(this.threadFactory);
            
            Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "initialiseRabbit creating connection");
            connection = factory.newConnection(this.executorService);

            Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "initialiseRabbit creating channel");
            channel = connection.createChannel();

            Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "initialiseRabbit declaring queue");
            channel.queueDeclare(COLLECTION_CHANGED_QUEUE, false, false, false, null);

            Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "initialiseRabbit creating queing consumer");
            QueueingConsumer consumer = new QueueingConsumer(channel);

            Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "initialiseRabbit associating queue with queing consumer");
            channel.basicConsume(COLLECTION_CHANGED_QUEUE, true, consumer);

            initialised = true;

            while (initialised) {
                Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "initialiseRabbit waiting for a message");
                QueueingConsumer.Delivery delivery = consumer.nextDelivery(5000);
                if (delivery != null && delivery.getBody() != null && delivery.getBody().length > 0) {
                    String message = new String(delivery.getBody());
                    Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "Rabbit message received:" + message);
                } else {
                    Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "initialiseRabbit timedout waiting for a message");
                }
            }
        } catch (IOException ex) {
            handleRabbitError(connection, channel, ex);
        } catch (InterruptedException ex) {
            handleRabbitError(connection, channel, ex);
        } catch (ShutdownSignalException ex) {
            handleRabbitError(connection, channel, ex);
        } catch (ConsumerCancelledException ex) {
            handleRabbitError(connection, channel, ex);
        }

        Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "initialiseRabbit exit!");

    }

    private synchronized void handleRabbitError(Connection connection, Channel channel, Exception ex) {
        Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.SEVERE, null, ex);

        try {
            channel.close();
        } catch (IOException ex1) {
            Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.SEVERE, "handleRabbitError couldn't close the channel", ex1);
        }

        try {
            connection.close();
        } catch (IOException ex1) {
            Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.SEVERE, "handleRabbitError couldn't close the connection", ex1);
        }

        Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "handleRabbitError has reset the rabbit connection/channel");
        this.initialised = false;

    }

    public void kill() {
        if (initialised) {
            Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "killing channel/connection...");

            if (channel != null) {
                Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "killing channel");
                try {
                    channel.abort();
                } catch (IOException ex) {
                    Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.SEVERE, null, ex);
                }
            }

            if (connection != null) {
                Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "killing connection");
                connection.abort();
            }
            Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "killing completed");
        } else {
            Logger.getLogger(RabbitMessageHandler.class.getName()).log(Level.INFO, "kill - nothing to kill detected");
        }
    }

}

Tomcat server log file - seems healthy to me

15-Oct-2014 14:09:49.446 INFO [main] org.apache.catalina.core.StandardServer.await A valid shutdown command was received via the shutdown port. Stopping the Server instance.

15-Oct-2014 14:09:49.446 INFO [main] org.apache.coyote.AbstractProtocol.pause Pausing ProtocolHandler ["http-nio-8084"]

15-Oct-2014 14:09:49.498 INFO [main] org.apache.coyote.AbstractProtocol.pause Pausing ProtocolHandler ["ajp-nio-8009"]

15-Oct-2014 14:09:49.550 INFO [main] org.apache.catalina.core.StandardService.stopInternal Stopping service Catalina

15-Oct-2014 14:09:49.564 INFO [main] org.apache.coyote.AbstractProtocol.stop Stopping ProtocolHandler ["http-nio-8084"]

15-Oct-2014 14:09:49.566 INFO [main] org.apache.coyote.AbstractProtocol.stop Stopping ProtocolHandler ["ajp-nio-8009"]

15-Oct-2014 14:09:49.566 INFO [main] org.apache.coyote.AbstractProtocol.destroy Destroying ProtocolHandler ["http-nio-8084"]

15-Oct-2014 14:09:49.567 INFO [main] org.apache.coyote.AbstractProtocol.destroy Destroying ProtocolHandler ["ajp-nio-8009"]


15-Oct-2014 14:10:05.336 SEVERE [main] org.apache.catalina.startup.Catalina.initDirs Cannot find specified temporary folder at /Users/Rob/Library/Application Support/NetBeans/8.0/apache-tomcat-8.0.3.0_base/temp

15-Oct-2014 14:10:05.829 INFO [main] org.apache.catalina.core.AprLifecycleListener.init The APR based Apache Tomcat Native library which allows optimal performance in production environments was not found on the java.library.path: /Applications/NetBeans/NetBeans 8.0.app/Contents/Resources/NetBeans/webcommon/bin::/Users/Rob/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.

15-Oct-2014 14:10:06.123 INFO [main] org.apache.coyote.AbstractProtocol.init Initializing ProtocolHandler ["http-nio-8084"]

15-Oct-2014 14:10:06.155 INFO [main] org.apache.tomcat.util.net.NioSelectorPool.getSharedSelector Using a shared selector for servlet write/read

15-Oct-2014 14:10:06.163 INFO [main] org.apache.coyote.AbstractProtocol.init Initializing ProtocolHandler ["ajp-nio-8009"]

15-Oct-2014 14:10:06.167 INFO [main] org.apache.tomcat.util.net.NioSelectorPool.getSharedSelector Using a shared selector for servlet write/read

15-Oct-2014 14:10:06.170 INFO [main] org.apache.catalina.startup.Catalina.load Initialization processed in 835 ms

15-Oct-2014 14:10:06.230 INFO [main] org.apache.catalina.core.StandardService.startInternal Starting service Catalina

15-Oct-2014 14:10:06.230 INFO [main] org.apache.catalina.core.StandardEngine.startInternal Starting Servlet Engine: Apache Tomcat/8.0.3

15-Oct-2014 14:10:06.243 INFO [localhost-startStop-1] org.apache.catalina.startup.HostConfig.deployDescriptor Deploying configuration descriptor /Users/Rob/Library/Application Support/NetBeans/8.0/apache-tomcat-8.0.3.0_base/conf/Catalina/localhost/manager.xml

15-Oct-2014 14:10:07.051 INFO [localhost-startStop-1] org.apache.jasper.servlet.TldScanner.scanJars At least one JAR was scanned for TLDs yet contained no TLDs. Enable debug logging for this logger for a complete list of JARs that were scanned but no TLDs were found in them. Skipping unneeded JARs during scanning can improve startup time and JSP compilation time.

15-Oct-2014 14:10:07.130 INFO [main] org.apache.coyote.AbstractProtocol.start Starting ProtocolHandler ["http-nio-8084"]

15-Oct-2014 14:10:07.135 INFO [main] org.apache.coyote.AbstractProtocol.start Starting ProtocolHandler ["ajp-nio-8009"]

15-Oct-2014 14:10:07.135 INFO [main] org.apache.catalina.startup.Catalina.start Server startup in 965 ms

15-Oct-2014 14:10:08.107 INFO [http-nio-8084-exec-5] org.apache.catalina.startup.HostConfig.deployDescriptor Deploying configuration descriptor /Users/Rob/Library/Application Support/NetBeans/8.0/apache-tomcat-8.0.3.0_base/conf/Catalina/localhost/SocialScheduler.xml

15-Oct-2014 14:10:08.111 WARNING [http-nio-8084-exec-5] org.apache.catalina.startup.SetContextPropertiesRule.begin [SetContextPropertiesRule]{Context} Setting property 'antiJARLocking' to 'true' did not find a matching property.

Application Logs

Oct 15, 2014 2:10:11 PM org.glassfish.jersey.servlet.init.JerseyServletContainerInitializer addServletWithApplication

INFO: Registering the Jersey servlet application, named uk.co.devology.socialscheduler.RestResourceConfig, at the servlet mapping /rest/*, with the Application class of the same name.

Oct 15, 2014 2:10:11 PM uk.co.devology.socialscheduler.security.SessionManager <init>

INFO: SessionManager constructed

Oct 15, 2014 2:10:11 PM uk.co.devology.socialscheduler.security.ServletListener contextInitialized

INFO: *** contextInitializing ***

Oct 15, 2014 2:10:11 PM uk.co.devology.socialscheduler.rabbit.RabbitMessageHandler initialiseRabbit

INFO: initialiseRabbit called!

Oct 15, 2014 2:10:11 PM uk.co.devology.socialscheduler.rabbit.RabbitMessageHandler initialiseRabbit

INFO: initialiseRabbit creating connection factory

Oct 15, 2014 2:10:11 PM uk.co.devology.socialscheduler.rabbit.RabbitMessageHandler initialiseRabbit

INFO: initialiseRabbit creating connection

Oct 15, 2014 2:10:11 PM uk.co.devology.socialscheduler.security.ServletListener$1 newThread

INFO: ThreadFactory is creating a new thread (Daemon) name:ThreadFactoryGenerated-1413382211726

Oct 15, 2014 2:10:11 PM uk.co.devology.socialscheduler.security.ServletListener$1 newThread

INFO: ThreadFactory daemon created.

Oct 15, 2014 2:10:12 PM uk.co.devology.socialscheduler.security.ServletListener$1 newThread

INFO: ThreadFactory is creating a new thread (Daemon) name:ThreadFactoryGenerated-1413382212228

Oct 15, 2014 2:10:12 PM uk.co.devology.socialscheduler.security.ServletListener$1 newThread

INFO: ThreadFactory daemon created.

Oct 15, 2014 2:10:12 PM uk.co.devology.socialscheduler.rabbit.RabbitMessageHandler initialiseRabbit

INFO: initialiseRabbit creating channel

Oct 15, 2014 2:10:12 PM uk.co.devology.socialscheduler.rabbit.RabbitMessageHandler initialiseRabbit

INFO: initialiseRabbit declaring queue

Oct 15, 2014 2:10:12 PM uk.co.devology.socialscheduler.rabbit.RabbitMessageHandler initialiseRabbit

INFO: initialiseRabbit creating queing consumer

Oct 15, 2014 2:10:12 PM uk.co.devology.socialscheduler.rabbit.RabbitMessageHandler initialiseRabbit

INFO: initialiseRabbit associating queue with queing consumer

Oct 15, 2014 2:10:12 PM uk.co.devology.socialscheduler.security.ServletListener$1 newThread

INFO: ThreadFactory is creating a new thread (Daemon) name:ThreadFactoryGenerated-1413382212273

Oct 15, 2014 2:10:12 PM uk.co.devology.socialscheduler.security.ServletListener$1 newThread

INFO: ThreadFactory daemon created.

Oct 15, 2014 2:10:12 PM uk.co.devology.socialscheduler.rabbit.RabbitMessageHandler initialiseRabbit

INFO: initialiseRabbit waiting for a message

Oct 15, 2014 2:10:17 PM uk.co.devology.socialscheduler.rabbit.RabbitMessageHandler initialiseRabbit

INFO: initialiseRabbit timedout waiting for a message

The last two bold lines in the log (waiting.. timedout) repeats indefinitely, even after I try to undeploy the war file, indicating that Tomcat doesn't like some threading perhaps?


Michael Klishin

unread,
Oct 15, 2014, 12:48:45 PM10/15/14
to Rob Wilson, rabbitm...@googlegroups.com
Java client connection factory in 3.4.0 (about to ship) has a flag for daemon threads. Your understanding of what's going on sounds reasonable.

MK

Rob Wilson

unread,
Oct 15, 2014, 4:00:23 PM10/15/14
to rabbitm...@googlegroups.com, net...@gmail.com
Thanks!

I've noticed that one war file doesn't exhibit this problem, but in that war file I open a connection, send a message, then close the connection.  Whereas in the problematic war file I open the connection and wait/block for messages as per the tutorials (Producer - Consumer).

Is there an ETA on the ship date - would you advise using the nightly builds to try it out (non production of course)... Would there be any API documentation on when & how to set the daemon flags?

Thanks,
Rob.

Michael Klishin

unread,
Oct 19, 2014, 6:11:23 AM10/19/14
to rabbitm...@googlegroups.com, Rob Wilson
On 16 October 2014 at 00:00:29, Rob Wilson (net...@gmail.com) wrote:
> Is there an ETA on the ship date - would you advise using the nightly
> builds to try it out (non production of course)... Would there
> be any API documentation on when & how to set the daemon flags?

Actually, I see that a couple of bugs related to daemon threads were
closed as WONTFIX for various implementation reasons.

So you should provide your own thread factory that would create daemon
threads, see Custom Thread Factories on http://www.rabbitmq.com/api-guide.html.
The factory has to adhere to the j.u.c .ThreadFactory interface.
--
MK

Staff Software Engineer, Pivotal/RabbitMQ
Reply all
Reply to author
Forward
0 new messages