Who can help me ,Kafka could not receive messages.
--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
axon:eventhandling:processors:"[WalletProcessor]":source: kafkaMessageSourcemode: TRACKINGkafka:# client-id: myproducerdefault-topic: testqueueproducer:retries: 5bootstrap-servers:transaction-id-prefix: clxtrxconsumer:group-id: axon1bootstrap-servers:spring:datasource:url: jdbc:mysql://localhost:3306/producerdemo?createDatabaseIfNotExist=true&useSSL=falseusername: rootpassword: rootjpa:database-platform: org.hibernate.dialect.MySQL5Dialecthibernate.ddl-auto: updateserver:port: 9000
/* * © 2018 CREALOGIX. All rights reserved. */package com.axonkafka;
import org.axonframework.boot.autoconfig.AxonAutoConfiguration;import org.axonframework.kafka.eventhandling.DefaultKafkaMessageConverter;import org.axonframework.kafka.eventhandling.KafkaMessageConverter;import org.axonframework.serialization.Serializer;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.boot.autoconfigure.AutoConfigureAfter;import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;
/*This configuration is only needed because of the issue */@Configuration@AutoConfigureAfter(AxonAutoConfiguration.class)public class MyConfig {
@ConditionalOnMissingBean @Bean public KafkaMessageConverter<String, byte[]> kafkaMessageConverter( @Qualifier("eventSerializer") Serializer eventSerializer) { return new DefaultKafkaMessageConverter(eventSerializer); }}
package com.axonkafka.saga;
import org.axonframework.commandhandling.gateway.CommandGateway;import org.axonframework.eventhandling.saga.EndSaga;import org.axonframework.eventhandling.saga.SagaEventHandler;import org.axonframework.eventhandling.saga.SagaLifecycle;import org.axonframework.eventhandling.saga.StartSaga;import org.axonframework.spring.stereotype.Saga;import org.springframework.beans.factory.annotation.Autowired;
import com.axonkafka.command.CreateWalletCommand;import com.axonkafka.events.UserCreatedEvent;import com.axonkafka.events.WalletCreatedEvent;
@Sagapublic class UserSaga {
@Autowired private transient CommandGateway commandGateway;
private String userId; private String name; private String walletId;
public UserSaga() {
}
@StartSaga @SagaEventHandler(associationProperty = "userId") public void on(UserCreatedEvent event) { System.out.println("Saga start"); this.userId = event.getUserId(); this.name = event.getName(); commandGateway.send(new CreateWalletCommand(event.getUserId(), event.getName())); }
@EndSaga @SagaEventHandler(associationProperty = "userId") public void on(WalletCreatedEvent event) { System.out.println("Saga end"); this.userId = event.getUserId(); this.walletId = event.getWalletId(); //SagaLifecycle.end(); }
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public String getWalletId() { return walletId; }
public void setWalletId(String walletId) { this.walletId = walletId; }}