mirror of https://github.com/apache/nifi.git
NIFI-13335 Added ability for the XMLRecordReader to handle where an array of data has different types.
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #9356.
This commit is contained in:
parent
5f6bdc6243
commit
92c6ddd98e
|
@ -277,6 +277,7 @@
|
|||
|
||||
<exclude>src/test/resources/text/testschema</exclude>
|
||||
|
||||
<exclude>src/test/resources/xml/dataWithArrayOfDifferentTypes.xml</exclude>
|
||||
<exclude>src/test/resources/xml/field_with_sub-element.xml</exclude>
|
||||
<exclude>src/test/resources/xml/people.xml</exclude>
|
||||
<exclude>src/test/resources/xml/people2.xml</exclude>
|
||||
|
|
|
@ -539,24 +539,11 @@ public class XMLRecordReader implements RecordReader {
|
|||
}
|
||||
|
||||
private Object parseStringForType(String data, String fieldName, DataType dataType) {
|
||||
switch (dataType.getFieldType()) {
|
||||
case BOOLEAN:
|
||||
case BYTE:
|
||||
case CHAR:
|
||||
case DECIMAL:
|
||||
case DOUBLE:
|
||||
case FLOAT:
|
||||
case INT:
|
||||
case LONG:
|
||||
case SHORT:
|
||||
case STRING:
|
||||
case DATE:
|
||||
case TIME:
|
||||
case TIMESTAMP: {
|
||||
return DataTypeUtils.convertType(data, dataType, Optional.ofNullable(dateFormat), Optional.ofNullable(timeFormat), Optional.ofNullable(timestampFormat), fieldName);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
return switch (dataType.getFieldType()) {
|
||||
case BOOLEAN, BYTE, CHAR, CHOICE, DECIMAL, DOUBLE, FLOAT, INT, LONG, SHORT, STRING, DATE, TIME, TIMESTAMP ->
|
||||
DataTypeUtils.convertType(data, dataType, Optional.ofNullable(dateFormat), Optional.ofNullable(timeFormat), Optional.ofNullable(timestampFormat), fieldName);
|
||||
default -> null;
|
||||
};
|
||||
}
|
||||
|
||||
private void skipElement() throws XMLStreamException {
|
||||
|
|
|
@ -46,11 +46,18 @@ public class TestXMLReader {
|
|||
private final String EVALUATE_IS_ARRAY = "xml.stream.is.array";
|
||||
|
||||
private TestRunner setup(Map<PropertyDescriptor, String> xmlReaderProperties) throws InitializationException {
|
||||
return setup(xmlReaderProperties, null);
|
||||
}
|
||||
|
||||
private TestRunner setup(Map<PropertyDescriptor, String> xmlReaderProperties, String recordFieldNameToGetAsString) throws InitializationException {
|
||||
TestRunner runner = TestRunners.newTestRunner(TestXMLReaderProcessor.class);
|
||||
XMLReader reader = new XMLReader();
|
||||
|
||||
runner.addControllerService("xml_reader", reader);
|
||||
runner.setProperty(TestXMLReaderProcessor.XML_READER, "xml_reader");
|
||||
if (recordFieldNameToGetAsString != null) {
|
||||
runner.setProperty(TestXMLReaderProcessor.RECORD_FIELD_TO_GET_AS_STRING, recordFieldNameToGetAsString);
|
||||
}
|
||||
|
||||
for (Map.Entry<PropertyDescriptor, String> entry : xmlReaderProperties.entrySet()) {
|
||||
runner.setProperty(reader, entry.getKey(), entry.getValue());
|
||||
|
@ -298,4 +305,23 @@ public class TestXMLReader {
|
|||
String actualContent = out.getContent();
|
||||
assertEquals(expectedContent, actualContent);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testInferSchemaWhereNameValuesHasMixedTypes() throws Exception {
|
||||
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
||||
xmlReaderProperties.put(XMLReader.PARSE_XML_ATTRIBUTES, "true");
|
||||
xmlReaderProperties.put(XMLReader.CONTENT_FIELD_NAME, "Value");
|
||||
TestRunner runner = setup(xmlReaderProperties, "Data");
|
||||
|
||||
final InputStream is = new FileInputStream("src/test/resources/xml/dataWithArrayOfDifferentTypes.xml");
|
||||
runner.enqueue(is);
|
||||
runner.run();
|
||||
|
||||
final MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||
final String expectedContent = "[MapRecord[{Name=Param1, Value=String1}], MapRecord[{Name=Param2, Value=2}], MapRecord[{Name=Param3, Value=String3}]]";
|
||||
final String actualContent = out.getContent();
|
||||
assertEquals(expectedContent, actualContent);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.nifi.processor.ProcessContext;
|
|||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.RecordReaderFactory;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
|
@ -31,6 +32,7 @@ import org.apache.nifi.util.StringUtils;
|
|||
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
@ -44,12 +46,20 @@ public class TestXMLReaderProcessor extends AbstractProcessor {
|
|||
.required(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor RECORD_FIELD_TO_GET_AS_STRING = new PropertyDescriptor.Builder()
|
||||
.name("record_field_to_get_as_string")
|
||||
.description("record_field_to_get_as_string")
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final Relationship SUCCESS = new Relationship.Builder().name("success").description("success").build();
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
final RecordReaderFactory readerFactory = context.getProperty(XML_READER).asControllerService(RecordReaderFactory.class);
|
||||
final String recordFieldAsString = context.getProperty(RECORD_FIELD_TO_GET_AS_STRING).getValue();
|
||||
|
||||
final List<String> records = new ArrayList<>();
|
||||
|
||||
|
@ -57,7 +67,11 @@ public class TestXMLReaderProcessor extends AbstractProcessor {
|
|||
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
|
||||
Record record;
|
||||
while ((record = reader.nextRecord()) != null) {
|
||||
records.add(record.toString());
|
||||
if (recordFieldAsString == null) {
|
||||
records.add(record.toString());
|
||||
} else {
|
||||
records.add(record.getAsString(recordFieldAsString));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
@ -69,7 +83,7 @@ public class TestXMLReaderProcessor extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return new ArrayList<PropertyDescriptor>() {{ add(XML_READER); }};
|
||||
return Arrays.asList(XML_READER, RECORD_FIELD_TO_GET_AS_STRING);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
<UserData>
|
||||
<Data Name="Param1">String1</Data>
|
||||
<Data Name="Param2">2</Data>
|
||||
<Data Name="Param3">String3</Data>
|
||||
</UserData>
|
Loading…
Reference in New Issue