NIFI-4024 Ensuring InputStream gets closed and cleaning up complex field handling

This closes #1961.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Bryan Bende 2017-08-03 13:00:25 -04:00
parent 496a32e12c
commit f8f1cc8d0d
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
1 changed files with 61 additions and 43 deletions

View File

@ -40,6 +40,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -146,7 +147,6 @@ public class PutHBaseRecord extends AbstractPutHBase {
return columns; return columns;
} }
private RecordReaderFactory recordParserFactory;
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
@ -176,7 +176,8 @@ public class PutHBaseRecord extends AbstractPutHBase {
} }
PutFlowFile last = null; PutFlowFile last = null;
try (RecordReader reader = recordParserFactory.createRecordReader(flowFile, session.read(flowFile), getLogger())) { try (final InputStream in = session.read(flowFile);
final RecordReader reader = recordParserFactory.createRecordReader(flowFile, in, getLogger())) {
Record record; Record record;
if (startIndex >= 0) { if (startIndex >= 0) {
while ( index++ < startIndex && (reader.nextRecord()) != null) {} while ( index++ < startIndex && (reader.nextRecord()) != null) {}
@ -242,15 +243,29 @@ public class PutHBaseRecord extends AbstractPutHBase {
byte[] retVal; byte[] retVal;
if (asString) { if (asString) {
retVal = clientService.toBytes(record.getAsString(field)); switch (fieldType) {
case RECORD:
case CHOICE:
case ARRAY:
case MAP:
retVal = handleComplexField(record, field, complexFieldStrategy);
break;
default:
final String value = record.getAsString(field);
retVal = clientService.toBytes(value);
break;
}
} else { } else {
switch (fieldType) { switch (fieldType) {
case RECORD:
case CHOICE:
case ARRAY:
case MAP:
retVal = handleComplexField(record, field, complexFieldStrategy);
break;
case BOOLEAN: case BOOLEAN:
retVal = clientService.toBytes(record.getAsBoolean(field)); retVal = clientService.toBytes(record.getAsBoolean(field));
break; break;
case CHAR:
retVal = clientService.toBytes(record.getAsString(field));
break;
case DOUBLE: case DOUBLE:
retVal = clientService.toBytes(record.getAsDouble(field)); retVal = clientService.toBytes(record.getAsDouble(field));
break; break;
@ -264,31 +279,37 @@ public class PutHBaseRecord extends AbstractPutHBase {
retVal = clientService.toBytes(record.getAsLong(field)); retVal = clientService.toBytes(record.getAsLong(field));
break; break;
default: default:
retVal = null; final String value = record.getAsString(field);
switch (complexFieldStrategy) { retVal = clientService.toBytes(value);
case FAIL_VALUE: break;
getLogger().error("Complex value found for {}; routing to failure", new Object[]{field});
throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field));
case WARN_VALUE:
getLogger().warn("Complex value found for {}; skipping", new Object[]{field});
break;
case TEXT_VALUE:
retVal = clientService.toBytes(record.getAsString(field));
break;
case IGNORE_VALUE:
// silently skip
break;
default:
break;
}
} }
} }
return retVal; return retVal;
} }
private byte[] handleComplexField(Record record, String field, String complexFieldStrategy) throws PutCreationFailedInvokedException {
switch (complexFieldStrategy) {
case FAIL_VALUE:
getLogger().error("Complex value found for {}; routing to failure", new Object[]{field});
throw new PutCreationFailedInvokedException(String.format("Complex value found for %s; routing to failure", field));
case WARN_VALUE:
getLogger().warn("Complex value found for {}; skipping", new Object[]{field});
return null;
case TEXT_VALUE:
final String value = record.getAsString(field);
return clientService.toBytes(value);
case IGNORE_VALUE:
// silently skip
return null;
default:
return null;
}
}
protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile,
String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy) throws PutCreationFailedInvokedException { String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy)
throws PutCreationFailedInvokedException {
PutFlowFile retVal = null; PutFlowFile retVal = null;
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
@ -296,29 +317,26 @@ public class PutHBaseRecord extends AbstractPutHBase {
final byte[] fam = clientService.toBytes(columnFamily); final byte[] fam = clientService.toBytes(columnFamily);
//try { if (record != null) {
if (record != null) { List<PutColumn> columns = new ArrayList<>();
List<PutColumn> columns = new ArrayList<>(); for (String name : schema.getFieldNames()) {
for (String name : schema.getFieldNames()) { if (name.equals(rowFieldName)) {
if (name.equals(rowFieldName)) { continue;
continue;
}
columns.add(new PutColumn(fam, clientService.toBytes(name), asBytes(name,
schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy)));
} }
String rowIdValue = record.getAsString(rowFieldName);
if (rowIdValue == null) {
throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid")));
}
byte[] rowId = getRow(rowIdValue, rowEncodingStrategy);
retVal = new PutFlowFile(tableName, rowId, columns, flowFile); final byte[] fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy);
if (fieldValueBytes != null) {
columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes));
}
} }
String rowIdValue = record.getAsString(rowFieldName);
if (rowIdValue == null) {
throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid")));
}
byte[] rowId = getRow(rowIdValue, rowEncodingStrategy);
/* } catch (Exception ex) { retVal = new PutFlowFile(tableName, rowId, columns, flowFile);
getLogger().error("Error running createPuts", ex); }
throw new RuntimeException(ex);
}*/
return retVal; return retVal;
} }