Rabbitmq JMS client get TextMessage

1,298 views
Skip to first unread message

Mudit Mishra

unread,
Mar 25, 2018, 7:36:19 AM3/25/18
to rabbitmq-users
We have an application engine that supports only JMS interface and hence we have to use JMS client of Rabbitmq to read messages.

Our application expects JMS message to be type TextMessage. Rabbitmq message receive function returns message of type RMQBytesMessage. 

How can message type be changed to TextMessage? 

Error:


03-25 22:35:01,685 [main][ERROR SendReceiveJMSExample] - Error

java.lang.ClassCastException: com.rabbitmq.jms.client.message.RMQBytesMessage cannot be cast to javax.jms.TextMessage

at SendReceiveJMSExample.runTest(SendReceiveJMSExample.java:54)

at SendReceiveJMSExample.main(SendReceiveJMSExample.java:24)

Exception in thread "main" java.lang.NullPointerException

at SendReceiveJMSExample.runTest(SendReceiveJMSExample.java:72)

at SendReceiveJMSExample.main(SendReceiveJMSExample.java:24)



export CLASSPATH=.:/tmp/rabbitmq/jars/jdk17/rabbitmq-jms-1.7.0.jar:\
/tmp/rabbitmq/jars/jdk17/amqp-client-4.2.0.jar:\
/tmp/rabbitmq/jars/slf4j-log4j12.jar:\
/tmp/rabbitmq/jars/jdk17/rabbitmq-jms-1.7.0.jar:\
/tmp/rabbitmq/jars/providerutil-1.2.1.jar:\
/tmp/rabbitmq/jars/geronimo-jms_1.1_spec-1.1.1.jar:\
/tmp/rabbitmq/jars/slf4j-api.jar:\
/tmp/rabbitmq/jars/fscontext-4.6-b01.jar:\
/tmp/rabbitmq/jars/log4j.jar

$ javac SendReceiveJMSExample.java 

$ java -Dlog4j.configuration=file:"/opt/sas/vicpol/sashome/SASFoundation/9.4/misc/tkjava/sas.log4j.properties" SendReceiveJMSExample


Java code sample:


import javax.jms.*;

//import com.rabbitmq.*;

import javax.naming.Context;

import javax.naming.InitialContext;

import java.util.Properties;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



public class SendReceiveJMSExample {


    protected final Logger logger = LoggerFactory.getLogger(SendReceiveJMSExample.class);

    private Context ctx;

    private Connection connection;

    private MessageConsumer messageConsumer;

    private MessageProducer messageProducer;

    private Session session;


    public SendReceiveJMSExample() {

    }


    public static void main(String[] args) throws Exception {

        SendReceiveJMSExample example = new SendReceiveJMSExample();

        example.runTest();

    }


    private void runTest() throws Exception {

      Properties env = new Properties();

      env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory");

      env.put(Context.PROVIDER_URL, "file:///tmp/rabbitmq/jndi");

      Properties properties = new Properties();


      try {

        ctx = new InitialContext(env);

        ConnectionFactory connectionFactory

            = (ConnectionFactory) ctx.lookup("ConnectionFactory");

        connection = connectionFactory.createConnection();

        connection.start();

        session = connection.createSession(true, Session.SESSION_TRANSACTED);

        Queue queue = (Queue) ctx.lookup("sas.audit.queue");


        messageConsumer = session.createConsumer(queue);

        MessageProducer messageProducer = session.createProducer(queue);


        TextMessage message = session.createTextMessage("Hello world!");

        messageProducer.send(message);

        session.commit();


        //RMQBytesMessage message_bt;

        //Class cls = messageConsumer.receive().getClass();

        //Class cls = message_bt.getClass();

        //System.out.println("The type of the object is: " + cls.getName());


        message = (TextMessage)messageConsumer.receive(1000);

        //Message message_cls = messageConsumer.receive(1000);

        //System.out.println("JMSType = " + message_cls.getJMSType());

        session.commit();

        System.out.println(message.getText());


        session.close();

        messageConsumer.close();

        messageProducer.close();

        connection.close();

        ctx.close();

  }

      catch (Exception e) {

        //System.out.println(e);

         //e.printStackTrace();

         logger.error("Error", e);

         session.close();

         messageConsumer.close();

         messageProducer.close();

         connection.close();

         ctx.close();

      }

      finally {

        connection.close();

        ctx.close();

      }


      //properties.load(this.getClass().getResourceAsStream("/tmp/rabbitmq/jndi/helloworld.properties"));

      //Context context = new InitialContext(properties);






    }

}




