Multiple consumers in one Bootique application

13 views
Skip to first unread message

Faezeh Eshragh

unread,
Oct 18, 2021, 10:25:05 AM10/18/21
to Bootique User Group
Hi,

I need to create multiple consumers (running in parallel) in one Bootique application. 

Here is what I did so far:

1- the sample consumer code provided by Bootique, uses Bootique command. I could run it for one consumer and it works well. However, when it comes to two or more consumers, the commands cannot be run in parallel. I tried a few suggestions from your documentation and it didn’t work for me. Not sure how to get them run in parallel.

2- I have also tried to implement the idea using some suggestions in your GitHub page here:https://github.com/bootique/bootique-kafka#streams-configuration
It seems that if we use the new version of Bootique (3.0.M1-SNAPSHOT) we can use KafkaPollingTracker that will start consumer in the background. The problem with this approach is that 3.0.M1-SNAPSHOT is not available in mvn repository. So I cloned the original code ( for Bootique-bon and Bootique-Kafka) from Github and tried to build it. It went well with Bootique-Kafka and I could refer to the locally generated jar file from pom.xml of my project. However, for Bootique-bom, I couldn’t build a jar file. All I get is a pom.xml file that cannot be used directly as it has some references to some properties. Next, I tried to copy some libs from pom.xml of Bootique-bom code into pom.xml of my project. With a few tries (in adding and removing stuff) it finally worked. So I could run the code with the newest Bootique-kafka library. The rest of the code works fine (e.g. the producer). But still the consumers do not work as expected.

Can you please help me understand how  to code the consumers (the rest of the app) so that consumers can run in parallel?

Thanks,
Faezeh

Andrus Adamchik

unread,
Oct 20, 2021, 1:40:17 AM10/20/21
to Bootique User Group
Hi Faezeh,

A couple of points:

* Indeed you should have more success with 3.0 than 2.0. The new API is more aligned with Kafka philosophy.

* While we don't yet have an official release of  3.0.M1 (we should have one soon), we do post SNAPSHOT jar artifacts to a Maven repo at https://oss.sonatype.org/content/repositories/snapshots . So you can add this repo to the list of repositories in your pom.xml and get instant access to all the snapshots.

Could you please share a code snippet of how you consume from Kafka with 3.0 API and where it doesn't work?

Thanks,
Andrus



--
You received this message because you are subscribed to the Google Groups "Bootique User Group" group.
To unsubscribe from this group and stop receiving emails from it, send an email to bootique-use...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/bootique-user/728251d1-cfe9-4ec9-ae11-78c6e18e53cbn%40googlegroups.com.

Faezeh Eshragh

unread,
Oct 20, 2021, 3:32:16 PM10/20/21
to Andrus Adamchik, Bootique User Group
Hi Andrus,

Regarding your maven repository, for bootique-bom, should I use the pom file form the repo? cause there is no jar file and the pom file from Bootique-bom code in github didn't work for me since it has some references to some properties. Just wondering how to get and use Bootique-bom 3.0.M1.

About the code, here is the consumer code:

public class TypeBConsumer {

@Inject
KafkaConsumerFactory factory;

public void runConsumer() {
try {
// this will start consumer in the background
KafkaPollingTracker poll = factory
// configure consumer
.charValueConsumer()
.cluster("localhost:9092")
.group("test1")
.topics("my-other-topic")
// start consumption in the background
.consume((c, data) -> {
data.forEach(
r -> System.out.println("consumerB" + r.topic() + "_" + r.offset() + ": " + r.value()));
}, Duration.ofSeconds(10));
// Close when we need to stop consumption. With no explicit Bootique will
// close the consumer before the app exit
// poll.close();
} catch (Exception e) {
System.out.println(e.getStackTrace());
}
}
}
I was not sure how I should run the consumers. I just added them as a module. here is the code:

public static void main(String[] args) {

BQModule typeAConsumerModule = binder ->
JerseyModule.extend(binder).addResource(TypeAConsumer.class);

BQModule typeBConsumerModule = binder ->
JerseyModule.extend(binder).addResource(TypeAConsumer.class);

BQModule helloResource = binder ->
JerseyModule.extend(binder).addResource(HelloResource.class);


Bootique
.app(args)
.module(helloResource)
.module(typeAConsumerModule)
.module(typeBConsumerModule)
.autoLoadModules()
.exec()
.exit();

}

I also tried to define the consumers as a job and run them in parallel:

public class TypeAConsumer extends BaseJob {

@Inject
KafkaConsumerFactory factory;

public TypeAConsumer() {
super(JobMetadata.build(TypeAConsumer.class));
}
@Override
public JobResult run(Map<String, Object> params) {
try {
// this will start consumer in the background
KafkaPollingTracker poll = factory
// configure consumer
.charValueConsumer()
.cluster("localhost:9092")
.group("test1")
.topics("my-topic")
// start consumption in the background
.consume((c, data) -> {
data.forEach(
r -> System.out.println("consumerA" + r.topic() + "_" + r.offset() + ": " + r.value()));
}, Duration.ofSeconds(10));
// Close when we need to stop consumption. With no explicit Bootique will
// close the consumer before the app exit
// poll.close();
} catch (Exception e) {
System.out.println(e.getStackTrace());
}
return JobResult.success(getMetadata());
}
and here is how I tried to run it:
@Override
public void configure(Binder binder) {
// contribute available jobs to the JobModule
JobModule.extend(binder)
.addJob(TypeAConsumer.class)
// .addJob(TypeBConsumer.class)
;
}
Would be great if you can help me figure out how to run these conumers.

Thanks,
Faezeh

Andrus Adamchik

unread,
Oct 21, 2021, 2:37:13 AM10/21/21
to Bootique User Group
Hi Faezeh,

You'd include the BOM the same way you do for a release BOM. The version must be "3.0.M1-SNAPSHOT" :

<dependency>
      <groupId>io.bootique.bom</groupId>
      <artifactId>bootique-bom</artifactId>
      <version>3.0.M1-SNAPSHOT</version>
      <type>pom</type>
      <scope>import</scope>
</dependency>

Now the consumers... Adding a consumer (or a pool of consumers) as a Job may actually work depending on the circumstances. But if all your app does is consume from Kafka, I recommend writing a custom "command".

1. Command class

public class StartConsumersCommand implements Command {

   // inject stuff you need in the command
   // ...

   // implement run method to start consumers in the background

   public CommandOutcome run(Cli cli) {
 
       // start one or more consumers
       // ....

       // tell Bootique that stuff is running in the backgound
       return CommandOutcome.succeededAndForkedToBackground();
   }
}


2. Register the command with Bootique in your app module:

public void configure(Binder binder) {
BQCoreModule.extend(binder)
.addCommand(StartConsumersCommand.class);

}

3. Use command name when starting the app:

java -jar myapp.jar --start-consumers


Andrus

Faezeh Eshragh

unread,
Oct 26, 2021, 10:33:23 AM10/26/21
to Andrus Adamchik, Bootique User Group
Thanks for the reply Andrus. I will give this a try and get back to you.

Faezeh
 

You received this message because you are subscribed to a topic in the Google Groups "Bootique User Group" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/bootique-user/EVe9tGYwQc4/unsubscribe.
To unsubscribe from this group and all its topics, send an email to bootique-use...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/bootique-user/DF67146F-3747-4B9F-B44B-00D124A5BA82%40objectstyle.com.
Reply all
Reply to author
Forward
0 new messages