diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java index 6926c939c0..ba507e9602 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java @@ -38,6 +38,8 @@ public class SimpleRecordSchema implements RecordSchema { private final AtomicReference text = new AtomicReference<>(); private final String schemaFormat; private final SchemaIdentifier schemaIdentifier; + private String schemaName; + private String schemaNamespace; public SimpleRecordSchema(final List fields) { this(fields, createText(fields), null, false, SchemaIdentifier.EMPTY); @@ -213,4 +215,30 @@ public class SimpleRecordSchema implements RecordSchema { public SchemaIdentifier getIdentifier() { return schemaIdentifier; } + + /** + * Set schema name. + * @param schemaName schema name as defined in a root record. + */ + public void setSchemaName(String schemaName) { + this.schemaName = schemaName; + } + + @Override + public Optional getSchemaName() { + return Optional.ofNullable(schemaName); + } + + /** + * Set schema namespace. + * @param schemaNamespace schema namespace as defined in a root record. + */ + public void setSchemaNamespace(String schemaNamespace) { + this.schemaNamespace = schemaNamespace; + } + + @Override + public Optional getSchemaNamespace() { + return Optional.ofNullable(schemaNamespace); + } } diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java index 367f2b0b53..cdc9a32fea 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/RecordSchema.java @@ -76,4 +76,15 @@ public interface RecordSchema { * @return the SchemaIdentifier, which provides various attributes for identifying a schema */ SchemaIdentifier getIdentifier(); + + /** + * @return the name of the schema's root record. + */ + Optional getSchemaName(); + + /** + * @return the namespace of the schema. + */ + Optional getSchemaNamespace(); + } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 2e8898a495..9e023ccfc5 100755 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -396,6 +396,8 @@ public class AvroTypeUtil { final String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName(); final SimpleRecordSchema recordSchema = schemaText == null ? new SimpleRecordSchema(schemaId) : new SimpleRecordSchema(schemaText, AVRO_SCHEMA_FORMAT, schemaId); + recordSchema.setSchemaName(avroSchema.getName()); + recordSchema.setSchemaNamespace(avroSchema.getNamespace()); final DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema); final Map knownRecords = new HashMap<>(); knownRecords.put(schemaFullName, recordSchemaType); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java index baa3a13554..382067c829 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/WriteXMLResult.java @@ -91,7 +91,7 @@ public class WriteXMLResult extends AbstractRecordSetWriter implements RecordSet if (recordTagName != null) { this.recordTagName = recordTagName; } else { - Optional recordTagNameOptional = recordSchema.getIdentifier().getName(); + Optional recordTagNameOptional = recordSchema.getSchemaName().isPresent()? recordSchema.getSchemaName() : recordSchema.getIdentifier().getName(); if (recordTagNameOptional.isPresent()) { this.recordTagName = recordTagNameOptional.get(); } else { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java index 8008f65be0..becb3c5a91 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/xml/TestXMLRecordSetWriter.java @@ -17,9 +17,16 @@ package org.apache.nifi.xml; +import org.apache.avro.Schema; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; @@ -29,6 +36,7 @@ import org.xmlunit.diff.ElementSelectors; import org.xmlunit.matchers.CompareMatcher; import java.io.IOException; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Paths; @@ -116,6 +124,33 @@ public class TestXMLRecordSetWriter { assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText))); } + @Test + public void testSchemaRootRecordNaming() throws IOException, InitializationException { + String avroSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/xml/testschema3")));; + Schema avroSchema = new Schema.Parser().parse(avroSchemaText); + + SchemaIdentifier schemaId = SchemaIdentifier.builder().name("schemaName").build(); + RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, avroSchemaText, schemaId); + + XMLRecordSetWriter writer = new _XMLRecordSetWriter(recordSchema); + TestRunner runner = setup(writer); + + runner.setProperty(writer, XMLRecordSetWriter.ROOT_TAG_NAME, "ROOT_NODE"); + + runner.enableControllerService(writer); + runner.enqueue(""); + runner.run(); + runner.assertQueueEmpty(); + runner.assertAllFlowFilesTransferred(TestXMLRecordSetWriterProcessor.SUCCESS, 1); + + String expected = "13" + + "val1" + + "13" + + "val1"; + String actual = new String(runner.getContentAsByteArray(runner.getFlowFilesForRelationship(TestXMLRecordSetWriterProcessor.SUCCESS).get(0))); + assertThat(expected, CompareMatcher.isSimilarTo(actual).ignoreWhitespace().withNodeMatcher(new DefaultNodeMatcher(ElementSelectors.byNameAndText))); + } + @Test public void testNullSuppression() throws IOException, InitializationException { XMLRecordSetWriter writer = new XMLRecordSetWriter(); @@ -194,5 +229,19 @@ public class TestXMLRecordSetWriter { } } + static class _XMLRecordSetWriter extends XMLRecordSetWriter{ + + RecordSchema recordSchema; + + _XMLRecordSetWriter(RecordSchema recordSchema){ + this.recordSchema = recordSchema; + } + + @Override + public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) + throws SchemaNotFoundException, IOException { + return super.createWriter(logger, this.recordSchema, out); + } + } }