Human readable Events in SQL event store

357 views
Skip to first unread message

expanding...@gmail.com

unread,
Sep 19, 2016, 10:46:56 AM9/19/16
to Axon Framework Users

Hi,

We are developing a solution based on the AxonFramework version 2.4.1 
We are getting close to our go live date, so we cannot realistically move to a different Axon version. 

What we are currently seeing in our event data store (using postgreSQL,) are events which are not Human Readable.  
We have implemented an Event Visitor which can retrieve events and everything looks fine, so the event content is behaving fine. 

However, if we look directly at the domainevententry, an event payload will look something like this:

\x3c636f6d2e76616c6578612e737562726f676174696f6e2e636f72652e636f6d706f6e656e742e636f6d6d6f6e2e6576656e742e526f6c654d656e754f7074696f6e437265617465644576656e743e3c757569643e63356638366530382d356165632d343265332d393562372d3837323930376233653863633c2f757569643e3c7265717565737455736572555549443e61666139613031352d353035612d346431322d623431352d3065333737656632663834373c2f7265717565737455736572555549443e3c72657175657374446174653e323031362d30392d31395431353a32393a34372e3535332b30313a30303c2f72657175657374446174653e3c6576656e744c6576656c3e414c4c3c2f6576656e744c6576656c3e3c6576656e744465736372697074696f6e3e41206d656e75206f7074696f6e20686173206265656e206372656174656420666f72207468697320726f6c652e3c2f6576656e744465736372697074696f6e3e3c6576656e744e616d653e526f6c654d656e754f7074696f6e437265617465644576656e743c2f6576656e744e616d653e3c7065726d697373696f6e547970653e5045524d495353494f4e5f4d454e555f4f5054494f4e3c2f7065726d697373696f6e547970653e3c6d656e754f7074696f6e3e4d454e555f574f524b5f5155455545533c2f6d656e754f7074696f6e3e3c726f6c654e616d653e524f4c455f53595354454d5f41444d494e4953545241544f523c2f726f6c654e616d653e3c2f636f6d2e76616c6578612e737562726f676174696f6e2e636f72652e636f6d706f6e656e742e636f6d6d6f6e2e6576656e742e526f6c654d656e754f7074696f6e437265617465644576656e743e 


Is this expected ?

Is there a way of improving the human readability of these raw domain events (e.g. to show the event payload as XML) ?


A key concern from the System Support team is that the do not feel that they could support the system if they cannot understand the stored event data.


Many Thanks,

M

 




Steven Grimm

unread,
Sep 19, 2016, 10:59:11 AM9/19/16
to axonfr...@googlegroups.com
That's the PostgreSQL client showing the data of a binary column as hexadecimal. The Axon SQL schema uses a binary column because the application might decide to use a non-human-readable serialization format, e.g., the Java serializer.

You can try the built-in PostgreSQL "encode" function to tell it to render the binary data as text, e.g.,

SELECT encode(payload, 'escape') FROM domainevententry;

-Steve

September 19, 2016 at 7:46 AM
--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

expanding...@gmail.com

unread,
Sep 19, 2016, 11:50:44 AM9/19/16
to Axon Framework Users


On Monday, September 19, 2016 at 3:59:11 PM UTC+1, Steven Grimm wrote:

You can try the built-in PostgreSQL "encode" function to tell it to render the binary data as text, e.g.,

SELECT encode(payload, 'escape') FROM domainevententry;

-Steve


You Sir are an absolute star - many thanks ! 

Patrick Haas

unread,
Sep 19, 2016, 5:44:25 PM9/19/16
to Axon Framework Users
Hi,

Since you're using Postgresql, you might consider:

- Using Jackson/JSON as the domain event serializer
- Storing the data in a JSONB column

Getting Axon to support JSONB data types requires implementing an adapter, but it's pretty 
straight forward when using the JDBC Domain Event Repository. In my version (see below) 
I've also modified the column names, but that's not neccesary, so you can get away with 
overriding much less code.

I've found JSONB to be very helpful -- it lets you do meaningful queries when you're e.g. wondering 
what type of event payloads are actually being used, or for re-writing events when it's not possible
to upcast the values in your upcaster chain.

Once that's done, you can easily query e.g.

select payload->>'menuOption', count(*) from .. events .. group by payload->>'menuOption';



