I wondered, if someone could help me with the following issue?
I have a service which receives events via Sculptor's embedded event bus. Event reception happens in persistence aware, main thread initiated via service junit test. Next, the service executes worker thread to handle the event.
Event handling requires to call repository save() method from worker thread, but it results in following exception:
15:35:46.937 [pool-1-thread-2] ERROR o.t.m.r.EngineRepositoryImpl - [org.sculptor.framework.errorhandling.DatabaseAccessException] : org.springframework.dao.InvalidDataAccessApiUsageException: No transactional EntityManager available; nested exception is javax.persistence.TransactionRequiredException: No transactional EntityManager available
org.springframework.dao.InvalidDataAccessApiUsageException: No transactional EntityManager available; nested exception is javax.persistence.TransactionRequiredException: No transactional EntityManager available
Do you know how to solve this problem?
My code looks like this:
model.btdesign:
Service EngineService {
inject @EngineRepository
start();
}
jUnit:
public class EngineServiceTest extends AbstractDbUnitJpaTests implements UIEngineTestBase {
@Autowired
protected EngineService engineService;
@Test
public void testStart() throws Exception {
engineService.start();
}
}
service:
@Service("engineService")
public class EngineServiceImpl extends EngineServiceImplBase implements EventSubscriber {
private class Handler implements Runnable {
private final Object targetObj;
private final Event event;
public Handler(Object targetObj, Event event ) {
this.targetObj = targetObj;
this.event = event;
}
@Override
public void run() {
DynamicMethodDispatcher.dispatch(targetObj, event, "consume");
}
}
private static final BlockingQueue<Event> commandQueue = new LinkedBlockingQueue<Event>(100);
private ExecutorService executor;
@Autowired
@Qualifier("commandBus")
private EventBus commandBus;
public EngineServiceImpl() {
}
@Override
public void receive(Event event) {
commandQueue.offer(event);
}
public void consume(Event any) {
getEngineRepository().save(any.getOccurred());
}
@Override
public void start(ServiceContext ctx) {
commandBus.subscribe("topic", this);
executor = Executors.newFixedThreadPool(4);
try {
for (;;) {
Event event = commandQueue.poll(3, TimeUnit.SECONDS);
if (event != null) {
executor.execute(new Handler(this, event));
} else {
//TODO add discovery and management of time consuming operations
}
}
} catch (Exception ex) {
executor.shutdown();
}
}