Informatica connection to RabbitMQ

354 views
Skip to first unread message

Mark Reedman

unread,
Aug 9, 2018, 4:07:38 AM8/9/18
to rabbitmq-users
Hello,

I'm new to using message queues like RabbitMQ.  I have a request to connect Informatica to RabbitMQ to ingest messages within a Queue.  Reading the limited Informatica documentation it appears that I can use the JMS component of Informatica.

I've setup the RabbitMQ server - the easy bit.  I've downloaded what I believe to be all the relevant JAR files for the client and enabled the rabbitmq_jms_topic_exchange.

I've created a .bindings file and the JNDI and JMS connections.  I can see the connection is active in the RabiitMQ server.  When I publish a message I get this:

Severity Timestamp Node Thread Message Code Message
ERROR 8/9/2018 9:03:40 AM node01 READER_1_1_1 JMS_2050 [ERROR] Source Qualifier [SQ_MyQueue_To_Oracle] encountered an error while receiving the JMS message. Reason: [invalid stream header: 7B0D0A20Exception Stack: com.rabbitmq.jms.util.RMQJMSException: invalid stream header: 7B0D0A20
at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1090)
at com.rabbitmq.jms.client.RMQMessage.convertJmsMessage(RMQMessage.java:907)
at com.rabbitmq.jms.client.RMQMessage.convertMessage(RMQMessage.java:902)
at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:322)
at com.rabbitmq.jms.client.RMQMessageConsumer.receive(RMQMessageConsumer.java:237)
at com.informatica.powerconnect.jms.server.reader.JMSReaderPartitionDriver.run(JMSReaderPartitionDriver.java:1750)
Caused by: java.io.StreamCorruptedException: invalid stream header: 7B0D0A20
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:299)
at com.rabbitmq.jms.client.RMQMessage.fromMessage(RMQMessage.java:1065)
... 5 more
.].

I'm missing some configuration I think but I really don't know what it could be.  Any help would be much appreciated.

Thanks in advance.

Regards
Mark

Arnaud Cogoluègnes

unread,
Aug 9, 2018, 5:35:19 AM8/9/18
to rabbitm...@googlegroups.com
Can you provide the version of the broker and the JMS client you're using?

How do you publish a message? Can you share the code/steps?

On consumption, the JMS client expects a specific format and it
doesn't recognize the format of the consumed message.
> --
> You received this message because you are subscribed to the Google Groups
> "rabbitmq-users" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to rabbitmq-user...@googlegroups.com.
> To post to this group, send email to rabbitm...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

Mark Reedman

unread,
Aug 9, 2018, 6:10:56 AM8/9/18
to rabbitmq-users
Hi,

Thanks for the reply.  This is a test environment, so I'm simply pushing a "Hello World" message from the RabbitMQ console.

I hadn't created an JMS Exchange, which I believe I need for this to work.  So, I've created an x-jms-topic Exchange but I can't bind it my queue.  I now get a the following error: 

Severity Timestamp Node Thread Message Code Message
ERROR 8/9/2018 11:03:23 AM node01 READER_1_1_1 JAVA PLUGIN_1762 [ERROR] java.lang.NullPointerException

Versions:
Informatica 10.1.0
amqp-client-3.6.2.jar
rabbitmq-jms-1.4.7.jar
slf4j-api-1.7.12.jar

Thanks for your help.

Regards
Mark

Arnaud Cogoluègnes

unread,
Aug 9, 2018, 7:15:40 AM8/9/18
to rabbitm...@googlegroups.com
You really need to upgrade, the broker and the client are a couple of years old.

You can indeed publish a message from the web console on the queue the
JMS client is listening on. You can try to set the JMSType header to
TextMessage. If this is an AMQP destination, the JMS client won't
assume any specific format than a plain text string.

The declaration of the queue the JMS client is listening on would be useful too.

Mark Reedman

unread,
Aug 9, 2018, 7:40:33 AM8/9/18
to rabbitmq-users
Hi,

I've updated the to use the following:

rabbitmq-jms-1.10.0
amqp-client-5.3.0