Mudit Mishra

unread,
Mar 25, 2018, 11:45:54 PM3/25/18
to rabbitmq-users
Added JMSType header to the message and then message could be read by the below code. We cannot set JMSType header for every message. 

Arnaud Cogoluègnes

unread,
Mar 26, 2018, 8:58:41 AM3/26/18
to rabbitm...@googlegroups.com
Can you share the definition of the queue? I think you defined it as an AMQP resource, this may be not what you want.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Mudit Mishra

unread,
Mar 26, 2018, 7:52:33 PM3/26/18
to rabbitm...@googlegroups.com

[root@rabbitmq1 rabbitmq]# rabbitmqctl report

Reporting server status of node rabbit@rabbitmq1 ...


Status of node rabbit@rabbitmq1 ...

[{pid,2052},

 {running_applications,

     [{rabbitmq_management,"RabbitMQ Management Console","3.7.4"},

      {amqp_client,"RabbitMQ AMQP Client","3.7.4"},

      {rabbitmq_management_agent,"RabbitMQ Management Agent","3.7.4"},

      {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.7.4"},

      {rabbitmq_jms_topic_exchange,

          "RabbitMQ JMS topic selector exchange plugin","3.7.4"},

      {rabbit,"RabbitMQ","3.7.4"},

      {rabbit_common,

          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",

          "3.7.4"},

      {xmerl,"XML parser","1.3.16"},

      {ranch_proxy_protocol,"Ranch Proxy Protocol Transport","1.4.4"},

      {cowboy,"Small, fast, modern HTTP server.","2.2.2"},

      {ranch,"Socket acceptor pool for TCP protocols.","1.4.0"},

      {ssl,"Erlang/OTP SSL application","8.2.4"},

      {public_key,"Public key infrastructure","1.5.2"},

      {asn1,"The Erlang ASN1 compiler version 5.0.5","5.0.5"},

      {recon,"Diagnostic tools for production use","2.3.2"},

      {cowlib,"Support library for manipulating Web protocols.","2.1.0"},

      {crypto,"CRYPTO","4.2.1"},

      {os_mon,"CPO  CXC 138 46","2.4.4"},

      {jsx,"a streaming, evented json parsing toolkit","2.8.2"},

      {inets,"INETS  CXC 138 49","6.5"},

      {mnesia,"MNESIA  CXC 138 12","4.15.3"},

      {lager,"Erlang logging framework","3.5.1"},

      {goldrush,"Erlang event stream processor","0.1.9"},

      {compiler,"ERTS  CXC 138 10","7.1.5"},

      {syntax_tools,"Syntax tools","2.1.4"},

      {sasl,"SASL  CXC 138 11","3.1.1"},

      {stdlib,"ERTS  CXC 138 10","3.4.4"},

      {kernel,"ERTS  CXC 138 10","5.4.3"}]},

 {os,{unix,linux}},

 {erlang_version,

     "Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:64] [hipe] [kernel-poll:true]\n"},

 {memory,

     [{connection_readers,0},

      {connection_writers,0},

      {connection_channels,0},

      {connection_other,24672},

      {queue_procs,22576},

      {queue_slave_procs,0},

      {plugins,1794256},

      {other_proc,25587848},

      {metrics,197664},

      {mgmt_db,390184},

      {mnesia,81704},

      {other_ets,2194168},

      {binary,560752},

      {msg_index,28912},

      {code,28471743},

      {atom,1123529},

      {other_system,9654848},

      {allocated_unused,17914760},

      {reserved_unallocated,0},

      {strategy,rss},

      {total,[{erlang,70132856},{rss,81543168},{allocated,88047616}]}]},

 {alarms,[]},

 {listeners,[{clustering,25672,"::"},{amqp,5672,"::"},{http,15672,"::"}]},

 {vm_memory_calculation_strategy,rss},

 {vm_memory_high_watermark,0.4},

 {vm_memory_limit,205558579},

 {disk_free_limit,50000000},

 {disk_free,33685524480},

 {file_descriptors,

     [{total_limit,924},{total_used,3},{sockets_limit,829},{sockets_used,0}]},

 {processes,[{limit,1048576},{used,369}]},

 {run_queue,0},

 {uptime,60649},

 {kernel,{net_ticktime,60}}]


Cluster status of node rabbit@rabbitmq1 ...

[{nodes,[{disc,[rabbit@rabbitmq1]}]},

 {running_nodes,[rabbit@rabbitmq1]},

 {cluster_name,<<"rab...@rabbitmq1.intranet.local">>},

 {partitions,[]},

 {alarms,[{rabbit@rabbitmq1,[]}]}]


Application environment of node rabbit@rabbitmq1 ...

[{amqp_client,[{prefer_ipv6,false},{ssl_options,[]}]},

 {asn1,[]},

 {compiler,[]},

 {cowboy,[]},

 {cowlib,[]},

 {crypto,[{fips_mode,false}]},

 {goldrush,[]},

 {inets,[]},

 {jsx,[]},

 {kernel,

     [{error_logger,tty},

      {inet_default_connect_options,[{nodelay,true}]},

      {inet_dist_listen_max,25672},

      {inet_dist_listen_min,25672}]},

 {lager,

     [{async_threshold,20},

      {async_threshold_window,5},

      {colored,false},

      {colors,

          [{debug,"\e[0;38m"},

           {info,"\e[1;37m"},

           {notice,"\e[1;36m"},

           {warning,"\e[1;33m"},

           {error,"\e[1;31m"},

           {critical,"\e[1;35m"},

           {alert,"\e[1;44m"},

           {emergency,"\e[1;41m"}]},

      {crash_log,"log/crash.log"},

      {crash_log_count,5},

      {crash_log_date,"$D0"},

      {crash_log_msg_size,65536},

      {crash_log_size,10485760},

      {error_logger_format_raw,true},

      {error_logger_hwm,50},

      {error_logger_hwm_original,50},

      {error_logger_redirect,true},

      {extra_sinks,

          [{error_logger_lager_event,

               [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},

                {rabbit_handlers,

                    [{lager_forwarder_backend,[lager_event,inherit]}]}]},

           {rabbit_log_lager_event,

               [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},

                {rabbit_handlers,

                    [{lager_forwarder_backend,[lager_event,inherit]}]}]},

           {rabbit_log_channel_lager_event,

               [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},

                {rabbit_handlers,

                    [{lager_forwarder_backend,[lager_event,inherit]}]}]},

           {rabbit_log_connection_lager_event,

               [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},

                {rabbit_handlers,

                    [{lager_forwarder_backend,[lager_event,inherit]}]}]},

           {rabbit_log_mirroring_lager_event,

               [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},

                {rabbit_handlers,

                    [{lager_forwarder_backend,[lager_event,inherit]}]}]},

           {rabbit_log_queue_lager_event,

               [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},

                {rabbit_handlers,

                    [{lager_forwarder_backend,[lager_event,inherit]}]}]},

           {rabbit_log_federation_lager_event,

               [{handlers,[{lager_forwarder_backend,[lager_event,inherit]}]},

                {rabbit_handlers,

                    [{lager_forwarder_backend,[lager_event,inherit]}]}]},

           {rabbit_log_upgrade_lager_event,

               [{handlers,

                    [{lager_file_backend,

                         [{date,[]},

                          {file,

                              "/var/log/rabbitmq/rabbit@rabbitmq1_upgrade.log"},

                          {formatter_config,

                              [date," ",time," ",color,"[",severity,"] ",

                               {pid,[]},

                               " ",message,"\n"]},

                          {level,info},

                          {size,0}]}]},

                {rabbit_handlers,

                    [{lager_file_backend,

                         [{date,[]},

                          {file,

                              "/var/log/rabbitmq/rabbit@rabbitmq1_upgrade.log"},

                          {formatter_config,

                              [date," ",time," ",color,"[",severity,"] ",

                               {pid,[]},

                               " ",message,"\n"]},

                          {level,info},

                          {size,0}]}]}]}]},

      {handlers,

          [{lager_file_backend,

               [{date,[]},

                {file,"/var/log/rabbitmq/rab...@rabbitmq1.log"},

                {formatter_config,

                    [date," ",time," ",color,"[",severity,"] ",

                     {pid,[]},

                     " ",message,"\n"]},

                {level,info},

                {size,0}]}]},

      {log_root,"/var/log/rabbitmq"},

      {rabbit_handlers,

          [{lager_file_backend,

               [{date,[]},

                {file,"/var/log/rabbitmq/rab...@rabbitmq1.log"},

                {formatter_config,

                    [date," ",time," ",color,"[",severity,"] ",

                     {pid,[]},

                     " ",message,"\n"]},

                {level,info},

                {size,0}]}]}]},

 {mnesia,[{dir,"/var/lib/rabbitmq/mnesia/rabbit@rabbitmq1"}]},

 {os_mon,

     [{start_cpu_sup,false},

      {start_disksup,false},

      {start_memsup,false},

      {start_os_sup,false}]},

 {public_key,[]},

 {rabbit,

     [{auth_backends,[rabbit_auth_backend_internal]},

      {auth_mechanisms,['PLAIN','AMQPLAIN']},

      {autocluster,

          [{peer_discovery_backend,rabbit_peer_discovery_classic_config}]},

      {background_gc_enabled,false},

      {background_gc_target_interval,60000},

      {backing_queue_module,rabbit_priority_queue},

      {channel_max,0},

      {channel_operation_timeout,15000},

      {cluster_keepalive_interval,10000},

      {cluster_nodes,{[],disc}},

      {cluster_partition_handling,ignore},

      {collect_statistics,fine},

      {collect_statistics_interval,5000},

      {config_entry_decoder,

          [{cipher,aes_cbc256},

           {hash,sha512},

           {iterations,1000},

           {passphrase,undefined}]},

      {connection_max,infinity},

      {credit_flow_default_credit,{400,200}},

      {default_consumer_prefetch,{false,0}},

      {default_permissions,[<<".*">>,<<".*">>,<<".*">>]},

      {default_user,<<"guest">>},

      {default_user_tags,[administrator]},

      {default_vhost,<<"/">>},

      {delegate_count,16},

      {disk_free_limit,50000000},

      {disk_monitor_failure_retries,10},

      {disk_monitor_failure_retry_interval,120000},

      {enabled_plugins_file,"/etc/rabbitmq/enabled_plugins"},

      {fhc_read_buffering,false},

      {fhc_write_buffering,true},

      {frame_max,131072},

      {halt_on_upgrade_failure,true},

      {handshake_timeout,10000},

      {heartbeat,60},

      {hipe_compile,false},

      {hipe_modules,

          [rabbit_reader,rabbit_channel,gen_server2,rabbit_exchange,

           rabbit_command_assembler,rabbit_framing_amqp_0_9_1,rabbit_basic,

           rabbit_event,lists,queue,priority_queue,rabbit_router,rabbit_trace,

           rabbit_misc,rabbit_binary_parser,rabbit_exchange_type_direct,

           rabbit_guid,rabbit_net,rabbit_amqqueue_process,

           rabbit_variable_queue,rabbit_binary_generator,rabbit_writer,

           delegate,gb_sets,lqueue,sets,orddict,rabbit_amqqueue,

           rabbit_limiter,gb_trees,rabbit_queue_index,

           rabbit_exchange_decorator,gen,dict,ordsets,file_handle_cache,

           rabbit_msg_store,array,rabbit_msg_store_ets_index,rabbit_msg_file,

           rabbit_exchange_type_fanout,rabbit_exchange_type_topic,mnesia,

           mnesia_lib,rpc,mnesia_tm,qlc,sofs,proplists,credit_flow,pmon,

           ssl_connection,tls_connection,ssl_record,tls_record,gen_fsm,ssl]},

      {lager_default_file,"/var/log/rabbitmq/rab...@rabbitmq1.log"},

      {lager_extra_sinks,

          [rabbit_log_lager_event,rabbit_log_channel_lager_event,

           rabbit_log_connection_lager_event,rabbit_log_mirroring_lager_event,

           rabbit_log_queue_lager_event,rabbit_log_federation_lager_event,

           rabbit_log_upgrade_lager_event]},

      {lager_log_root,"/var/log/rabbitmq"},

      {lager_upgrade_file,"/var/log/rabbitmq/rabbit@rabbitmq1_upgrade.log"},

      {lazy_queue_explicit_gc_run_operation_threshold,1000},

      {log,

          [{file,[{file,"/var/log/rabbitmq/rab...@rabbitmq1.log"}]},

           {categories,

               [{upgrade,

                    [{file,

                         "/var/log/rabbitmq/rabbit@rabbitmq1_upgrade.log"}]}]}]},

      {loopback_users,[<<"guest">>]},

      {memory_monitor_interval,2500},

      {mirroring_flow_control,true},

      {mirroring_sync_batch_size,4096},

      {mnesia_table_loading_retry_limit,10},

      {mnesia_table_loading_retry_timeout,30000},

      {msg_store_credit_disc_bound,{4000,800}},

      {msg_store_file_size_limit,16777216},

      {msg_store_index_module,rabbit_msg_store_ets_index},

      {msg_store_io_batch_size,4096},

      {num_ssl_acceptors,10},

      {num_tcp_acceptors,10},

      {password_hashing_module,rabbit_password_hashing_sha256},

      {plugins_dir,

          "/usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.4/plugins"},

      {plugins_expand_dir,

          "/var/lib/rabbitmq/mnesia/rabbit@rabbitmq1-plugins-expand"},

      {proxy_protocol,false},

      {queue_explicit_gc_run_operation_threshold,1000},

      {queue_index_embed_msgs_below,4096},

      {queue_index_max_journal_entries,32768},

      {reverse_dns_lookups,false},

      {server_properties,[]},

      {ssl_allow_poodle_attack,false},

      {ssl_apps,[asn1,crypto,public_key,ssl]},

      {ssl_cert_login_from,distinguished_name},

      {ssl_handshake_timeout,5000},

      {ssl_listeners,[]},

      {ssl_options,[]},

      {tcp_listen_options,

          [{backlog,128},

           {nodelay,true},

           {linger,{true,0}},

           {exit_on_close,false}]},

      {tcp_listeners,[5672]},

      {trace_vhosts,[]},

      {vhost_restart_strategy,continue},

      {vm_memory_calculation_strategy,rss},

      {vm_memory_high_watermark,0.4},

      {vm_memory_high_watermark_paging_ratio,0.5}]},

 {rabbit_common,[]},

 {rabbitmq_jms_topic_exchange,[]},

 {rabbitmq_management,

     [{cors_allow_origins,[]},

      {cors_max_age,1800},

      {http_log_dir,none},

      {listener,[{port,15672}]},

      {load_definitions,none},

      {management_db_cache_multiplier,5},

      {process_stats_gc_timeout,300000},

      {stats_event_max_backlog,250}]},

 {rabbitmq_management_agent,

     [{rates_mode,basic},

      {sample_retention_policies,

          [{global,[{605,5},{3660,60},{29400,600},{86400,1800}]},

           {basic,[{605,5},{3600,60}]},

           {detailed,[{605,5}]}]}]},

 {rabbitmq_web_dispatch,[]},

 {ranch,[]},

 {ranch_proxy_protocol,[{proxy_protocol_timeout,55000},{ssl_accept_opts,[]}]},

 {recon,[]},

 {sasl,[{errlog_type,error},{sasl_error_logger,false}]},

 {ssl,[]},

 {stdlib,[]},

 {syntax_tools,[]},

 {xmerl,[]}]


