Quartz integration

已查看 52 次
跳至第一个未读帖子

hbf

未读,
2009年8月17日 14:06:232009/8/17
收件人 warp-core
Hi everybody,

My application uses OpenSymphony's Quartz and it seems that Quartz
hooks into Spring's transaction management – using warp-persist, I do
not get transactional behaviour when I schedule jobs.

Is there any easy way to get support for Quartz?

Thanks,
Kaspar

Andreas Petersson

未读,
2009年8月17日 16:25:292009/8/17
收件人 warp...@googlegroups.com

> Is there any easy way to get support for Quartz?
>

yes.

its basically just three very small classes:
1) create a custom Provider<Scheduler> and bind it in the module
2) create an implementation of GuiceJobFactory (GuiceJobFactory)
3) bind Scheduler.class to SchedulerProvider
4) somewhere in your init code inject a scheduler and start it up. -
don't forget to shut it down correctly, too.


-----------------

bind(Scheduler.class).toProvider(SchedulerProvider.class).asEagerSingleton();

--------------
public class SchedulerProvider implements Provider<Scheduler> {

private final GuiceJobFactory guiceFactory;

@Inject
public SchedulerProvider(GuiceJobFactory factory) {
this.guiceFactory = factory;
}

public Scheduler get() {
try {
Scheduler scheduler = new StdSchedulerFactory().getScheduler();
scheduler.setJobFactory(guiceFactory);
return scheduler;
} catch (SchedulerException ex) {
throw new InvalidConfigurationException("unable to
instantiate Quartz Scheduler.", ex);
}
}
}
--------------

public class GuiceJobFactory implements JobFactory {

private final Injector injector;

@Inject
public GuiceJobFactory(Injector inj) {
injector = inj;
}

public Job newJob(TriggerFiredBundle triggerFiredBundle) throws
SchedulerException {
try {
return injector.getInstance((Class<Job>)
triggerFiredBundle.getJobDetail().getJobClass()); // yay for synergies.
} catch (Exception ex) {
throw new SchedulerException(ex);
}
}
}
-------------------------
//startup the scheduler
public void contextInitialized(ServletContextEvent
servletContextEvent) {
final Scheduler scheduler = injector.getInstance(Scheduler.class);
scheduler.start();
}
//shutdown
public void contextDestroyed(ServletContextEvent servletContextEvent) {
getInjector().getInstance(Scheduler.class).shutdown();
//thats ok, scheduler is singleton..
}


Kaspar Fischer (Gmail)

未读,
2009年8月18日 01:50:442009/8/18
收件人 warp...@googlegroups.com
Andreas, thanks for your quick reply.

On 17.08.2009, at 22:25, Andreas Petersson wrote:

>> Is there any easy way to get support for Quartz?
>
> yes.
>
> its basically just three very small classes:
> 1) create a custom Provider<Scheduler> and bind it in the module
> 2) create an implementation of GuiceJobFactory (GuiceJobFactory)
> 3) bind Scheduler.class to SchedulerProvider
> 4) somewhere in your init code inject a scheduler and start it up. -
> don't forget to shut it down correctly, too.

I could get Quartz to "run". However, all calls I make to Quartz (like
triggering a job, etc.) run in "auto commit" mode. So if you trigger a
job in a @Tansactional-annotated method and after the triggering call
to Quartz, a rollback occurs, the job WILL run all the same. Using
Spring transactions, the job will only get scheduled on successful
completion of the @Transactional method.

I am not a Spring or Quartz expert, so I do not fully understand how
Quartz hooks into Spring transactions. But do you see the
transactional behaviour I describe above when you run your code?

Thanks,
Kaspar

--
P.S. I attach a little test I use myself to verify this:

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import java.util.List;

import org.hbf.testing.AbstractGuiceInjectedTest;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.StatefulJob;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.transaction.annotation.Transactional;
import org.testng.annotations.Test;

import com.google.inject.Inject;
import com.google.inject.Scopes;