and changed to text message.  A little further forward as It doesn't crash but does generate some non-fatal errors:

Severity Timestamp Node Thread Message Code Message
INFO 8/9/2018 12:31:24 PM node01 WRITER_1_*_1 WRT_8226 Target Load Order Group [1] is set for real-time flushing. Target based commit is switched to source based commit and target based commit interval is used as source based commit interval.
ERROR 8/9/2018 12:31:25 PM node01 READER_1_1_1 JMS_2028 [ERROR] The message received by Source Qualifier [SQ_MyQueue_To_Oracle] does not match the body definition.
ERROR 8/9/2018 12:31:25 PM node01 READER_1_1_1 JMS_2091 [ERROR] The mismatched body type row is : [com.rabbitmq.jms.client.message.RMQBytesMessage@1f]
ERROR 8/9/2018 12:32:04 PM node01 READER_1_1_1 JMS_2028 [ERROR] The message received by Source Qualifier [SQ_MyQueue_To_Oracle] does not match the body definition.
ERROR 8/9/2018 12:32:04 PM node01 READER_1_1_1 JMS_2091 [ERROR] The mismatched body type row is : [com.rabbitmq.jms.client.message.RMQBytesMessage@1f]

It mentions BytesMessage, but switching it to that results in a nullPointerException.

Thanks again for taking an interest.

Regards
Mark

Arnaud Cogoluègnes

unread,
Aug 9, 2018, 9:45:48 AM8/9/18
to rabbitm...@googlegroups.com
The latest version of the JMS client is 1.9.0, it uses Java client 4.6.0.

I can't say much with the amount of information: I don't know what
Informatica expects. Please provide more information about what it is
supposed to be done with the consumed message, a procedure to
reproduce or more detailled logs.

On Thu, Aug 9, 2018 at 1:40 PM, Mark Reedman

Mark Reedman

unread,
Aug 10, 2018, 5:24:57 AM8/10/18
to rabbitmq-users
Hi Arnaud,

I've downloaded the versions you suggested.  Informatica is a bit of a black box when it comes to connectors.  I can't really glean anything from the log files it produces.  All I know is that we definitely have a connection and it tries to consume the message.  If I set the source to be a TextMessage in Informatica then I get a "does not match the body definition" and suggests I used a ByteMessage, which seems correct.  As soon as I change it to ByeMessage then I get a NullPointerException once I publish a message and get this traceback:

Severity Timestamp Node Thread Message Code Message
ERROR 8/10/2018 10:13:19 AM node01 READER_1_1_1 JAVA PLUGIN_1762 [ERROR] java.lang.NullPointerException
ERROR 8/10/2018 10:13:19 AM node01 READER_1_1_1 JAVA PLUGIN_1762 [ERROR]  at com.informatica.powerconnect.jms.server.reader.JMSReaderPartitionDriver.onMessage(JMSReaderPartitionDriver.java:2086)
ERROR 8/10/2018 10:13:19 AM node01 READER_1_1_1 JAVA PLUGIN_1762 [ERROR]  at com.informatica.powerconnect.jms.server.reader.JMSReaderPartitionDriver.run(JMSReaderPartitionDriver.java:1752)
ERROR 8/10/2018 10:13:19 AM node01 READER_1_1_1 JMS_2050 [ERROR] Source Qualifier [SQ_MyQueue_To_Oracle] encountered an error while receiving the JMS message. Reason: [null].
ERROR 8/10/2018 10:13:19 AM node01 READER_1_1_1 JAVA PLUGIN_1762 [ERROR] Source Qualifier [SQ_MyQueue_To_Oracle] encountered an error while receiving the JMS message. Reason: [null].
ERROR 8/10/2018 10:13:19 AM node01 READER_1_1_1 JAVA PLUGIN_1762 [ERROR]  at com.informatica.powerconnect.jms.server.reader.JMSReaderPartitionDriver.run(JMSReaderPartitionDriver.java:1858)
ERROR 8/10/2018 10:13:19 AM node01 READER_1_1_1 SDKS_38200 Partition-level [SQ_MyQueue_To_Oracle]: Plug-in #300800 failed in run().