Listing connections ...


Listing channels ...


Timeout: 60.0 seconds ...

Listing queues for vhost / ...

sas.audit.queue true false [] <rab...@rabbitmq1.2.399.0> false 0 0 0 069 0 56240 running


Listing exchanges for vhost / ...

amq.fanout fanout true false false []

amq.rabbitmq.trace topic true false true []

jms.durable.queues direct true false false []

amq.topic topic true false false []

amq.match headers true false false []

direct true false false []

amq.direct direct true false false []

amq.headers headers true false false []


Listing bindings for vhost /...

exchange sas.audit.queue queue sas.audit.queue []

jms.durable.queues exchange sas.audit.queue queue sas.audit.queue []


Listing permissions for vhost "/" ...

sasdemo .* .* .*

guest .* .* .*

[root@rabbitmq1 rabbitmq]# 



To post to this group, send email to rabbitm...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/-pQms6IkjUA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-users+unsubscribe@googlegroups.com.

Arnaud Cogoluègnes

unread,
Mar 27, 2018, 4:46:42 AM3/27/18
to rabbitm...@googlegroups.com
I meant the definition of this queue:
Queue queue = (Queue) ctx.lookup("sas.audit.queue");

How did you define "sas.audit.queue"?
Message has been deleted

Arnaud Cogoluègnes

