Hi,
I, like Prem, also need to audit events that are processed by listeners and track the commands that are dispatched as a result. To do this, after looking at the CorrelationDataHolder, I’ve decided to set thread local data at the start of the listener thread that processes the event. My custom command gateway will grab the thread local data and add it as command metadata. My main concern right now is setting and clearing thread local data in the right place. I have the following code so far:
class TrackingEventProcessor extends EventProcessor {
public TrackingEventProcessor(Executor executor, ShutdownCallback shutDownCallback, ErrorHandler errorHandler,
UnitOfWorkFactory unitOfWorkFactory, Set<EventListener> eventListeners,
MultiplexingEventProcessingMonitor eventProcessingMonitor) {
super(executor, shutDownCallback, errorHandler, unitOfWorkFactory, eventListeners, eventProcessingMonitor);
}
@Override
protected ProcessingResult doHandle(EventMessage<?> event) {
try {
//BEST PLACE TO SET THREAD LOCAL?? (will probably end up using CorrelationDataHolder instead)
AuditingContext.setFlowId((String) event.getMetaData().get(AuditingContext.FLOW_ID));
AuditingContext.setOriginatorId(event.getIdentifier());
return super.doHandle(event);
} finally {
//BEST PLACE TO CLEAR THREAD LOCAL??
AuditingContext.clear();
}
}
}
class TrackingAsynchronousCluster extends AsynchronousCluster {
private Executor executor;
private UnitOfWorkFactory unitOfWorkFactory;
private ErrorHandler errorHandler;
public TrackingAsynchronousCluster(String name, Executor executor, UnitOfWorkFactory unitOfWorkFactory,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy, ErrorHandler errorHandler) {
super(name, executor, unitOfWorkFactory, sequencingPolicy, errorHandler);
this.executor = executor;
this.unitOfWorkFactory = unitOfWorkFactory;
this.errorHandler = errorHandler;
}
@Override
protected EventProcessor newProcessingScheduler(
EventProcessor.ShutdownCallback shutDownCallback, Set<EventListener> eventListeners,
MultiplexingEventProcessingMonitor eventProcessingMonitor) {
return new TrackingEventProcessor(executor,
shutDownCallback,
errorHandler,
unitOfWorkFactory,
eventListeners,
eventProcessingMonitor);
}
}
Does this look right? Is there anything I’m missing here?