Who has a successful case of kafka

310 views
Skip to first unread message

zhuang wu

unread,
Aug 24, 2018, 6:06:14 AM8/24/18
to Axon Framework Users
Who can help me ,Kafka could not receive messages.

Marinko Babic

unread,
Aug 24, 2018, 1:51:50 PM8/24/18
to axonfr...@googlegroups.com
Does the following example work on your machine 

On Fri, Aug 24, 2018, 12:06 zhuang wu <wuz...@gmail.com> wrote:
Who can help me ,Kafka could not receive messages.

--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Pratik Panchal

unread,
Aug 27, 2018, 7:25:43 AM8/27/18
to Axon Framework Users
I had implemented this example. it works. but when with same example I am trying to implement saga then SagaEventHandler is not getting the event that was published by consumer 

Following is my application.yml
 
axon:
  eventhandling:
    processors:
      "[WalletProcessor]":
        source: kafkaMessageSource
        mode: TRACKING
  kafka:
#    client-id: myproducer
    default-topic: testqueue
    producer:
      retries: 5
      bootstrap-servers:
         - 127.0.0.1:9092
      transaction-id-prefix: clxtrx      
    consumer:           
      group-id: axon1
      bootstrap-servers:
               - 127.0.0.1:9092      
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/producerdemo?createDatabaseIfNotExist=true&useSSL=false
    username: root
    password: root
  jpa:
    database-platform: org.hibernate.dialect.MySQL5Dialect
    hibernate.ddl-auto: update  
server:
  port: 9000    

 myConfig file
/*
 * © 2018 CREALOGIX. All rights reserved.
 */
package com.axonkafka;

import org.axonframework.boot.autoconfig.AxonAutoConfiguration;
import org.axonframework.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.serialization.Serializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/*
This configuration is only needed because of the issue
 */
@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class MyConfig {

@ConditionalOnMissingBean
@Bean
public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(
@Qualifier("eventSerializer") Serializer eventSerializer) {
return new DefaultKafkaMessageConverter(eventSerializer);
}
}


UserSaga Class

package com.axonkafka.saga;

import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.eventhandling.saga.EndSaga;
import org.axonframework.eventhandling.saga.SagaEventHandler;
import org.axonframework.eventhandling.saga.SagaLifecycle;
import org.axonframework.eventhandling.saga.StartSaga;
import org.axonframework.spring.stereotype.Saga;
import org.springframework.beans.factory.annotation.Autowired;

import com.axonkafka.command.CreateWalletCommand;
import com.axonkafka.events.UserCreatedEvent;
import com.axonkafka.events.WalletCreatedEvent;

@Saga
public class UserSaga {

@Autowired
private transient CommandGateway commandGateway;

private String userId;
private String name;
private String walletId;

public UserSaga() {

}

@StartSaga
@SagaEventHandler(associationProperty = "userId")
public void on(UserCreatedEvent event) {
System.out.println("Saga start");
this.userId = event.getUserId();
this.name = event.getName();
commandGateway.send(new CreateWalletCommand(event.getUserId(), event.getName()));
}

@EndSaga
@SagaEventHandler(associationProperty = "userId")
public void on(WalletCreatedEvent event) {
System.out.println("Saga end");
this.userId = event.getUserId();
this.walletId = event.getWalletId();
//SagaLifecycle.end();
}

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getWalletId() {
return walletId;
}

public void setWalletId(String walletId) {
this.walletId = walletId;
}
}





Allard Buijze

unread,
Sep 4, 2018, 7:09:19 AM9/4/18
to axonfr...@googlegroups.com
Hi,

you have configured a Tracking Processor to consume events from Kafka, but the Saga is not part of that processor, currently. This configuration will give you the default SubscribingProcessor for the Saga.

You can create a SagaConfiguration bean (called userSagaConfiguration, or specify the bean name in your @Saga annotation) and configure it to track from the kafka message source. That should have the Saga read from Kafka as well.

Hope this helps.
Cheers,

Allard
--
Allard Buijze
CTO

E: allard...@axoniq.io
T: +31 6 34 73 99 89
Reply all
Reply to author
Forward
0 new messages