This is the expected message structure as defined in Informatica is the following:

Severity Timestamp Node Thread Message Code Message
INFO 8/10/2018 10:11:47 AM node01 MAPPING CMN_1053 SQ_MyQueue_To_Oracle:Exchange: Row Definition Info:(
[0]:Names(JMSCorrelationID->JMSCorrelationID)
 Native-type:300810 (N/A) C-type:1014 (UniChar) Precision:255 Scale:0
[1]:Names(JMSMessageID->JMSMessageID)
 Native-type:300810 (N/A) C-type:1014 (UniChar) Precision:255 Scale:0
[2]:Names(JMSType->JMSType)
 Native-type:300810 (N/A) C-type:1014 (UniChar) Precision:255 Scale:0
[3]:Names(JMSDestination->JMSDestination)
 Native-type:300810 (N/A) C-type:1014 (UniChar) Precision:255 Scale:0
[4]:Names(JMSReplyTo->JMSReplyTo)
 Native-type:300810 (N/A) C-type:1014 (UniChar) Precision:255 Scale:0
[5]:Names(JMSDeliveryMode->JMSDeliveryMode)
 Native-type:300805 (N/A) C-type:1002 (Int) Precision:10 Scale:0
[6]:Names(JMSExpiration->JMSExpiration)
 Native-type:300811 (N/A) C-type:1007 (Date/Time) Precision:29 Scale:9
[7]:Names(JMSPriority->JMSPriority)
 Native-type:300805 (N/A) C-type:1002 (Int) Precision:10 Scale:0
[8]:Names(JMSTimeStamp->JMSTimeStamp)
 Native-type:300811 (N/A) C-type:1007 (Date/Time) Precision:29 Scale:9
[9]:Names(JMSRedelivered->JMSRedelivered)
 Native-type:300801 (N/A) C-type:1014 (UniChar) Precision:10 Scale:0
[10]:Names(BodyBytes->BodyBytes)
 Native-type:300809 (N/A) C-type:1006 (Raw) Precision:1024 Scale:0
)


I've searched Informatica pages and found this: https://kb.informatica.com/solution/23/Pages/54/337078.aspx - I'm really not sure if this is related.  

I really hope you can help. 

Thanks again fro your time.

Regards
Mark

Arnaud Cogoluègnes

unread,
Aug 10, 2018, 10:22:36 AM8/10/18
to rabbitm...@googlegroups.com
The error occurs in the JMSReaderPartitionDriver class, I don't have
access to it so I can't know what it is expected. Can you get in touch
with the Informatica support to know more about this Java component?

On Fri, Aug 10, 2018 at 11:24 AM, Mark Reedman

Mark Reedman

unread,
Aug 13, 2018, 9:38:16 AM8/13/18
to rabbitmq-users
Hello again,

I just wanted to confirm my configuration is correct before I embark on asking Informatica Support.

I have the following jar files:

rabbitmq-client.jar
amqp-client-4.6.0.jar
geronimo-jms_1.1_spec-1.1.1.jar
rabbitmq-jms-1.9.0.jar

I've setup RabbitMQ Server 3.7.7 and enabled the topic exchange plug-in which created the jms.durable.queues exchange.  I created a new queue MyQ which is bound to jms.durable.queues.

I created a new .bindings file which contains the following:

# -----.bindings file------------------------------------------------------------------
ConnectionFactory/ClassName=javax.jms.QueueConnectionFactory
ConnectionFactory/FactoryName=com.rabbitmq.jms.admin.RMQObjectFactory

ConnectionFactory/RefAddr/0/Content=jms/ConnectionFactory
ConnectionFactory/RefAddr/0/Type=name
ConnectionFactory/RefAddr/0/Encoding=String

ConnectionFactory/RefAddr/1/Content=javax.jms.ConnectionFactory
ConnectionFactory/RefAddr/1/Type=type
ConnectionFactory/RefAddr/1/Encoding=String