/**
* SQL schema supporting postgres databases and our JPA naming convention.
* <p>
* Payload and metadata are stored either as byte[] (DB2, H2) or JSONB (postgresql).
*
* @author Patrick Haas
*/
public class CustomEventSqlSchema extends GenericEventSqlSchema<String> {

private static final String STD_FIELDS = "event_identifier, aggregate_identifier, sequence_number, time_stamp, "
+ "payload_type, payload_revision, payload, meta_data";

private final boolean useJSONB;

public CustomEventSqlSchema(SchemaConfiguration schemaConfiguration, String dialect) {
super(String.class, schemaConfiguration);
this.useJSONB = dialect.toLowerCase().contains("postgres");
}


@Override
protected PreparedStatement doInsertEventEntry(String tableName, Connection connection, String eventIdentifier,
String aggregateIdentifier,
long sequenceNumber, DateTime timestamp, String eventType,
String eventRevision,
String eventPayload, String eventMetaData, String aggregateType)
throws SQLException {
final String sql = "INSERT INTO " + tableName
+ " (event_identifier, type, aggregate_identifier, sequence_number, time_stamp, payload_type, "
+ "payload_revision, payload, meta_data) VALUES (?,?,?,?,?,?,?,?,?)";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, eventIdentifier);
preparedStatement.setString(2, aggregateType);
preparedStatement.setString(3, aggregateIdentifier);
preparedStatement.setLong(4, sequenceNumber);
preparedStatement.setString(5, sql_dateTime(timestamp));
preparedStatement.setString(6, eventType);
preparedStatement.setString(7, eventRevision);

if (useJSONB) {
PGobject payload = new PGobject();
payload.setType("jsonb");
payload.setValue(eventPayload);

PGobject metaData = new PGobject();
metaData.setType("jsonb");
metaData.setValue(eventMetaData);

preparedStatement.setObject(8, payload);
preparedStatement.setObject(9, metaData);
} else {
preparedStatement.setBytes(8, eventPayload.getBytes(StandardCharsets.UTF_8));
preparedStatement.setBytes(9, eventMetaData.getBytes(StandardCharsets.UTF_8));
}
return preparedStatement;
}

@Override
public PreparedStatement sql_fetchFromSequenceNumber(Connection connection, String type, Object aggregateIdentifier,
long firstSequenceNumber) throws SQLException {
final String sql = "SELECT " + STD_FIELDS + " FROM " + schemaConfiguration.domainEventEntryTable()
+ " WHERE aggregate_identifier = ? AND type = ?"
+ " AND sequence_number >= ?"
+ " ORDER BY sequence_number ASC";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, aggregateIdentifier.toString());
preparedStatement.setString(2, type);
preparedStatement.setLong(3, firstSequenceNumber);
return preparedStatement;
}

@Override
public PreparedStatement sql_getFetchAll(Connection connection, String whereClause,
Object[] params) throws SQLException {
final String sql = "select " + STD_FIELDS + " from " + schemaConfiguration.domainEventEntryTable()
+ " e " + whereClause
+ " ORDER BY e.time_stamp ASC, e.sequence_number ASC, e.aggregate_identifier ASC ";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
for (int i = 0; i < params.length; i++) {
Object param = params[i];
if (param instanceof DateTime) {
param = sql_dateTime((DateTime) param);
}

if (param instanceof byte[]) {
preparedStatement.setBytes(i + 1, (byte[]) param);
} else {
preparedStatement.setObject(i + 1, param);
}
}
return preparedStatement;
}

@Override
public PreparedStatement sql_loadLastSnapshot(Connection connection, Object identifier, String aggregateType)
throws SQLException {
final String s = "SELECT " + STD_FIELDS + " FROM " + schemaConfiguration.snapshotEntryTable()
+ " WHERE aggregate_identifier = ? AND type = ? ORDER BY sequence_number DESC";
PreparedStatement statement = connection.prepareStatement(s);
statement.setString(1, identifier.toString());
statement.setString(2, aggregateType);
return statement;
}

@Override
public PreparedStatement sql_pruneSnapshots(Connection connection, String type, Object aggregateIdentifier,
long sequenceOfFirstSnapshotToPrune) throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement("DELETE FROM " + schemaConfiguration.snapshotEntryTable()
+ " WHERE type = ?"
+ " AND aggregate_identifier = ?"
+ " AND sequence_number <= ?");
preparedStatement.setString(1, type);
preparedStatement.setString(2, aggregateIdentifier.toString());
preparedStatement.setLong(3, sequenceOfFirstSnapshotToPrune);
return preparedStatement;
}

