MassTransit Quartz integration: MassTransitJobFactory

888 views
Skip to first unread message

Mike Bridge

unread,
Jun 13, 2014, 6:17:07 PM6/13/14
to masstrans...@googlegroups.com
I am able to queue MassTransit jobs with Quartz.Net, but I can't figure out how to set up MassTransitJobFactory to process them with my producer.  Currently the messages that are taken out of the database are all ending up in my producer's error queue---but without any error message that I can find.

Here's generally how I set this up:

            NameValueCollection properties = new NameValueCollection();

            properties["quartz.scheduler.instanceName"] = MASSTRANSIT_QUARTZ;
            properties["quartz.scheduler.instanceId"] = "AUTO";
            properties["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz";
            properties["quartz.jobStore.dataSource"] = "default";
            properties["quartz.jobStore.tablePrefix"] = "QRTZ_";
            properties["quartz.jobStore.lockHandler.type"] = "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz";
            properties["quartz.dataSource.default.connectionString"] = _myConnectionString
            properties["quartz.dataSource.default.provider"] = "SqlServer-20";

            ISchedulerFactory schedulerFactory = new StdSchedulerFactory(properties);
            var scheduler = schedulerFactory.GetScheduler();

            var bus = _busCreator.CreateBus(MASSTRANSIT_QUARTZ+"-Trolley");
            scheduler.JobFactory = new MassTransitJobFactory(bus);

            scheduler.Start();
            // ...

Is there anything else I need to do to get this to work?  It looks to me like the purpose of the MassTransitJobFactory is to take any message that comes from Quartz and place it back on the bus for reprocessing....  

And will I be able to use one MassTransitJobFactory to reprocess all my various queued messages, or do I need several (and therefore several Quartz schedulers)?

Thanks,

-Mike

Chris Patterson

unread,
Jun 19, 2014, 1:18:28 PM6/19/14
to masstrans...@googlegroups.com
The sample Quartz service configuration shown for the service should work for you.

You can also host this in your own service if you want, many instances can be hosted as Quartz supports clusters.

Sample App.Config settings:

The service (Topshelf-style) that is available out of the box (just not on the NuGet package):


The zip file should include the compiled service:



--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/b4bc43ed-951e-46d9-8cf3-28fffec73980%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Mike Bridge

unread,
Jun 23, 2014, 1:32:27 PM6/23/14
to masstrans...@googlegroups.com
I'm trying to run the part that takes the Quartz job from a MSSQL database and sends it as a MassTransit job with RabbitMQ in an existing Topshelf service, but so far with no luck.  I'm using essentially the same code as ScheduleMessageService, but the messages all end up in an *_error queue.  As I understand it, that means that the consumer isn't registered correctly (http://stackoverflow.com/questions/20514457/masstransit-with-rabbitmq-when-is-a-message-moved-to-the-error-queue).  But my consumer is correctly consuming messages that are not scheduled via quartz, so I'm confused.  

When the quartz jobs are deserialized by the MassTransitJobFactory, how do they make their way from there to the consumer?  Does the quartz service need to initialize a copy of that bus so that it can transform Quartz jobs into MassTransit messages (i.e. does it know how to Produce messages, or does it just queue deserialized messages to RabbitMQ directly)?    I wrapped the MassTransitJobFactory in a decorator to see what it was generating, and my job looks like it was deserialized from quartz correctly.  When I take my message out of the error queue in rabbitmq it also seems to be intact.

I can't find any documentation or blog entries on this anywhere---just a single post on stackoverflow.

Pavlo Tkhir

unread,
Dec 5, 2014, 11:17:39 AM12/5/14
to masstrans...@googlegroups.com
Hi Mike!

Have you managed to sovle this issue? I had the same issue except i was not using MSSQL db. So I found that I was able to handle the actual message published by Quartz in the same service where it was emited (publisher). I have downloaded the source code for ScheduleMessageConsumer and ScheduledMessageJob, made couple of modifications and then it started to work as expected.

So here are lines I have modified:



public class ScheduleMessageConsumer<T> : Consumes<ScheduleMessage<T>>.Context where T : class

public class ScheduledMessageJob<T> : IJob



and inside ScheduledMessageJob.Execute method I have changed this "IEndpoint endpoint = _bus.GetEndpoint(destinationAddress);" to this "IEndpoint endpoint = _bus.GetEndpoint(typeof(T));"

I'm really not sure if that is correct solution in general, but it fixed the problem. (I'm new to distributed programming so I'm just playing around with "hello, MassTransit" project)


Понеділок, 23 червня 2014 р. 20:32:27 UTC+3 користувач Mike Bridge написав:
Reply all
Reply to author
Forward
0 new messages