public class SimpleQuartzTransactionTest extends AbstractGuiceInjectedTest
{
@BeforeClass
protected void configureDependencies() throws Exception
{
// FILL IN WHATEVER IS NEEDED TO GET HIBERNATE RUNNING

bind(TestService.class).to(TestServiceImpl.class).in(Scopes.SINGLETON);
Scheduler schedulerSingleton = new StdSchedulerFactory().getScheduler();
bind(Scheduler.class).toInstance(schedulerSingleton);
}

@Inject
private TestService testService;
@Inject
private Scheduler scheduler;

@Test
public void startJobAndRollback() throws InterruptedException,
SchedulerException
{
scheduler.start();

// Create a task and schedule it, but provoke a rollback afterwards
JobDetail jobDetail = new JobDetail("job0",
Scheduler.DEFAULT_GROUP, LongJob.class, false, true, true);
try
{
System.err.println("Calling startJob()...");
testService.startJob(jobDetail, true);
assertTrue(false); // We should never get here.
}
catch (RuntimeException e)
{
assertEquals("Oops, an exception! Job should not be started!",
e.getMessage());
}
finally
{
System.err.println("Returned from startJob()...");
}

// And now start a job with a commit
jobDetail = new JobDetail("job1", Scheduler.DEFAULT_GROUP,
LongJob.class, false, true, true);
testService.startJob(jobDetail, false);

// Wait for any tasks to start
for (int numberOfTries = 0; numberOfTries < 60 &&
scheduler.getCurrentlyExecutingJobs().size() == 0; ++numberOfTries)
{
System.err.println("Waited for " + numberOfTries + "s for any
jobs to start; will wait at most for 60s...");
Thread.sleep(1000);
}

// Make sure exactly one task got started
List<JobExecutionContext> currentlyExecutingJobs =
scheduler.getCurrentlyExecutingJobs();
System.err.println("Number of running jobs: " +
currentlyExecutingJobs.size());
assertEquals(currentlyExecutingJobs.size(), 1);

// Wait some more
Thread.sleep(5000);

// Make sure exactly one task got started
currentlyExecutingJobs = scheduler.getCurrentlyExecutingJobs();
System.err.println("Number of running jobs: " +
currentlyExecutingJobs.size());
assertEquals(currentlyExecutingJobs.size(), 1);

// Remove job
testService.removeJob("job1", Scheduler.DEFAULT_GROUP);
}

public static class LongJob implements Job, StatefulJob
{
public void execute(JobExecutionContext arg0) throws JobExecutionException
{
for (int i = 0; i <= 10; ++i)
{
System.err.println("TestJob in phase " + i + " of 10.");
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
System.err.println("TestJob got interrupted - ignoring.");
Thread.currentThread().interrupt();
}
}
System.err.println("TestJob finished.");
}
}

public static interface TestService
{
public void startJob(JobDetail jobDetail, boolean fail) throws
SchedulerException;

public void removeJob(String string, String defaultGroup) throws
SchedulerException;
}

public static class TestServiceImpl implements TestService
{
private final Scheduler scheduler;

@Inject
public TestServiceImpl(Scheduler scheduler)
{
this.scheduler = scheduler;
}

@Transactional
public void startJob(JobDetail jobDetail, boolean fail) throws
SchedulerException
{
try
{
System.err.println("Entering startJob()...");
try
{
scheduler.addJob(jobDetail, false);
scheduler.triggerJobWithVolatileTrigger(jobDetail.getName(),
jobDetail.getGroup());
}
catch (Exception e)
{
throw new RuntimeException(
"job could not be scheduled; if you are using an
existing database make sure it is does not contain old jobs",
e);
}

// Wait a bit
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
throw new IllegalStateException("got interrupted");
}

// Fail if needed
if (fail)
{
throw new RuntimeException("Oops, an exception! Job should
not be started!");
}
}
finally
{
System.err.println("Leaving startJob()...");
}
}

@Transactional
public void removeJob(String jobName, String groupName) throws
SchedulerException
{
scheduler.deleteJob(jobName, groupName);
}
}
}