@Override
public PreparedStatement sql_findSnapshotSequenceNumbers(Connection connection, String type,
Object aggregateIdentifier) throws SQLException {
final String sql = "SELECT sequence_number FROM " + schemaConfiguration.snapshotEntryTable()
+ " WHERE type = ? AND aggregate_identifier = ?"
+ " ORDER BY sequence_number DESC";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, type);
preparedStatement.setString(2, aggregateIdentifier.toString());
return preparedStatement;
}

@Override
public PreparedStatement sql_createSnapshotEventEntryTable(Connection connection) throws SQLException {
final String sql = "create table " + schemaConfiguration.snapshotEntryTable() + " (" +
" aggregate_identifier varchar(255) not null," +
" sequence_number bigint not null," +
" type varchar(255) not null," +
" event_identifier varchar(255) not null," +
" meta_data jsonb," +
" payload jsonb not null," +
" payload_revision varchar(255)," +
" payload_type varchar(255) not null," +
" time_stamp varchar(255) not null," +
" primary key (aggregate_identifier, sequence_number, type)" +
" );";
return connection.prepareStatement(sql);
}

@Override
public PreparedStatement sql_createDomainEventEntryTable(Connection connection) throws SQLException {
final String sql = "create table " + schemaConfiguration.domainEventEntryTable() + " (" +
" aggregate_identifier varchar(255) not null," +
" sequence_number bigint not null," +
" type varchar(255) not null," +
" event_identifier varchar(255) not null," +
" meta_data jsonb," +
" payload jsonb not null," +
" payload_revision varchar(255)," +
" payload_type varchar(255) not null," +
" time_stamp varchar(255) not null," +
" primary key (aggregate_identifier, sequence_number, type)" +
" );";
return connection.prepareStatement(sql);
}

@Override
protected Object readTimeStamp(ResultSet resultSet, int columnIndex) throws SQLException {
return new DateTime(resultSet.getString(columnIndex));
}

@Override
protected String readPayload(ResultSet resultSet, int columnIndex) throws SQLException {
if (useJSONB) {
// Postgres maps JSONB to strings.
return resultSet.getString(columnIndex);
} else {
// DB2 stores a byte stream (BLOB).
// H2 works with either code path..
return new String(resultSet.getBytes(columnIndex), StandardCharsets.UTF_8);
}
}
}

Kirk Daries

unread,
Sep 29, 2016, 9:02:02 AM9/29/16
to Axon Framework Users
Hi Patrick,

I'm interested in using Postrgres with JsonB for our Event store....
Because well ---> JSON :)

Could you tell me how exactly how one goes about plugging in the custom GenericEventSqlSchema and serializing to JSON?

Thanks,
--KD

Patrick Haas

unread,
Sep 29, 2016, 3:31:38 PM9/29/16
to Axon Framework Users
Hi Kirk,

The basic documentation for setting up the JDBC event store is here:



My particular configuration looks like this:

<bean id="eventStoreSqlSchema" class="com.expd.rates.infrastructure.orm.CustomEventSqlSchema">
<constructor-arg name="schemaConfiguration">
<bean class="org.axonframework.eventstore.jdbc.SchemaConfiguration">
<constructor-arg name="eventEntryTable" value="schemaname.domain_event_entry"/>
<constructor-arg name="snapshotEntryTable" value="schemaname.snapshot_event_entry"/>
</bean>
</constructor-arg>
<constructor-arg name="dialect" value="${datasource.dialect}" />
</bean>

<bean id="axonConnectionProvider" class="org.axonframework.common.jdbc.SpringDataSourceConnectionProvider">
<constructor-arg ref="dataSource"/>
</bean>

<bean id="eventStore" primary="true" class="org.axonframework.eventstore.jdbc.JdbcEventStore">
<constructor-arg name="eventEntryStore">
<bean class="org.axonframework.eventstore.jdbc.DefaultEventEntryStore">
<constructor-arg name="connectionProvider" ref="axonConnectionProvider" />
<constructor-arg name="sqlSchema" ref="eventStoreSqlSchema" />
</bean>
</constructor-arg>
<constructor-arg name="serializer" ref="axonJsonSerializer"/>
<!--<property name="persistenceExceptionResolver" ref="axonPersistenceExceptionResolver" />-->
<property name="upcasterChain" ref="upcasterChain"/>
</bean>

~Patrick
Reply all
Reply to author
Forward
0 new messages