ConnectionFactory/RefAddr/2/Content=com.rabbitmq.jms.admin.RMQObjectFactory
ConnectionFactory/RefAddr/2/Type=factory
ConnectionFactory/RefAddr/2/Encoding=String

# Change this line accordingly if the broker is not at localhost
ConnectionFactory/RefAddr/3/Content=localhost
ConnectionFactory/RefAddr/3/Type=host
ConnectionFactory/RefAddr/3/Encoding=String

myQueue/ClassName=javax.jms.Queue
myQueue/FactoryName=com.rabbitmq.jms.admin.RMQObjectFactory

myQueue/RefAddr/0/Content=jms/Queue
myQueue/RefAddr/0/Type=name
myQueue/RefAddr/0/Encoding=String

myQueue/RefAddr/1/Content=javax.jms.Queue
myQueue/RefAddr/1/Type=type
myQueue/RefAddr/1/Encoding=String

myQueue/RefAddr/2/Content=com.rabbitmq.jms.admin.RMQObjectFactory
myQueue/RefAddr/2/Type=factory
myQueue/RefAddr/2/Encoding=String

myQueue/RefAddr/3/Content=MyQ
myQueue/RefAddr/3/Type=destinationName
myQueue/RefAddr/3/Encoding=String

myQueue/RefAddr/4/Content=true
myQueue/RefAddr/4/Type=amqp
myQueue/RefAddr/4/Encoding=String

myQueue/RefAddr/5/Content=MyQ
myQueue/RefAddr/5/Type=amqpQueueName
myQueue/RefAddr/5/Encoding=String

myQueue/RefAddr/6/Content=jms.durable.queues
myQueue/RefAddr/6/Type=amqpExchangeName
myQueue/RefAddr/6/Encoding=String

myQueue/RefAddr/7/Content=MyQ
myQueue/RefAddr/7/Type=amqpRoutingKey
myQueue/RefAddr/7/Encoding=String

I've created a JNDI connector which has the following settings:

JNDI Context Factory: com.sun.jndi.fscontext.RefFSContextFactory
JNDI Provider: file:Z:/InfraSRV/jndi
JNDI UserName: guest
JNDI Password: guest

Also created a JMS connector with the following settings:

JMS Destination Type: QUEUE
JSM Connection Factory: ConnectionFactory
JMS Destination: myQueue

By Source Qualifier is of type BytesMessage.

Please let me know if these look correct or if I've made a mess of it.

Thanks in advance.

Regards
Mark

Arnaud Cogoluègnes

unread,
Aug 14, 2018, 4:54:35 AM8/14/18
to rabbitm...@googlegroups.com
You may want to add the SLF4J jar file, this a dependency of the JMS client [1]

You may also browse another thread that deals with Informatica [2].

I don't know what "By Source Qualifier is of type BytesMessage."
means, this must be Informatica-specific.

Apart from that, the configuration looks OK to me.

[1] http://mvnrepository.com/artifact/com.rabbitmq.jms/rabbitmq-jms/1.9.0
[2] https://groups.google.com/d/msg/rabbitmq-users/q-5CwEEM22Q/6dCxW7q_BAAJ

On Mon, Aug 13, 2018 at 3:38 PM, Mark Reedman

Mark Reedman

unread,
Aug 14, 2018, 5:16:26 AM8/14/18
to rabbitmq-users
Good Morning Arnaud,

Sorry, I didn’t mention the SLF4J.  I do have that one too.

JMS uses 4 message types of which only two are supported TextMessage and BytesMessage.  So the structure of the JMS message that use BytesMessag (the last one being the only one that changes based on the JMTType) should be as follows:

JMSCorrelationID
JMSMessageID
JMSType
JMSDestination
JMSReplyTo
JMSDeliveryMode
JMSExpiration
JMSPriority
JMSTimeStamp
JMSRedelivered
BodyBytes (Raw/Binary)

I’ve also looked at the thread you mentioned and even contacted the developers directly but they have unfortunately abandoned using RabbitMQ.

Regards
Mark
Reply all
Reply to author
Forward
0 new messages