Migration from Reactor v1.x to v2.x problem - consumer doesn't run after EventBus.notify

13 views
Skip to first unread message

Ivo Šmajstrla

unread,
Feb 21, 2017, 9:38:10 AM2/21/17
to reactor-framework
Hi,

I'm trying to upgrade from Reactor v1.x. At first I wanted to go directly to 3.0.4 but for various reasons I decided to upgrade to 2.x (2.0.8 specifically, and 2.0.7 of reactor-spring) only for now. Unfortunately I still have some serious problems with it.
Basic idea of app - MessageListener consumes messages, extracts core data from them and according to type of data it passes them to number of topics. There is number of processors (@Consumer) registered to topics which makes further processing.

Already found a supplement to Reactor etc. But now I have a problem with consuming messages sent through EventBus.notify. Below is simplified code both original (_v1) and new (_v2).
The main problem I have is that while on _v1 it was possible to not have consumers explicitly bound to listener or wrapping app, now it seems it is NOT possible(?). Without marked lines, after invoking EventBus.notify, nothing happens and consumer doesn't get notified and doesn't run. Which is really bad. I'm looking for a way how to make it working same way as in _v1 without having to have autowired and explicitly registered a zillion of consumers. Sometime it is not even a possible to do it.
So can you please give me a hint how to achieve this?

Thanks!

BTW: A few times mentioned link to documentation (http://projectreactor.io/docs/reference/) currently leads to nowhere - effectively 404.

*************************
Reactor 1.x code
*************************
public class App_v1 {
   
    @Autowired
    private Publisher_v1 publisher;
   
    public static void main(String... args) {
        publisher.start(); //simplified
    }
}

@Component
public class Publisher_v1 implements MessageListener {

    @Autowired
    private Reactor reactor;

    public void onMessage(Message message) {
        MarketData market = extract(message);//simplification
        reactor.notify("process.market", Event.wrap(market));
    }
}

@Component
@Consumer
public class MarketProcessor_v1 {

    @Selector(value = "process.market", reactor = "@rootReactor")
    public void process(Event<MarketData> reactEvent) {...}
}

*************************
Reactor 2.x code
*************************
public class App_v2 {
   
    @Autowired
    private Publisher_v2 publisher;
   
    @Autowired
    private MarketProcessor_v2 marketProcessor;
//How to get rid of this??
   
    @Autowired
    private EventBus eventBus;
   
    public static void main(String... args) {
        publisher.start(); //simplified

        eventBus.on($("process.market"), marketProcessor);//How to get rid of this?? Consumer doesn't receive the event without this line :(
    }
}

@Service
public class Publisher_v2 implements MessageListener {

    @Autowired
    private EventBus eventBus;

    public void onMessage(Message message) {
        MarketData market = extract(message);//simplification
        eventBus.notify("process.market", Event.wrap(market));
    }
}

@Service
public class MarketProcessor_v2 implements Consumer<Event<MarketData>> {

    @Selector(value = "process.market", eventBus = "@rootReactor") //tried also eventBus="eventBus" and eventBus="@eventBus" but neither helps
    public void accept(Event<MarketData> reactEvent) {...}
}


Reply all
Reply to author
Forward
0 new messages