unread,
Mar 28, 2018, 9:35:54 AM3/28/18
to rabbitm...@googlegroups.com
Please have a look at the "JMS and AMQP 0-9-1 Destination Interoperability" section of the JMS client web page [1]. Then feel free to post more precise questions about what you're trying to achieve and the exact error messages you're getting.



On Wed, Mar 28, 2018 at 7:42 AM, Mudit Mishra <muditm...@gmail.com> wrote:
Thank you for your response.

here is my .binding file

#This file is used by the JNDI FSContext.
ConnectionFactory/ClassName=javax.jms.ConnectionFactory
ConnectionFactory/FactoryName=com.rabbitmq.jms.admin.RMQObjectFactory
ConnectionFactory/RefAddr/0/Content=jms/ConnectionFactory
ConnectionFactory/RefAddr/0/Type=name
ConnectionFactory/RefAddr/0/Encoding=String
ConnectionFactory/RefAddr/1/Content=javax.jms.ConnectionFactory
ConnectionFactory/RefAddr/1/Type=type
ConnectionFactory/RefAddr/1/Encoding=String
ConnectionFactory/RefAddr/2/Content=com.rabbitmq.jms.admin.RMQObjectFactory
ConnectionFactory/RefAddr/2/Type=factory
ConnectionFactory/RefAddr/2/Encoding=String
# Change this line accordingly if the broker is not at localhost
ConnectionFactory/RefAddr/3/Content=192.168.56.111
ConnectionFactory/RefAddr/3/Type=host
ConnectionFactory/RefAddr/3/Encoding=String
ConnectionFactory/RefAddr/4/Content=5672
ConnectionFactory/RefAddr/4/Type=portNumber
ConnectionFactory/RefAddr/4/Encoding=String
ConnectionFactory/RefAddr/5/Content=sasdemo
ConnectionFactory/RefAddr/5/Type=username
ConnectionFactory/RefAddr/5/Encoding=String
ConnectionFactory/RefAddr/6/Content=Orion123
ConnectionFactory/RefAddr/6/Type=password
ConnectionFactory/RefAddr/6/Encoding=String
ConnectionFactory/RefAddr/7/Content=/
ConnectionFactory/RefAddr/7/Type=VirtualHost
ConnectionFactory/RefAddr/7/Encoding=String

