Kafka Connect connectors for IBM MQ

1,553 views
Skip to first unread message

andrew_s...@uk.ibm.com

unread,
Sep 14, 2017, 6:48:30 AM9/14/17
to Confluent Platform

I've written a couple of connectors using the Kafka Connect framework that work with IBM MQ.

Kafka Connect MQ Sink: 
https://github.com/ibm-messaging/kafka-connect-mq-sink
Kafka Connect MQ Source: 
https://github.com/ibm-messaging/kafka-connect-mq-source

Please could they be added to the "additional connectors" list here: https://www.confluent.io/product/connectors


They're pretty basic right now, but I'd like to enhance them sufficiently in the future to become certified.


Thanks,
Andrew Schofield
Event Services, IBM Watson and Cloud Platform

Robin Moffatt

unread,
Sep 14, 2017, 6:53:32 AM9/14/17
to confluent...@googlegroups.com
Thanks Andrew, we'll get them added. 



--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/9adfab35-33d5-4dcb-a1d2-7569bb1837ba%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

tomonaga1377

unread,
Nov 2, 2017, 11:33:54 AM11/2/17
to Confluent Platform
Hi Andrew,

Thank you for the work that you did for the IBM MQ - Kafka Connector. It is something that I am looking into...how to get existing infrastructure to connect with Kafka. We would like to connect our IBM MQ Queues to Kafka, so the data can be used by other applications.

Once the connector is built, I'd like to test it out with one of our Development MQ Queues and Kafka development sandbox. 

How do I go about doing this? Do you have any step-by-step documentation on how you set up your testing on the connector?


Thank alot,

Jeff Nagata

andrew_s...@uk.ibm.com

unread,
Nov 2, 2017, 2:11:33 PM11/2/17
to Confluent Platform
Hi Jeff,
Glad to here that the MQ connectors are of interest.

I gave instructions to build the connector but not really to run it. Actually, to configure any connector with Kafka Connect, I think you do need to invest quite a bit of time understanding how it all works so you can get the right configuration, particularly if you want data formats to be handled properly.

You'll need:
  • A queue manager with a server-connection channel, running listener and appropriate authentication
  • A Kafka cluster
  • The Kafka Connect runtime - either standalone or distributed

Then, you set the configuration for the worker to point to your Kafka cluster, the configuration for the connector to point to your queue manager (with the credentials if necessary) and then start up the connector.

If you need more detail, what form would work? Blog post or something like that? Have you used Kafka Connect before?

Thanks,
Andrew Schofield
Event Services, IBM Cloud

tomonaga1377

unread,
Nov 3, 2017, 11:13:21 AM11/3/17
to Confluent Platform
Hi Andrew,

Thank you for your response, much appreciated.

A blog post on how to start up and test the MQ to Kafka Connector would be great. I've never used Kafka Connect in a live environment before and Kafka is fairly new to our Dev environment.

Your instructions to build the connector was very detailed and easy to follow and I believe that I have the connector built. 

We are an IBM mainframe shop and I was tasked to get a connection from MQ Queues to Kafka set up. 

Thanks again,

Jeff Nagata 

andrew_s...@uk.ibm.com

unread,
Nov 7, 2017, 3:49:58 AM11/7/17
to Confluent Platform
Hi Jeff,
I'm happy to write some instructions for getting it up and running, but I'm not going to get around to it that quickly.

I'd be happy to get on the phone and help you get started if that would be help. If that's of interest, you can email me at andrew_schofield @ uk DOT ibm DOT com to arrange.

Thanks,
Andrew

andrew_s...@uk.ibm.com

unread,
Nov 14, 2017, 3:14:38 PM11/14/17
to Confluent Platform

tomonaga1377

unread,
Nov 24, 2017, 10:34:24 AM11/24/17
to Confluent Platform
Thanks Andrew. I'll keep you posted on my progress to test the MQ connector in our environment.

Much appreciated!

Jeff

tomonaga1377

unread,
Nov 30, 2017, 3:17:45 PM11/30/17
to Confluent Platform
Hi Andrew,

A couple of things that I have run in to.


I am having a problem with the CHCKCLNT(REQUIRED) entry as part of the set channel authentication rules. I believe it's complaining about the CHCKCLNT command.


Set Identity - AUTHTYPE(IDPWOS) - Syntax error AUTHTYPE(I     )  


REFRESH SECURITY TYPE(CONNAUTH) - Syntax error, TYPE(CONN     ).


All other commands runs successfully.  Just wondering what the impact is for not running these commands or if there is a workaround.


I've set up Kafka on my own machine and have tested simple feeds through it.


I have also created the SOURCE Connector.


Thanks


Jeff

andrew_s...@uk.ibm.com

unread,
Dec 1, 2017, 6:12:58 AM12/1/17
to Confluent Platform
Hi Jeff,
It sounds like you've got it working which is great news. I think the errors that you've described are probably to do with the version and/or platform of your MQ. The instructions are specifically for MQ v9 on Linux and I think they'd apply equally well to MQ v8 also. Versions earlier than v8 would not have CONNAUTH support and I expect there are some other slight differences between platforms and so on. I think the connectors technically would work with MQ v7 but probably not older. 

The instructions showed one way of setting up MQ to give a secure channel between MQ and Kafka, but there are certainly other ways that would work too. I suggest following whatever standards your organisation uses for securing MQ client connections and configure the connector to match.

Andrew Schofield
Event Services, IBM Cloud

tomonaga1377

unread,
Dec 1, 2017, 9:57:21 AM12/1/17
to Confluent Platform
Hi Andrew,

