\x3c636f6d2e76616c6578612e737562726f676174696f6e2e636f72652e636f6d706f6e656e742e636f6d6d6f6e2e6576656e742e526f6c654d656e754f7074696f6e437265617465644576656e743e3c757569643e63356638366530382d356165632d343265332d393562372d3837323930376233653863633c2f757569643e3c7265717565737455736572555549443e61666139613031352d353035612d346431322d623431352d3065333737656632663834373c2f7265717565737455736572555549443e3c72657175657374446174653e323031362d30392d31395431353a32393a34372e3535332b30313a30303c2f72657175657374446174653e3c6576656e744c6576656c3e414c4c3c2f6576656e744c6576656c3e3c6576656e744465736372697074696f6e3e41206d656e75206f7074696f6e20686173206265656e206372656174656420666f72207468697320726f6c652e3c2f6576656e744465736372697074696f6e3e3c6576656e744e616d653e526f6c654d656e754f7074696f6e437265617465644576656e743c2f6576656e744e616d653e3c7065726d697373696f6e547970653e5045524d495353494f4e5f4d454e555f4f5054494f4e3c2f7065726d697373696f6e547970653e3c6d656e754f7074696f6e3e4d454e555f574f524b5f5155455545533c2f6d656e754f7074696f6e3e3c726f6c654e616d653e524f4c455f53595354454d5f41444d494e4953545241544f523c2f726f6c654e616d653e3c2f636f6d2e76616c6578612e737562726f676174696f6e2e636f72652e636f6d706f6e656e742e636f6d6d6f6e2e6576656e742e526f6c654d656e754f7074696f6e437265617465644576656e743e
Is this expected ?
Is there a way of improving the human readability of these raw domain events (e.g. to show the event payload as XML) ?
A key concern from the System Support team is that the do not feel that they could support the system if they cannot understand the stored event data.
Many Thanks,
M
September 19, 2016 at 7:46 AM
--
You received this message because you are subscribed to the Google Groups "Axon Framework Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to axonframewor...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
You can try the built-in PostgreSQL "encode" function to tell it to render the binary data as text, e.g.,
SELECT encode(payload, 'escape') FROM domainevententry;
-Steve
/**
* SQL schema supporting postgres databases and our JPA naming convention.
* <p>
* Payload and metadata are stored either as byte[] (DB2, H2) or JSONB (postgresql).
*
* @author Patrick Haas
*/
public class CustomEventSqlSchema extends GenericEventSqlSchema<String> {
private static final String STD_FIELDS = "event_identifier, aggregate_identifier, sequence_number, time_stamp, "
+ "payload_type, payload_revision, payload, meta_data";
private final boolean useJSONB;
public CustomEventSqlSchema(SchemaConfiguration schemaConfiguration, String dialect) {
super(String.class, schemaConfiguration);
this.useJSONB = dialect.toLowerCase().contains("postgres");
}
@Override
protected PreparedStatement doInsertEventEntry(String tableName, Connection connection, String eventIdentifier,
String aggregateIdentifier,
long sequenceNumber, DateTime timestamp, String eventType,
String eventRevision,
String eventPayload, String eventMetaData, String aggregateType)
throws SQLException {
final String sql = "INSERT INTO " + tableName
+ " (event_identifier, type, aggregate_identifier, sequence_number, time_stamp, payload_type, "
+ "payload_revision, payload, meta_data) VALUES (?,?,?,?,?,?,?,?,?)";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, eventIdentifier);
preparedStatement.setString(2, aggregateType);
preparedStatement.setString(3, aggregateIdentifier);
preparedStatement.setLong(4, sequenceNumber);
preparedStatement.setString(5, sql_dateTime(timestamp));
preparedStatement.setString(6, eventType);
preparedStatement.setString(7, eventRevision);
if (useJSONB) {
PGobject payload = new PGobject();
payload.setType("jsonb");
payload.setValue(eventPayload);
PGobject metaData = new PGobject();
metaData.setType("jsonb");
metaData.setValue(eventMetaData);
preparedStatement.setObject(8, payload);
preparedStatement.setObject(9, metaData);
} else {
preparedStatement.setBytes(8, eventPayload.getBytes(StandardCharsets.UTF_8));
preparedStatement.setBytes(9, eventMetaData.getBytes(StandardCharsets.UTF_8));
}
return preparedStatement;
}
@Override
public PreparedStatement sql_fetchFromSequenceNumber(Connection connection, String type, Object aggregateIdentifier,
long firstSequenceNumber) throws SQLException {
final String sql = "SELECT " + STD_FIELDS + " FROM " + schemaConfiguration.domainEventEntryTable()
+ " WHERE aggregate_identifier = ? AND type = ?"
+ " AND sequence_number >= ?"
+ " ORDER BY sequence_number ASC";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, aggregateIdentifier.toString());
preparedStatement.setString(2, type);
preparedStatement.setLong(3, firstSequenceNumber);
return preparedStatement;
}
@Override
public PreparedStatement sql_getFetchAll(Connection connection, String whereClause,
Object[] params) throws SQLException {
final String sql = "select " + STD_FIELDS + " from " + schemaConfiguration.domainEventEntryTable()
+ " e " + whereClause
+ " ORDER BY e.time_stamp ASC, e.sequence_number ASC, e.aggregate_identifier ASC ";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
for (int i = 0; i < params.length; i++) {
Object param = params[i];
if (param instanceof DateTime) {
param = sql_dateTime((DateTime) param);
}
if (param instanceof byte[]) {
preparedStatement.setBytes(i + 1, (byte[]) param);
} else {
preparedStatement.setObject(i + 1, param);
}
}
return preparedStatement;
}
@Override
public PreparedStatement sql_loadLastSnapshot(Connection connection, Object identifier, String aggregateType)
throws SQLException {
final String s = "SELECT " + STD_FIELDS + " FROM " + schemaConfiguration.snapshotEntryTable()
+ " WHERE aggregate_identifier = ? AND type = ? ORDER BY sequence_number DESC";
PreparedStatement statement = connection.prepareStatement(s);
statement.setString(1, identifier.toString());
statement.setString(2, aggregateType);
return statement;
}
@Override
public PreparedStatement sql_pruneSnapshots(Connection connection, String type, Object aggregateIdentifier,
long sequenceOfFirstSnapshotToPrune) throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement("DELETE FROM " + schemaConfiguration.snapshotEntryTable()
+ " WHERE type = ?"
+ " AND aggregate_identifier = ?"
+ " AND sequence_number <= ?");
preparedStatement.setString(1, type);
preparedStatement.setString(2, aggregateIdentifier.toString());
preparedStatement.setLong(3, sequenceOfFirstSnapshotToPrune);
return preparedStatement;
}
@Override
public PreparedStatement sql_findSnapshotSequenceNumbers(Connection connection, String type,
Object aggregateIdentifier) throws SQLException {
final String sql = "SELECT sequence_number FROM " + schemaConfiguration.snapshotEntryTable()
+ " WHERE type = ? AND aggregate_identifier = ?"
+ " ORDER BY sequence_number DESC";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
preparedStatement.setString(1, type);
preparedStatement.setString(2, aggregateIdentifier.toString());
return preparedStatement;
}
@Override
public PreparedStatement sql_createSnapshotEventEntryTable(Connection connection) throws SQLException {
final String sql = "create table " + schemaConfiguration.snapshotEntryTable() + " (" +
" aggregate_identifier varchar(255) not null," +
" sequence_number bigint not null," +
" type varchar(255) not null," +
" event_identifier varchar(255) not null," +
" meta_data jsonb," +
" payload jsonb not null," +
" payload_revision varchar(255)," +
" payload_type varchar(255) not null," +
" time_stamp varchar(255) not null," +
" primary key (aggregate_identifier, sequence_number, type)" +
" );";
return connection.prepareStatement(sql);
}
@Override
public PreparedStatement sql_createDomainEventEntryTable(Connection connection) throws SQLException {
final String sql = "create table " + schemaConfiguration.domainEventEntryTable() + " (" +
" aggregate_identifier varchar(255) not null," +
" sequence_number bigint not null," +
" type varchar(255) not null," +
" event_identifier varchar(255) not null," +
" meta_data jsonb," +
" payload jsonb not null," +
" payload_revision varchar(255)," +
" payload_type varchar(255) not null," +
" time_stamp varchar(255) not null," +
" primary key (aggregate_identifier, sequence_number, type)" +
" );";
return connection.prepareStatement(sql);
}
@Override
protected Object readTimeStamp(ResultSet resultSet, int columnIndex) throws SQLException {
return new DateTime(resultSet.getString(columnIndex));
}
@Override
protected String readPayload(ResultSet resultSet, int columnIndex) throws SQLException {
if (useJSONB) {
// Postgres maps JSONB to strings.
return resultSet.getString(columnIndex);
} else {
// DB2 stores a byte stream (BLOB).
// H2 works with either code path..
return new String(resultSet.getBytes(columnIndex), StandardCharsets.UTF_8);
}
}
}
<bean id="eventStoreSqlSchema" class="com.expd.rates.infrastructure.orm.CustomEventSqlSchema">
<constructor-arg name="schemaConfiguration">
<bean class="org.axonframework.eventstore.jdbc.SchemaConfiguration">
<constructor-arg name="eventEntryTable" value="schemaname.domain_event_entry"/>
<constructor-arg name="snapshotEntryTable" value="schemaname.snapshot_event_entry"/>
</bean>
</constructor-arg>
<constructor-arg name="dialect" value="${datasource.dialect}" />
</bean>
<bean id="axonConnectionProvider" class="org.axonframework.common.jdbc.SpringDataSourceConnectionProvider">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="eventStore" primary="true" class="org.axonframework.eventstore.jdbc.JdbcEventStore">
<constructor-arg name="eventEntryStore">
<bean class="org.axonframework.eventstore.jdbc.DefaultEventEntryStore">
<constructor-arg name="connectionProvider" ref="axonConnectionProvider" />
<constructor-arg name="sqlSchema" ref="eventStoreSqlSchema" />
</bean>
</constructor-arg>
<constructor-arg name="serializer" ref="axonJsonSerializer"/>
<!--<property name="persistenceExceptionResolver" ref="axonPersistenceExceptionResolver" />-->
<property name="upcasterChain" ref="upcasterChain"/>
</bean>
~Patrick