NIFI-5728 - XML Writer to populate record tag name properly

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3098.
This commit is contained in:
Ed B 2018-10-20 21:42:21 -04:00 committed by Pierre Villard
parent d8d220ccb8
commit 2812fe60a2
5 changed files with 91 additions and 1 deletions

View File

@ -38,6 +38,8 @@ public class SimpleRecordSchema implements RecordSchema {
private final AtomicReference<String> text = new AtomicReference<>();
private final String schemaFormat;
private final SchemaIdentifier schemaIdentifier;
private String schemaName;
private String schemaNamespace;
public SimpleRecordSchema(final List<RecordField> fields) {
this(fields, createText(fields), null, false, SchemaIdentifier.EMPTY);
@ -213,4 +215,30 @@ public class SimpleRecordSchema implements RecordSchema {
public SchemaIdentifier getIdentifier() {
return schemaIdentifier;
}
/**
* Set schema name.
* @param schemaName schema name as defined in a root record.
*/
public void setSchemaName(String schemaName) {
this.schemaName = schemaName;
}
@Override
public Optional<String> getSchemaName() {
return Optional.ofNullable(schemaName);
}
/**
* Set schema namespace.
* @param schemaNamespace schema namespace as defined in a root record.
*/
public void setSchemaNamespace(String schemaNamespace) {
this.schemaNamespace = schemaNamespace;
}
@Override
public Optional<String> getSchemaNamespace() {
return Optional.ofNullable(schemaNamespace);
}
}

View File

@ -76,4 +76,15 @@ public interface RecordSchema {
* @return the SchemaIdentifier, which provides various attributes for identifying a schema
*/
SchemaIdentifier getIdentifier();
/**
* @return the name of the schema's root record.
*/
Optional<String> getSchemaName();
/**
* @return the namespace of the schema.
*/
Optional<String> getSchemaNamespace();
}

View File

@ -396,6 +396,8 @@ public class AvroTypeUtil {
final String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName();
final SimpleRecordSchema recordSchema = schemaText == null ? new SimpleRecordSchema(schemaId) : new SimpleRecordSchema(schemaText, AVRO_SCHEMA_FORMAT, schemaId);
recordSchema.setSchemaName(avroSchema.getName());
recordSchema.setSchemaNamespace(avroSchema.getNamespace());
final DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
final Map<String, DataType> knownRecords = new HashMap<>();
knownRecords.put(schemaFullName, recordSchemaType);

View File

@ -91,7 +91,7 @@ public class WriteXMLResult extends AbstractRecordSetWriter implements RecordSet
if (recordTagName != null) {
this.recordTagName = recordTagName;
} else {
Optional<String> recordTagNameOptional = recordSchema.getIdentifier().getName();
Optional<String> recordTagNameOptional = recordSchema.getSchemaName().isPresent()? recordSchema.getSchemaName() : recordSchema.getIdentifier().getName();
if (recordTagNameOptional.isPresent()) {
this.recordTagName = recordTagNameOptional.get();
} else {

View File

@ -17,9 +17,16 @@
package org.apache.nifi.xml;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
@ -29,6 +36,7 @@ import org.xmlunit.diff.ElementSelectors;
import org.xmlunit.matchers.CompareMatcher;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
@ -116,6 +124,33 @@ public class TestXMLRecordSetWriter {
assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText)));
}
@Test
public void testSchemaRootRecordNaming() throws IOException, InitializationException {
String avroSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/xml/testschema3")));;
Schema avroSchema = new Schema.Parser().parse(avroSchemaText);
SchemaIdentifier schemaId = SchemaIdentifier.builder().name("schemaName").build();
RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, avroSchemaText, schemaId);
XMLRecordSetWriter writer = new _XMLRecordSetWriter(recordSchema);
TestRunner runner = setup(writer);
runner.setProperty(writer, XMLRecordSetWriter.ROOT_TAG_NAME, "ROOT_NODE");
runner.enableControllerService(writer);
runner.enqueue("");
runner.run();
runner.assertQueueEmpty();
runner.assertAllFlowFilesTransferred(TestXMLRecordSetWriterProcessor.SUCCESS, 1);
String expected = "<ROOT_NODE><array_record><array_field>1</array_field><array_field></array_field><array_field>3</array_field>" +
"<name1>val1</name1><name2></name2></array_record>" +
"<array_record><array_field>1</array_field><array_field></array_field><array_field>3</array_field>" +
"<name1>val1</name1><name2></name2></array_record></ROOT_NODE>";
String actual = new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(TestXMLRecordSetWriterProcessor.SUCCESS).get(0)));
assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText)));
}
@Test
public void testNullSuppression() throws IOException, InitializationException {
XMLRecordSetWriter writer = new XMLRecordSetWriter();
@ -194,5 +229,19 @@ public class TestXMLRecordSetWriter {
}
}
static class _XMLRecordSetWriter extends XMLRecordSetWriter{
RecordSchema recordSchema;
_XMLRecordSetWriter(RecordSchema recordSchema){
this.recordSchema = recordSchema;
}
@Override
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out)
throws SchemaNotFoundException, IOException {
return super.createWriter(logger, this.recordSchema, out);
}
}
}