Dhanji R. Prasanna

未读,
2009年8月18日 02:02:292009/8/18
收件人 warp...@googlegroups.com
This sounds like a problem with your hibernate configuration. Can you post that here? Are you sure it is working outside quartz--in other threads without autocommit?

Dhanji.

Kaspar Fischer (Gmail)

未读,
2009年8月18日 04:16:152009/8/18
收件人 warp...@googlegroups.com
On Tue, Aug 18, 2009 at 8:02 AM, Dhanji R. Prasanna<dha...@gmail.com> wrote:
> This sounds like a problem with your hibernate configuration. Can you post
> that here? Are you sure it is working outside quartz--in other threads
> without autocommit?
> Dhanji.

Thanks for jumping in, Dhanji. You are right, something with my
configuration is not good. I attach a self-contained test together
with the Hibernate config I am using.

With the Hibernate config as shown below
(hibernate.current_session_context_class=managed) I get:

org.hibernate.HibernateException: No session currently bound to
execution context
at org.hibernate.context.ManagedSessionContext.currentSession(ManagedSessionContext.java:74)
at org.hibernate.impl.SessionFactoryImpl.getCurrentSession(SessionFactoryImpl.java:622)
at com.wideplay.warp.hibernate.SessionProvider.get(SessionProvider.java:40)
at com.wideplay.warp.hibernate.SessionProvider.get(SessionProvider.java:30)
at com.wideplay.warp.hibernate.HibernateLocalTxnInterceptor.invoke(HibernateLocalTxnInterceptor.java:50)
at com.google.inject.InterceptorStackCallback$InterceptedMethodInvocation.proceed(InterceptorStackCallback.java:64)
at com.google.inject.InterceptorStackCallback.intercept(InterceptorStackCallback.java:44)
at org.hbf.core.hibernate.module.warppersist.SimpleWarpPersistTest$TestService$$EnhancerByGuice$$bde9a035.add(<generated>)
at org.hbf.core.hibernate.module.warppersist.SimpleWarpPersistTest.populateTest(SimpleWarpPersistTest.java:108)
... Removed 22 stack frames

When I set hibernate.current_session_context_class=thread, the test succeeds.

As the warp-persist tutorials indicate that managed needs to be used,
I must be doing something wrong. Can you spot the problem?

Many thanks,
Kaspar

--
package org.hbf.core.hibernate.module.warppersist;

import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.util.Properties;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;

import org.hibernate.Session;
import org.hibernate.cfg.AnnotationConfiguration;
import org.hibernate.cfg.Configuration;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Provider;
import com.wideplay.warp.persist.PersistenceService;
import com.wideplay.warp.persist.Transactional;
import com.wideplay.warp.persist.UnitOfWork;

