mirror of https://github.com/apache/nifi.git
NIFI-5113: Add XMLRecordSetWriter
This closes #2675. Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
620d446860
commit
2c8c9374af
|
@ -86,12 +86,29 @@
|
|||
<artifactId>avro</artifactId>
|
||||
<version>1.8.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>net.java.dev.stax-utils</groupId>
|
||||
<artifactId>stax-utils</artifactId>
|
||||
<version>20070216</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.bea.xml</groupId>
|
||||
<artifactId>jsr173-ri</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<version>1.7.0-SNAPSHOT</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.xmlunit</groupId>
|
||||
<artifactId>xmlunit-matchers</artifactId>
|
||||
<version>2.2.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
|
@ -148,6 +165,7 @@
|
|||
<exclude>src/test/resources/xml/person.xml</exclude>
|
||||
<exclude>src/test/resources/xml/testschema</exclude>
|
||||
<exclude>src/test/resources/xml/testschema2</exclude>
|
||||
<exclude>src/test/resources/xml/testschema3</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.record.NullSuppression;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.nifi.record.NullSuppression;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.schema.access.SchemaAccessWriter;
|
||||
import org.apache.nifi.serialization.AbstractRecordSetWriter;
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.json;
|
||||
package org.apache.nifi.record;
|
||||
|
||||
public enum NullSuppression {
|
||||
ALWAYS_SUPPRESS,
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.xml;
|
||||
|
||||
public enum ArrayWrapping {
|
||||
USE_PROPERTY_AS_WRAPPER,
|
||||
USE_PROPERTY_FOR_ELEMENTS,
|
||||
NO_WRAPPING
|
||||
}
|
|
@ -0,0 +1,653 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.xml;
|
||||
|
||||
import javanet.staxutils.IndentingXMLStreamWriter;
|
||||
import org.apache.nifi.record.NullSuppression;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.schema.access.SchemaAccessWriter;
|
||||
import org.apache.nifi.serialization.AbstractRecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.RawRecordWriter;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.type.ArrayDataType;
|
||||
import org.apache.nifi.serialization.record.type.ChoiceDataType;
|
||||
import org.apache.nifi.serialization.record.type.MapDataType;
|
||||
import org.apache.nifi.serialization.record.type.RecordDataType;
|
||||
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
|
||||
import javax.xml.stream.XMLOutputFactory;
|
||||
import javax.xml.stream.XMLStreamException;
|
||||
import javax.xml.stream.XMLStreamWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.text.DateFormat;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.apache.nifi.xml.XMLRecordSetWriter.RECORD_TAG_NAME;
|
||||
import static org.apache.nifi.xml.XMLRecordSetWriter.ROOT_TAG_NAME;
|
||||
|
||||
|
||||
public class WriteXMLResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter {
|
||||
|
||||
private final ComponentLog logger;
|
||||
private final RecordSchema recordSchema;
|
||||
private final SchemaAccessWriter schemaAccess;
|
||||
private final XMLStreamWriter writer;
|
||||
private final NullSuppression nullSuppression;
|
||||
private final ArrayWrapping arrayWrapping;
|
||||
private final String arrayTagName;
|
||||
private final String recordTagName;
|
||||
private final String rootTagName;
|
||||
private final String charSet;
|
||||
private final boolean allowWritingMultipleRecords;
|
||||
private boolean hasWrittenRecord;
|
||||
|
||||
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
|
||||
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
|
||||
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
|
||||
|
||||
public WriteXMLResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
|
||||
final NullSuppression nullSuppression, final ArrayWrapping arrayWrapping, final String arrayTagName, final String rootTagName, final String recordTagName,
|
||||
final String charSet, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
|
||||
|
||||
super(out);
|
||||
|
||||
this.logger = logger;
|
||||
this.recordSchema = recordSchema;
|
||||
this.schemaAccess = schemaAccess;
|
||||
this.nullSuppression = nullSuppression;
|
||||
|
||||
this.arrayWrapping = arrayWrapping;
|
||||
this.arrayTagName = arrayTagName;
|
||||
|
||||
this.rootTagName = rootTagName;
|
||||
|
||||
if (recordTagName != null) {
|
||||
this.recordTagName = recordTagName;
|
||||
} else {
|
||||
Optional<String> recordTagNameOptional = recordSchema.getIdentifier().getName();
|
||||
if (recordTagNameOptional.isPresent()) {
|
||||
this.recordTagName = recordTagNameOptional.get();
|
||||
} else {
|
||||
StringBuilder message = new StringBuilder();
|
||||
message.append("The property \'")
|
||||
.append(RECORD_TAG_NAME.getDisplayName())
|
||||
.append("\' has not been set and the writer does not find a record name in the schema.");
|
||||
throw new IOException(message.toString());
|
||||
}
|
||||
}
|
||||
|
||||
this.allowWritingMultipleRecords = !(this.rootTagName == null);
|
||||
|
||||
this.charSet = charSet;
|
||||
|
||||
hasWrittenRecord = false;
|
||||
|
||||
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
|
||||
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
|
||||
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
|
||||
|
||||
LAZY_DATE_FORMAT = () -> df;
|
||||
LAZY_TIME_FORMAT = () -> tf;
|
||||
LAZY_TIMESTAMP_FORMAT = () -> tsf;
|
||||
|
||||
try {
|
||||
XMLOutputFactory factory = XMLOutputFactory.newInstance();
|
||||
|
||||
if (prettyPrint) {
|
||||
writer = new IndentingXMLStreamWriter(factory.createXMLStreamWriter(out, charSet));
|
||||
} else {
|
||||
writer = factory.createXMLStreamWriter(out, charSet);
|
||||
}
|
||||
|
||||
} catch (XMLStreamException e) {
|
||||
throw new IOException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onBeginRecordSet() throws IOException {
|
||||
|
||||
final OutputStream out = getOutputStream();
|
||||
schemaAccess.writeHeader(recordSchema, out);
|
||||
|
||||
try {
|
||||
writer.writeStartDocument();
|
||||
|
||||
if (allowWritingMultipleRecords) {
|
||||
writer.writeStartElement(rootTagName);
|
||||
}
|
||||
|
||||
} catch (XMLStreamException e) {
|
||||
throw new IOException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, String> onFinishRecordSet() throws IOException {
|
||||
|
||||
try {
|
||||
if (allowWritingMultipleRecords) {
|
||||
writer.writeEndElement();
|
||||
}
|
||||
|
||||
writer.writeEndDocument();
|
||||
} catch (XMLStreamException e) {
|
||||
throw new IOException(e.getMessage());
|
||||
}
|
||||
return schemaAccess.getAttributes(recordSchema);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
try {
|
||||
writer.close();
|
||||
|
||||
} catch (XMLStreamException e) {
|
||||
throw new IOException(e.getMessage());
|
||||
}
|
||||
|
||||
super.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
|
||||
try {
|
||||
writer.flush();
|
||||
|
||||
} catch (XMLStreamException e) {
|
||||
throw new IOException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void checkWritingMultipleRecords() throws IOException {
|
||||
if (!allowWritingMultipleRecords && hasWrittenRecord) {
|
||||
StringBuilder message = new StringBuilder();
|
||||
message.append("The writer attempts to write multiple record although property \'")
|
||||
.append(ROOT_TAG_NAME.getDisplayName())
|
||||
.append("\' has not been set. If the XMLRecordSetWriter is supposed to write multiple records into one ")
|
||||
.append("FlowFile, this property is required to be configured.");
|
||||
throw new IOException(message.toString()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, String> writeRecord(Record record) throws IOException {
|
||||
|
||||
if (!isActiveRecordSet()) {
|
||||
schemaAccess.writeHeader(recordSchema, getOutputStream());
|
||||
}
|
||||
|
||||
checkWritingMultipleRecords();
|
||||
|
||||
Deque<String> tagsToOpen = new ArrayDeque<>();
|
||||
|
||||
try {
|
||||
tagsToOpen.addLast(recordTagName);
|
||||
|
||||
boolean closingTagRequired = iterateThroughRecordUsingSchema(tagsToOpen, record, recordSchema);
|
||||
if (closingTagRequired) {
|
||||
writer.writeEndElement();
|
||||
hasWrittenRecord = true;
|
||||
}
|
||||
|
||||
} catch (XMLStreamException e) {
|
||||
throw new IOException(e.getMessage());
|
||||
}
|
||||
return schemaAccess.getAttributes(recordSchema);
|
||||
}
|
||||
|
||||
private boolean iterateThroughRecordUsingSchema(Deque<String> tagsToOpen, Record record, RecordSchema schema) throws XMLStreamException {
|
||||
|
||||
boolean loopHasWritten = false;
|
||||
for (RecordField field : schema.getFields()) {
|
||||
|
||||
String fieldName = field.getFieldName();
|
||||
DataType dataType = field.getDataType();
|
||||
Object value = record.getValue(field);
|
||||
|
||||
final DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType) dataType) : dataType;
|
||||
final Object coercedValue = DataTypeUtils.convertType(value, chosenDataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
|
||||
|
||||
if (coercedValue != null) {
|
||||
boolean hasWritten = writeFieldForType(tagsToOpen, coercedValue, chosenDataType, fieldName);
|
||||
if (hasWritten) {
|
||||
loopHasWritten = true;
|
||||
}
|
||||
|
||||
} else {
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING) && recordHasField(field, record)) {
|
||||
writeAllTags(tagsToOpen, fieldName);
|
||||
writer.writeEndElement();
|
||||
loopHasWritten = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return loopHasWritten;
|
||||
}
|
||||
|
||||
private boolean writeFieldForType(Deque<String> tagsToOpen, Object coercedValue, DataType dataType, String fieldName) throws XMLStreamException {
|
||||
switch (dataType.getFieldType()) {
|
||||
case BOOLEAN:
|
||||
case BYTE:
|
||||
case CHAR:
|
||||
case DOUBLE:
|
||||
case FLOAT:
|
||||
case INT:
|
||||
case LONG:
|
||||
case SHORT:
|
||||
case STRING: {
|
||||
writeAllTags(tagsToOpen, fieldName);
|
||||
writer.writeCharacters(coercedValue.toString());
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
}
|
||||
case DATE: {
|
||||
writeAllTags(tagsToOpen, fieldName);
|
||||
final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_DATE_FORMAT);
|
||||
writer.writeCharacters(stringValue);
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
}
|
||||
case TIME: {
|
||||
writeAllTags(tagsToOpen, fieldName);
|
||||
final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_TIME_FORMAT);
|
||||
writer.writeCharacters(stringValue);
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
}
|
||||
case TIMESTAMP: {
|
||||
writeAllTags(tagsToOpen, fieldName);
|
||||
final String stringValue = DataTypeUtils.toString(coercedValue, LAZY_TIMESTAMP_FORMAT);
|
||||
writer.writeCharacters(stringValue);
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
}
|
||||
case RECORD: {
|
||||
final Record record = (Record) coercedValue;
|
||||
final RecordDataType recordDataType = (RecordDataType) dataType;
|
||||
final RecordSchema childSchema = recordDataType.getChildSchema();
|
||||
tagsToOpen.addLast(fieldName);
|
||||
|
||||
boolean hasWritten = iterateThroughRecordUsingSchema(tagsToOpen, record, childSchema);
|
||||
|
||||
if (hasWritten) {
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
|
||||
writeAllTags(tagsToOpen);
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
tagsToOpen.removeLast();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
case ARRAY: {
|
||||
final Object[] arrayValues;
|
||||
if (coercedValue instanceof Object[]) {
|
||||
arrayValues = (Object[]) coercedValue;
|
||||
} else {
|
||||
arrayValues = new Object[]{coercedValue.toString()};
|
||||
}
|
||||
|
||||
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
|
||||
final DataType elementType = arrayDataType.getElementType();
|
||||
|
||||
final String elementName;
|
||||
final String wrapperName;
|
||||
if (arrayWrapping.equals(ArrayWrapping.USE_PROPERTY_FOR_ELEMENTS)) {
|
||||
elementName = arrayTagName;
|
||||
wrapperName = fieldName;
|
||||
} else if (arrayWrapping.equals(ArrayWrapping.USE_PROPERTY_AS_WRAPPER)) {
|
||||
elementName = fieldName;
|
||||
wrapperName = arrayTagName;
|
||||
} else {
|
||||
elementName = fieldName;
|
||||
wrapperName = null;
|
||||
}
|
||||
|
||||
if (wrapperName!= null) {
|
||||
tagsToOpen.addLast(wrapperName);
|
||||
}
|
||||
|
||||
boolean loopHasWritten = false;
|
||||
for (Object element : arrayValues) {
|
||||
|
||||
final DataType chosenDataType = elementType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(element, (ChoiceDataType) elementType) : elementType;
|
||||
final Object coercedElement = DataTypeUtils.convertType(element, chosenDataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, elementName);
|
||||
|
||||
if (coercedElement != null) {
|
||||
boolean hasWritten = writeFieldForType(tagsToOpen, coercedElement, elementType, elementName);
|
||||
|
||||
if (hasWritten) {
|
||||
loopHasWritten = true;
|
||||
}
|
||||
|
||||
} else {
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
|
||||
writeAllTags(tagsToOpen, fieldName);
|
||||
writer.writeEndElement();
|
||||
loopHasWritten = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (wrapperName!= null) {
|
||||
if (loopHasWritten) {
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
|
||||
writeAllTags(tagsToOpen);
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
tagsToOpen.removeLast();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return loopHasWritten;
|
||||
}
|
||||
}
|
||||
case MAP: {
|
||||
final MapDataType mapDataType = (MapDataType) dataType;
|
||||
final DataType valueDataType = mapDataType.getValueType();
|
||||
final Map<String,?> map = (Map<String,?>) coercedValue;
|
||||
|
||||
tagsToOpen.addLast(fieldName);
|
||||
boolean loopHasWritten = false;
|
||||
|
||||
for (Map.Entry<String,?> entry : map.entrySet()) {
|
||||
|
||||
final String key = entry.getKey();
|
||||
|
||||
final DataType chosenDataType = valueDataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(entry.getValue(),
|
||||
(ChoiceDataType) valueDataType) : valueDataType;
|
||||
final Object coercedElement = DataTypeUtils.convertType(entry.getValue(), chosenDataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, key);
|
||||
|
||||
if (coercedElement != null) {
|
||||
boolean hasWritten = writeFieldForType(tagsToOpen, entry.getValue(), valueDataType, key);
|
||||
|
||||
if (hasWritten) {
|
||||
loopHasWritten = true;
|
||||
}
|
||||
} else {
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
|
||||
writeAllTags(tagsToOpen, key);
|
||||
writer.writeEndElement();
|
||||
loopHasWritten = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (loopHasWritten) {
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
|
||||
writeAllTags(tagsToOpen);
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
tagsToOpen.removeLast();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
case CHOICE:
|
||||
default: {
|
||||
return writeUnknownField(tagsToOpen, coercedValue, fieldName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void writeAllTags(Deque<String> tagsToOpen, String fieldName) throws XMLStreamException {
|
||||
tagsToOpen.addLast(fieldName);
|
||||
writeAllTags(tagsToOpen);
|
||||
}
|
||||
|
||||
private void writeAllTags(Deque<String> tagsToOpen) throws XMLStreamException {
|
||||
for (String tagName : tagsToOpen) {
|
||||
writer.writeStartElement(tagName);
|
||||
}
|
||||
tagsToOpen.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteResult writeRawRecord(Record record) throws IOException {
|
||||
|
||||
if (!isActiveRecordSet()) {
|
||||
schemaAccess.writeHeader(recordSchema, getOutputStream());
|
||||
}
|
||||
|
||||
checkWritingMultipleRecords();
|
||||
|
||||
Deque<String> tagsToOpen = new ArrayDeque<>();
|
||||
|
||||
try {
|
||||
tagsToOpen.addLast(recordTagName);
|
||||
|
||||
boolean closingTagRequired = iterateThroughRecordWithoutSchema(tagsToOpen, record);
|
||||
if (closingTagRequired) {
|
||||
writer.writeEndElement();
|
||||
hasWrittenRecord = true;
|
||||
}
|
||||
|
||||
} catch (XMLStreamException e) {
|
||||
throw new IOException(e.getMessage());
|
||||
}
|
||||
|
||||
final Map<String, String> attributes = schemaAccess.getAttributes(recordSchema);
|
||||
return WriteResult.of(incrementRecordCount(), attributes);
|
||||
}
|
||||
|
||||
private boolean iterateThroughRecordWithoutSchema(Deque<String> tagsToOpen, Record record) throws XMLStreamException {
|
||||
|
||||
boolean loopHasWritten = false;
|
||||
|
||||
for (String fieldName : record.getRawFieldNames()) {
|
||||
Object value = record.getValue(fieldName);
|
||||
|
||||
if (value != null) {
|
||||
boolean hasWritten = writeUnknownField(tagsToOpen, value, fieldName);
|
||||
|
||||
if (hasWritten) {
|
||||
loopHasWritten = true;
|
||||
}
|
||||
} else {
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
|
||||
writeAllTags(tagsToOpen, fieldName);
|
||||
writer.writeEndElement();
|
||||
loopHasWritten = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return loopHasWritten;
|
||||
}
|
||||
|
||||
private boolean writeUnknownField(Deque<String> tagsToOpen, Object value, String fieldName) throws XMLStreamException {
|
||||
|
||||
if (value instanceof Record) {
|
||||
Record valueAsRecord = (Record) value;
|
||||
tagsToOpen.addLast(fieldName);
|
||||
|
||||
boolean hasWritten = iterateThroughRecordWithoutSchema(tagsToOpen, valueAsRecord);
|
||||
|
||||
if (hasWritten) {
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
|
||||
writeAllTags(tagsToOpen);
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
tagsToOpen.removeLast();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (value instanceof Object[]) {
|
||||
Object[] valueAsArray = (Object[]) value;
|
||||
|
||||
final String elementName;
|
||||
final String wrapperName;
|
||||
if (arrayWrapping.equals(ArrayWrapping.USE_PROPERTY_FOR_ELEMENTS)) {
|
||||
elementName = arrayTagName;
|
||||
wrapperName = fieldName;
|
||||
} else if (arrayWrapping.equals(ArrayWrapping.USE_PROPERTY_AS_WRAPPER)) {
|
||||
elementName = fieldName;
|
||||
wrapperName = arrayTagName;
|
||||
} else {
|
||||
elementName = fieldName;
|
||||
wrapperName = null;
|
||||
}
|
||||
|
||||
if (wrapperName!= null) {
|
||||
tagsToOpen.addLast(wrapperName);
|
||||
}
|
||||
|
||||
boolean loopHasWritten = false;
|
||||
|
||||
for (Object element : valueAsArray) {
|
||||
if (element != null) {
|
||||
boolean hasWritten = writeUnknownField(tagsToOpen, element, elementName);
|
||||
|
||||
if (hasWritten) {
|
||||
loopHasWritten = true;
|
||||
}
|
||||
|
||||
} else {
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
|
||||
writeAllTags(tagsToOpen, fieldName);
|
||||
writer.writeEndElement();
|
||||
loopHasWritten = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (wrapperName!= null) {
|
||||
if (loopHasWritten) {
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
|
||||
writeAllTags(tagsToOpen);
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
tagsToOpen.removeLast();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return loopHasWritten;
|
||||
}
|
||||
}
|
||||
|
||||
if (value instanceof Map) {
|
||||
Map<String, ?> valueAsMap = (Map<String, ?>) value;
|
||||
|
||||
tagsToOpen.addLast(fieldName);
|
||||
boolean loopHasWritten = false;
|
||||
|
||||
for (Map.Entry<String,?> entry : valueAsMap.entrySet()) {
|
||||
|
||||
final String key = entry.getKey();
|
||||
final Object entryValue = entry.getValue();
|
||||
|
||||
if (entryValue != null) {
|
||||
boolean hasWritten = writeUnknownField(tagsToOpen, entry.getValue(), key);
|
||||
|
||||
if (hasWritten) {
|
||||
loopHasWritten = true;
|
||||
}
|
||||
} else {
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
|
||||
writeAllTags(tagsToOpen, key);
|
||||
writer.writeEndElement();
|
||||
loopHasWritten = true;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (loopHasWritten) {
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
if (nullSuppression.equals(NullSuppression.NEVER_SUPPRESS) || nullSuppression.equals(NullSuppression.SUPPRESS_MISSING)) {
|
||||
writeAllTags(tagsToOpen);
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
} else {
|
||||
tagsToOpen.removeLast();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writeAllTags(tagsToOpen, fieldName);
|
||||
writer.writeCharacters(value.toString());
|
||||
writer.writeEndElement();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getMimeType() {
|
||||
return "application/xml";
|
||||
}
|
||||
|
||||
private boolean recordHasField(RecordField field, Record record) {
|
||||
Set<String> recordFieldNames = record.getRawFieldNames();
|
||||
if (recordFieldNames.contains(field.getFieldName())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
for (String alias : field.getAliases()) {
|
||||
if (recordFieldNames.contains(alias)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,210 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.xml;
|
||||
|
||||
import org.apache.nifi.record.NullSuppression;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
@Tags({"xml", "resultset", "writer", "serialize", "record", "recordset", "row"})
|
||||
@CapabilityDescription("Writes a RecordSet to XML. The records are wrapped by a root tag.")
|
||||
public class XMLRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory {
|
||||
|
||||
public static final AllowableValue ALWAYS_SUPPRESS = new AllowableValue("always-suppress", "Always Suppress",
|
||||
"Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out");
|
||||
public static final AllowableValue NEVER_SUPPRESS = new AllowableValue("never-suppress", "Never Suppress",
|
||||
"Fields that are missing (present in the schema but not in the record), or that have a value of null, will be written out as a null value");
|
||||
public static final AllowableValue SUPPRESS_MISSING = new AllowableValue("suppress-missing", "Suppress Missing Values",
|
||||
"When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out.");
|
||||
|
||||
public static final AllowableValue USE_PROPERTY_AS_WRAPPER = new AllowableValue("use-property-as-wrapper", "Use Property as Wrapper",
|
||||
"The value of the property \"Array Tag Name\" will be used as the tag name to wrap elements of an array. The field name of the array field will be used for the tag name " +
|
||||
"of the elements.");
|
||||
public static final AllowableValue USE_PROPERTY_FOR_ELEMENTS = new AllowableValue("use-property-for-elements", "Use Property for Elements",
|
||||
"The value of the property \"Array Tag Name\" will be used for the tag name of the elements of an array. The field name of the array field will be used as the tag name " +
|
||||
"to wrap elements.");
|
||||
public static final AllowableValue NO_WRAPPING = new AllowableValue("no-wrapping", "No Wrapping",
|
||||
"The elements of an array will not be wrapped");
|
||||
|
||||
public static final PropertyDescriptor SUPPRESS_NULLS = new PropertyDescriptor.Builder()
|
||||
.name("suppress_nulls")
|
||||
.displayName("Suppress Null Values")
|
||||
.description("Specifies how the writer should handle a null field")
|
||||
.allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS, SUPPRESS_MISSING)
|
||||
.defaultValue(NEVER_SUPPRESS.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor PRETTY_PRINT_XML = new PropertyDescriptor.Builder()
|
||||
.name("pretty_print_xml")
|
||||
.displayName("Pretty Print XML")
|
||||
.description("Specifies whether or not the XML should be pretty printed")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ROOT_TAG_NAME = new PropertyDescriptor.Builder()
|
||||
.name("root_tag_name")
|
||||
.displayName("Name of Root Tag")
|
||||
.description("Specifies the name of the XML root tag wrapping the record set. This property has to be defined if " +
|
||||
"the writer is supposed to write multiple records in a single FlowFile.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor RECORD_TAG_NAME = new PropertyDescriptor.Builder()
|
||||
.name("record_tag_name")
|
||||
.displayName("Name of Record Tag")
|
||||
.description("Specifies the name of the XML record tag wrapping the record fields. If this is not set, the writer " +
|
||||
"will use the record name in the schema.")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ARRAY_WRAPPING = new PropertyDescriptor.Builder()
|
||||
.name("array_wrapping")
|
||||
.displayName("Wrap Elements of Arrays")
|
||||
.description("Specifies how the writer wraps elements of fields of type array")
|
||||
.allowableValues(USE_PROPERTY_AS_WRAPPER, USE_PROPERTY_FOR_ELEMENTS, NO_WRAPPING)
|
||||
.defaultValue(NO_WRAPPING.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor ARRAY_TAG_NAME = new PropertyDescriptor.Builder()
|
||||
.name("array_tag_name")
|
||||
.displayName("Array Tag Name")
|
||||
.description("Name of the tag used by property \"Wrap Elements of Arrays\" to write arrays")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
|
||||
.name("Character Set")
|
||||
.description("The Character set to use when writing the data to the FlowFile")
|
||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||
.defaultValue("UTF-8")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||
properties.add(SUPPRESS_NULLS);
|
||||
properties.add(PRETTY_PRINT_XML);
|
||||
properties.add(ROOT_TAG_NAME);
|
||||
properties.add(RECORD_TAG_NAME);
|
||||
properties.add(ARRAY_WRAPPING);
|
||||
properties.add(ARRAY_TAG_NAME);
|
||||
properties.add(CHARACTER_SET);
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
if (!validationContext.getProperty(ARRAY_WRAPPING).getValue().equals(NO_WRAPPING.getValue())) {
|
||||
if (!validationContext.getProperty(ARRAY_TAG_NAME).isSet()) {
|
||||
StringBuilder explanation = new StringBuilder()
|
||||
.append("if property \'")
|
||||
.append(ARRAY_WRAPPING.getName())
|
||||
.append("\' is defined as \'")
|
||||
.append(USE_PROPERTY_AS_WRAPPER.getDisplayName())
|
||||
.append("\' or \'")
|
||||
.append(USE_PROPERTY_FOR_ELEMENTS.getDisplayName())
|
||||
.append("\' the property \'")
|
||||
.append(ARRAY_TAG_NAME.getDisplayName())
|
||||
.append("\' has to be set.");
|
||||
|
||||
return Collections.singleton(new ValidationResult.Builder()
|
||||
.subject(ARRAY_TAG_NAME.getName())
|
||||
.valid(false)
|
||||
.explanation(explanation.toString())
|
||||
.build());
|
||||
}
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
|
||||
|
||||
final String nullSuppression = getConfigurationContext().getProperty(SUPPRESS_NULLS).getValue();
|
||||
final NullSuppression nullSuppressionEnum;
|
||||
if (nullSuppression.equals(ALWAYS_SUPPRESS.getValue())) {
|
||||
nullSuppressionEnum = NullSuppression.ALWAYS_SUPPRESS;
|
||||
} else if (nullSuppression.equals(NEVER_SUPPRESS.getValue())) {
|
||||
nullSuppressionEnum = NullSuppression.NEVER_SUPPRESS;
|
||||
} else {
|
||||
nullSuppressionEnum = NullSuppression.SUPPRESS_MISSING;
|
||||
}
|
||||
|
||||
final boolean prettyPrint = getConfigurationContext().getProperty(PRETTY_PRINT_XML).getValue().equals("true");
|
||||
|
||||
final String rootTagName = getConfigurationContext().getProperty(ROOT_TAG_NAME).isSet()
|
||||
? getConfigurationContext().getProperty(ROOT_TAG_NAME).getValue() : null;
|
||||
final String recordTagName = getConfigurationContext().getProperty(RECORD_TAG_NAME).isSet()
|
||||
? getConfigurationContext().getProperty(RECORD_TAG_NAME).getValue() : null;
|
||||
|
||||
final String arrayWrapping = getConfigurationContext().getProperty(ARRAY_WRAPPING).getValue();
|
||||
final ArrayWrapping arrayWrappingEnum;
|
||||
if (arrayWrapping.equals(NO_WRAPPING.getValue())) {
|
||||
arrayWrappingEnum = ArrayWrapping.NO_WRAPPING;
|
||||
} else if (arrayWrapping.equals(USE_PROPERTY_AS_WRAPPER.getValue())) {
|
||||
arrayWrappingEnum = ArrayWrapping.USE_PROPERTY_AS_WRAPPER;
|
||||
} else {
|
||||
arrayWrappingEnum = ArrayWrapping.USE_PROPERTY_FOR_ELEMENTS;
|
||||
}
|
||||
|
||||
final String arrayTagName;
|
||||
if (getConfigurationContext().getProperty(ARRAY_TAG_NAME).isSet()) {
|
||||
arrayTagName = getConfigurationContext().getProperty(ARRAY_TAG_NAME).getValue();
|
||||
} else {
|
||||
arrayTagName = null;
|
||||
}
|
||||
|
||||
final String charSet = getConfigurationContext().getProperty(CHARACTER_SET).getValue();
|
||||
|
||||
return new WriteXMLResult(logger, schema, getSchemaAccessWriter(schema),
|
||||
out, prettyPrint, nullSuppressionEnum, arrayWrappingEnum, arrayTagName, rootTagName, recordTagName, charSet,
|
||||
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
|
||||
}
|
||||
}
|
|
@ -27,4 +27,5 @@ org.apache.nifi.grok.GrokReader
|
|||
|
||||
org.apache.nifi.text.FreeFormTextRecordSetWriter
|
||||
|
||||
org.apache.nifi.xml.XMLReader
|
||||
org.apache.nifi.xml.XMLReader
|
||||
org.apache.nifi.xml.XMLRecordSetWriter
|
|
@ -0,0 +1,211 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<head>
|
||||
<meta charset="utf-8"/>
|
||||
<title>XMLRecordSetWriter</title>
|
||||
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<p>
|
||||
The XMLRecordSetWriter Controller Service writes record objects to XML. The Controller Service must be configured
|
||||
with a schema that describes the structure of the record objects. Multiple records are wrapped by a root node.
|
||||
The name of the root node can be configured via property. If no root node is configured, the writer expects
|
||||
only one record for each FlowFile (that will not be wrapped). As Avro does not support defining attributes for
|
||||
records, this writer currently does not support writing XML attributes.
|
||||
</p>
|
||||
|
||||
<h2>
|
||||
Example: Simple records
|
||||
</h2>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
RecordSet (
|
||||
Record (
|
||||
Field "name1" = "value1",
|
||||
Field "name2" = 42
|
||||
),
|
||||
Record (
|
||||
Field "name1" = "value2",
|
||||
Field "name2" = 84
|
||||
)
|
||||
)
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
<p>
|
||||
This record can be described by the following schema:
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
{
|
||||
"name": "test",
|
||||
"namespace": "nifi",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "name1", "type": "string" },
|
||||
{ "name": "name2", "type": "int" }
|
||||
]
|
||||
}
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
<p>
|
||||
Assuming that "root_name" has been configured as the name for the root node and "record_name" has been configured
|
||||
as the name for the record nodes, the writer will write the following XML:
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
<root_name>
|
||||
<record_name>
|
||||
<name1>value1</name1>
|
||||
<name2>42</name2>
|
||||
</record_name>
|
||||
<record_name>
|
||||
<name1>value2</name1>
|
||||
<name2>84</name2>
|
||||
</record_name>
|
||||
</root_name>
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
<p>
|
||||
The writer furthermore can be configured how to treat null or missing values in records:
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
RecordSet (
|
||||
Record (
|
||||
Field "name1" = "value1",
|
||||
Field "name2" = null
|
||||
),
|
||||
Record (
|
||||
Field "name1" = "value2",
|
||||
)
|
||||
)
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
<p>
|
||||
If the writer is configured always to suppress missing or null values, only the field of name "name1" will appear
|
||||
in the XML. If the writer ist configured only to suppress missing values, the field of name "name2" will appear in
|
||||
the XML as a node without content for the first record. If the writer is configured never to suppress anything,
|
||||
the field of name "name2" will appear in the XML as a node without content for both records.
|
||||
</p>
|
||||
|
||||
<h2>
|
||||
Example: Arrays
|
||||
</h2>
|
||||
|
||||
<p>
|
||||
The writer furthermore can be configured how to write arrays:
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
RecordSet (
|
||||
Record (
|
||||
Field "name1" = "value1",
|
||||
Field "array_field" = [ 1, 2, 3 ]
|
||||
)
|
||||
)
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
<p>
|
||||
This record can be described by the following schema:
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
{
|
||||
"name": "test",
|
||||
"namespace": "nifi",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "array_field", "type":
|
||||
{ "type": "array", "items": int }
|
||||
},
|
||||
{ "name": "name1", "type": "string" }
|
||||
]
|
||||
}
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
<p>
|
||||
If the writer is configured not to wrap arrays, it will transform the record to the following XML:
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
<root_name>
|
||||
<record_name>
|
||||
<name1>value1</name1>
|
||||
<array_field>1</array_field>
|
||||
<array_field>2</array_field>
|
||||
<array_field>3</array_field>
|
||||
</record_name>
|
||||
</root_name>
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
<p>
|
||||
If the writer is configured to wrap arrays using the field name as wrapper and "elem" as tag name for element nodes,
|
||||
it will transform the record to the following XML:
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
<root_name>
|
||||
<record_name>
|
||||
<name1>value1</field2>
|
||||
<array_field>
|
||||
<elem>1</elem>
|
||||
<elem>2</elem>
|
||||
<elem>3</elem>
|
||||
</array_field>
|
||||
</record_name>
|
||||
</root_name>
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
<p>
|
||||
If the writer is configured to wrap arrays using "wrap" as wrapper and the field name as tag name for element nodes,
|
||||
it will transform the record to the following XML:
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
<root_name>
|
||||
<record_name>
|
||||
<name1>value1</field2>
|
||||
<wrap>
|
||||
<array_field>1</array_field>
|
||||
<array_field>2</array_field>
|
||||
<array_field>3</array_field>
|
||||
</wrap>
|
||||
</record_name>
|
||||
</root_name>
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
</body>
|
||||
</html>
|
|
@ -38,6 +38,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
|
||||
import org.apache.nifi.record.NullSuppression;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.schema.access.SchemaNameAsAttribute;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,490 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.xml;
|
||||
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.ListRecordSet;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class TestWriteXMLResultUtils {
|
||||
|
||||
protected static final String FIELD_NAME = "NAME";
|
||||
protected static final String FIELD_AGE = "AGE";
|
||||
protected static final String FIELD_COUNTRY = "COUNTRY";
|
||||
protected static final String FIELD_ADDRESS = "ADDRESS";
|
||||
protected static final String FIELD_STREET = "STREET";
|
||||
protected static final String FIELD_CITY = "CITY";
|
||||
protected static final String FIELD_CHILDREN = "CHILDREN";
|
||||
|
||||
protected static Map<String,Object> RECORD_FIELDS_PERSON_1 = new HashMap<>();
|
||||
protected static Map<String,Object> RECORD_FIELDS_PERSON_2 = new HashMap<>();
|
||||
protected static Map<String,Object> RECORD_FIELDS_ADDRESS_1 = new HashMap<>();
|
||||
protected static Map<String,Object> RECORD_FIELDS_ADDRESS_2 = new HashMap<>();
|
||||
|
||||
protected static Object[] ARRAY_CHILDREN = {"Tom", "Anna", "Ben"};
|
||||
protected static Object[] ARRAY_CHILDREN_WITH_NULL_VALUE = {"Tom", null, "Ben"};
|
||||
protected static Object[] ARRAY_CHILDREN_ONLY_NULL_VALUES = {null, null, null};
|
||||
|
||||
static {
|
||||
RECORD_FIELDS_PERSON_1.put(FIELD_NAME, "Cleve Butler");
|
||||
RECORD_FIELDS_PERSON_1.put(FIELD_AGE, 42);
|
||||
RECORD_FIELDS_PERSON_1.put(FIELD_COUNTRY, "USA");
|
||||
RECORD_FIELDS_PERSON_2.put(FIELD_NAME, "Ainslie Fletcher");
|
||||
RECORD_FIELDS_PERSON_2.put(FIELD_AGE, 33);
|
||||
RECORD_FIELDS_PERSON_2.put(FIELD_COUNTRY, "UK");
|
||||
RECORD_FIELDS_ADDRESS_1.put(FIELD_STREET, "292 West Street");
|
||||
RECORD_FIELDS_ADDRESS_1.put(FIELD_CITY, "Jersey City");
|
||||
RECORD_FIELDS_ADDRESS_2.put(FIELD_STREET, "123 6th St.");
|
||||
RECORD_FIELDS_ADDRESS_2.put(FIELD_CITY, "Seattle");
|
||||
|
||||
RECORD_FIELDS_PERSON_1 = Collections.unmodifiableMap(RECORD_FIELDS_PERSON_1);
|
||||
RECORD_FIELDS_PERSON_2 = Collections.unmodifiableMap(RECORD_FIELDS_PERSON_2);
|
||||
RECORD_FIELDS_ADDRESS_1 = Collections.unmodifiableMap(RECORD_FIELDS_ADDRESS_1);
|
||||
RECORD_FIELDS_ADDRESS_2 = Collections.unmodifiableMap(RECORD_FIELDS_ADDRESS_2);
|
||||
}
|
||||
|
||||
protected static final SchemaIdentifier SCHEMA_IDENTIFIER_PERSON = SchemaIdentifier.builder().name("PERSON").id(0L).version(0).build();
|
||||
protected static final SchemaIdentifier SCHEMA_IDENTIFIER_RECORD = SchemaIdentifier.builder().name("RECORD").id(0L).version(0).build();
|
||||
|
||||
protected static final String DATE_FORMAT = RecordFieldType.DATE.getDefaultFormat();
|
||||
protected static final String TIME_FORMAT = RecordFieldType.TIME.getDefaultFormat();
|
||||
protected static final String TIMESTAMP_FORMAT = RecordFieldType.TIMESTAMP.getDefaultFormat();
|
||||
|
||||
public enum NullValues {
|
||||
ONLY_NULL,
|
||||
HAS_NULL,
|
||||
WITHOUT_NULL,
|
||||
EMPTY
|
||||
}
|
||||
|
||||
/*
|
||||
Simple records
|
||||
*/
|
||||
|
||||
protected static RecordSet getSingleRecord() {
|
||||
RecordSchema schema = getSimpleSchema();
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(new MapRecord(schema, RECORD_FIELDS_PERSON_1));
|
||||
|
||||
return new ListRecordSet(schema, records);
|
||||
}
|
||||
|
||||
protected static RecordSet getSimpleRecords() {
|
||||
RecordSchema schema = getSimpleSchema();
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(new MapRecord(schema, RECORD_FIELDS_PERSON_1));
|
||||
records.add(new MapRecord(schema, RECORD_FIELDS_PERSON_2));
|
||||
|
||||
return new ListRecordSet(schema, records);
|
||||
}
|
||||
|
||||
protected static RecordSet getSimpleRecordsWithoutIdentifierInSchema() {
|
||||
RecordSchema schema = getSimpleSchemaWithoutIdentifier();
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(new MapRecord(schema, RECORD_FIELDS_PERSON_1));
|
||||
records.add(new MapRecord(schema, RECORD_FIELDS_PERSON_2));
|
||||
|
||||
return new ListRecordSet(schema, records);
|
||||
}
|
||||
|
||||
protected static RecordSet getSimpleRecordsWithNullValues() {
|
||||
RecordSchema schema = getSimpleSchema();
|
||||
|
||||
Map<String, Object> recordWithoutName1 = new HashMap<>(RECORD_FIELDS_PERSON_1);
|
||||
Map<String, Object> recordWithoutName2 = new HashMap<>(RECORD_FIELDS_PERSON_2);
|
||||
|
||||
recordWithoutName1.put(FIELD_NAME, null);
|
||||
recordWithoutName2.remove(FIELD_NAME);
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(new MapRecord(schema, recordWithoutName1));
|
||||
records.add(new MapRecord(schema, recordWithoutName2));
|
||||
|
||||
return new ListRecordSet(schema, records);
|
||||
}
|
||||
|
||||
protected static RecordSet getEmptyRecordsWithEmptySchema() {
|
||||
RecordSchema schema = getEmptySchema();
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(new MapRecord(schema, Collections.emptyMap()));
|
||||
records.add(new MapRecord(schema, Collections.emptyMap()));
|
||||
|
||||
return new ListRecordSet(schema, records);
|
||||
}
|
||||
|
||||
protected static List<RecordField> getSimpleRecordFields() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(FIELD_NAME, RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField(FIELD_AGE, RecordFieldType.INT.getDataType()));
|
||||
fields.add(new RecordField(FIELD_COUNTRY, RecordFieldType.STRING.getDataType()));
|
||||
return fields;
|
||||
}
|
||||
|
||||
protected static RecordSchema getSimpleSchema() {
|
||||
return new SimpleRecordSchema(getSimpleRecordFields(), SCHEMA_IDENTIFIER_PERSON);
|
||||
}
|
||||
|
||||
protected static RecordSchema getSimpleSchemaWithoutIdentifier() {
|
||||
return new SimpleRecordSchema(getSimpleRecordFields());
|
||||
}
|
||||
|
||||
protected static RecordSchema getEmptySchema() {
|
||||
return new SimpleRecordSchema(Collections.emptyList(), SCHEMA_IDENTIFIER_PERSON);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
Simple nested records
|
||||
*/
|
||||
|
||||
protected static RecordSet getNestedRecords() {
|
||||
RecordSchema innerSchema = getNestedSchema();
|
||||
|
||||
final DataType recordType = RecordFieldType.RECORD.getRecordDataType(innerSchema);
|
||||
List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(FIELD_ADDRESS, recordType));
|
||||
fields.addAll(getSimpleRecordFields());
|
||||
|
||||
RecordSchema outerSchema = new SimpleRecordSchema(fields, SCHEMA_IDENTIFIER_PERSON);
|
||||
|
||||
Record innerRecord1 = new MapRecord(innerSchema, RECORD_FIELDS_ADDRESS_1);
|
||||
Record outerRecord1 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_1);
|
||||
put(FIELD_ADDRESS, innerRecord1);
|
||||
}});
|
||||
|
||||
Record innerRecord2 = new MapRecord(innerSchema, RECORD_FIELDS_ADDRESS_2);
|
||||
Record outerRecord2 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_2);
|
||||
put(FIELD_ADDRESS, innerRecord2);
|
||||
}});
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(outerRecord1);
|
||||
records.add(outerRecord2);
|
||||
|
||||
return new ListRecordSet(outerSchema, records);
|
||||
}
|
||||
|
||||
protected static RecordSet getNestedRecordsWithNullValues() {
|
||||
RecordSchema innerSchema = getNestedSchema();
|
||||
|
||||
final DataType recordType = RecordFieldType.RECORD.getRecordDataType(innerSchema);
|
||||
List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(FIELD_ADDRESS, recordType));
|
||||
fields.addAll(getSimpleRecordFields());
|
||||
|
||||
RecordSchema outerSchema = new SimpleRecordSchema(fields, SCHEMA_IDENTIFIER_PERSON);
|
||||
|
||||
Map<String, Object> recordWithoutStreet1 = new HashMap<>(RECORD_FIELDS_ADDRESS_1);
|
||||
Map<String, Object> recordWithoutStreet2 = new HashMap<>(RECORD_FIELDS_ADDRESS_2);
|
||||
|
||||
recordWithoutStreet1.put(FIELD_STREET, null);
|
||||
recordWithoutStreet2.remove(FIELD_STREET);
|
||||
|
||||
Record innerRecord1 = new MapRecord(innerSchema, recordWithoutStreet1);
|
||||
Record outerRecord1 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_1);
|
||||
put(FIELD_ADDRESS, innerRecord1);
|
||||
}});
|
||||
|
||||
Record innerRecord2 = new MapRecord(innerSchema, recordWithoutStreet2);
|
||||
Record outerRecord2 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_2);
|
||||
put(FIELD_ADDRESS, innerRecord2);
|
||||
}});
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(outerRecord1);
|
||||
records.add(outerRecord2);
|
||||
|
||||
return new ListRecordSet(outerSchema, records);
|
||||
}
|
||||
|
||||
protected static RecordSet getNestedRecordsWithOnlyNullValues() {
|
||||
RecordSchema innerSchema = getNestedSchema();
|
||||
|
||||
final DataType recordType = RecordFieldType.RECORD.getRecordDataType(innerSchema);
|
||||
List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(FIELD_ADDRESS, recordType));
|
||||
fields.addAll(getSimpleRecordFields());
|
||||
|
||||
RecordSchema outerSchema = new SimpleRecordSchema(fields, SCHEMA_IDENTIFIER_PERSON);
|
||||
|
||||
Map<String, Object> recordWithoutStreet1 = new HashMap<>(RECORD_FIELDS_ADDRESS_1);
|
||||
Map<String, Object> recordWithoutStreet2 = new HashMap<>(RECORD_FIELDS_ADDRESS_2);
|
||||
|
||||
recordWithoutStreet1.put(FIELD_STREET, null);
|
||||
recordWithoutStreet1.put(FIELD_CITY, null);
|
||||
recordWithoutStreet2.remove(FIELD_STREET);
|
||||
recordWithoutStreet2.remove(FIELD_CITY);
|
||||
|
||||
Record innerRecord1 = new MapRecord(innerSchema, recordWithoutStreet1);
|
||||
Record outerRecord1 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_1);
|
||||
put(FIELD_ADDRESS, innerRecord1);
|
||||
}});
|
||||
|
||||
Record innerRecord2 = new MapRecord(innerSchema, recordWithoutStreet2);
|
||||
Record outerRecord2 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_2);
|
||||
put(FIELD_ADDRESS, innerRecord2);
|
||||
}});
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(outerRecord1);
|
||||
records.add(outerRecord2);
|
||||
|
||||
return new ListRecordSet(outerSchema, records);
|
||||
}
|
||||
|
||||
protected static RecordSet getEmptyNestedRecordEmptyNestedSchema() {
|
||||
RecordSchema innerSchema = new SimpleRecordSchema(Collections.emptyList());
|
||||
|
||||
final DataType recordType = RecordFieldType.RECORD.getRecordDataType(innerSchema);
|
||||
List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(FIELD_ADDRESS, recordType));
|
||||
fields.addAll(getSimpleRecordFields());
|
||||
|
||||
RecordSchema outerSchema = new SimpleRecordSchema(fields, SCHEMA_IDENTIFIER_PERSON);
|
||||
|
||||
Record innerRecord1 = new MapRecord(innerSchema, Collections.emptyMap());
|
||||
Record outerRecord1 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_1);
|
||||
put(FIELD_ADDRESS, innerRecord1);
|
||||
}});
|
||||
|
||||
Record innerRecord2 = new MapRecord(innerSchema, Collections.emptyMap());
|
||||
Record outerRecord2 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_2);
|
||||
put(FIELD_ADDRESS, innerRecord2);
|
||||
}});
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(outerRecord1);
|
||||
records.add(outerRecord2);
|
||||
|
||||
return new ListRecordSet(outerSchema, records);
|
||||
}
|
||||
|
||||
protected static RecordSet getEmptyNestedRecordDefinedSchema() {
|
||||
RecordSchema innerSchema = getNestedSchema();
|
||||
|
||||
final DataType recordType = RecordFieldType.RECORD.getRecordDataType(innerSchema);
|
||||
List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(FIELD_ADDRESS, recordType));
|
||||
fields.addAll(getSimpleRecordFields());
|
||||
|
||||
RecordSchema outerSchema = new SimpleRecordSchema(fields, SCHEMA_IDENTIFIER_PERSON);
|
||||
|
||||
Record innerRecord1 = new MapRecord(innerSchema, Collections.EMPTY_MAP);
|
||||
Record outerRecord1 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_1);
|
||||
put(FIELD_ADDRESS, innerRecord1);
|
||||
}});
|
||||
|
||||
Record innerRecord2 = new MapRecord(innerSchema, Collections.EMPTY_MAP);
|
||||
Record outerRecord2 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_2);
|
||||
put(FIELD_ADDRESS, innerRecord2);
|
||||
}});
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(outerRecord1);
|
||||
records.add(outerRecord2);
|
||||
|
||||
return new ListRecordSet(outerSchema, records);
|
||||
}
|
||||
|
||||
protected static List<RecordField> getNestedRecordFields() {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(FIELD_STREET, RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField(FIELD_CITY, RecordFieldType.STRING.getDataType()));
|
||||
return fields;
|
||||
}
|
||||
|
||||
protected static RecordSchema getNestedSchema() {
|
||||
return new SimpleRecordSchema(getNestedRecordFields());
|
||||
}
|
||||
|
||||
/*
|
||||
Arrays
|
||||
*/
|
||||
|
||||
protected static RecordSet getRecordWithSimpleArray(NullValues nullValues) {
|
||||
Object[] children;
|
||||
if (nullValues.equals(NullValues.HAS_NULL)) {
|
||||
children = ARRAY_CHILDREN_WITH_NULL_VALUE;
|
||||
} else if (nullValues.equals(NullValues.ONLY_NULL)) {
|
||||
children = ARRAY_CHILDREN_ONLY_NULL_VALUES;
|
||||
} else if (nullValues.equals(NullValues.EMPTY)) {
|
||||
children = new Object[]{};
|
||||
} else {
|
||||
children = ARRAY_CHILDREN;
|
||||
}
|
||||
|
||||
final DataType arrayType = RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
|
||||
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(FIELD_CHILDREN, arrayType));
|
||||
fields.addAll(getSimpleRecordFields());
|
||||
|
||||
Map<String,Object> recordFields1 = new HashMap<>();
|
||||
recordFields1.putAll(RECORD_FIELDS_PERSON_1);
|
||||
recordFields1.put(FIELD_CHILDREN, children);
|
||||
|
||||
Map<String,Object> recordFields2 = new HashMap<>();
|
||||
recordFields2.putAll(RECORD_FIELDS_PERSON_2);
|
||||
recordFields2.put(FIELD_CHILDREN, children);
|
||||
|
||||
RecordSchema schema = new SimpleRecordSchema(fields, SCHEMA_IDENTIFIER_PERSON);
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(new MapRecord(schema, recordFields1));
|
||||
records.add(new MapRecord(schema, recordFields2));
|
||||
|
||||
return new ListRecordSet(schema, records);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*
|
||||
Maps
|
||||
*/
|
||||
|
||||
protected static RecordSet getRecordWithSimpleMap(NullValues nullValues) {
|
||||
|
||||
List<Object> values = new ArrayList<>();
|
||||
|
||||
if (nullValues.equals(NullValues.HAS_NULL)) {
|
||||
values.addAll(Arrays.asList(ARRAY_CHILDREN_WITH_NULL_VALUE));
|
||||
} else if (nullValues.equals(NullValues.ONLY_NULL)) {
|
||||
values.addAll(Arrays.asList(ARRAY_CHILDREN_ONLY_NULL_VALUES));
|
||||
} else if (nullValues.equals(NullValues.WITHOUT_NULL)){
|
||||
values.addAll(Arrays.asList(ARRAY_CHILDREN));
|
||||
}
|
||||
|
||||
Map<String,Object> children = new HashMap<>();
|
||||
|
||||
if (!nullValues.equals(NullValues.EMPTY)) {
|
||||
|
||||
children.put("CHILD1", values.get(0));
|
||||
children.put("CHILD2", values.get(1));
|
||||
children.put("CHILD3", values.get(2));
|
||||
}
|
||||
|
||||
final DataType mapType = RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType());
|
||||
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(FIELD_CHILDREN, mapType));
|
||||
fields.addAll(getSimpleRecordFields());
|
||||
|
||||
Map<String,Object> recordFields1 = new HashMap<>();
|
||||
recordFields1.putAll(RECORD_FIELDS_PERSON_1);
|
||||
recordFields1.put(FIELD_CHILDREN, children);
|
||||
|
||||
Map<String,Object> recordFields2 = new HashMap<>();
|
||||
recordFields2.putAll(RECORD_FIELDS_PERSON_2);
|
||||
recordFields2.put(FIELD_CHILDREN, children);
|
||||
|
||||
RecordSchema schema = new SimpleRecordSchema(fields, SCHEMA_IDENTIFIER_PERSON);
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(new MapRecord(schema, recordFields1));
|
||||
records.add(new MapRecord(schema, recordFields2));
|
||||
|
||||
return new ListRecordSet(schema, records);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
Choice
|
||||
*/
|
||||
|
||||
protected static RecordSet getSimpleRecordsWithChoice() {
|
||||
|
||||
final List<DataType> possibleTypes = new ArrayList<>();
|
||||
possibleTypes.add(RecordFieldType.INT.getDataType());
|
||||
possibleTypes.add(RecordFieldType.LONG.getDataType());
|
||||
possibleTypes.add(RecordFieldType.STRING.getDataType());
|
||||
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(FIELD_NAME, RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField(FIELD_AGE, RecordFieldType.INT.getDataType()));
|
||||
fields.add(new RecordField(FIELD_COUNTRY, RecordFieldType.CHOICE.getChoiceDataType(possibleTypes)));
|
||||
|
||||
RecordSchema schema = new SimpleRecordSchema(getSimpleRecordFields(), SCHEMA_IDENTIFIER_PERSON);
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(new MapRecord(schema, RECORD_FIELDS_PERSON_1));
|
||||
records.add(new MapRecord(schema, RECORD_FIELDS_PERSON_2));
|
||||
|
||||
return new ListRecordSet(schema, records);
|
||||
}
|
||||
|
||||
protected static RecordSet getNestedRecordsTypeChoice() {
|
||||
final List<DataType> possibleTypes = new ArrayList<>();
|
||||
possibleTypes.add(RecordFieldType.INT.getDataType());
|
||||
possibleTypes.add(RecordFieldType.LONG.getDataType());
|
||||
possibleTypes.add(RecordFieldType.RECORD.getDataType());
|
||||
|
||||
final DataType choiceType = RecordFieldType.CHOICE.getChoiceDataType(possibleTypes);
|
||||
List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField(FIELD_ADDRESS, choiceType));
|
||||
fields.addAll(getSimpleRecordFields());
|
||||
|
||||
RecordSchema outerSchema = new SimpleRecordSchema(fields, SCHEMA_IDENTIFIER_PERSON);
|
||||
|
||||
Record innerRecord1 = new MapRecord(new SimpleRecordSchema(Collections.emptyList()), RECORD_FIELDS_ADDRESS_1);
|
||||
Record outerRecord1 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_1);
|
||||
put(FIELD_ADDRESS, innerRecord1);
|
||||
}});
|
||||
|
||||
Record innerRecord2 = new MapRecord(new SimpleRecordSchema(Collections.emptyList()), RECORD_FIELDS_ADDRESS_2);
|
||||
Record outerRecord2 = new MapRecord(outerSchema, new HashMap<String,Object>(){{
|
||||
putAll(RECORD_FIELDS_PERSON_2);
|
||||
put(FIELD_ADDRESS, innerRecord2);
|
||||
}});
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(outerRecord1);
|
||||
records.add(outerRecord2);
|
||||
|
||||
return new ListRecordSet(outerSchema, records);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,194 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.xml;
|
||||
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.xmlunit.diff.DefaultNodeMatcher;
|
||||
import org.xmlunit.diff.ElementSelectors;
|
||||
import org.xmlunit.matchers.CompareMatcher;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class TestXMLRecordSetWriter {
|
||||
|
||||
private TestRunner setup(XMLRecordSetWriter writer) throws InitializationException, IOException {
|
||||
TestRunner runner = TestRunners.newTestRunner(TestXMLRecordSetWriterProcessor.class);
|
||||
|
||||
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/xml/testschema3")));
|
||||
|
||||
runner.addControllerService("xml_writer", writer);
|
||||
runner.setProperty(TestXMLRecordSetWriterProcessor.XML_WRITER, "xml_writer");
|
||||
|
||||
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
|
||||
runner.setProperty(writer, XMLRecordSetWriter.PRETTY_PRINT_XML, new AllowableValue("true"));
|
||||
|
||||
runner.setProperty(writer, "Schema Write Strategy", "no-schema");
|
||||
|
||||
return runner;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefault() throws IOException, InitializationException {
|
||||
XMLRecordSetWriter writer = new XMLRecordSetWriter();
|
||||
TestRunner runner = setup(writer);
|
||||
|
||||
runner.setProperty(writer, XMLRecordSetWriter.ROOT_TAG_NAME, "root");
|
||||
|
||||
runner.enableControllerService(writer);
|
||||
runner.enqueue("");
|
||||
runner.run();
|
||||
runner.assertQueueEmpty();
|
||||
runner.assertAllFlowFilesTransferred(TestXMLRecordSetWriterProcessor.SUCCESS, 1);
|
||||
|
||||
String expected = "<root><array_record><array_field>1</array_field><array_field></array_field><array_field>3</array_field>" +
|
||||
"<name1>val1</name1><name2></name2></array_record>" +
|
||||
"<array_record><array_field>1</array_field><array_field></array_field><array_field>3</array_field>" +
|
||||
"<name1>val1</name1><name2></name2></array_record></root>";
|
||||
String actual = new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(TestXMLRecordSetWriterProcessor.SUCCESS).get(0)));
|
||||
assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultSingleRecord() throws IOException, InitializationException {
|
||||
XMLRecordSetWriter writer = new XMLRecordSetWriter();
|
||||
TestRunner runner = setup(writer);
|
||||
|
||||
runner.setProperty(TestXMLRecordSetWriterProcessor.MULTIPLE_RECORDS, "false");
|
||||
|
||||
runner.enableControllerService(writer);
|
||||
runner.enqueue("");
|
||||
runner.run();
|
||||
runner.assertQueueEmpty();
|
||||
runner.assertAllFlowFilesTransferred(TestXMLRecordSetWriterProcessor.SUCCESS, 1);
|
||||
|
||||
String expected = "<array_record><array_field>1</array_field><array_field></array_field><array_field>3</array_field>" +
|
||||
"<name1>val1</name1><name2></name2></array_record>";
|
||||
|
||||
String actual = new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(TestXMLRecordSetWriterProcessor.SUCCESS).get(0)));
|
||||
assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRootAndRecordNaming() throws IOException, InitializationException {
|
||||
XMLRecordSetWriter writer = new XMLRecordSetWriter();
|
||||
TestRunner runner = setup(writer);
|
||||
|
||||
runner.setProperty(writer, XMLRecordSetWriter.ROOT_TAG_NAME, "ROOT_NODE");
|
||||
runner.setProperty(writer, XMLRecordSetWriter.RECORD_TAG_NAME, "RECORD_NODE");
|
||||
|
||||
runner.enableControllerService(writer);
|
||||
runner.enqueue("");
|
||||
runner.run();
|
||||
runner.assertQueueEmpty();
|
||||
runner.assertAllFlowFilesTransferred(TestXMLRecordSetWriterProcessor.SUCCESS, 1);
|
||||
|
||||
String expected = "<ROOT_NODE><RECORD_NODE><array_field>1</array_field><array_field></array_field><array_field>3</array_field>" +
|
||||
"<name1>val1</name1><name2></name2></RECORD_NODE>" +
|
||||
"<RECORD_NODE><array_field>1</array_field><array_field></array_field><array_field>3</array_field>" +
|
||||
"<name1>val1</name1><name2></name2></RECORD_NODE></ROOT_NODE>";
|
||||
String actual = new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(TestXMLRecordSetWriterProcessor.SUCCESS).get(0)));
|
||||
assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullSuppression() throws IOException, InitializationException {
|
||||
XMLRecordSetWriter writer = new XMLRecordSetWriter();
|
||||
TestRunner runner = setup(writer);
|
||||
|
||||
runner.setProperty(writer, XMLRecordSetWriter.ROOT_TAG_NAME, "root");
|
||||
runner.setProperty(writer, XMLRecordSetWriter.RECORD_TAG_NAME, "record");
|
||||
|
||||
runner.setProperty(writer, XMLRecordSetWriter.SUPPRESS_NULLS, XMLRecordSetWriter.ALWAYS_SUPPRESS);
|
||||
|
||||
runner.enableControllerService(writer);
|
||||
runner.enqueue("");
|
||||
runner.run();
|
||||
runner.assertQueueEmpty();
|
||||
runner.assertAllFlowFilesTransferred(TestXMLRecordSetWriterProcessor.SUCCESS, 1);
|
||||
|
||||
String expected = "<root><record><array_field>1</array_field><array_field>3</array_field>" +
|
||||
"<name1>val1</name1></record>" +
|
||||
"<record><array_field>1</array_field><array_field>3</array_field>" +
|
||||
"<name1>val1</name1></record></root>";
|
||||
String actual = new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(TestXMLRecordSetWriterProcessor.SUCCESS).get(0)));
|
||||
assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArrayWrapping() throws IOException, InitializationException {
|
||||
XMLRecordSetWriter writer = new XMLRecordSetWriter();
|
||||
TestRunner runner = setup(writer);
|
||||
|
||||
runner.setProperty(writer, XMLRecordSetWriter.ROOT_TAG_NAME, "root");
|
||||
runner.setProperty(writer, XMLRecordSetWriter.RECORD_TAG_NAME, "record");
|
||||
|
||||
runner.setProperty(writer, XMLRecordSetWriter.ARRAY_WRAPPING, XMLRecordSetWriter.USE_PROPERTY_AS_WRAPPER);
|
||||
runner.setProperty(writer, XMLRecordSetWriter.ARRAY_TAG_NAME, "wrap");
|
||||
|
||||
runner.enableControllerService(writer);
|
||||
runner.enqueue("");
|
||||
runner.run();
|
||||
runner.assertQueueEmpty();
|
||||
runner.assertAllFlowFilesTransferred(TestXMLRecordSetWriterProcessor.SUCCESS, 1);
|
||||
|
||||
String expected = "<root><record><wrap><array_field>1</array_field><array_field></array_field><array_field>3</array_field></wrap>" +
|
||||
"<name1>val1</name1><name2></name2></record>" +
|
||||
"<record><wrap><array_field>1</array_field><array_field></array_field><array_field>3</array_field></wrap>" +
|
||||
"<name1>val1</name1><name2></name2></record></root>";
|
||||
String actual = new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(TestXMLRecordSetWriterProcessor.SUCCESS).get(0)));
|
||||
assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidation() throws IOException, InitializationException {
|
||||
XMLRecordSetWriter writer = new XMLRecordSetWriter();
|
||||
TestRunner runner = setup(writer);
|
||||
|
||||
runner.setProperty(writer, XMLRecordSetWriter.ROOT_TAG_NAME, "root");
|
||||
runner.setProperty(writer, XMLRecordSetWriter.RECORD_TAG_NAME, "record");
|
||||
|
||||
runner.setProperty(writer, XMLRecordSetWriter.ARRAY_WRAPPING, XMLRecordSetWriter.USE_PROPERTY_AS_WRAPPER);
|
||||
|
||||
runner.enableControllerService(writer);
|
||||
runner.enqueue("");
|
||||
|
||||
String message = "Processor has 1 validation failures:\n" +
|
||||
"'xml_writer' validated against 'xml_writer' is invalid because Controller Service is not valid: " +
|
||||
"'array_tag_name' is invalid because if property 'array_wrapping' is defined as 'Use Property as Wrapper' " +
|
||||
"or 'Use Property for Elements' the property 'Array Tag Name' has to be set.\n";
|
||||
|
||||
try {
|
||||
runner.run();
|
||||
} catch (AssertionError e) {
|
||||
Assert.assertEquals(message, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.xml;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.ListRecordSet;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class TestXMLRecordSetWriterProcessor extends AbstractProcessor {
|
||||
|
||||
static final PropertyDescriptor XML_WRITER = new PropertyDescriptor.Builder()
|
||||
.name("xml_writer")
|
||||
.identifiesControllerService(XMLRecordSetWriter.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor MULTIPLE_RECORDS = new PropertyDescriptor.Builder()
|
||||
.name("multiple_records")
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final Relationship SUCCESS = new Relationship.Builder().name("success").description("success").build();
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
|
||||
final RecordSetWriterFactory writerFactory = context.getProperty(XML_WRITER).asControllerService(RecordSetWriterFactory.class);
|
||||
flowFile = session.write(flowFile, out -> {
|
||||
try {
|
||||
|
||||
final RecordSchema schema = writerFactory.getSchema(null, null);
|
||||
|
||||
boolean multipleRecords = Boolean.parseBoolean(context.getProperty(MULTIPLE_RECORDS).getValue());
|
||||
RecordSet recordSet = getRecordSet(multipleRecords);
|
||||
|
||||
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out);
|
||||
|
||||
|
||||
writer.write(recordSet);
|
||||
writer.flush();
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new ProcessException(e.getMessage());
|
||||
}
|
||||
|
||||
});
|
||||
session.transfer(flowFile, SUCCESS);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return new ArrayList<PropertyDescriptor>() {{ add(XML_WRITER); add(MULTIPLE_RECORDS); }};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return new HashSet<Relationship>() {{ add(SUCCESS); }};
|
||||
}
|
||||
|
||||
protected static RecordSet getRecordSet(boolean multipleRecords) {
|
||||
Object[] arrayVals = {1, null, 3};
|
||||
|
||||
Map<String,Object> recordFields = new HashMap<>();
|
||||
recordFields.put("name1", "val1");
|
||||
recordFields.put("name2", null);
|
||||
recordFields.put("array_field", arrayVals);
|
||||
|
||||
RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList());
|
||||
|
||||
List<Record> records = new ArrayList<>();
|
||||
records.add(new MapRecord(emptySchema, recordFields));
|
||||
|
||||
if (multipleRecords) {
|
||||
records.add(new MapRecord(emptySchema, recordFields));
|
||||
}
|
||||
|
||||
return new ListRecordSet(emptySchema, records);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
{
|
||||
"name": "array_record",
|
||||
"namespace": "nifi",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "array_field", "type":
|
||||
{ "type": "array", "items": "int" }
|
||||
},
|
||||
{ "name": "name1", "type": "string" },
|
||||
{ "name": "name2", "type": "string" }
|
||||
]
|
||||
}
|
Loading…
Reference in New Issue