Revision: 2810
Author:
robert...@continuent.com
Date: Thu Apr 9 18:48:16 2015 UTC
Log: Update Issue 1113
Added support for padding BINARY column values to add trailing null
characters when converting to hex.
https://code.google.com/p/tungsten-replicator/source/detail?r=2810
Modified:
/branches/3.0.0-maintenance/replicator/samples/extensions/javascript/fixmysqlstrings.js
=======================================
---
/branches/3.0.0-maintenance/replicator/samples/extensions/javascript/fixmysqlstrings.js
Fri Aug 8 18:55:23 2014 UTC
+++
/branches/3.0.0-maintenance/replicator/samples/extensions/javascript/fixmysqlstrings.js
Thu Apr 9 18:48:16 2015 UTC
@@ -7,15 +7,21 @@
*
* This filter fixes MySQL strings by converting byte values to either a
* normal Java String or a Hex'ed string if the source type is VARBINARY
- * or BINARY. For this to work you must:
- *
- * 1.) Run colnames filter before it in the filter chain. This filter
- * depends on the type description being filled in.
+ * or BINARY. After fixing strings it marks the strings=utf8 option so
+ * that downstream filters can see strings have been fixed. It also
+ * removes the ##charset tag from row events as this is no longer needed
+ * after strings are converted.
*
- * 2.) Run before pkey. The filter will break if pkey removes key
columns.
- *
- * 3.) Set the MySQL extractor to use bytes for strings, e.g.:
- * replicator.extractor.dbms.usingBytesForString=true
+ * The filter is designed to be used in place of the MySQL option
+ * usingBytesForStrings=true, which cannot be used in cases where the
+ * master replicator is also generating log records to be applied to other
+ * MySQL servers.
+ *
+ * IMPORTANT: For this script to work you must run the colnames filter
+ * to fill in the type description. It can run anywhere upstream as the
+ * value is now preserved in the log. The tpm --enable-heterogeneous-master
+ * option will cause colnames to be applied and is the recommended method
+ * to enable metadata collection.
*
* @author <a href="mailto:
eric....@continuent.com">Eric M. Stone</a>
* @author <a href="mailto:
robert...@continuent.com">Robert M.
Hodges</a>
@@ -48,6 +54,29 @@
* @see org.apache.log4j.Logger
*/
function filter(event) {
+ // Ensure that we are dealing with a MySQL event. If not, we can stop.
+ raw_event = event.getDBMSEvent();
+ if (raw_event == null)
+ {
+ //
logger.info("Event was null!");
+ return;
+ }
+ dbms_type = raw_event.getMetadataOptionValue("dbms_type");
+ if (dbms_type != "mysql")
+ {
+ //
logger.info("Event was not mysql: [" + dbms_type + "]");
+ return;
+ }
+
+ // Next see strings need fixing up. If 'strings=utf8' is set, there is
+ // no further work required.
+ strings = raw_event.getMetadataOptionValue("strings");
+ if (strings == "utf8")
+ {
+ //
logger.info("string was already fixed!");
+ return;
+ }
+
// Get the data.
data = event.getData();
if (data != null) {
@@ -58,18 +87,31 @@
// Determine the underlying type of DBMSData event.
if (d != null && d instanceof
com.continuent.tungsten.replicator.dbms.StatementData) {
- // We can't do anything about statements.
+ // Convert statement data from bytes to string.
+ query = d.getQuery();
+ d.setQuery(query);
} else if (d != null && d instanceof
com.continuent.tungsten.replicator.dbms.RowChangeData) {
processRowChanges(event, d);
}
}
}
+
+ // If we made it this far, byte values are properly translated to UTF8.
+ // Make a note to that effect.
+ raw_event.setMetaDataOption("strings", "utf8");
}
// Convert String bytes to blob or string based on type description.
function processRowChanges(event, d) {
rowChanges = d.getRowChanges();
+ // Find the applicable charset for converting strings, if known.
+ charsetName = d.getOption("##charset");
+ if (charsetName == null)
+ charset = null;
+ else
+ charset = java.nio.charset.Charset.forName(charsetName);
+
// One RowChangeData may contain many OneRowChange events.
for (j = 0; j < rowChanges.size(); j++) {
// Get com.continuent.tungsten.replicator.dbms.OneRowChange
@@ -78,17 +120,23 @@
var table = oneRowChange.getTableName();
var columns = oneRowChange.getColumnSpec();
var columnValues = oneRowChange.getColumnValues();
- fixUpStrings(schema, table, columns, columnValues);
+ fixUpStrings(schema, table, columns, columnValues, charset);
// Iterate through its keys if any
keys = oneRowChange.getKeySpec();
keyValues = oneRowChange.getKeyValues();
- fixUpStrings(schema, table, keys, keyValues);
+ fixUpStrings(schema, table, keys, keyValues, charset);
}
+
+ // If the character set is set, try to remove the option from the
+ // row change event, as it is no longer needed now that strings are
+ // converted.
+ if (charsetName != null)
+ d.removeOption("##charset");
}
// Look for strings to fix up.
-function fixUpStrings(schema, table, columns, columnValues)
+function fixUpStrings(schema, table, columns, columnValues, charset)
{
for (c = 0; c < columns.size(); c++) {
columnSpec = columns.get(c);
@@ -102,20 +150,51 @@
logger.debug("Found a VARCHAR column that may need sorting: column="
+ colName + ' table=' + schema + '.' + table);
// Iterate through the rows.
for (row = 0; row < columnValues.size(); row++) {
+ // Ensure values are actually there--for insert keys they may not
be.
values = columnValues.get(row);
+ if (row >= values.size())
+ break;
+
+ // Fetch the values.
value = values.get(c);
raw_v = value.getValue();
if (raw_v == null || colDesc == null) {
+ // Do nothing; we have a null value or are missing column data.
logger.debug('value: NULL');
- // Do nothing
+ }
+ else if (raw_v instanceof java.lang.String) {
+ // It's already a string, so we cannot convert it...
+ logger.debug('value: ' + raw_v);
} else {
if (colDesc.startsWith("BINARY") ||
colDesc.startsWith("VARBINARY")) {
// Convert to a hexadecimal string.
hex = javax.xml.bind.DatatypeConverter.printHexBinary(raw_v);
+
logger.info(hex);
+ if (colDesc.startsWith("B")) {
+ // Conversion cuts off trailing x'00' bytes in BINARY
strings.
+ // We compute the proper length from the type name and append
+ // the missing null values.
+ re = /BINARY\(([0-9]+)\)/;
+ match = re.exec(colDesc);
+ expectLen = match[1] * 2;
+ actLen = hex.length();
+ diff = expectLen - actLen;
+ if (diff > 0)
+ {
+ extraZeroes = "";
+ for (zeroes = 0; zeroes < diff; zeroes++) {
+ extraZeroes += "0";
+ }
+ hex += extraZeroes;
+ }
+ }
value.setValue(hex);
}
else {
- value.setValue(new java.lang.String(raw_v));
+ if (charset == null)
+ value.setValue(new java.lang.String(raw_v));
+ else
+ value.setValue(new java.lang.String(raw_v, charset));
}
}
}