public class SimpleWarpPersistTest
{
public static Properties loadProperties(InputStream stream)
{
try
{
Properties p = new Properties();
p.load(stream);
return p;
}
catch (IOException e)
{
throw new IllegalStateException("Properties could not be loaded.", e);
}
}

@BeforeClass
public void setUp()
{
Injector injector = Guice.createInjector(new AbstractModule()
{
@Override
protected void configure()
{
install(PersistenceService.usingHibernate().across(UnitOfWork.REQUEST).buildModule());
AnnotationConfiguration configuration = new AnnotationConfiguration();
configuration.setProperties(loadProperties(SimpleWarpPersistTest.class.getResourceAsStream("testH2Hibernate.properties")));
configuration.addAnnotatedClass(SampleEntity.class);
bind(Configuration.class).toInstance(configuration);
}
});
injector.injectMembers(this);
}

@Entity(name = "SampleEntity")
public static class SampleEntity
{
@Id
@GeneratedValue
private int id;

public int getId()
{
return id;
}

public void setId(int id)
{
this.id = id;
}
}

public static class TestService
{
@Inject
private Provider<Session> session;

@Transactional
public void add(boolean fail)
{
SampleEntity e = new SampleEntity();
session.get().save(e);
if (fail) throw new IllegalStateException("Haha!");
}

@Transactional
public long count()
{
return (Long) session.get().createQuery("select count(*) from
SampleEntity").uniqueResult();
}
}

@Inject
private TestService testService;

@Test
public void populateTest() throws SQLException
{
try
{
testService.add(true);
Assert.assertTrue(false);
}
catch (IllegalStateException e)
{
Assert.assertEquals(e.getMessage(), "Haha!");
}
testService.add(false);
Assert.assertEquals(testService.count(), 1);
}
}

// Config file loaded in code:
hibernate.connection.driver_class=org.h2.Driver
hibernate.dialect=org.hibernate.dialect.H2Dialect

# Needed for warp-persist
hibernate.current_session_context_class=managed

# Each test has its own in-memory database, see
http://www.h2database.com/html/features.html#memory_only_databases
hibernate.connection.url=jdbc:h2:mem:

hibernate.connection.provider_class=org.hibernate.connection.C3P0ConnectionProvider

# See https://www.hibernate.org/214.html
hibernate.c3p0.acquire_increment=1
hibernate.c3p0.idle_test_period=100
hibernate.c3p0.max_size=100
hibernate.c3p0.max_statements=0
hibernate.c3p0.min_size=10
hibernate.c3p0.timeout=100

hibernate.cache.provider_class=net.sf.ehcache.hibernate.EhCacheProvider
hibernate.hbm2ddl.auto=create

hibernate.show_sql=true
hibernate.format_sql=true
hibernate.use_sql_comments=true

Dhanji R. Prasanna

未读,
2009年8月18日 04:20:172009/8/18
收件人 warp...@googlegroups.com
You will need to use WorkManager to begin() and end() a session before running any transactional methods. In an Http request, this is done automatically for you via the PersistenceFilter.
Then you can set the context as managed and it will continue to work.

Dhanji.

Kaspar Fischer (Gmail)

未读,
2009年8月18日 05:30:212009/8/18
收件人 warp...@googlegroups.com
Thanks.

The following makes the test code from the previous mail work:

@Inject
private WorkManager workManager;

@Test
public void populateTest() throws SQLException
{
try
{

workManager.beginWork();
testService.add(true);
workManager.endWork();


Assert.assertTrue(false);
}
catch (IllegalStateException e)
{
Assert.assertEquals(e.getMessage(), "Haha!");
}

workManager.beginWork();
testService.add(false);
workManager.endWork();
workManager.beginWork();
Assert.assertEquals(testService.count(), 1);
workManager.endWork();

Kaspar Fischer (Gmail)

未读,
2009年8月18日 06:14:532009/8/18
收件人 warp...@googlegroups.com
On Tue, Aug 18, 2009 at 7:50 AM, Kaspar Fischer
(Gmail)<kaspar....@gmail.com> wrote:

> I could get Quartz to "run". However, all calls I make to Quartz (like
> triggering a job, etc.) run in "auto commit" mode. So if you trigger a
> job in a @Tansactional-annotated method and after the triggering call
> to Quartz, a rollback occurs, the job WILL run all the same. Using
> Spring transactions, the job will only get scheduled on successful
> completion of the @Transactional method.

On the original question, the transactional behaviour: I have looked
at the Spring code, see Spring's LocalDataSourceJobStore:

http://www.koders.com/java/fid9827F58DA190D3EEF02A219E3762F493D99FED63.aspx?s=JobStoreCMT#L25

I have the feeling you need to configure the StdSchedulerFactory when
you instantiate it; something like

Scheduler scheduler = new StdSchedulerFactory().getScheduler();

is probably not enough to get the transactional behaviour described above.

I am not a Quartz guru and do not really understand the code in
Spring's LocalDataSourceJobStore which uses two data-sources.

Maybe somebody with a more solid Quartz background has an idea how to
fully integrate Quartz into warp-persist?

Kaspar

回复全部
回复作者
转发
0 个新帖子