hello/ClassName=javax.jms.Queue
hello/FactoryName=com.rabbitmq.jms.admin.RMQObjectFactory
hello/RefAddr/0/Content=jms/Queue
hello/RefAddr/0/Type=name
hello/RefAddr/0/Encoding=String
hello/RefAddr/1/Content=javax.jms.Queue
hello/RefAddr/1/Type=type
hello/RefAddr/1/Encoding=String
hello/RefAddr/2/Content=com.rabbitmq.jms.admin.RMQObjectFactory
hello/RefAddr/2/Type=factory
hello/RefAddr/2/Encoding=String
hello/RefAddr/3/Content=hello
hello/RefAddr/3/Type=destinationName
hello/RefAddr/3/Encoding=String

sas.audit.queue/ClassName=javax.jms.Queue
sas.audit.queue/FactoryName=com.rabbitmq.jms.admin.RMQObjectFactory

sas.audit.queue/RefAddr/0/Content=jms/Queue
sas.audit.queue/RefAddr/0/Type=name
sas.audit.queue/RefAddr/0/Encoding=String

sas.audit.queue/RefAddr/1/Content=javax.jms.Queue
sas.audit.queue/RefAddr/1/Type=type
sas.audit.queue/RefAddr/1/Encoding=String

sas.audit.queue/RefAddr/2/Content=com.rabbitmq.jms.admin.RMQObjectFactory
sas.audit.queue/RefAddr/2/Type=factory
sas.audit.queue/RefAddr/2/Encoding=String

