This commit is contained in:
Mike Thomsen 2018-04-23 15:39:20 -04:00
commit 5ca6261de0
30 changed files with 3450 additions and 20 deletions

View File

@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
@ -51,7 +52,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Continually runs a processor as long as the processor has work to do. {@link #call()} will return <code>true</code> if the processor should be yielded, <code>false</code> otherwise.
* Continually runs a <code>{@link Connectable}</code> component as long as the component has work to do.
* {@link #invoke()} ()} will return <code>{@link InvocationResult}</code> telling if the component should be yielded.
*/
public class ConnectableTask {
@ -64,7 +66,6 @@ public class ConnectableTask {
private final ProcessContext processContext;
private final FlowController flowController;
private final int numRelationships;
private final boolean hasNonLoopConnection;
public ConnectableTask(final SchedulingAgent schedulingAgent, final Connectable connectable,
@ -76,7 +77,6 @@ public class ConnectableTask {
this.scheduleState = scheduleState;
this.numRelationships = connectable.getRelationships().size();
this.flowController = flowController;
this.hasNonLoopConnection = Connectables.hasNonLoopConnection(connectable);
final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
if (connectable instanceof ProcessorNode) {
@ -103,8 +103,40 @@ public class ConnectableTask {
return connectable.getYieldExpiration() > System.currentTimeMillis();
}
/**
* Make sure processor has work to do. This means that it meets one of these criteria:
* <ol>
* <li>It is a Funnel and has incoming FlowFiles from other components, and and at least one outgoing connection.</li>
* <li>It is a 'source' component, meaning:<ul>
* <li>It is annotated with @TriggerWhenEmpty</li>
* <li>It has no incoming connections</li>
* <li>All incoming connections are self-loops</li>
* </ul></li>
* <li>It has data in incoming connections to process</li>
* </ol>
* @return true if there is work to do, otherwise false
*/
private boolean isWorkToDo() {
return connectable.isTriggerWhenEmpty() || !connectable.hasIncomingConnection() || !hasNonLoopConnection || Connectables.flowFilesQueued(connectable);
boolean hasNonLoopConnection = Connectables.hasNonLoopConnection(connectable);
if (connectable.getConnectableType() == ConnectableType.FUNNEL) {
// Handle Funnel as a special case because it will never be a 'source' component,
// and also its outgoing connections can not be terminated.
// Incoming FlowFiles from other components, and at least one outgoing connection are required.
return connectable.hasIncomingConnection()
&& hasNonLoopConnection
&& !connectable.getConnections().isEmpty()
&& Connectables.flowFilesQueued(connectable);
}
final boolean isSourceComponent = connectable.isTriggerWhenEmpty()
// No input connections
|| !connectable.hasIncomingConnection()
// Every incoming connection loops back to itself, no inputs from other components
|| !hasNonLoopConnection;
// If it is not a 'source' component, it requires a FlowFile to process.
return isSourceComponent || Connectables.flowFilesQueued(connectable);
}
private boolean isBackPressureEngaged() {
@ -129,11 +161,7 @@ public class ConnectableTask {
return InvocationResult.DO_NOT_YIELD;
}
// Make sure processor has work to do. This means that it meets one of these criteria:
// * It is annotated with @TriggerWhenEmpty
// * It has data in an incoming Connection
// * It has no incoming connections
// * All incoming connections are self-loops
// Make sure processor has work to do.
if (!isWorkToDo()) {
return InvocationResult.yield("No work to do");
}

View File

@ -21,12 +21,17 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
@ -41,15 +46,9 @@ import org.junit.Test;
import org.mockito.Mockito;
public class TestConnectableTask {
@Test
public void testIsWorkToDo() {
final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
final Processor processor = Mockito.mock(Processor.class);
Mockito.when(procNode.getIdentifier()).thenReturn("123");
Mockito.when(procNode.getRunnableComponent()).thenReturn(processor);
private ConnectableTask createTask(final Connectable connectable) {
final FlowController flowController = Mockito.mock(FlowController.class);
Mockito.when(flowController.getStateManagerProvider()).thenReturn(Mockito.mock(StateManagerProvider.class));
@ -61,9 +60,22 @@ public class TestConnectableTask {
final LifecycleState scheduleState = new LifecycleState();
final StringEncryptor encryptor = Mockito.mock(StringEncryptor.class);
ConnectableTask task = new ConnectableTask(Mockito.mock(SchedulingAgent.class), procNode, flowController, contextFactory, scheduleState, encryptor);
return new ConnectableTask(Mockito.mock(SchedulingAgent.class), connectable,
flowController, contextFactory, scheduleState, encryptor);
}
@Test
public void testIsWorkToDo() {
final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
final Processor processor = Mockito.mock(Processor.class);
Mockito.when(procNode.getIdentifier()).thenReturn("123");
Mockito.when(procNode.getRunnableComponent()).thenReturn(processor);
// There is work to do because there are no incoming connections.
final ConnectableTask task = createTask(procNode);
assertFalse(task.invoke().isYield());
// Test with only a single connection that is self-looping and empty
@ -93,8 +105,6 @@ public class TestConnectableTask {
when(emptyConnection.getFlowFileQueue()).thenReturn(flowFileQueue);
when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(emptyConnection));
// Create a new ConnectableTask because we want to have a different value for the 'hasNonLoopConnection' value, which is calculated in the task's constructor.
task = new ConnectableTask(Mockito.mock(SchedulingAgent.class), procNode, flowController, contextFactory, scheduleState, encryptor);
assertTrue(task.invoke().isYield());
// test when the queue has data
@ -106,4 +116,67 @@ public class TestConnectableTask {
assertFalse(task.invoke().isYield());
}
@Test
public void testIsWorkToDoFunnels() {
final Funnel funnel = Mockito.mock(Funnel.class);
Mockito.when(funnel.hasIncomingConnection()).thenReturn(false);
Mockito.when(funnel.getRunnableComponent()).thenReturn(funnel);
Mockito.when(funnel.getConnectableType()).thenReturn(ConnectableType.FUNNEL);
Mockito.when(funnel.getIdentifier()).thenReturn("funnel-1");
final ConnectableTask task = createTask(funnel);
assertTrue("If there is no incoming connection, it should be yielded.", task.invoke().isYield());
// Test with only a single connection that is self-looping and empty.
// Actually, this self-loop input can not be created for Funnels using NiFi API because an outer layer check condition does not allow it.
// But test it anyways.
final Connection selfLoopingConnection = Mockito.mock(Connection.class);
when(selfLoopingConnection.getSource()).thenReturn(funnel);
when(selfLoopingConnection.getDestination()).thenReturn(funnel);
when(funnel.hasIncomingConnection()).thenReturn(true);
when(funnel.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection));
final FlowFileQueue emptyQueue = Mockito.mock(FlowFileQueue.class);
when(emptyQueue.isActiveQueueEmpty()).thenReturn(true);
when(selfLoopingConnection.getFlowFileQueue()).thenReturn(emptyQueue);
final Set<Connection> outgoingConnections = new HashSet<>();
outgoingConnections.add(selfLoopingConnection);
when(funnel.getConnections()).thenReturn(outgoingConnections);
assertTrue("If there is no incoming connection from other components, it should be yielded.", task.invoke().isYield());
// Add an incoming connection from another component.
final ProcessorNode inputProcessor = Mockito.mock(ProcessorNode.class);
final Connection incomingFromAnotherComponent = Mockito.mock(Connection.class);
when(incomingFromAnotherComponent.getSource()).thenReturn(inputProcessor);
when(incomingFromAnotherComponent.getDestination()).thenReturn(funnel);
when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(emptyQueue);
when(funnel.hasIncomingConnection()).thenReturn(true);
when(funnel.getIncomingConnections()).thenReturn(Arrays.asList(selfLoopingConnection, incomingFromAnotherComponent));
assertTrue("Even if there is an incoming connection from another component," +
" it should be yielded because there's no outgoing connections.", task.invoke().isYield());
// Add an outgoing connection to another component.
final ProcessorNode outputProcessor = Mockito.mock(ProcessorNode.class);
final Connection outgoingToAnotherComponent = Mockito.mock(Connection.class);
when(outgoingToAnotherComponent.getSource()).thenReturn(funnel);
when(outgoingToAnotherComponent.getDestination()).thenReturn(outputProcessor);
outgoingConnections.add(outgoingToAnotherComponent);
assertTrue("Even if there is an incoming connection from another component and an outgoing connection as well," +
" it should be yielded because there's no incoming FlowFiles to process.", task.invoke().isYield());
// Adding input FlowFiles.
final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(nonEmptyQueue);
assertFalse("When a Funnel has both incoming and outgoing connections and FlowFiles to process, then it should be executed.",
task.invoke().isYield());
}
}

View File

@ -122,6 +122,26 @@
<exclude>src/test/resources/json/single-element-nested-array.json</exclude>
<exclude>src/test/resources/json/single-element-nested.json</exclude>
<exclude>src/test/resources/json/output/dataTypes.json</exclude>
<exclude>src/test/resources/xml/people.xml</exclude>
<exclude>src/test/resources/xml/people2.xml</exclude>
<exclude>src/test/resources/xml/people3.xml</exclude>
<exclude>src/test/resources/xml/people_array.xml</exclude>
<exclude>src/test/resources/xml/people_array_simple.xml</exclude>
<exclude>src/test/resources/xml/people_cdata.xml</exclude>
<exclude>src/test/resources/xml/people_complex1.xml</exclude>
<exclude>src/test/resources/xml/people_complex2.xml</exclude>
<exclude>src/test/resources/xml/people_empty.xml</exclude>
<exclude>src/test/resources/xml/people_invalid.xml</exclude>
<exclude>src/test/resources/xml/people_map.xml</exclude>
<exclude>src/test/resources/xml/people_map2.xml</exclude>
<exclude>src/test/resources/xml/people_namespace.xml</exclude>
<exclude>src/test/resources/xml/people_nested.xml</exclude>
<exclude>src/test/resources/xml/people_no_attributes.xml</exclude>
<exclude>src/test/resources/xml/people_tag_in_characters.xml</exclude>
<exclude>src/test/resources/xml/people_with_header_and_comments.xml</exclude>
<exclude>src/test/resources/xml/person.xml</exclude>
<exclude>src/test/resources/xml/testschema</exclude>
<exclude>src/test/resources/xml/testschema2</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.xml;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SchemaRegistryService;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Tags({"xml", "record", "reader", "parser"})
@CapabilityDescription("Reads XML content and creates Record objects. Records are expected in the second level of " +
"XML data, embedded in an enclosing root tag.")
public class XMLReader extends SchemaRegistryService implements RecordReaderFactory {
public static final AllowableValue RECORD_SINGLE = new AllowableValue("false", "false",
"Each FlowFile will consist of a single record without any sort of \"wrapper\".");
public static final AllowableValue RECORD_ARRAY = new AllowableValue("true", "true",
"Each FlowFile will consist of zero or more records. The outer-most XML element is expected to be a \"wrapper\" and will be ignored.");
public static final AllowableValue RECORD_EVALUATE = new AllowableValue("${xml.stream.is.array}", "Use attribute 'xml.stream.is.array'",
"Whether to treat a FlowFile as a single Record or an array of multiple Records is determined by the value of the 'xml.stream.is.array' attribute. "
+ "If the value of the attribute is 'true' (case-insensitive), then the XML Reader will treat the FlowFile as a series of Records with the outer element being ignored. "
+ "If the value of the attribute is 'false' (case-insensitive), then the FlowFile is treated as a single Record and no wrapper element is assumed. "
+ "If the attribute is missing or its value is anything other than 'true' or 'false', then an Exception will be thrown and no records will be parsed.");
public static final PropertyDescriptor RECORD_FORMAT = new PropertyDescriptor.Builder()
.name("record_format")
.displayName("Expect Records as Array")
.description("This property defines whether the reader expects a FlowFile to consist of a single Record or a series of Records with a \"wrapper element\". Because XML does not "
+ "provide for a way to read a series of XML documents from a stream directly, it is common to combine many XML documents by concatenating them and then wrapping the entire "
+ "XML blob with a \"wrapper element\". This property dictates whether the reader expects a FlowFile to consist of a single Record or a series of Records with a \"wrapper element\" "
+ "that will be ignored.")
.allowableValues(RECORD_SINGLE, RECORD_ARRAY, RECORD_EVALUATE)
.defaultValue(RECORD_SINGLE.getValue())
.required(true)
.build();
public static final PropertyDescriptor ATTRIBUTE_PREFIX = new PropertyDescriptor.Builder()
.name("attribute_prefix")
.displayName("Attribute Prefix")
.description("If this property is set, the name of attributes will be prepended with a prefix when they are added to a record.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor CONTENT_FIELD_NAME = new PropertyDescriptor.Builder()
.name("content_field_name")
.displayName("Field Name for Content")
.description("If tags with content (e. g. <field>content</field>) are defined as nested records in the schema, " +
"the name of the tag will be used as name for the record and the value of this property will be used as name for the field. " +
"If tags with content shall be parsed together with attributes (e. g. <field attribute=\"123\">content</field>), " +
"they have to be defined as records. For additional information, see the section of processor usage.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(RECORD_FORMAT);
properties.add(ATTRIBUTE_PREFIX);
properties.add(CONTENT_FIELD_NAME);
properties.add(DateTimeUtils.DATE_FORMAT);
properties.add(DateTimeUtils.TIME_FORMAT);
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
return properties;
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger)
throws IOException, SchemaNotFoundException, MalformedRecordException {
final ConfigurationContext context = getConfigurationContext();
final RecordSchema schema = getSchema(variables, in, null);
final String attributePrefix = context.getProperty(ATTRIBUTE_PREFIX).isSet()
? context.getProperty(ATTRIBUTE_PREFIX).evaluateAttributeExpressions(variables).getValue().trim() : null;
final String contentFieldName = context.getProperty(CONTENT_FIELD_NAME).isSet()
? context.getProperty(CONTENT_FIELD_NAME).evaluateAttributeExpressions(variables).getValue().trim() : null;
final boolean isArray;
final String recordFormat = context.getProperty(RECORD_FORMAT).evaluateAttributeExpressions(variables).getValue().trim();
if ("true".equalsIgnoreCase(recordFormat)) {
isArray = true;
} else if ("false".equalsIgnoreCase(recordFormat)) {
isArray = false;
} else {
throw new IOException("Cannot parse XML Records because the '" + RECORD_FORMAT.getDisplayName() + "' property evaluates to '"
+ recordFormat + "', which is neither 'true' nor 'false'");
}
return new XMLRecordReader(in, schema, isArray, attributePrefix, contentFieldName, dateFormat, timeFormat, timestampFormat, logger);
}
}

View File

@ -0,0 +1,568 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.xml;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.Attribute;
import javax.xml.stream.events.Characters;
import javax.xml.stream.events.StartElement;
import javax.xml.stream.events.XMLEvent;
import java.io.IOException;
import java.io.InputStream;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
public class XMLRecordReader implements RecordReader {
private final ComponentLog logger;
private final RecordSchema schema;
private final String attributePrefix;
private final String contentFieldName;
private StartElement currentRecordStartTag;
private final XMLEventReader xmlEventReader;
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
public XMLRecordReader(InputStream in, RecordSchema schema, boolean isArray, String attributePrefix, String contentFieldName,
final String dateFormat, final String timeFormat, final String timestampFormat, final ComponentLog logger) throws MalformedRecordException {
this.schema = schema;
this.attributePrefix = attributePrefix;
this.contentFieldName = contentFieldName;
this.logger = logger;
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
LAZY_DATE_FORMAT = () -> df;
LAZY_TIME_FORMAT = () -> tf;
LAZY_TIMESTAMP_FORMAT = () -> tsf;
try {
final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
xmlEventReader = xmlInputFactory.createXMLEventReader(in);
if (isArray) {
skipNextStartTag();
}
setNextRecordStartTag();
} catch (XMLStreamException e) {
throw new MalformedRecordException("Could not parse XML", e);
}
}
private void skipNextStartTag() throws XMLStreamException {
while (xmlEventReader.hasNext()) {
final XMLEvent xmlEvent = xmlEventReader.nextEvent();
if (xmlEvent.isStartElement()) {
return;
}
}
}
private void setNextRecordStartTag() throws XMLStreamException {
while (xmlEventReader.hasNext()) {
final XMLEvent xmlEvent = xmlEventReader.nextEvent();
if (xmlEvent.isStartElement()) {
final StartElement startElement = xmlEvent.asStartElement();
currentRecordStartTag = startElement;
return;
}
}
currentRecordStartTag = null;
}
@Override
public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
if (currentRecordStartTag == null) {
return null;
}
try {
final Record record = parseRecord(currentRecordStartTag, this.schema, coerceTypes, dropUnknownFields);
setNextRecordStartTag();
if (record != null) {
return record;
} else {
return new MapRecord(this.schema, Collections.EMPTY_MAP);
}
} catch (XMLStreamException e) {
throw new MalformedRecordException("Could not parse XML", e);
}
}
private Object parseFieldForType(StartElement startElement, String fieldName, DataType dataType, Map<String, Object> recordValues,
boolean dropUnknown) throws XMLStreamException, MalformedRecordException {
switch (dataType.getFieldType()) {
case BOOLEAN:
case BYTE:
case CHAR:
case DOUBLE:
case FLOAT:
case INT:
case LONG:
case SHORT:
case STRING:
case DATE:
case TIME:
case TIMESTAMP: {
StringBuilder content = new StringBuilder();
while (xmlEventReader.hasNext()) {
XMLEvent xmlEvent = xmlEventReader.nextEvent();
if (xmlEvent.isCharacters()) {
final Characters characters = xmlEvent.asCharacters();
if (!characters.isWhiteSpace()) {
content.append(characters.getData());
}
} else if (xmlEvent.isEndElement()) {
final String contentToReturn = content.toString();
if (!StringUtils.isBlank(contentToReturn)) {
return DataTypeUtils.convertType(content.toString(), dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
} else {
return null;
}
} else if (xmlEvent.isStartElement()) {
this.skipElement();
}
}
break;
}
case ARRAY: {
final DataType arrayDataType = ((ArrayDataType) dataType).getElementType();
final Object newValue = parseFieldForType(startElement, fieldName, arrayDataType, recordValues, dropUnknown);
final Object oldValues = recordValues.get(fieldName);
if (newValue != null) {
if (oldValues != null) {
if (oldValues instanceof List) {
((List) oldValues).add(newValue);
} else {
List<Object> arrayValues = new ArrayList<>();
arrayValues.add(oldValues);
arrayValues.add(newValue);
return arrayValues;
}
} else {
List<Object> arrayValues = new ArrayList<>();
arrayValues.add(newValue);
return arrayValues;
}
}
return oldValues;
}
case RECORD: {
final RecordSchema childSchema;
if (dataType instanceof RecordDataType) {
childSchema = ((RecordDataType) dataType).getChildSchema();
} else {
return null;
}
return parseRecord(startElement, childSchema, true, dropUnknown);
}
case MAP: {
final DataType mapDataType = ((MapDataType) dataType).getValueType();
final Map<String,Object> embeddedMap = new HashMap<>();
while (xmlEventReader.hasNext()) {
XMLEvent xmlEvent = xmlEventReader.nextEvent();
if (xmlEvent.isStartElement()) {
final StartElement subStartElement = xmlEvent.asStartElement();
final String subFieldName = subStartElement.getName().getLocalPart();
final Object mapValue = parseFieldForType(subStartElement, subFieldName, mapDataType, embeddedMap, dropUnknown);
embeddedMap.put(subFieldName, mapValue);
} else if (xmlEvent.isEndElement()) {
break;
}
}
if (embeddedMap.size() > 0) {
return embeddedMap;
} else {
return null;
}
}
case CHOICE: {
// field choice will parse the entire tree of a field
return parseUnknownField(startElement, false, null);
}
}
return null;
}
private Object parseUnknownField(StartElement startElement, boolean dropUnknown, RecordSchema schema) throws XMLStreamException {
// parse attributes
final Map<String, Object> recordValues = new HashMap<>();
final Iterator iterator = startElement.getAttributes();
while (iterator.hasNext()) {
final Attribute attribute = (Attribute) iterator.next();
final String attributeName = attribute.getName().toString();
if (dropUnknown) {
if (schema != null) {
final Optional<RecordField> field = schema.getField(attributeName);
if (field.isPresent()){
recordValues.put(attributePrefix == null ? attributeName : attributePrefix + attributeName, attribute.getValue());
}
}
} else {
recordValues.put(attributePrefix == null ? attributeName : attributePrefix + attributeName, attribute.getValue());
}
}
// parse fields
StringBuilder content = new StringBuilder();
while (xmlEventReader.hasNext()) {
final XMLEvent xmlEvent = xmlEventReader.nextEvent();
if (xmlEvent.isCharacters()) {
final Characters characters = xmlEvent.asCharacters();
if (!characters.isWhiteSpace()) {
content.append(characters.getData());
}
} else if (xmlEvent.isStartElement()){
final StartElement subStartElement = xmlEvent.asStartElement();
final String subFieldName = subStartElement.getName().getLocalPart();
if (dropUnknown) {
if (schema != null) {
final Optional<RecordField> field = schema.getField(subFieldName);
if (field.isPresent()){
// subElements of subStartElement can only be known if there is a corresponding field in the schema defined as record
final DataType dataType = field.get().getDataType();
RecordSchema childSchema = null;
if (dataType instanceof RecordDataType) {
childSchema = ((RecordDataType) dataType).getChildSchema();
} else if (dataType instanceof ArrayDataType) {
DataType typeOfArray = ((ArrayDataType) dataType).getElementType();
if (typeOfArray instanceof RecordDataType) {
childSchema = ((RecordDataType) typeOfArray).getChildSchema();
}
}
final Object value = parseUnknownField(subStartElement, true, childSchema);
if (value != null) {
putUnknownTypeInMap(recordValues, subFieldName, value);
}
} else {
skipElement();
}
} else {
skipElement();
}
} else {
final Object value = parseUnknownField(subStartElement, dropUnknown, schema);
if (value != null) {
putUnknownTypeInMap(recordValues, subFieldName, value);
}
}
} else if (xmlEvent.isEndElement()) {
break;
}
}
for (final Map.Entry<String,Object> entry : recordValues.entrySet()) {
if (entry.getValue() instanceof List) {
recordValues.put(entry.getKey(), ((List) entry.getValue()).toArray());
}
}
final boolean hasContent = content.length() > 0;
final boolean hasFields = recordValues.size() > 0;
if (hasContent) {
if (!hasFields) {
return content.toString();
} else {
if (contentFieldName != null) {
recordValues.put(contentFieldName, content.toString());
} else {
logger.debug("Found content for field that has to be parsed as record but property \"Field Name for Content\" is not set. " +
"The content will not be added to the record.");
}
return new MapRecord(new SimpleRecordSchema(Collections.emptyList()), recordValues);
}
} else {
if (hasFields) {
return new MapRecord(new SimpleRecordSchema(Collections.emptyList()), recordValues);
} else {
return null;
}
}
}
private Record parseRecord(StartElement startElement, RecordSchema schema, boolean coerceTypes, boolean dropUnknown) throws XMLStreamException, MalformedRecordException {
final Map<String, Object> recordValues = new HashMap<>();
// parse attributes
final Iterator iterator = startElement.getAttributes();
while (iterator.hasNext()) {
final Attribute attribute = (Attribute) iterator.next();
final String attributeName = attribute.getName().toString();
final String targetFieldName = attributePrefix == null ? attributeName : attributePrefix + attributeName;
if (dropUnknown) {
final Optional<RecordField> field = schema.getField(attributeName);
if (field.isPresent()){
// dropUnknown == true && coerceTypes == true
if (coerceTypes) {
final Object value;
final DataType dataType = field.get().getDataType();
if ((value = parseStringForType(attribute.getValue(), attributeName, dataType)) != null) {
recordValues.put(targetFieldName, value);
}
// dropUnknown == true && coerceTypes == false
} else {
recordValues.put(targetFieldName, attribute.getValue());
}
}
} else {
// dropUnknown == false && coerceTypes == true
if (coerceTypes) {
final Object value;
final Optional<RecordField> field = schema.getField(attributeName);
if (field.isPresent()){
if ((value = parseStringForType(attribute.getValue(), attributeName, field.get().getDataType())) != null) {
recordValues.put(targetFieldName, value);
}
} else {
recordValues.put(targetFieldName, attribute.getValue());
}
// dropUnknown == false && coerceTypes == false
} else {
recordValues.put(targetFieldName, attribute.getValue());
}
}
}
// parse fields
StringBuilder content = new StringBuilder();
while(xmlEventReader.hasNext()){
final XMLEvent xmlEvent = xmlEventReader.nextEvent();
if (xmlEvent.isStartElement()) {
final StartElement subStartElement = xmlEvent.asStartElement();
final String fieldName = subStartElement.getName().getLocalPart();
final Optional<RecordField> field = schema.getField(fieldName);
if (dropUnknown) {
if (field.isPresent()) {
// dropUnknown == true && coerceTypes == true
if (coerceTypes) {
final Object value = parseFieldForType(subStartElement, fieldName, field.get().getDataType(), recordValues, true);
if (value != null) {
recordValues.put(fieldName, value);
}
// dropUnknown == true && coerceTypes == false
// subElements of subStartElement can only be known if there is a corresponding field in the schema defined as record
} else {
final DataType dataType = field.get().getDataType();
RecordSchema childSchema = null;
if (dataType instanceof RecordDataType) {
childSchema = ((RecordDataType) dataType).getChildSchema();
} else if (dataType instanceof ArrayDataType) {
DataType typeOfArray = ((ArrayDataType) dataType).getElementType();
if (typeOfArray instanceof RecordDataType) {
childSchema = ((RecordDataType) typeOfArray).getChildSchema();
}
}
final Object value = parseUnknownField(subStartElement, true, childSchema);
if (value != null) {
putUnknownTypeInMap(recordValues, fieldName, value);
}
}
} else {
skipElement();
}
} else {
// dropUnknown == false && coerceTypes == true
if (coerceTypes) {
if (field.isPresent()) {
final Object value = parseFieldForType(subStartElement, fieldName, field.get().getDataType(), recordValues, false);
if (value != null) {
recordValues.put(fieldName, value);
}
} else {
final Object value = parseUnknownField(subStartElement, false, null);
if (value != null) {
putUnknownTypeInMap(recordValues, fieldName, value);
}
}
// dropUnknown == false && coerceTypes == false
} else {
final Object value = parseUnknownField(subStartElement, false, null);
if (value != null) {
putUnknownTypeInMap(recordValues, fieldName, value);
}
}
}
} else if (xmlEvent.isEndElement()) {
break;
} else if (xmlEvent.isCharacters()) {
final Characters characters = xmlEvent.asCharacters();
if (!characters.isWhiteSpace()) {
content.append(characters.getData());
}
}
}
if (content.length() > 0) {
if (contentFieldName != null) {
final Optional<RecordField> field = schema.getField(contentFieldName);
if (field.isPresent()) {
Object value = parseStringForType(content.toString(), contentFieldName, field.get().getDataType());
recordValues.put(contentFieldName, value);
}
} else {
logger.debug("Found content for field that is defined as record but property \"Field Name for Content\" is not set. " +
"The content will not be added to record.");
}
}
for (final Map.Entry<String,Object> entry : recordValues.entrySet()) {
if (entry.getValue() instanceof List) {
recordValues.put(entry.getKey(), ((List) entry.getValue()).toArray());
}
}
if (recordValues.size() > 0) {
return new MapRecord(schema, recordValues);
} else {
return null;
}
}
private void putUnknownTypeInMap(Map<String, Object> values, String fieldName, Object fieldValue) {
final Object oldValues = values.get(fieldName);
if (oldValues != null) {
if (oldValues instanceof List) {
((List) oldValues).add(fieldValue);
} else {
List<Object> valuesToPut = new ArrayList<>();
valuesToPut.add(oldValues);
valuesToPut.add(fieldValue);
values.put(fieldName, valuesToPut);
}
} else {
values.put(fieldName, fieldValue);
}
}
private Object parseStringForType(String data, String fieldName, DataType dataType) {
switch (dataType.getFieldType()) {
case BOOLEAN:
case BYTE:
case CHAR:
case DOUBLE:
case FLOAT:
case INT:
case LONG:
case SHORT:
case STRING:
case DATE:
case TIME:
case TIMESTAMP: {
return DataTypeUtils.convertType(data, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
}
}
return null;
}
private void skipElement() throws XMLStreamException {
while(xmlEventReader.hasNext()){
final XMLEvent xmlEvent = xmlEventReader.nextEvent();
if (xmlEvent.isStartElement()) {
skipElement();
}
if (xmlEvent.isEndElement()) {
return;
}
}
}
@Override
public RecordSchema getSchema() {
return schema;
}
@Override
public void close() throws IOException {
try {
xmlEventReader.close();
} catch (XMLStreamException e) {
logger.error("Unable to close XMLEventReader");
}
}
}

View File

@ -25,4 +25,6 @@ org.apache.nifi.csv.CSVRecordSetWriter
org.apache.nifi.grok.GrokReader
org.apache.nifi.text.FreeFormTextRecordSetWriter
org.apache.nifi.text.FreeFormTextRecordSetWriter
org.apache.nifi.xml.XMLReader

View File

@ -0,0 +1,433 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8"/>
<title>XMLReader</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
</head>
<body>
<p>
The XMLReader Controller Service reads XML content and creates Record objects. The Controller Service
must be configured with a schema that describes the structure of the XML data. Fields in the XML data
that are not defined in the schema will be skipped. Depending on whether the property "Expect Records as Array"
is set to "false" or "true", the reader either expects a single record or an array of records for each FlowFile.
</p>
<p>
Example: Single record
</p>
<code>
<pre>
&lt;record&gt;
&lt;field1&gt;content&lt;/field1&gt;
&lt;field2&gt;content&lt;/field2&gt;
&lt;/record&gt;
</pre>
</code>
<p>
An array of records has to be enclosed by a root tag.
Example: Array of records
</p>
<code>
<pre>
&lt;root&gt;
&lt;record&gt;
&lt;field1&gt;content&lt;/field1&gt;
&lt;field2&gt;content&lt;/field2&gt;
&lt;/record&gt;
&lt;record&gt;
&lt;field1&gt;content&lt;/field1&gt;
&lt;field2&gt;content&lt;/field2&gt;
&lt;/record&gt;
&lt;/root&gt;
</pre>
</code>
<h2>Example: Simple Fields</h2>
<p>
The simplest kind of data within XML data are tags / fields only containing content (no attributes, no embedded tags).
They can be described in the schema by simple types (e. g. INT, STRING, ...).
</p>
<code>
<pre>
&lt;root&gt;
&lt;record&gt;
&lt;simple_field&gt;content&lt;/simple_field&gt;
&lt;/record&gt;
&lt;/root&gt;
</pre>
</code>
<p>
This record can be described by a schema containing one field (e. g. of type string). By providing this schema,
the reader expects zero or one occurrences of "simple_field" in the record.
</p>
<code>
<pre>
{
"namespace": "nifi",
"name": "test",
"type": "record",
"fields": [
{ "name": "simple_field", "type": "string" }
]
}
</pre>
</code>
<h2>Example: Arrays with Simple Fields</h2>
<p>
Arrays are considered as repetitive tags / fields in XML data. For the following XML data, "array_field" is considered
to be an array enclosing simple fields, whereas "simple_field" is considered to be a simple field not enclosed in
an array.
</p>
<code>
<pre>
&lt;record&gt;
&lt;array_field&gt;content&lt;/array_field&gt;
&lt;array_field&gt;content&lt;/array_field&gt;
&lt;simple_field&gt;content&lt;/simple_field&gt;
&lt;/record&gt;
</pre>
</code>
<p>
This record can be described by the following schema:
</p>
<code>
<pre>
{
"namespace": "nifi",
"name": "test",
"type": "record",
"fields": [
{ "name": "array_field", "type":
{ "type": "array", "items": string }
},
{ "name": "simple_field", "type": "string" }
]
}
</pre>
</code>
<p>
If a field in a schema is embedded in an array, the reader expects zero, one or more occurrences of the field
in a record. The field "array_field" principally also could be defined as a simple field, but then the second occurrence
of this field would replace the first in the record object. Moreover, the field "simple_field" could also be defined
as an array. In this case, the reader would put it into the record object as an array with one element.
</p>
<h2>Example: Tags with Attributes</h2>
<p>
XML fields frequently not only contain content, but also attributes. The following record contains a field with
an attribute "attr" and content:
</p>
<code>
<pre>
&lt;record&gt;
&lt;field_with_attribute attr="attr_content"&gt;content of field&lt;/field_with_attribute&gt;
&lt;/record&gt;
</pre>
</code>
<p>
To parse the content of the field "field_with_attribute" together with the attribute "attr", two requirements have
to be fulfilled:
</p>
<ul>
<li>In the schema, the field has to be defined as record.</li>
<li>The property "Field Name for Content" has to be set.</li>
<li>As an option, the property "Attribute Prefix" also can be set.</li>
</ul>
<p>
For the example above, the following property settings are assumed:
</p>
<table>
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>Field Name for Content</td>
<td><code>field_name_for_content</code></td>
</tr>
<tr>
<td>Attribute Prefix</td>
<td><code>prefix_</code></td>
</tr>
</table>
<p>
The schema can be defined as follows:
</p>
<code>
<pre>
{
"name": "test",
"namespace": "nifi",
"type": "record",
"fields": [
{
"name": "field_with_attribute",
"type": {
"name": "RecordForTag",
"type": "record",
"fields" : [
{"name": "attr", "type": "string"},
{"name": "field_name_for_content", "type": "string"}
]
}
]
}
</pre>
</code>
<p>
Note that the field "field_name_for_content" not only has to be defined in the property section, but also in the
schema, whereas the prefix for attributes is not part of the schema. It will be appended when an attribute named
"attr" is found at the respective position in the XML data and added to the record. The record object of the above
example will be structured as follows:
</p>
<code>
<pre>
Record (
Record "field_with_attribute" (
RecordField "prefix_attr" = "attr_content",
RecordField "field_name_for_content" = "content of field"
)
)
</pre>
</code>
<p>
Principally, the field "field_with_attribute" could also be defined as a simple field. In this case, the attributes
simply would be ignored. Vice versa, the simple field in example 1 above could also be defined as a record (assuming that
the property "Field Name for Content" is set.
</p>
<h2>Example: Tags within tags</h2>
<p>
XML data is frequently nested. In this case, tags enclose other tags:
</p>
<code>
<pre>
&lt;record&gt;
&lt;field_with_embedded_fields attr=&quot;attr_content&quot;&gt;
&lt;embedded_field&gt;embedded content&lt;/embedded_field&gt;
&lt;another_embedded_field&gt;another embedded content&lt;/another_embedded_field&gt;
&lt;/field_with_embedded_fields&gt;
&lt;/record&gt;
</pre>
</code>
<p>
The enclosing fields always have to be defined as records, irrespective whether they include attributes to be
parsed or not. In this example, the tag "field_with_embedded_fields" encloses the fields "embedded_field" and
"another_embedded_field", which are both simple fields. The schema can be defined as follows:
</p>
<code>
<pre>
{
"name": "test",
"namespace": "nifi",
"type": "record",
"fields": [
{
"name": "field_with_embedded_fields",
"type": {
"name": "RecordForEmbedded",
"type": "record",
"fields" : [
{"name": "attr", "type": "string"},
{"name": "embedded_field", "type": "string"},
{"name": "another_embedded_field", "type": "string"}
]
}
]
}
</pre>
</code>
<p>
Notice that this case does not require the property "Field Name for Content" to be set as this is only required
for tags containing attributes and content.
</p>
<h2>Example: Array of records</h2>
<p>
For further explanation of the logic of this reader, an example of an array of records shall be demonstrated.
The following record contains the field "array_field", which repeatedly occurs. The field contains two
embedded fields.
</p>
<code>
<pre>
&lt;record&gt;
&lt;array_field&gt;
&lt;embedded_field&gt;embedded content 1&lt;/embedded_field&gt;
&lt;another_embedded_field&gt;another embedded content 1&lt;/another_embedded_field&gt;
&lt;/array_field&gt;
&lt;array_field&gt;
&lt;embedded_field&gt;embedded content 2&lt;/embedded_field&gt;
&lt;another_embedded_field&gt;another embedded content 2&lt;/another_embedded_field&gt;
&lt;/array_field&gt;
&lt;/record&gt;
</pre>
</code>
<p>
This XML data can be parsed similarly to the data in example 4. However, the record defined in the schema of
example 4 has to be embedded in an array.
</p>
<code>
<pre>
{
"namespace": "nifi",
"name": "test",
"type": "record",
"fields": [
{ "name": "array_field",
"type": {
"type": "array",
"items": {
"name": "RecordInArray",
"type": "record",
"fields" : [
{"name": "embedded_field", "type": "string"},
{"name": "another_embedded_field", "type": "string"}
]
}
}
}
]
}
</pre>
</code>
<h2>Example: Array in record</h2>
<p>
In XML data, arrays are frequently enclosed by tags:
</p>
<code>
<pre>
&lt;record&gt;
&lt;field_enclosing_array&gt;
&lt;element&gt;content 1&lt;/element&gt;
&lt;element&gt;content 2&lt;/element&gt;
&lt;/field_enclosing_array&gt;
&lt;field_without_array&gt; content 3&lt;/field_without_array&gt;
&lt;/record&gt;
</pre>
</code>
<p>
For the schema, embedded tags have to be described by records. Therefore, the field "field_enclosing_array"
is a record that embeds an array with elements of type string:
</p>
<code>
<pre>
{
"namespace": "nifi",
"name": "test",
"type": "record",
"fields": [
{ "name": "field_enclosing_array",
"type": {
"name": "EmbeddedRecord",
"type": "record",
"fields" : [
{
"name": "element",
"type": {
"type": "array",
"items": "string"
}
}
]
}
},
{ "name": "field_without_array", "type": "string" }
]
}
</pre>
</code>
<h2>Example: Maps</h2>
<p>
A map is a field embedding fields with different names:
</p>
<code>
<pre>
&lt;record&gt;
&lt;map_field&gt;
&lt;field1&gt;content&lt;/field1&gt;
&lt;field2&gt;content&lt;/field2&gt;
...
&lt;/map_field&gt;
&lt;simple_field&gt;content&lt;/simple_field&gt;
&lt;/record&gt;
</pre>
</code>
<p>
This data can be processed using the following schema:
</p>
<code>
<pre>
{
"namespace": "nifi",
"name": "test",
"type": "record",
"fields": [
{ "name": "map_field", "type":
{ "type": "map", "items": string }
},
{ "name": "simple_field", "type": "string" }
]
}
</pre>
</code>
</body>
</html>

View File

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.xml;
import static junit.framework.TestCase.assertEquals;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestXMLReader {
private XMLReader reader;
private final String ATTRIBUTE_PREFIX = "attribute_prefix";
private final String CONTENT_NAME = "content_field";
private final String EVALUATE_IS_ARRAY = "xml.stream.is.array";
public TestRunner setup(String filePath) throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(TestXMLReaderProcessor.class);
reader = new XMLReader();
runner.addControllerService("xml_reader", reader);
runner.setProperty(TestXMLReaderProcessor.XML_READER, "xml_reader");
final String outputSchemaText = new String(Files.readAllBytes(Paths.get(filePath)));
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
return runner;
}
@Test
public void testRecordFormat() throws IOException, InitializationException {
TestRunner runner = setup("src/test/resources/xml/testschema");
runner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_EVALUATE);
runner.enableControllerService(reader);
InputStream is = new FileInputStream("src/test/resources/xml/people.xml");
runner.enqueue(is, Collections.singletonMap(EVALUATE_IS_ARRAY, "true"));
runner.run();
List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
List<String> records = Arrays.asList((new String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n"));
assertEquals(4, records.size());
}
@Test
public void testRecordFormat2() throws IOException, InitializationException {
TestRunner runner = setup("src/test/resources/xml/testschema");
runner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY);
runner.enableControllerService(reader);
InputStream is = new FileInputStream("src/test/resources/xml/people.xml");
runner.enqueue(is, Collections.singletonMap(EVALUATE_IS_ARRAY, "true"));
runner.run();
List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
List<String> records = Arrays.asList((new String(runner.getContentAsByteArray(flowFile.get(0)))).split("\n"));
assertEquals(4, records.size());
}
@Test
public void testRecordFormat3() throws IOException, InitializationException {
TestRunner runner = setup("src/test/resources/xml/testschema");
runner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_SINGLE);
runner.enableControllerService(reader);
InputStream is = new FileInputStream("src/test/resources/xml/person.xml");
runner.enqueue(is, Collections.singletonMap(EVALUATE_IS_ARRAY, "true"));
runner.run();
List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
List<String> records = Arrays.asList(new String(runner.getContentAsByteArray(flowFile.get(0))).split("\n"));
assertEquals(1, records.size());
}
@Test
public void testAttributePrefix() throws IOException, InitializationException {
TestRunner runner = setup("src/test/resources/xml/testschema");
runner.setProperty(reader, XMLReader.ATTRIBUTE_PREFIX, "${" + ATTRIBUTE_PREFIX + "}");
runner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY);
runner.enableControllerService(reader);
InputStream is = new FileInputStream("src/test/resources/xml/people.xml");
runner.enqueue(is, Collections.singletonMap(ATTRIBUTE_PREFIX, "ATTR_"));
runner.run();
List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
List<String> records = Arrays.asList(new String(runner.getContentAsByteArray(flowFile.get(0))).split("\n"));
assertEquals(4, records.size());
assertEquals("MapRecord[{COUNTRY=USA, ATTR_ID=P1, NAME=Cleve Butler, AGE=42}]", records.get(0));
assertEquals("MapRecord[{COUNTRY=UK, ATTR_ID=P2, NAME=Ainslie Fletcher, AGE=33}]", records.get(1));
assertEquals("MapRecord[{COUNTRY=FR, ATTR_ID=P3, NAME=Amélie Bonfils, AGE=74}]", records.get(2));
assertEquals("MapRecord[{COUNTRY=USA, ATTR_ID=P4, NAME=Elenora Scrivens, AGE=16}]", records.get(3));
}
@Test
public void testContentField() throws IOException, InitializationException {
TestRunner runner = setup("src/test/resources/xml/testschema2");
runner.setProperty(reader, XMLReader.CONTENT_FIELD_NAME, "${" + CONTENT_NAME + "}");
runner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY);
runner.enableControllerService(reader);
InputStream is = new FileInputStream("src/test/resources/xml/people_tag_in_characters.xml");
runner.enqueue(is, Collections.singletonMap(CONTENT_NAME, "CONTENT"));
runner.run();
List<MockFlowFile> flowFile = runner.getFlowFilesForRelationship(TestXMLReaderProcessor.SUCCESS);
List<String> records = Arrays.asList(new String(runner.getContentAsByteArray(flowFile.get(0))).split("\n"));
assertEquals(5, records.size());
assertEquals("MapRecord[{ID=P1, NAME=MapRecord[{CONTENT=Cleve Butler, ATTR=attr content, INNER=inner content}], AGE=42}]", records.get(0));
assertEquals("MapRecord[{ID=P2, NAME=MapRecord[{CONTENT=Ainslie Fletcher, ATTR=attr content, INNER=inner content}], AGE=33}]", records.get(1));
assertEquals("MapRecord[{ID=P3, NAME=MapRecord[{CONTENT=Amélie Bonfils, ATTR=attr content, INNER=inner content}], AGE=74}]", records.get(2));
assertEquals("MapRecord[{ID=P4, NAME=MapRecord[{CONTENT=Elenora Scrivens, ATTR=attr content, INNER=inner content}], AGE=16}]", records.get(3));
assertEquals("MapRecord[{ID=P5, NAME=MapRecord[{INNER=inner content}]}]", records.get(4));
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.xml;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StringUtils;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class TestXMLReaderProcessor extends AbstractProcessor {
static final PropertyDescriptor XML_READER = new PropertyDescriptor.Builder()
.name("xml_reader")
.description("xml_reader")
.identifiesControllerService(XMLReader.class)
.required(true)
.build();
public static final Relationship SUCCESS = new Relationship.Builder().name("success").description("success").build();
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
final RecordReaderFactory readerFactory = context.getProperty(XML_READER).asControllerService(RecordReaderFactory.class);
final List<String> records = new ArrayList<>();
try (final InputStream in = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
Record record;
while ((record = reader.nextRecord()) != null) {
records.add(record.toString());
}
} catch (Exception e) {
e.printStackTrace();
}
flowFile = session.write(flowFile, (out) -> out.write(StringUtils.join(records, "\n").getBytes()));
session.transfer(flowFile, SUCCESS);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return new ArrayList<PropertyDescriptor>() {{ add(XML_READER); }};
}
@Override
public Set<Relationship> getRelationships() {
return new HashSet<Relationship>() {{ add(SUCCESS); }};
}
}

View File

@ -0,0 +1,22 @@
<PEOPLE attr="attr1">
<PERSON ID="P1">
<NAME>Cleve Butler</NAME>
<AGE>42</AGE>
<COUNTRY>USA</COUNTRY>
</PERSON>
<PERSON ID="P2">
<NAME>Ainslie Fletcher</NAME>
<AGE>33</AGE>
<COUNTRY>UK</COUNTRY>
</PERSON>
<PERSON ID="P3">
<NAME>Amélie Bonfils</NAME>
<AGE>74</AGE>
<COUNTRY>FR</COUNTRY>
</PERSON>
<PERSON ID="P4">
<NAME>Elenora Scrivens</NAME>
<AGE>16</AGE>
<COUNTRY>USA</COUNTRY>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,12 @@
<PEOPLE attr="attr1">
<PERSON ID="1">
<NAME>Cleve Butler</NAME>
<AGE>42</AGE>
<COUNTRY>USA</COUNTRY>
</PERSON>
<PERSON ID="2">
<NAME>Ainslie Fletcher</NAME>
<AGE>33</AGE>
<COUNTRY>UK</COUNTRY>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,12 @@
<PEOPLE>
<PERSON>
<NAME ID="name1">Cleve Butler</NAME>
<AGE ID="age1">42</AGE>
<COUNTRY>USA</COUNTRY>
</PERSON>
<PERSON>
<NAME ID="name2">Ainslie Fletcher</NAME>
<AGE ID="age2">33</AGE>
<COUNTRY>UK</COUNTRY>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,37 @@
<PEOPLE attr="attr1">
<PERSON ID="P1">
<NAME>Cleve Butler</NAME>
<AGE>42</AGE>
<COUNTRY>USA</COUNTRY>
<CHILDREN>
<CHILD>child1</CHILD>
<CHILD>child2</CHILD>
</CHILDREN>
</PERSON>
<PERSON ID="P2">
<NAME>Ainslie Fletcher</NAME>
<AGE>33</AGE>
<COUNTRY>UK</COUNTRY>
<CHILDREN>
<CHILD>child1</CHILD>
</CHILDREN>
</PERSON>
<PERSON ID="P3">
<NAME>Amélie Bonfils</NAME>
<AGE>74</AGE>
<COUNTRY>FR</COUNTRY>
<CHILDREN>
<CHILD>child1</CHILD>
<CHILD>child2</CHILD>
<CHILD>child3</CHILD>
<CHILD></CHILD>
</CHILDREN>
</PERSON>
<PERSON ID="P4">
<NAME>Elenora Scrivens</NAME>
<AGE>16</AGE>
<COUNTRY>USA</COUNTRY>
<CHILDREN>
</CHILDREN>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,28 @@
<PEOPLE attr="attr1">
<PERSON ID="P1">
<NAME>Cleve Butler</NAME>
<AGE>42</AGE>
<COUNTRY>USA</COUNTRY>
<CHILD>child1</CHILD>
<CHILD>child2</CHILD>
</PERSON>
<PERSON ID="P2">
<NAME>Ainslie Fletcher</NAME>
<AGE>33</AGE>
<COUNTRY>UK</COUNTRY>
<CHILD>child1</CHILD>
</PERSON>
<PERSON ID="P3">
<NAME>Amélie Bonfils</NAME>
<AGE>74</AGE>
<COUNTRY>FR</COUNTRY>
<CHILD>child1</CHILD>
<CHILD>child2</CHILD>
<CHILD>child3</CHILD>
</PERSON>
<PERSON ID="P4">
<NAME>Elenora Scrivens</NAME>
<AGE>16</AGE>
<COUNTRY>USA</COUNTRY>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,22 @@
<PEOPLE>
<PERSON>
<NAME><![CDATA[Cleve Butler]]></NAME>
<AGE><![CDATA[42]]></AGE>
<COUNTRY><![CDATA[USA]]></COUNTRY>
</PERSON>
<PERSON>
<NAME><![CDATA[Ainslie Fletcher]]></NAME>
<AGE><![CDATA[33]]></AGE>
<COUNTRY><![CDATA[UK]]></COUNTRY>
</PERSON>
<PERSON>
<NAME><![CDATA[Amélie Bonfils]]></NAME>
<AGE><![CDATA[74]]></AGE>
<COUNTRY><![CDATA[FR]]></COUNTRY>
</PERSON>
<PERSON>
<NAME><![CDATA[Elenora Scrivens]]></NAME>
<AGE><![CDATA[16]]></AGE>
<COUNTRY><![CDATA[USA]]></COUNTRY>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,33 @@
<PEOPLE>
<PERSON ID="1">
<CHILDREN ID="first husband">
<CHILD ID="1-1-1">
<NAME>Selina</NAME>
<ROLE>daughter</ROLE>
</CHILD>
<CHILD ID="1-1-2">
<NAME>Hans</NAME>
<ROLE>son</ROLE>
</CHILD>
</CHILDREN>
<CHILDREN ID="second husband">
<CHILD ID="1-2-1">
<NAME>Selina2</NAME>
<ROLE>daughter</ROLE>
</CHILD>
<CHILD ID="1-2-2">
<NAME>Hans2</NAME>
<ROLE>son</ROLE>
</CHILD>
</CHILDREN>
</PERSON>
<PERSON ID="2">
<NAME>Tom</NAME>
<CHILDREN ID="first wife">
<CHILD ID="2-1-1">
<NAME>Selina3</NAME>
<ROLE>daughter</ROLE>
</CHILD>
</CHILDREN>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,73 @@
<PEOPLE>
<PERSON ID="1">
<NAME>Lisa</NAME>
<ROLE>grandmother</ROLE>
<CHILDREN SPOUSE="husband1">
<CHILD ID="1-1">
<NAME>Anna</NAME>
<ROLE>mother</ROLE>
<CHILDREN ID="first husband">
<CHILD ID="1-1-1">
<NAME>Selina</NAME>
<ROLE>daughter</ROLE>
</CHILD>
<CHILD ID="1-1-2">
<NAME>Hans</NAME>
<ROLE>son</ROLE>
</CHILD>
</CHILDREN>
</CHILD>
<CHILD ID="1-2">
<NAME>Catrina</NAME>
<ROLE>mother</ROLE>
<CHILDREN ID="first husband">
<CHILD ID="1-2-1">
<NAME>Maria</NAME>
<ROLE>daughter</ROLE>
</CHILD>
</CHILDREN>
<CHILDREN ID="second husband">
<CHILD ID="1-2-2">
<NAME>Sarah</NAME>
<ROLE>daughter</ROLE>
</CHILD>
<CHILD ID="1-2-3">
<NAME>Jacob</NAME>
<ROLE>son</ROLE>
</CHILD>
</CHILDREN>
</CHILD>
</CHILDREN>
<CHILDREN SPOUSE="husband2">
<CHILD ID="1-3">
<NAME>Anna2</NAME>
<ROLE>mother</ROLE>
<CHILDREN ID="first husband">
<CHILD ID="1-3-1">
<NAME>Selina2</NAME>
<ROLE>daughter</ROLE>
</CHILD>
<CHILD ID="1-3-2">
<NAME>Hans2</NAME>
<ROLE>son</ROLE>
</CHILD>
</CHILDREN>
</CHILD>
</CHILDREN>
</PERSON>
<PERSON ID="2">
<NAME>Tom</NAME>
<CHILDREN SPOUSE="wife1">
<CHILD ID="2-1">
<NAME>Anna3</NAME>
<ROLE>father</ROLE>
<CHILDREN ID="first wife">
<CHILD ID="2-1-1">
<NAME>Selina3</NAME>
<ROLE>daughter</ROLE>
</CHILD>
</CHILDREN>
</CHILD>
</CHILDREN>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,12 @@
<PEOPLE>
<PERSON ID="">
<NAME></NAME>
<AGE></AGE>
<COUNTRY></COUNTRY>
</PERSON>
<PERSON>
<NAME/>
<AGE/>
<COUNTRY/>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,21 @@
<PEOPLE attr="attr1">
<PERSON ID="P1">
<NAME>Cleve Butler</NAME>
<AGE>42</AGE>
<COUNTRY>USA</COUNTRY>
</PERSON>
<PERSON ID="P2">
<NAME>Ainslie Fletcher</NAME>
<AGE>33</AGE>
<COUNTRY>UK</COUNTRY>
</PERSON>
<NAME>Amélie Bonfils</NAME>
<AGE>74</AGE>
<COUNTRY>FR</COUNTRY>
</PERSON>
<PERSON ID="P4">
<NAME>Elenora Scrivens</NAME>
<AGE>16</AGE>
<COUNTRY>USA</COUNTRY>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,18 @@
<PEOPLE>
<PERSON>
<ID>P1</ID>
<MAP>
<NAME>Cleve Butler</NAME>
<AGE>42</AGE>
<COUNTRY>USA</COUNTRY>
</MAP>
</PERSON>
<PERSON>
<ID>P2</ID>
<MAP>
<NAME>Ainslie Fletcher</NAME>
<AGE>33</AGE>
<COUNTRY>UK</COUNTRY>
</MAP>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,32 @@
<PEOPLE>
<PERSON>
<ID>P1</ID>
<MAP>
<ENTRY>
<NAME>Cleve Butler</NAME>
<AGE>42</AGE>
<COUNTRY>USA</COUNTRY>
</ENTRY>
<ENTRY2>
<NAME>Ainslie Fletcher</NAME>
<AGE>33</AGE>
<COUNTRY>UK</COUNTRY>
</ENTRY2>
</MAP>
</PERSON>
<PERSON>
<ID>P2</ID>
<MAP>
<ENTRY>
<NAME>Amélie Bonfils</NAME>
<AGE>74</AGE>
<COUNTRY>FR</COUNTRY>
</ENTRY>
<ENTRY2>
<NAME>Elenora Scrivens</NAME>
<AGE>16</AGE>
<COUNTRY>USA</COUNTRY>
</ENTRY2>
</MAP>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,22 @@
<PEOPLE xmlns:F="http://www.nifi-testfile.org">
<PERSON>
<F:NAME>Cleve Butler</F:NAME>
<F:AGE>42</F:AGE>
<F:COUNTRY>USA</F:COUNTRY>
</PERSON>
<PERSON>
<F:NAME>Ainslie Fletcher</F:NAME>
<F:AGE>33</F:AGE>
<F:COUNTRY>UK</F:COUNTRY>
</PERSON>
<PERSON>
<F:NAME>Amélie Bonfils</F:NAME>
<F:AGE>74</F:AGE>
<F:COUNTRY>FR</F:COUNTRY>
</PERSON>
<PERSON>
<F:NAME>Elenora Scrivens</F:NAME>
<F:AGE>16</F:AGE>
<F:COUNTRY>USA</F:COUNTRY>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,38 @@
<PEOPLE attr="attr1">
<PERSON ID="P1">
<NAME>Cleve Butler</NAME>
<AGE>42</AGE>
<COUNTRY>USA</COUNTRY>
<ADDRESS>
<STREET>292 West Street</STREET>
<CITY>Jersey City</CITY>
</ADDRESS>
</PERSON>
<PERSON ID="P2">
<NAME>Ainslie Fletcher</NAME>
<AGE>33</AGE>
<COUNTRY>UK</COUNTRY>
<ADDRESS>
<STREET>123 6th St.</STREET>
<CITY>Seattle</CITY>
</ADDRESS>
</PERSON>
<PERSON ID="P3">
<NAME>Amélie Bonfils</NAME>
<AGE>74</AGE>
<COUNTRY>FR</COUNTRY>
<ADDRESS>
<STREET>44 Shirley Ave.</STREET>
<CITY>Los Angeles</CITY>
</ADDRESS>
</PERSON>
<PERSON ID="P4">
<NAME>Elenora Scrivens</NAME>
<AGE>16</AGE>
<COUNTRY>USA</COUNTRY>
<ADDRESS>
<STREET>70 Bowman St.</STREET>
<CITY>Columbus</CITY>
</ADDRESS>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,22 @@
<PEOPLE>
<PERSON>
<NAME>Cleve Butler</NAME>
<AGE>42</AGE>
<COUNTRY>USA</COUNTRY>
</PERSON>
<PERSON>
<NAME>Ainslie Fletcher</NAME>
<AGE>33</AGE>
<COUNTRY>UK</COUNTRY>
</PERSON>
<PERSON>
<NAME>Amélie Bonfils</NAME>
<AGE>74</AGE>
<COUNTRY>FR</COUNTRY>
</PERSON>
<PERSON>
<NAME>Elenora Scrivens</NAME>
<AGE>16</AGE>
<COUNTRY>USA</COUNTRY>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,23 @@
<PEOPLE attr="attr1">
<PERSON ID="P1">
<NAME ATTR="attr content">Cleve <INNER>inner content</INNER>Butler</NAME>
<AGE>42</AGE>
</PERSON>
<PERSON ID="P2">
<NAME ATTR="attr content">Ainslie <INNER>inner content</INNER>Fletcher</NAME>
<AGE>33</AGE>
</PERSON>
<PERSON ID="P3">
<NAME ATTR="attr content">Amélie <INNER>inner content</INNER>Bonfils</NAME>
<AGE>74</AGE>
</PERSON>
<PERSON ID="P4">
<NAME ATTR="attr content">Elenora <INNER>inner content</INNER>Scrivens</NAME>
<AGE>16</AGE>
</PERSON>
<PERSON ID="P5">
<NAME><INNER>inner content</INNER></NAME>
<AGE></AGE>
<COUNTRY></COUNTRY>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="utf-8"?>
<PEOPLE attr="attr1">
<!-- something -->
<PERSON ID="P1">
<NAME>Cleve Butler</NAME>
<AGE>42</AGE>
<COUNTRY>USA</COUNTRY>
<!-- something -->
</PERSON>
<!-- something -->
<PERSON ID="P2">
<NAME>Ainslie Fletcher</NAME>
<AGE>33</AGE>
<COUNTRY>UK</COUNTRY>
</PERSON>
<PERSON ID="P3">
<!-- something -->
<NAME>Amélie Bonfils</NAME>
<AGE>74</AGE>
<COUNTRY>FR</COUNTRY>
</PERSON>
<PERSON ID="P4">
<NAME>Elenora Scrivens</NAME>
<AGE>16</AGE>
<COUNTRY>USA</COUNTRY>
</PERSON>
</PEOPLE>

View File

@ -0,0 +1,5 @@
<PERSON ID="P1">
<NAME>Cleve Butler</NAME>
<AGE>42</AGE>
<COUNTRY>USA</COUNTRY>
</PERSON>

View File

@ -0,0 +1,11 @@
{
"namespace": "nifi",
"name": "test",
"type": "record",
"fields": [
{ "name": "ID", "type": "string" },
{ "name": "NAME", "type": "string" },
{ "name": "AGE", "type": "int" },
{ "name": "COUNTRY", "type": "string" }
]
}

View File

@ -0,0 +1,19 @@
{
"namespace": "nifi",
"name": "test",
"type": "record",
"fields": [
{ "name": "ID", "type": "string" },
{ "name": "NAME", "type": {
"type": "record",
"name": "nested",
"fields": [
{ "name": "ATTR", "type": "string" },
{ "name": "INNER", "type": "string" },
{ "name": "CONTENT", "type": "string" }
]
}
},
{ "name": "AGE", "type": "int" }
]
}