Clojure and JMS - Sample program

411 views
Skip to first unread message

jsrodrigues

unread,
Mar 21, 2010, 1:09:09 AM3/21/10
to Clojure
I wrote this program to help me explore and understand JMS-Queue
(javax.jms.Queue). I was working with OpenJMS which comes bundled with
the GlassFish application server (running on Windows). In addition to
installing JDK-EE 1.5+GlassFish, you will need to include the
following jars in your Clojure classpath, to get this program to work:

set ssdk_lib=C:\Sun\SDK\lib
set jms_lib=%ssdk_lib%\install\applications\jmsra\imqjmsra.jar;
%ssdk_lib%\appserv-rt.jar;%ssdk_lib%\javaee.jar;%ssdk_lib%\appserv-
admin.jar

You will also need to define the following JMS resources -
"QueueConnectionFactory"(javax.jms.QueueConnectionFactory) and
"TestQueue" (javax.jms.Queue). I call the main function this way:
user=> (jms.jms-test/main)

Next, I will be writing some more code to understand JMS-Topic
(javax.jms.Topic). I will post that as well.

Regards,
John

Here is the code:

(ns jms.jms-test
(:import (javax.naming InitialContext))
(:import (java.util Properties))
(:import (javax.jms Session QueueRequestor
MessageListener)))

(defn get-initial-context []
(let [props (Properties.)]
(doto props
(. setProperty "java.naming.factory.initial"
"com.sun.enterprise.naming.SerialInitContextFactory")
(. setProperty "org.omg.CORBA.ORBInitialHost"
"localhost")
(. setProperty "org.omg.CORBA.ORBInitialPort"
"3700"))
(InitialContext. props)))

(defn get-message-text []
(format "This is a test message: %d" (rand-int (int (* (rand)
10000)))))

(defn send-message-to-queue [qSender qSession]
(let [message (.createTextMessage qSession)]
(.setText message (get-message-text))
(.send qSender message)))

(defn send-term-message-to-queue [qSender qSession]
(.send qSender (.createMessage qSession)))

(defn send-n-messages-to-queue [qSender qSession num-messages]
(loop [n num-messages]
(if (zero? n)
(do
(send-term-message-to-queue qSender qSession)
(println "*Done putting messages on queue!*"))
(recur (do
(send-message-to-queue qSender qSession)
(dec n))))))

(defn process-queue-messages [qReceiver qConnection]
(let [done-processing (ref false)]
(.setMessageListener qReceiver
(proxy [MessageListener][]
(onMessage [message]
(if (instance?
javax.jms.TextMessage message)
(println (format "Read
message: %s" (.getText message)))
(dosync (ref-set done-
processing true))))))
(.start qConnection)
(while (false? @done-processing)
(Thread/sleep 1000)))
(.close qConnection)
(println "==Read all messages off the queue!!=="))

(defn main []
(let [ctx (get-initial-context)
qConFactory (.lookup ctx "QueueConnectionFactory")
qConnection (.createQueueConnection qConFactory)
qSession (.createQueueSession qConnection false Session/
AUTO_ACKNOWLEDGE)
queue (.createQueue qSession "TestQueue")
qSender (.createSender qSession queue)
qReceiver (.createReceiver qSession queue)]
(.start (Thread. (fn[] (send-n-messages-to-queue qSender qSession
10))))
(.start (Thread. (fn[] (process-queue-messages qReceiver
qConnection))))))

jsrodrigues

unread,
Mar 21, 2010, 11:20:42 PM3/21/10
to Clojure
This is the follow-up code I wrote to help me understand JMS-Topic
(javax.jms.Topic).

Regards,
John

(ns jms.jms-topic
(:use [jms.jms-test :only (get-initial-context
get-message-text)])
(:import (javax.jms Session MessageListener)))

(defn publish-term-message-to-topic [tPublisher tSession]
(.publish tPublisher (.createMessage tSession)))

(defn publish-message-to-topic [tPublisher tSession]
(let [message (.createTextMessage tSession)]
(.setText message (get-message-text))
(println (format "Publishing message - %s ..." (.getText
message)))
(.publish tPublisher message)))

(defn publish-n-messages-to-topic [tPublisher tSession num-messages
qReceiver qConnection]
;; Wait for subscribers
(println "Waiting for subscribers...")
(let [subscriber-available (ref false)]


(.setMessageListener qReceiver
(proxy [MessageListener][]
(onMessage [message]

(dosync (ref-set subscriber-
available true)))))
(.start qConnection)
(while (false? @subscriber-available)
(Thread/sleep 1000))
(.close qConnection)
(println "Hoo-rah!! We have a subscriber"))

(loop [n num-messages]
(if (zero? n)
(do

(publish-term-message-to-topic tPublisher tSession)
(println "*Done publishing messages to topic!*"))
(recur (do
(publish-message-to-topic tPublisher tSession)
(dec n))))))

(defn process-topic-messages [tSubscriber tConnection qSender
qSession]
;; Snooze a bit before we are ready to process topic messages
(Thread/sleep (* 10 1000))
(println "-- Subscriber is now online --")
(.send qSender (.createMessage qSession))

(let [done-processing (ref false)]
(.setMessageListener tSubscriber


(proxy [MessageListener][]
(onMessage [message]
(if (instance?
javax.jms.TextMessage message)
(println (format "Read
message: %s" (.getText message)))
(dosync (ref-set done-
processing true))))))

(.start tConnection)


(while (false? @done-processing)
(Thread/sleep 1000)))

(.close tConnection)
(println "==Read all messages off the topic!!=="))

(defn main []
(let [ctx (get-initial-context)

tConFactory (.lookup ctx "TopicConnectionFactory")
tConnection (.createTopicConnection tConFactory)
tSession (.createTopicSession tConnection false Session/
AUTO_ACKNOWLEDGE)
topic (.createTopic tSession "TestTopic")
tPublisher (.createPublisher tSession topic)
tSubscriber (.createSubscriber tSession topic)


qConFactory (.lookup ctx "QueueConnectionFactory")
qConnection (.createQueueConnection qConFactory)
qSession (.createQueueSession qConnection false Session/
AUTO_ACKNOWLEDGE)

queue (.createQueue qSession "controlQueue")


qSender (.createSender qSession queue)
qReceiver (.createReceiver qSession queue)]

(.start (Thread. (fn[] (publish-n-messages-to-topic tPublisher
tSession 10 qReceiver qConnection))))
(.start (Thread. (fn[] (process-topic-messages tSubscriber
tConnection qSender qSession))))))

Reply all
Reply to author
Forward
0 new messages