sas.audit.queue/RefAddr/3/Content=sas.audit.queue
sas.audit.queue/RefAddr/3/Type=destinationName
sas.audit.queue/RefAddr/3/Encoding=String


sas.audit.queue/RefAddr/4/Content=true
sas.audit.queue/RefAddr/4/Type=amqp
sas.audit.queue/RefAddr/4/Encoding=String

sas.audit.queue/RefAddr/5/Content=sas.audit.queue
sas.audit.queue/RefAddr/5/Type=amqpQueueName
sas.audit.queue/RefAddr/5/Encoding=String

sas.audit.queue/RefAddr/6/Content=jms.durable.queues
sas.audit.queue/RefAddr/6/Type=amqpExchangeName
sas.audit.queue/RefAddr/6/Encoding=String

sas.audit.queue/RefAddr/7/Content=sas.audit.queue
sas.audit.queue/RefAddr/7/Type=amqpRoutingKey
sas.audit.queue/RefAddr/7/Encoding=String


If I don't use AMP I get invalid header error. 
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/-pQms6IkjUA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.

To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Eric NICOLAS

unread,
Apr 12, 2018, 11:53:40 AM4/12/18
to rabbitmq-users

Hi Arnaud,

I didn't find the reply buttin but just reply to the author.
I have similar the same problem : having some clients having jms code and don't want to rewrite code.
Setting the JMSType in message header when publishing a message in Rabbit works fine .

