I've spent some time today hunting around the Hazelcast source code trying to work out how to implement a QueueStore and QueueStoreFactory. To save others the same difficulty here's what I found. Hazelcast people, please correct me where I'm wrong. :)
First off, here's an example of a configuration using the new factory-class-name directive available in 3.1.3. You can use either factory-class-name or class-name but not both. class-name is chosen first if it's there.
<queue name="test">
<max-size>500</max-size>
<backup-count>1</backup-count>
<async-backup-count>0</async-backup-count>
<empty-queue-ttl>-1</empty-queue-ttl>
<queue-store>
<factory-class-name>com.test.OracleQueueFactory</factory-class-name>
<properties>
<property name="binary">false</property>
<property name="memory-limit">500</property>
<property name="bulk-load">500</property>
<property name="dburl">put your dbURL here</property>
<property name="dbuser">put your db user here</property>
<property name="dbpassword">put your db password here</property>
</properties>
</queue-store>
</queue>
The factory is required if you want to pass your own configuration properties into your QueueStore. In either case (QueueStoreFactory or QueueStore) if they are specified in the configuration then they must have a constructor with no arguments. If you need configuration arguments to your QueueStore implementation then you must use a QueueStoreFactory implementation. In the setup above the dburl,dbuser and dbpassword are my configuration parameters. The factory is pretty simple:
public class OracleQueueFactory
implements QueueStoreFactory<String>
{
@Override
public QueueStore<String> newQueueStore(String name, Properties properties)
{
return new OracleQueueStore(properties);
}
}
I won't write in the QueueStore implementation. It just a "public class OracleQueueStore implements QueueStore<String>", although there's a couple of things which I haven't seen documented about the QueueStore
1. If you have a problem and need to return an error you must throw an exception which extends RuntimeException. Hazelcast has a try/catch(Exception e) around every call made to the QueueStore. The interface doesn't specify any exceptions, so RuntimeException is the only option. If you extend Throwable your exception won't get caught and dealt with correctly.
2. If you want to do logging you can use the Hazelcast logging framework. Just use Logger.getLogger(getClass()) in the constructor. This works, but not sure if Logger is supposed to be exposed to developers.
With that in mind writing the methods for QueueStore aren't that difficult. Here's an the store method:
@Override
public void store(Long key, String value)
{
logger.info("QueueStore - store");
Connection conn = null;
PreparedStatement preparedStatement = null;
try
{
conn = connectionPool.getConnection();
String deleteStatement = "insert into MY_QUEUE (key,value) values (?,?)";
preparedStatement = conn.prepareStatement(deleteStatement);
preparedStatement.setLong(1, key);
preparedStatement.setString(2, value);
preparedStatement.execute();
conn.commit();
}
catch (SQLException e)
{
throw new OracleQueueStoreException(e); // OracleQueueStoreException extends RuntimeException
}
finally
{
closeConnection(conn,preparedStatement,null);
}
}
Hope that helps someone.
Regards,
David.