NIFI-8259: Infer CSV field types as strings if no records are present

This commit is contained in:
Matthew Burgess 2021-02-25 11:30:33 -05:00 committed by markap14
parent 93b1a05dc3
commit 29cc3886fc
3 changed files with 44 additions and 1 deletions

View File

@ -17,9 +17,12 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
@ -931,6 +934,34 @@ public class TestQueryRecord {
out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n"); out.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"49\"\n");
} }
@Test
public void testNoRecordsInput() throws InitializationException, IOException, SQLException {
TestRunner runner = getRunner();
CSVReader csvReader = new CSVReader();
runner.addControllerService("csv-reader", csvReader);
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
final MockRecordWriter writer = new MockRecordWriter("\"name\",\"age\"");
runner.addControllerService("csv-reader", csvReader);
runner.addControllerService("writer", writer);
runner.enableControllerService(csvReader);
runner.enableControllerService(writer);
runner.setProperty(REL_NAME, "select name from FLOWFILE WHERE age > 23");
runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "csv-reader");
runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
runner.setProperty(QueryRecord.INCLUDE_ZERO_RECORD_FLOWFILES, "true");
runner.enqueue("name,age\n");
runner.run();
runner.assertTransferCount(REL_NAME, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0);
System.out.println(new String(out.toByteArray()));
out.assertContentEquals("\"name\",\"age\"\n");
}
@Test @Test
public void testTransformCalc() throws InitializationException, IOException, SQLException { public void testTransformCalc() throws InitializationException, IOException, SQLException {

View File

@ -65,4 +65,8 @@ public class CSVRecordSource implements RecordSource<CSVRecordAndFieldNames> {
return null; return null;
} }
public List<String> getFieldNames() {
return fieldNames;
}
} }

View File

@ -50,12 +50,20 @@ public class CSVSchemaInference implements SchemaInferenceEngine<CSVRecordAndFie
while (true) { while (true) {
final CSVRecordAndFieldNames recordAndFieldNames = recordSource.next(); final CSVRecordAndFieldNames recordAndFieldNames = recordSource.next();
if (recordAndFieldNames == null) { if (recordAndFieldNames == null) {
// If there are no records, assume the datatypes of all fields are strings
if (typeMap.isEmpty()) {
if (recordSource instanceof CSVRecordSource) {
CSVRecordSource csvRecordSource = (CSVRecordSource) recordSource;
for (String fieldName : csvRecordSource.getFieldNames()) {
typeMap.put(fieldName, new FieldTypeInference());
}
}
}
break; break;
} }
inferSchema(recordAndFieldNames, typeMap); inferSchema(recordAndFieldNames, typeMap);
} }
return createSchema(typeMap); return createSchema(typeMap);
} }