Hi all,
In a custom scheme we are trying to have the overridden source method still accept records that have a tuple.size() that is greater than the source fields.size(), if set to 'not strict' mode. So far I've been unsuccessful (truncation and discarding or trapping aside). Here is roughly what I'm attempting within source():
TupleEntry entry = sourceCall.getIncomingEntry();
ListWritable<Text> values = (ListWritable<Text>) context[1];
Fields fields = getSourceFields();
...
checkIndexDiff = values.size() - fields.size()
String[] names = new String[checkIndexDiff];
String[] vals = new String[checkIndexDiff];
for (int i=0; i < checkIndexDiff; i++) {
names[i] = String.format("col%s", indices.size() + i);
vals[i] = getNullString();
}
Fields newFields = new Fields(names);
fields.append(newFields);
Tuple tuple = new Tuple(entry.getTuple());
TupleEntry newTe = new TupleEntry(fields, tuple);
entry = entry.appendNew(new TupleEntry(newFields, new Tuple(vals)));
...
// for each value in values
entry.setString(i, value.toString());
return true;
This parses all tuples, but for any record whose value count (values.size()) exceeds fields value count (getSourceFields().size()) does something very odd:
It writes a tuple, but it writes the values of the previous tuple retrieved...
example:
input
id,user,date
1,bob,10/23/2014
2,tom,10/23/2013,extra extra
3,jim,10/11/2012
output
id,user,date1,bob,10/23/2014
1,bob,10/23/2014
3,jim,10/11/2012
When stepping though, it correctly updates the entry object to the correct values, but after returning to cascading.tuple.TupleEntrySchemeIterator#getNext which executes cascading.tuple.TupleEntryIterator#getTupleEntry the TupleEntry value for all tuples is correct, except for tuples with length greater than the fields. I already have come to working solutions to trap, or truncate these records, but we want to accept inconsisten length records at the scheme level and then separate valid and invalid tuples with a SubAssembly before sinking to 'Clean' and 'Dirty' sinks.
Is what I'm trying to do impossible? Or is the way I'm going about it just ill advised...
My other idea is to not solve this problem in the scheme (if that is bad practice, though in our case we prefer it in theory). And instead, initially load the full dataset including dirty data using Hfs(TextDelmited()) then have a cleanDirtyData subassembly that breaks data into Dirty and Clean pipes. Then sink Clean to Hfs(CustomCsvScheme()), Dirty to Hfs(TextDelimited()) and go from there...
Any guidance on what is advisable/possible at the scheme level would be much appreciated!
-- Harrison Cavallero
Full Stack Engineer
DataScience