I have a standalone client (but normaly use jndi resources)

Following the code :

        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPort(5662);
        connectionFactory.setHost("XX.XX.XX.XX");
       
       
        RMQDestination jmsDestination = new RMQDestination();
        jmsDestination.setAmqp(true);
        jmsDestination.setQueue(true);
        jmsDestination.setAmqpExchangeName("direct.exchange");
        jmsDestination.setAmqpRoutingKey("rk_jms");
        jmsDestination.setDestinationName("queue_jms");
        jmsDestination.setAmqpQueueName("queue_jms");

Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Destination dest = session.createQueue("queue2");
//MessageConsumer consumer = (MessageConsumer) session.createConsumer(dest);

MessageConsumer consumer = (MessageConsumer) session.createConsumer(jmsDestination);
consumer.setMessageListener(new MessageListener() {

    //@Override
    public void onMessage(Message message) {
        try {
            if (message instanceof RMQBytesMessage) {
                System.out.println("RMQBytesMessage");
            }
            else if (message instanceof TextMessage){
                    System.out.println("TextMessage");
            }
            else  {
                System.out.println("Other message instance");
            }
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
});

I always get RMQBytesMessage. How can i define my destination to have a TextMessage?

Trying to convert RMQBytesMessage to TextMessage, i have an error :
Caused by: java.lang.ClassCastException: com.rabbitmq.jms.client.message.RMQBytesMessage cannot be cast to javax.jms.TextMessage

Thanks for yout help

Eric

Arnaud Cogoluègnes

unread,
Apr 13, 2018, 4:49:54 AM4/13/18
to rabbitm...@googlegroups.com
If a JMS client sends a TextMessage and you want a JMS consumer to receive this TextMessage, the JMS queue shouldn't be an AMQP resource, i.e. jmsDestination.setAmqp(false).

JMS AMQP resources (jmsDestination.setAmqp(true)) are meant to provide interoperability between JMS and AMQP clients. A JMS client consuming a message (possibility from an AMQP client) from a JMS AMQP resource will always see a RMQBytesMessage and will have to make sense out of it. There's an exception: if the message has been published with the "JMSType" AMQP header set to "TextMessage", the byte-array payload will be interpreted as UTF-8 string and the message will be converted into a JMS TextMessage by the driver.

In your example, if you want to keep the queue as an AMQP resource and exchange TextMessages between JMS clients, you need to set up the JMSType property to TextMessage:

MessageProducer producer = session.createProducer(jmsDestination);
TextMessage message = session.createTextMessage("Hello");
message.setJMSType("TextMessage");
producer.send(message);

This is covered in the "JMS and AMQP 0-9-1 Destination Interoperability" section of [1], but it says "AMQP message property" instead of "AMQP message header". I'll fix it, thanks for bringing that up.

[1] http://www.rabbitmq.com/jms-client.html


To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

Eric NICOLAS

unread,
Apr 13, 2018, 10:01:19 AM4/13/18
to rabbitmq-users
Thanks for your reply.

My producer publish in AMQP. i have several clients. Clients migrate progessively from JMS to AMQP.
The solution to put the
message.setJMSType("TextMessage");
will works fine (otherwise i cannot get the message as Text) .

But all my clients should migrate, and then the setJMSType in the publisher code will not be more usefull
Also if a client wants to consume in JMS my message, if the publish method do not specify the setJMSType, it couldn't get the message as Text?

Thanks for your reply

Eric

Arnaud Cogoluègnes

unread,
Apr 13, 2018, 10:48:56 AM4/13/18
to rabbitm...@googlegroups.com
Try to set the JMSType header in a transversal way, e.g. with a global system of message post-processing. Then getting rid of the JMSType header will be much easier.

If you're sure the body of the message is a readable string, you can do something like the following:

BytesMessage msg = (BytesMessage) message;
byte [] content = new byte[(int) msg.getBodyLength()];
msg.readBytes(content);
String textContent = new String(content);

To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages