mirror of
https://github.com/apache/nifi.git
synced 2025-03-03 16:09:19 +00:00
NIFI-13850 Simplified TextXMLReader using Current Language and Library Practices (#9361)
Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
6eab0a5f44
commit
f41f6fea4f
@ -18,18 +18,17 @@
|
|||||||
package org.apache.nifi.xml;
|
package org.apache.nifi.xml;
|
||||||
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
|
||||||
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
||||||
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
|
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -39,119 +38,106 @@ import java.util.Map;
|
|||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
public class TestXMLReader {
|
class TestXMLReader {
|
||||||
|
|
||||||
private final String ATTRIBUTE_PREFIX = "attribute_prefix";
|
private static final String CONTENT_NAME = "content_field";
|
||||||
private final String CONTENT_NAME = "content_field";
|
private static final String EVALUATE_IS_ARRAY = "xml.stream.is.array";
|
||||||
private final String EVALUATE_IS_ARRAY = "xml.stream.is.array";
|
private static String testSchemaText;
|
||||||
|
private static Path personRecord;
|
||||||
|
private static Path fieldWithSubElement;
|
||||||
|
private static Path people;
|
||||||
|
|
||||||
private TestRunner setup(Map<PropertyDescriptor, String> xmlReaderProperties) throws InitializationException {
|
private TestRunner runner;
|
||||||
return setup(xmlReaderProperties, null);
|
private XMLReader reader;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void setUpBeforeAll() throws Exception {
|
||||||
|
testSchemaText = getSchemaText("src/test/resources/xml/testschema");
|
||||||
|
personRecord = Paths.get("src/test/resources/xml/person_record.xml");
|
||||||
|
fieldWithSubElement = Paths.get("src/test/resources/xml/field_with_sub-element.xml");
|
||||||
|
people = Paths.get("src/test/resources/xml/people.xml");
|
||||||
}
|
}
|
||||||
|
|
||||||
private TestRunner setup(Map<PropertyDescriptor, String> xmlReaderProperties, String recordFieldNameToGetAsString) throws InitializationException {
|
@BeforeEach
|
||||||
TestRunner runner = TestRunners.newTestRunner(TestXMLReaderProcessor.class);
|
void setUp() throws Exception {
|
||||||
XMLReader reader = new XMLReader();
|
runner = TestRunners.newTestRunner(TestXMLReaderProcessor.class);
|
||||||
|
reader = new XMLReader();
|
||||||
|
|
||||||
runner.addControllerService("xml_reader", reader);
|
runner.addControllerService("xml_reader", reader);
|
||||||
runner.setProperty(TestXMLReaderProcessor.XML_READER, "xml_reader");
|
runner.setProperty(TestXMLReaderProcessor.XML_READER, "xml_reader");
|
||||||
if (recordFieldNameToGetAsString != null) {
|
|
||||||
runner.setProperty(TestXMLReaderProcessor.RECORD_FIELD_TO_GET_AS_STRING, recordFieldNameToGetAsString);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (Map.Entry<PropertyDescriptor, String> entry : xmlReaderProperties.entrySet()) {
|
|
||||||
runner.setProperty(reader, entry.getKey(), entry.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
runner.enableControllerService(reader);
|
|
||||||
return runner;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRecordFormatDeterminedBasedOnAttribute() throws IOException, InitializationException {
|
void testRecordFormatDeterminedBasedOnAttribute() throws Exception {
|
||||||
String outputSchemaPath = "src/test/resources/xml/testschema";
|
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||||
String outputSchemaText = new String(Files.readAllBytes(Paths.get(outputSchemaPath)));
|
|
||||||
|
|
||||||
Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_TEXT, testSchemaText);
|
||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_EVALUATE.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_EVALUATE.getValue());
|
||||||
TestRunner runner = setup(xmlReaderProperties);
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
|
||||||
try (InputStream is = new FileInputStream("src/test/resources/xml/people.xml")) {
|
runner.enqueue(people, Collections.singletonMap(EVALUATE_IS_ARRAY, "true"));
|
||||||
runner.enqueue(is, Collections.singletonMap(EVALUATE_IS_ARRAY, "true"));
|
runner.run();
|
||||||
runner.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
|
runner.assertTransferCount(TestXMLReaderProcessor.SUCCESS, 1);
|
||||||
List<String> records = Arrays.asList((new String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n"));
|
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
|
final List<String> records = getRecords(flowFile);
|
||||||
|
|
||||||
assertEquals(4, records.size());
|
assertEquals(4, records.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRecordFormatArray() throws IOException, InitializationException {
|
void testRecordFormatArray() throws Exception {
|
||||||
String outputSchemaPath = "src/test/resources/xml/testschema";
|
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||||
String outputSchemaText = new String(Files.readAllBytes(Paths.get(outputSchemaPath)));
|
|
||||||
|
|
||||||
Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_TEXT, testSchemaText);
|
||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY.getValue());
|
||||||
TestRunner runner = setup(xmlReaderProperties);
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
|
||||||
try (InputStream is = new FileInputStream("src/test/resources/xml/people.xml")) {
|
runner.enqueue(people, Collections.singletonMap(EVALUATE_IS_ARRAY, "true"));
|
||||||
runner.enqueue(is, Collections.singletonMap(EVALUATE_IS_ARRAY, "true"));
|
runner.run();
|
||||||
runner.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
|
runner.assertTransferCount(TestXMLReaderProcessor.SUCCESS, 1);
|
||||||
List<String> records = Arrays.asList((new String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n"));
|
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
|
final List<String> records = getRecords(flowFile);
|
||||||
|
|
||||||
assertEquals(4, records.size());
|
assertEquals(4, records.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRecordFormatNotArray() throws IOException, InitializationException {
|
void testRecordFormatNotArray() throws Exception {
|
||||||
String outputSchemaPath = "src/test/resources/xml/testschema";
|
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||||
String outputSchemaText = new String(Files.readAllBytes(Paths.get(outputSchemaPath)));
|
|
||||||
|
|
||||||
Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_TEXT, testSchemaText);
|
||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
||||||
TestRunner runner = setup(xmlReaderProperties);
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
|
||||||
try (InputStream is = new FileInputStream("src/test/resources/xml/person.xml")) {
|
final Path data = Paths.get("src/test/resources/xml/person.xml");
|
||||||
runner.enqueue(is, Collections.singletonMap(EVALUATE_IS_ARRAY, "true"));
|
runner.enqueue(data, Collections.singletonMap(EVALUATE_IS_ARRAY, "true"));
|
||||||
runner.run();
|
runner.run();
|
||||||
}
|
|
||||||
|
|
||||||
List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
|
runner.assertTransferCount(TestXMLReaderProcessor.SUCCESS, 1);
|
||||||
List<String> records = Arrays.asList(new String(runner.getContentAsByteArray(flowFile.get(0))).split("\n"));
|
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
|
final List<String> records = getRecords(flowFile);
|
||||||
|
|
||||||
assertEquals(1, records.size());
|
assertEquals(1, records.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAttributePrefix() throws IOException, InitializationException {
|
void testAttributePrefix() throws Exception {
|
||||||
String outputSchemaPath = "src/test/resources/xml/testschema";
|
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||||
String outputSchemaText = new String(Files.readAllBytes(Paths.get(outputSchemaPath)));
|
|
||||||
|
|
||||||
Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_TEXT, testSchemaText);
|
||||||
xmlReaderProperties.put(XMLReader.ATTRIBUTE_PREFIX, "${" + ATTRIBUTE_PREFIX + "}");
|
final String attributePrefix = "attribute_prefix";
|
||||||
|
xmlReaderProperties.put(XMLReader.ATTRIBUTE_PREFIX, "${" + attributePrefix + "}");
|
||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY.getValue());
|
||||||
TestRunner runner = setup(xmlReaderProperties);
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
|
||||||
try (InputStream is = new FileInputStream("src/test/resources/xml/people.xml")) {
|
runner.enqueue(people, Collections.singletonMap(attributePrefix, "ATTR_"));
|
||||||
runner.enqueue(is, Collections.singletonMap(ATTRIBUTE_PREFIX, "ATTR_"));
|
runner.run();
|
||||||
runner.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
|
runner.assertTransferCount(TestXMLReaderProcessor.SUCCESS, 1);
|
||||||
List<String> records = Arrays.asList(new String(runner.getContentAsByteArray(flowFile.get(0))).split("\n"));
|
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
|
final List<String> records = getRecords(flowFile);
|
||||||
|
|
||||||
assertEquals(4, records.size());
|
assertEquals(4, records.size());
|
||||||
assertEquals("MapRecord[{ATTR_ID=P1, NAME=Cleve Butler, AGE=42, COUNTRY=USA}]", records.get(0));
|
assertEquals("MapRecord[{ATTR_ID=P1, NAME=Cleve Butler, AGE=42, COUNTRY=USA}]", records.get(0));
|
||||||
@ -161,24 +147,22 @@ public class TestXMLReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testContentField() throws IOException, InitializationException {
|
void testContentField() throws Exception {
|
||||||
String outputSchemaPath = "src/test/resources/xml/testschema2";
|
final String outputSchemaText = getSchemaText("src/test/resources/xml/testschema2");
|
||||||
String outputSchemaText = new String(Files.readAllBytes(Paths.get(outputSchemaPath)));
|
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||||
|
|
||||||
Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
|
||||||
xmlReaderProperties.put(XMLReader.CONTENT_FIELD_NAME, "${" + CONTENT_NAME + "}");
|
xmlReaderProperties.put(XMLReader.CONTENT_FIELD_NAME, "${" + CONTENT_NAME + "}");
|
||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY.getValue());
|
||||||
TestRunner runner = setup(xmlReaderProperties);
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
|
||||||
try (InputStream is = new FileInputStream("src/test/resources/xml/people_tag_in_characters.xml")) {
|
final Path data = Paths.get("src/test/resources/xml/people_tag_in_characters.xml");
|
||||||
runner.enqueue(is, Collections.singletonMap(CONTENT_NAME, "CONTENT"));
|
runner.enqueue(data, Collections.singletonMap(CONTENT_NAME, "CONTENT"));
|
||||||
runner.run();
|
runner.run();
|
||||||
}
|
|
||||||
|
|
||||||
List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
|
runner.assertTransferCount(TestXMLReaderProcessor.SUCCESS, 1);
|
||||||
List<String> records = Arrays.asList(new String(runner.getContentAsByteArray(flowFile.get(0))).split("\n"));
|
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
|
final List<String> records = getRecords(flowFile);
|
||||||
|
|
||||||
assertEquals(5, records.size());
|
assertEquals(5, records.size());
|
||||||
assertEquals("MapRecord[{ID=P1, NAME=MapRecord[{ATTR=attr content, INNER=inner content, CONTENT=Cleve Butler}], AGE=42}]", records.get(0));
|
assertEquals("MapRecord[{ID=P1, NAME=MapRecord[{ATTR=attr content, INNER=inner content, CONTENT=Cleve Butler}], AGE=42}]", records.get(0));
|
||||||
@ -189,120 +173,108 @@ public class TestXMLReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInferSchema() throws InitializationException, IOException {
|
void testInferSchema() throws Exception {
|
||||||
String expectedContent = "MapRecord[{num=123, name=John Doe, software=MapRecord[{favorite=true, " + CONTENT_NAME + "=Apache NiFi}]}]";
|
final String expectedContent = "MapRecord[{num=123, name=John Doe, software=MapRecord[{favorite=true, " + CONTENT_NAME + "=Apache NiFi}]}]";
|
||||||
|
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||||
Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
||||||
xmlReaderProperties.put(XMLReader.CONTENT_FIELD_NAME, CONTENT_NAME);
|
xmlReaderProperties.put(XMLReader.CONTENT_FIELD_NAME, CONTENT_NAME);
|
||||||
TestRunner runner = setup(xmlReaderProperties);
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
|
||||||
try (InputStream is = new FileInputStream("src/test/resources/xml/person_record.xml")) {
|
runner.enqueue(personRecord);
|
||||||
runner.enqueue(is);
|
runner.run();
|
||||||
runner.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).get(0);
|
runner.assertTransferCount(TestXMLReaderProcessor.SUCCESS, 1);
|
||||||
String actualContent = out.getContent();
|
final MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
|
final String actualContent = out.getContent();
|
||||||
assertEquals(expectedContent, actualContent);
|
assertEquals(expectedContent, actualContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInferSchemaContentFieldNameNotSet() throws InitializationException, IOException {
|
void testInferSchemaContentFieldNameNotSet() throws Exception {
|
||||||
String expectedContent = "MapRecord[{num=123, name=John Doe, software=MapRecord[{favorite=true}]}]";
|
final String expectedContent = "MapRecord[{num=123, name=John Doe, software=MapRecord[{favorite=true}]}]";
|
||||||
|
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||||
Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
||||||
TestRunner runner = setup(xmlReaderProperties);
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
|
||||||
try (InputStream is = new FileInputStream("src/test/resources/xml/person_record.xml")) {
|
runner.enqueue(personRecord);
|
||||||
runner.enqueue(is);
|
runner.run();
|
||||||
runner.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).get(0);
|
runner.assertTransferCount(TestXMLReaderProcessor.SUCCESS, 1);
|
||||||
String actualContent = out.getContent();
|
final MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
|
final String actualContent = out.getContent();
|
||||||
assertEquals(expectedContent, actualContent);
|
assertEquals(expectedContent, actualContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInferSchemaContentFieldNameNotSetSubElementExists() throws InitializationException, IOException {
|
void testInferSchemaContentFieldNameNotSetSubElementExists() throws Exception {
|
||||||
String expectedContent = "MapRecord[{field_with_attribute=MapRecord[{attr=attr_content, value=123}]}]";
|
final String expectedContent = "MapRecord[{field_with_attribute=MapRecord[{attr=attr_content, value=123}]}]";
|
||||||
|
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||||
Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
||||||
TestRunner runner = setup(xmlReaderProperties);
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
|
||||||
try (InputStream is = new FileInputStream("src/test/resources/xml/field_with_sub-element.xml")) {
|
runner.enqueue(fieldWithSubElement);
|
||||||
runner.enqueue(is);
|
runner.run();
|
||||||
runner.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).get(0);
|
runner.assertTransferCount(TestXMLReaderProcessor.SUCCESS, 1);
|
||||||
String actualContent = out.getContent();
|
final MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
|
final String actualContent = out.getContent();
|
||||||
assertEquals(expectedContent, actualContent);
|
assertEquals(expectedContent, actualContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInferSchemaContentFieldNameSetSubElementExistsNameClash() throws InitializationException, IOException {
|
void testInferSchemaContentFieldNameSetSubElementExistsNameClash() throws Exception {
|
||||||
String expectedContent = "MapRecord[{field_with_attribute=MapRecord[{attr=attr_content, value=content of field}]}]";
|
final String expectedContent = "MapRecord[{field_with_attribute=MapRecord[{attr=attr_content, value=content of field}]}]";
|
||||||
|
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||||
Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
||||||
xmlReaderProperties.put(XMLReader.CONTENT_FIELD_NAME, "value");
|
xmlReaderProperties.put(XMLReader.CONTENT_FIELD_NAME, "value");
|
||||||
TestRunner runner = setup(xmlReaderProperties);
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
|
||||||
try (InputStream is = new FileInputStream("src/test/resources/xml/field_with_sub-element.xml")) {
|
runner.enqueue(fieldWithSubElement);
|
||||||
runner.enqueue(is);
|
runner.run();
|
||||||
runner.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).get(0);
|
runner.assertTransferCount(TestXMLReaderProcessor.SUCCESS, 1);
|
||||||
String actualContent = out.getContent();
|
final MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
|
final String actualContent = out.getContent();
|
||||||
assertEquals(expectedContent, actualContent);
|
assertEquals(expectedContent, actualContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInferSchemaContentFieldNameSetSubElementExistsNoNameClash() throws InitializationException, IOException {
|
void testInferSchemaContentFieldNameSetSubElementExistsNoNameClash() throws Exception {
|
||||||
String expectedContent = "MapRecord[{field_with_attribute=MapRecord[{attr=attr_content, value=123, " + CONTENT_NAME + "=content of field" +
|
final String expectedContent = String.format("MapRecord[{field_with_attribute=MapRecord[{attr=attr_content, value=123, %s=content of field}]}]", CONTENT_NAME);
|
||||||
"}]}]";
|
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||||
|
|
||||||
Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
||||||
xmlReaderProperties.put(XMLReader.CONTENT_FIELD_NAME, CONTENT_NAME);
|
xmlReaderProperties.put(XMLReader.CONTENT_FIELD_NAME, CONTENT_NAME);
|
||||||
TestRunner runner = setup(xmlReaderProperties);
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
|
||||||
try (InputStream is = new FileInputStream("src/test/resources/xml/field_with_sub-element.xml")) {
|
runner.enqueue(fieldWithSubElement);
|
||||||
runner.enqueue(is);
|
runner.run();
|
||||||
runner.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).get(0);
|
runner.assertTransferCount(TestXMLReaderProcessor.SUCCESS, 1);
|
||||||
String actualContent = out.getContent();
|
final MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
|
final String actualContent = out.getContent();
|
||||||
assertEquals(expectedContent, actualContent);
|
assertEquals(expectedContent, actualContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInferSchemaIgnoreAttributes() throws InitializationException, IOException {
|
void testInferSchemaIgnoreAttributes() throws Exception {
|
||||||
String expectedContent = "MapRecord[{num=123, name=John Doe, software=Apache NiFi}]";
|
final String expectedContent = "MapRecord[{num=123, name=John Doe, software=Apache NiFi}]";
|
||||||
|
final Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
||||||
Map<PropertyDescriptor, String> xmlReaderProperties = new HashMap<>();
|
|
||||||
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
xmlReaderProperties.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
|
||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
||||||
xmlReaderProperties.put(XMLReader.PARSE_XML_ATTRIBUTES, "false");
|
xmlReaderProperties.put(XMLReader.PARSE_XML_ATTRIBUTES, "false");
|
||||||
TestRunner runner = setup(xmlReaderProperties);
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
|
||||||
InputStream is = new FileInputStream("src/test/resources/xml/person_record.xml");
|
runner.enqueue(personRecord);
|
||||||
runner.enqueue(is);
|
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).get(0);
|
runner.assertTransferCount(TestXMLReaderProcessor.SUCCESS, 1);
|
||||||
String actualContent = out.getContent();
|
final MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
|
final String actualContent = out.getContent();
|
||||||
assertEquals(expectedContent, actualContent);
|
assertEquals(expectedContent, actualContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -313,10 +285,11 @@ public class TestXMLReader {
|
|||||||
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
xmlReaderProperties.put(XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE.getValue());
|
||||||
xmlReaderProperties.put(XMLReader.PARSE_XML_ATTRIBUTES, "true");
|
xmlReaderProperties.put(XMLReader.PARSE_XML_ATTRIBUTES, "true");
|
||||||
xmlReaderProperties.put(XMLReader.CONTENT_FIELD_NAME, "Value");
|
xmlReaderProperties.put(XMLReader.CONTENT_FIELD_NAME, "Value");
|
||||||
TestRunner runner = setup(xmlReaderProperties, "Data");
|
configureAndEnableXmlReader(xmlReaderProperties);
|
||||||
|
runner.setProperty(TestXMLReaderProcessor.RECORD_FIELD_TO_GET_AS_STRING, "Data");
|
||||||
|
|
||||||
final InputStream is = new FileInputStream("src/test/resources/xml/dataWithArrayOfDifferentTypes.xml");
|
final Path data = Paths.get("src/test/resources/xml/dataWithArrayOfDifferentTypes.xml");
|
||||||
runner.enqueue(is);
|
runner.enqueue(data);
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
||||||
final MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
final MockFlowFile out = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS).getFirst();
|
||||||
@ -324,4 +297,19 @@ public class TestXMLReader {
|
|||||||
final String actualContent = out.getContent();
|
final String actualContent = out.getContent();
|
||||||
assertEquals(expectedContent, actualContent);
|
assertEquals(expectedContent, actualContent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void configureAndEnableXmlReader(Map<PropertyDescriptor, String> xmlReaderProperties) {
|
||||||
|
for (Map.Entry<PropertyDescriptor, String> entry : xmlReaderProperties.entrySet()) {
|
||||||
|
runner.setProperty(reader, entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
runner.enableControllerService(reader);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<String> getRecords(MockFlowFile flowFile) {
|
||||||
|
return Arrays.asList(flowFile.getContent().split("\n"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String getSchemaText(String schemaPath) throws Exception {
|
||||||
|
return Files.readString(Paths.get(schemaPath));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user