Thank you for the explanation. I'll just work around the differences.

Much appreciated,

Jeff

venkat sampath

unread,
Feb 24, 2018, 10:19:24 AM2/24/18
to Confluent Platform
Hi andrew,

 How do i use the MQ source jar file to connect to a kafka remotely and publish to topic by retrieving message from queue.
Our Kafka solution is running on On-Prem azure cloud. I need to connect from my MQ environment and then post the message to kafka topic.
I don't think i  will have access to the kafka root directory, I will have access only to endpoint url and topic name.


On Thursday, September 14, 2017 at 6:48:30 AM UTC-4, andrew_s...@uk.ibm.com wrote:

andrew_s...@uk.ibm.com

unread,
Feb 26, 2018, 4:31:11 AM2/26/18
to Confluent Platform
Hi,
I think there are two ways forward.

1) Download Apache Kafka onto an on-prem server so that you will use to host the Kafka Connect workers. Then you'll be able to run the MQ connector on that server, making connections to your queue manager and your Kafka cluster running in Azure. I personally have run the MQ connector like this on my laptop connecting to a cloud-hosted Kafka cluster.
2) If the Azure Kafka installation is able to host Kafka Connect connectors, you could perhaps build the MQ connector's JAR and install it into your Azure environment. I'm not sure whether this approach is possible or whether your Azure installation is too locked down to make it viable.

Option #1 is probably the easiest to get working.

Thanks,
Andrew Schofield
Event Services, IBM Cloud

Sai Prasad

unread,
Apr 20, 2018, 5:24:21 PM4/20/18
to Confluent Platform
Hi Andrew,

Thank you so much for the connectors. I was able to use both Source and Sink MQ connectors successfully. I have a requirement to modify MQ Header in during Sink. I tried to add lines below in JWSWriter.java just to test the JMS API. 


    public void send(SinkRecord r) throws ConnectException, RetriableException {
        connectInternal();

        try {
            Message m = builder.fromSinkRecord(jmsCtxt, r);

            m.setStringProperty("StrucId", "MD");
            m.setJMSExpiration(-1);
            m.setJMSType("8");
            m.setStringProperty("Format", "MQHRF2");
            
            m.setStringProperty("StrucId", "RFH");
            m.setLongProperty("Encoding", 546L);
            m.setLongProperty("version", 2L);
            m.setStringProperty("Format", "MQSTR");
            m.setStringProperty("NameValueData", "AlertEvent.TAP");
            inflight = true;
            jmsProd.send(queue, m);
        }
        catch (JMSRuntimeException|JMSException jmse) {
            log.debug("JMS exception {}", jmse);
            throw handleException(jmse);
        }
    }

I am not sure if I doing it correctly. I am looking specifically to add MQHRF2 -> NameValueData. Thanking you.

Sai Prasad

andrew_s...@uk.ibm.com

unread,
Apr 26, 2018, 9:56:08 AM4/26/18
to Confluent Platform
Hi,
The connector uses a regular MQ JMS client so you should be able to use the usual MQ JMS techniques to do what you want. Every message you send to MQ using JMS will automatically have an MQMD so you will not need to build that yourself. For example, if you set the JMSExpiration header field, it will be reflected in the MQMD.Expiry in the actual MQ message. Similarly, if you use a JMS TextMessage that is reflected in the formatting of the actual MQ message - typically it will contain an MQRFH2 that will contain an embedded Format of "MQSTR".

I suggest you read the MQ documentation quite closely here because it's pretty complicated: https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.dev.doc/q031990_.htm

Hope this helps,
Andrew

conta...@gmail.com

unread,
Jun 29, 2018, 4:40:24 PM6/29/18
to Confluent Platform
Hi Andrew

I tried to use your connector and getting an error with the following message "ERROR Couldn't instantiate task test1-ibmmq-0  because it has an invalid task configuration" 

{
        "name": "test1-ibmmq",
        "config": {
                "connector.class": "com.ibm.mq.kafkaconnect.MQSourceConnector",
                "tasks.max": 1,
                "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
                "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
                "mq.record.builder":"com.ibm.mq.kafkaconnect.builders.DefaultRecordBuilder",
                "topic": "accounts-test",
                "mq.connection.name.list": "test.test.com(1402)",
                "mq.queue.manager": "manager",
                "mq.channel.name":"chanel",
                "mq.queue":"MyTopic",
                "mq.user.name":"xxxxxxx",
                "mq.password":"xxxxxx"
        }
}

do you see anything wrong in configuration? Also, how the connector differentiate between  Queue and Topic?

thank you


On Thursday, September 14, 2017 at 6:48:30 AM UTC-4, andrew_s...@uk.ibm.com wrote:

andrew_s...@uk.ibm.com

unread,
Jul 1, 2018, 3:50:19 PM7/1/18
to Confluent Platform
Hi,
The only thing that I can think of is that the name is at a different level than the rest of the configuration. Certainly when you're using the standalone worker, that's not the case. If you're using the distributed worker, you can use the REST API to validate the configuration and that's sometimes helpful to diagnose the problem. In short, your configuration looks pretty good to me so I expect it's almost right.

You also asked about differentiating between queue and topic. The connector only supports MQ queues, not topics. The topic config refers to a Kafka topic. If you want data from an MQ topic to be handled by the connector, create an administrative subscription (DEFINE SUB in MQSC for example) using a regular local queue. Then set up the connector to read from that queue.

Hope this helps,
Andrew Schofield
Reply all
Reply to author
Forward
0 new messages