diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index b0ba446484..1a98ee3006 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
@@ -51,7 +52,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Continually runs a processor as long as the processor has work to do. {@link #call()} will return true if the processor should be yielded, false otherwise.
+ * Continually runs a {@link Connectable} component as long as the component has work to do.
+ * {@link #invoke()} ()} will return {@link InvocationResult} telling if the component should be yielded.
*/
public class ConnectableTask {
@@ -64,7 +66,6 @@ public class ConnectableTask {
private final ProcessContext processContext;
private final FlowController flowController;
private final int numRelationships;
- private final boolean hasNonLoopConnection;
public ConnectableTask(final SchedulingAgent schedulingAgent, final Connectable connectable,
@@ -76,7 +77,6 @@ public class ConnectableTask {
this.scheduleState = scheduleState;
this.numRelationships = connectable.getRelationships().size();
this.flowController = flowController;
- this.hasNonLoopConnection = Connectables.hasNonLoopConnection(connectable);
final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
if (connectable instanceof ProcessorNode) {
@@ -103,8 +103,40 @@ public class ConnectableTask {
return connectable.getYieldExpiration() > System.currentTimeMillis();
}
+ /**
+ * Make sure processor has work to do. This means that it meets one of these criteria:
+ *
+ *
It is a Funnel and has incoming FlowFiles from other components, and and at least one outgoing connection.
+ *
It is a 'source' component, meaning:
+ *
It is annotated with @TriggerWhenEmpty
+ *
It has no incoming connections
+ *
All incoming connections are self-loops
+ *
+ *
It has data in incoming connections to process
+ *
+ * @return true if there is work to do, otherwise false
+ */
private boolean isWorkToDo() {
- return connectable.isTriggerWhenEmpty() || !connectable.hasIncomingConnection() || !hasNonLoopConnection || Connectables.flowFilesQueued(connectable);
+ boolean hasNonLoopConnection = Connectables.hasNonLoopConnection(connectable);
+
+ if (connectable.getConnectableType() == ConnectableType.FUNNEL) {
+ // Handle Funnel as a special case because it will never be a 'source' component,
+ // and also its outgoing connections can not be terminated.
+ // Incoming FlowFiles from other components, and at least one outgoing connection are required.
+ return connectable.hasIncomingConnection()
+ && hasNonLoopConnection
+ && !connectable.getConnections().isEmpty()
+ && Connectables.flowFilesQueued(connectable);
+ }
+
+ final boolean isSourceComponent = connectable.isTriggerWhenEmpty()
+ // No input connections
+ || !connectable.hasIncomingConnection()
+ // Every incoming connection loops back to itself, no inputs from other components
+ || !hasNonLoopConnection;
+
+ // If it is not a 'source' component, it requires a FlowFile to process.
+ return isSourceComponent || Connectables.flowFilesQueued(connectable);
}
private boolean isBackPressureEngaged() {
@@ -129,11 +161,7 @@ public class ConnectableTask {
return InvocationResult.DO_NOT_YIELD;
}
- // Make sure processor has work to do. This means that it meets one of these criteria:
- // * It is annotated with @TriggerWhenEmpty
- // * It has data in an incoming Connection
- // * It has no incoming connections
- // * All incoming connections are self-loops
+ // Make sure processor has work to do.
if (!isWorkToDo()) {
return InvocationResult.yield("No work to do");
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
index 3ff95800c7..7214b8079e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
@@ -21,12 +21,17 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -41,15 +46,9 @@ import org.junit.Test;
import org.mockito.Mockito;
public class TestConnectableTask {
- @Test
- public void testIsWorkToDo() {
- final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
- Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
- final Processor processor = Mockito.mock(Processor.class);
- Mockito.when(procNode.getIdentifier()).thenReturn("123");
- Mockito.when(procNode.getRunnableComponent()).thenReturn(processor);
+ private ConnectableTask createTask(final Connectable connectable) {
final FlowController flowController = Mockito.mock(FlowController.class);
Mockito.when(flowController.getStateManagerProvider()).thenReturn(Mockito.mock(StateManagerProvider.class));
@@ -61,9 +60,22 @@ public class TestConnectableTask {
final LifecycleState scheduleState = new LifecycleState();
final StringEncryptor encryptor = Mockito.mock(StringEncryptor.class);
- ConnectableTask task = new ConnectableTask(Mockito.mock(SchedulingAgent.class), procNode, flowController, contextFactory, scheduleState, encryptor);
+
+ return new ConnectableTask(Mockito.mock(SchedulingAgent.class), connectable,
+ flowController, contextFactory, scheduleState, encryptor);
+ }
+
+ @Test
+ public void testIsWorkToDo() {
+ final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
+ Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
+
+ final Processor processor = Mockito.mock(Processor.class);
+ Mockito.when(procNode.getIdentifier()).thenReturn("123");
+ Mockito.when(procNode.getRunnableComponent()).thenReturn(processor);
// There is work to do because there are no incoming connections.
+ final ConnectableTask task = createTask(procNode);
assertFalse(task.invoke().isYield());
// Test with only a single connection that is self-looping and empty
@@ -93,8 +105,6 @@ public class TestConnectableTask {
when(emptyConnection.getFlowFileQueue()).thenReturn(flowFileQueue);
when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(emptyConnection));
- // Create a new ConnectableTask because we want to have a different value for the 'hasNonLoopConnection' value, which is calculated in the task's constructor.
- task = new ConnectableTask(Mockito.mock(SchedulingAgent.class), procNode, flowController, contextFactory, scheduleState, encryptor);
assertTrue(task.invoke().isYield());
// test when the queue has data
@@ -106,4 +116,67 @@ public class TestConnectableTask {
assertFalse(task.invoke().isYield());
}
+ @Test
+ public void testIsWorkToDoFunnels() {
+ final Funnel funnel = Mockito.mock(Funnel.class);
+ Mockito.when(funnel.hasIncomingConnection()).thenReturn(false);
+ Mockito.when(funnel.getRunnableComponent()).thenReturn(funnel);
+ Mockito.when(funnel.getConnectableType()).thenReturn(ConnectableType.FUNNEL);
+ Mockito.when(funnel.getIdentifier()).thenReturn("funnel-1");
+
+ final ConnectableTask task = createTask(funnel);
+ assertTrue("If there is no incoming connection, it should be yielded.", task.invoke().isYield());
+
+ // Test with only a single connection that is self-looping and empty.
+ // Actually, this self-loop input can not be created for Funnels using NiFi API because an outer layer check condition does not allow it.
+ // But test it anyways.
+ final Connection selfLoopingConnection = Mockito.mock(Connection.class);
+ when(selfLoopingConnection.getSource()).thenReturn(funnel);
+ when(selfLoopingConnection.getDestination()).thenReturn(funnel);
+
+ when(funnel.hasIncomingConnection()).thenReturn(true);
+ when(funnel.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection));
+
+ final FlowFileQueue emptyQueue = Mockito.mock(FlowFileQueue.class);
+ when(emptyQueue.isActiveQueueEmpty()).thenReturn(true);
+ when(selfLoopingConnection.getFlowFileQueue()).thenReturn(emptyQueue);
+
+ final Set outgoingConnections = new HashSet<>();
+ outgoingConnections.add(selfLoopingConnection);
+ when(funnel.getConnections()).thenReturn(outgoingConnections);
+
+ assertTrue("If there is no incoming connection from other components, it should be yielded.", task.invoke().isYield());
+
+ // Add an incoming connection from another component.
+ final ProcessorNode inputProcessor = Mockito.mock(ProcessorNode.class);
+ final Connection incomingFromAnotherComponent = Mockito.mock(Connection.class);
+ when(incomingFromAnotherComponent.getSource()).thenReturn(inputProcessor);
+ when(incomingFromAnotherComponent.getDestination()).thenReturn(funnel);
+ when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(emptyQueue);
+
+ when(funnel.hasIncomingConnection()).thenReturn(true);
+ when(funnel.getIncomingConnections()).thenReturn(Arrays.asList(selfLoopingConnection, incomingFromAnotherComponent));
+
+ assertTrue("Even if there is an incoming connection from another component," +
+ " it should be yielded because there's no outgoing connections.", task.invoke().isYield());
+
+ // Add an outgoing connection to another component.
+ final ProcessorNode outputProcessor = Mockito.mock(ProcessorNode.class);
+ final Connection outgoingToAnotherComponent = Mockito.mock(Connection.class);
+ when(outgoingToAnotherComponent.getSource()).thenReturn(funnel);
+ when(outgoingToAnotherComponent.getDestination()).thenReturn(outputProcessor);
+ outgoingConnections.add(outgoingToAnotherComponent);
+
+ assertTrue("Even if there is an incoming connection from another component and an outgoing connection as well," +
+ " it should be yielded because there's no incoming FlowFiles to process.", task.invoke().isYield());
+
+ // Adding input FlowFiles.
+ final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
+ when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
+ when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(nonEmptyQueue);
+ assertFalse("When a Funnel has both incoming and outgoing connections and FlowFiles to process, then it should be executed.",
+ task.invoke().isYield());
+
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
old mode 100644
new mode 100755
index 4c958f5623..4d47701ff6
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -122,6 +122,26 @@
src/test/resources/json/single-element-nested-array.jsonsrc/test/resources/json/single-element-nested.jsonsrc/test/resources/json/output/dataTypes.json
+ src/test/resources/xml/people.xml
+ src/test/resources/xml/people2.xml
+ src/test/resources/xml/people3.xml
+ src/test/resources/xml/people_array.xml
+ src/test/resources/xml/people_array_simple.xml
+ src/test/resources/xml/people_cdata.xml
+ src/test/resources/xml/people_complex1.xml
+ src/test/resources/xml/people_complex2.xml
+ src/test/resources/xml/people_empty.xml
+ src/test/resources/xml/people_invalid.xml
+ src/test/resources/xml/people_map.xml
+ src/test/resources/xml/people_map2.xml
+ src/test/resources/xml/people_namespace.xml
+ src/test/resources/xml/people_nested.xml
+ src/test/resources/xml/people_no_attributes.xml
+ src/test/resources/xml/people_tag_in_characters.xml
+ src/test/resources/xml/people_with_header_and_comments.xml
+ src/test/resources/xml/person.xml
+ src/test/resources/xml/testschema
+ src/test/resources/xml/testschema2
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java
new file mode 100755
index 0000000000..d8216df480
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.xml;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.DateTimeUtils;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryService;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"xml", "record", "reader", "parser"})
+@CapabilityDescription("Reads XML content and creates Record objects. Records are expected in the second level of " +
+ "XML data, embedded in an enclosing root tag.")
+public class XMLReader extends SchemaRegistryService implements RecordReaderFactory {
+
+ public static final AllowableValue RECORD_SINGLE = new AllowableValue("false", "false",
+ "Each FlowFile will consist of a single record without any sort of \"wrapper\".");
+ public static final AllowableValue RECORD_ARRAY = new AllowableValue("true", "true",
+ "Each FlowFile will consist of zero or more records. The outer-most XML element is expected to be a \"wrapper\" and will be ignored.");
+ public static final AllowableValue RECORD_EVALUATE = new AllowableValue("${xml.stream.is.array}", "Use attribute 'xml.stream.is.array'",
+ "Whether to treat a FlowFile as a single Record or an array of multiple Records is determined by the value of the 'xml.stream.is.array' attribute. "
+ + "If the value of the attribute is 'true' (case-insensitive), then the XML Reader will treat the FlowFile as a series of Records with the outer element being ignored. "
+ + "If the value of the attribute is 'false' (case-insensitive), then the FlowFile is treated as a single Record and no wrapper element is assumed. "
+ + "If the attribute is missing or its value is anything other than 'true' or 'false', then an Exception will be thrown and no records will be parsed.");
+
+ public static final PropertyDescriptor RECORD_FORMAT = new PropertyDescriptor.Builder()
+ .name("record_format")
+ .displayName("Expect Records as Array")
+ .description("This property defines whether the reader expects a FlowFile to consist of a single Record or a series of Records with a \"wrapper element\". Because XML does not "
+ + "provide for a way to read a series of XML documents from a stream directly, it is common to combine many XML documents by concatenating them and then wrapping the entire "
+ + "XML blob with a \"wrapper element\". This property dictates whether the reader expects a FlowFile to consist of a single Record or a series of Records with a \"wrapper element\" "
+ + "that will be ignored.")
+ .allowableValues(RECORD_SINGLE, RECORD_ARRAY, RECORD_EVALUATE)
+ .defaultValue(RECORD_SINGLE.getValue())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor ATTRIBUTE_PREFIX = new PropertyDescriptor.Builder()
+ .name("attribute_prefix")
+ .displayName("Attribute Prefix")
+ .description("If this property is set, the name of attributes will be prepended with a prefix when they are added to a record.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor CONTENT_FIELD_NAME = new PropertyDescriptor.Builder()
+ .name("content_field_name")
+ .displayName("Field Name for Content")
+ .description("If tags with content (e. g. content) are defined as nested records in the schema, " +
+ "the name of the tag will be used as name for the record and the value of this property will be used as name for the field. " +
+ "If tags with content shall be parsed together with attributes (e. g. content), " +
+ "they have to be defined as records. For additional information, see the section of processor usage.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ private volatile String dateFormat;
+ private volatile String timeFormat;
+ private volatile String timestampFormat;
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
+ this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
+ this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ properties.add(RECORD_FORMAT);
+ properties.add(ATTRIBUTE_PREFIX);
+ properties.add(CONTENT_FIELD_NAME);
+ properties.add(DateTimeUtils.DATE_FORMAT);
+ properties.add(DateTimeUtils.TIME_FORMAT);
+ properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
+ return properties;
+ }
+
+ @Override
+ public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger)
+ throws IOException, SchemaNotFoundException, MalformedRecordException {
+ final ConfigurationContext context = getConfigurationContext();
+
+ final RecordSchema schema = getSchema(variables, in, null);
+
+ final String attributePrefix = context.getProperty(ATTRIBUTE_PREFIX).isSet()
+ ? context.getProperty(ATTRIBUTE_PREFIX).evaluateAttributeExpressions(variables).getValue().trim() : null;
+
+ final String contentFieldName = context.getProperty(CONTENT_FIELD_NAME).isSet()
+ ? context.getProperty(CONTENT_FIELD_NAME).evaluateAttributeExpressions(variables).getValue().trim() : null;
+
+ final boolean isArray;
+ final String recordFormat = context.getProperty(RECORD_FORMAT).evaluateAttributeExpressions(variables).getValue().trim();
+ if ("true".equalsIgnoreCase(recordFormat)) {
+ isArray = true;
+ } else if ("false".equalsIgnoreCase(recordFormat)) {
+ isArray = false;
+ } else {
+ throw new IOException("Cannot parse XML Records because the '" + RECORD_FORMAT.getDisplayName() + "' property evaluates to '"
+ + recordFormat + "', which is neither 'true' nor 'false'");
+ }
+
+ return new XMLRecordReader(in, schema, isArray, attributePrefix, contentFieldName, dateFormat, timeFormat, timestampFormat, logger);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
new file mode 100755
index 0000000000..e819b92dea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordReader.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.xml;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StringUtils;
+
+import javax.xml.stream.XMLEventReader;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.events.Attribute;
+import javax.xml.stream.events.Characters;
+import javax.xml.stream.events.StartElement;
+import javax.xml.stream.events.XMLEvent;
+import java.io.IOException;
+import java.io.InputStream;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+public class XMLRecordReader implements RecordReader {
+
+ private final ComponentLog logger;
+ private final RecordSchema schema;
+ private final String attributePrefix;
+ private final String contentFieldName;
+
+ private StartElement currentRecordStartTag;
+
+ private final XMLEventReader xmlEventReader;
+
+ private final Supplier LAZY_DATE_FORMAT;
+ private final Supplier LAZY_TIME_FORMAT;
+ private final Supplier LAZY_TIMESTAMP_FORMAT;
+
+ public XMLRecordReader(InputStream in, RecordSchema schema, boolean isArray, String attributePrefix, String contentFieldName,
+ final String dateFormat, final String timeFormat, final String timestampFormat, final ComponentLog logger) throws MalformedRecordException {
+ this.schema = schema;
+ this.attributePrefix = attributePrefix;
+ this.contentFieldName = contentFieldName;
+ this.logger = logger;
+
+ final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
+ final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
+ final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
+
+ LAZY_DATE_FORMAT = () -> df;
+ LAZY_TIME_FORMAT = () -> tf;
+ LAZY_TIMESTAMP_FORMAT = () -> tsf;
+
+ try {
+ final XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
+
+ xmlEventReader = xmlInputFactory.createXMLEventReader(in);
+
+ if (isArray) {
+ skipNextStartTag();
+ }
+
+ setNextRecordStartTag();
+ } catch (XMLStreamException e) {
+ throw new MalformedRecordException("Could not parse XML", e);
+ }
+ }
+
+ private void skipNextStartTag() throws XMLStreamException {
+ while (xmlEventReader.hasNext()) {
+ final XMLEvent xmlEvent = xmlEventReader.nextEvent();
+ if (xmlEvent.isStartElement()) {
+ return;
+ }
+ }
+ }
+
+ private void setNextRecordStartTag() throws XMLStreamException {
+ while (xmlEventReader.hasNext()) {
+ final XMLEvent xmlEvent = xmlEventReader.nextEvent();
+ if (xmlEvent.isStartElement()) {
+ final StartElement startElement = xmlEvent.asStartElement();
+ currentRecordStartTag = startElement;
+ return;
+ }
+ }
+ currentRecordStartTag = null;
+ }
+
+ @Override
+ public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
+ if (currentRecordStartTag == null) {
+ return null;
+ }
+ try {
+ final Record record = parseRecord(currentRecordStartTag, this.schema, coerceTypes, dropUnknownFields);
+ setNextRecordStartTag();
+ if (record != null) {
+ return record;
+ } else {
+ return new MapRecord(this.schema, Collections.EMPTY_MAP);
+ }
+ } catch (XMLStreamException e) {
+ throw new MalformedRecordException("Could not parse XML", e);
+ }
+ }
+
+ private Object parseFieldForType(StartElement startElement, String fieldName, DataType dataType, Map recordValues,
+ boolean dropUnknown) throws XMLStreamException, MalformedRecordException {
+ switch (dataType.getFieldType()) {
+ case BOOLEAN:
+ case BYTE:
+ case CHAR:
+ case DOUBLE:
+ case FLOAT:
+ case INT:
+ case LONG:
+ case SHORT:
+ case STRING:
+ case DATE:
+ case TIME:
+ case TIMESTAMP: {
+
+ StringBuilder content = new StringBuilder();
+
+ while (xmlEventReader.hasNext()) {
+ XMLEvent xmlEvent = xmlEventReader.nextEvent();
+ if (xmlEvent.isCharacters()) {
+ final Characters characters = xmlEvent.asCharacters();
+ if (!characters.isWhiteSpace()) {
+ content.append(characters.getData());
+ }
+ } else if (xmlEvent.isEndElement()) {
+ final String contentToReturn = content.toString();
+
+ if (!StringUtils.isBlank(contentToReturn)) {
+ return DataTypeUtils.convertType(content.toString(), dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+ } else {
+ return null;
+ }
+
+ } else if (xmlEvent.isStartElement()) {
+ this.skipElement();
+ }
+ }
+ break;
+ }
+
+ case ARRAY: {
+ final DataType arrayDataType = ((ArrayDataType) dataType).getElementType();
+
+ final Object newValue = parseFieldForType(startElement, fieldName, arrayDataType, recordValues, dropUnknown);
+ final Object oldValues = recordValues.get(fieldName);
+
+ if (newValue != null) {
+ if (oldValues != null) {
+ if (oldValues instanceof List) {
+ ((List) oldValues).add(newValue);
+ } else {
+ List