NIFI-7493: When inferring schema for XML data, if we find a text element that also has attributes, infer it as a Record type, in order to match how the data will be read when using the XML Reader

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4375.
This commit is contained in:
Mark Payne 2020-06-30 15:09:22 -04:00 committed by Pierre Villard
parent 21c3085d15
commit 7e09e0db33
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 77 additions and 20 deletions

View File

@ -21,12 +21,14 @@ import org.apache.nifi.schema.inference.RecordSource;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.Attribute;
import javax.xml.stream.events.Characters;
import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@ -73,6 +75,13 @@ public class XmlRecordSource implements RecordSource<XmlNode> {
final StringBuilder content = new StringBuilder();
final Map<String, XmlNode> childNodes = new LinkedHashMap<>();
final Iterator<?> attributeIterator = startElement.getAttributes();
while (attributeIterator.hasNext()) {
final Attribute attribute = (Attribute) attributeIterator.next();
final String attributeName = attribute.getName().getLocalPart();
childNodes.put(attributeName, new XmlTextNode(attributeName, attribute.getValue()));
}
while (xmlEventReader.hasNext()) {
final XMLEvent xmlEvent = xmlEventReader.nextEvent();
@ -107,6 +116,13 @@ public class XmlRecordSource implements RecordSource<XmlNode> {
arrayNode.addElement(childNode);
childNodes.put(childName, arrayNode);
}
final Iterator<?> childAttributeIterator = childStartElement.getAttributes();
while (childAttributeIterator.hasNext()) {
final Attribute attribute = (Attribute) childAttributeIterator.next();
final String attributeName = attribute.getName().getLocalPart();
childNodes.put(attributeName, new XmlTextNode(attributeName, attribute.getValue()));
}
}
}
@ -114,6 +130,11 @@ public class XmlRecordSource implements RecordSource<XmlNode> {
if (childNodes.isEmpty()) {
return new XmlTextNode(nodeName, content.toString().trim());
} else {
final String textContent = content.toString().trim();
if (!textContent.equals("")) {
childNodes.put("value", new XmlTextNode("value", textContent));
}
return new XmlContainerNode(nodeName, childNodes);
}
}

View File

@ -34,12 +34,14 @@ import org.mockito.Mockito;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class TestInferXmlSchema {
@ -47,16 +49,7 @@ public class TestInferXmlSchema {
@Test
public void testFlatXml() throws IOException {
final File file = new File("src/test/resources/xml/person.xml");
final RecordSourceFactory<XmlNode> xmlSourceFactory = (var, in) -> new XmlRecordSource(in, false);
final SchemaInferenceEngine<XmlNode> schemaInference = new XmlSchemaInference(timeValueInference);
final InferSchemaAccessStrategy<XmlNode> inferStrategy = new InferSchemaAccessStrategy<>(xmlSourceFactory, schemaInference, Mockito.mock(ComponentLog.class));
final RecordSchema schema;
try (final InputStream fis = new FileInputStream(file);
final InputStream in = new BufferedInputStream(fis)) {
schema = inferStrategy.getSchema(Collections.emptyMap(), in, null);
}
final RecordSchema schema = inferSchema("src/test/resources/xml/person.xml", false);
assertSame(RecordFieldType.STRING, schema.getDataType("NAME").get().getFieldType());
assertSame(RecordFieldType.INT, schema.getDataType("AGE").get().getFieldType());
@ -69,17 +62,9 @@ public class TestInferXmlSchema {
@Test
public void testFieldsFromAllRecordsIncluded() throws IOException {
final File file = new File("src/test/resources/xml/people_nested.xml");
final RecordSourceFactory<XmlNode> xmlSourceFactory = (var, in) -> new XmlRecordSource(in, true);
final SchemaInferenceEngine<XmlNode> schemaInference = new XmlSchemaInference(timeValueInference);
final InferSchemaAccessStrategy<XmlNode> inferStrategy = new InferSchemaAccessStrategy<>(xmlSourceFactory, schemaInference, Mockito.mock(ComponentLog.class));
final RecordSchema schema;
try (final InputStream fis = new FileInputStream(file);
final InputStream in = new BufferedInputStream(fis)) {
schema = inferStrategy.getSchema(Collections.emptyMap(), in, null);
}
final RecordSchema schema = inferSchema("src/test/resources/xml/people_nested.xml", true);
assertSame(RecordFieldType.STRING, schema.getDataType("ID").get().getFieldType());
assertSame(RecordFieldType.STRING, schema.getDataType("NAME").get().getFieldType());
assertSame(RecordFieldType.INT, schema.getDataType("AGE").get().getFieldType());
assertSame(RecordFieldType.STRING, schema.getDataType("COUNTRY").get().getFieldType());
@ -102,4 +87,31 @@ public class TestInferXmlSchema {
assertSame(RecordFieldType.STRING, addressSchema.getDataType("STATE").get().getFieldType());
}
@Test
public void testStringFieldWithAttributes() throws IOException {
final RecordSchema schema = inferSchema("src/test/resources/xml/TextNodeWithAttribute.xml", true);
assertSame(RecordFieldType.INT, schema.getDataType("num").get().getFieldType());
assertSame(RecordFieldType.STRING, schema.getDataType("name").get().getFieldType());
final DataType softwareDataType = schema.getDataType("software").get();
assertSame(RecordFieldType.RECORD, softwareDataType.getFieldType());
assertTrue(softwareDataType instanceof RecordDataType);
final RecordSchema childSchema = ((RecordDataType) softwareDataType).getChildSchema();
assertSame(RecordFieldType.BOOLEAN, childSchema.getDataType("favorite").get().getFieldType());
assertSame(RecordFieldType.STRING, childSchema.getDataType("value").get().getFieldType());
}
private RecordSchema inferSchema(final String filename, final boolean ignoreWrapper) throws IOException {
final File file = new File(filename);
final RecordSourceFactory<XmlNode> xmlSourceFactory = (var, in) -> new XmlRecordSource(in, ignoreWrapper);
final SchemaInferenceEngine<XmlNode> schemaInference = new XmlSchemaInference(timeValueInference);
final InferSchemaAccessStrategy<XmlNode> inferStrategy = new InferSchemaAccessStrategy<>(xmlSourceFactory, schemaInference, Mockito.mock(ComponentLog.class));
final RecordSchema schema;
try (final InputStream fis = new FileInputStream(file);
final InputStream in = new BufferedInputStream(fis)) {
return inferStrategy.getSchema(Collections.emptyMap(), in, null);
}
}
}

View File

@ -0,0 +1,24 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<wrapper>
<record>
<num>123</num>
<name>John Doe</name>
<software favorite="true">Apache NiFi</software>
</record>
</wrapper>