diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java index 4fb53717b7..78cef120d6 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java @@ -48,13 +48,8 @@ public interface SchemaIdentifier { */ Optional getBranch(); - /** - * @return the protocol used to get this schema identifier - */ - Integer getProtocol(); - - SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null, -1); + SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null); static Builder builder() { return new StandardSchemaIdentifier.Builder(); @@ -75,8 +70,6 @@ public interface SchemaIdentifier { Builder branch(String branch); - Builder protocol(Integer protocol); - SchemaIdentifier build(); } diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java index 0800982690..a9f859b17e 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java @@ -27,20 +27,14 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { private final OptionalInt version; private final OptionalLong schemaVersionId; private final Optional branch; - private final int protocol; StandardSchemaIdentifier(final String name, final Long identifier, final Integer version, - final Long schemaVersionId, final String branch, final int protocol) { + final Long schemaVersionId, final String branch) { this.name = Optional.ofNullable(name); this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier); this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version); this.schemaVersionId = schemaVersionId == null ? OptionalLong.empty() : OptionalLong.of(schemaVersionId); this.branch = Optional.ofNullable(branch); - this.protocol = protocol; - - if ((this.name == null && this.identifier == null) || this.schemaVersionId == null) { - throw new IllegalStateException("Name or Identifier must be provided"); - } } @Override @@ -68,11 +62,6 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { return branch; } - @Override - public Integer getProtocol() { - return protocol; - } - @Override public int hashCode() { return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode() @@ -104,8 +93,7 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { + "identifier = " + identifier + ", " + "version = " + version + ", " + "schemaVersionId = " + schemaVersionId + ", " - + "branch = " + branch + ", " - + "protocol = " + protocol + " ]"; + + "branch = " + branch + " ]"; } /** @@ -118,7 +106,6 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { private Long identifier; private Integer version; private Long schemaVersionId; - private Integer protocol; @Override public SchemaIdentifier.Builder name(final String name) { @@ -150,15 +137,9 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { return this; } - @Override - public SchemaIdentifier.Builder protocol(final Integer protocol) { - this.protocol = protocol; - return this; - } - @Override public SchemaIdentifier build() { - return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch, protocol); + return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch); } } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java index 57ec05a64a..567d860fe6 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java @@ -21,9 +21,12 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor.Builder; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.ConfluentSchemaRegistryWriter; import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter; import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter; @@ -42,6 +45,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import static org.apache.nifi.schema.access.SchemaAccessUtils.INHERIT_RECORD_SCHEMA; @@ -80,6 +84,17 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic .identifiesControllerService(RecordSchemaCacheService.class) .build(); + static final PropertyDescriptor SCHEMA_PROTOCOL_VERSION = new Builder() + .name("schema-protocol-version") + .displayName("Schema Protocol Version") + .description("The protocol version to be used for Schema Write Strategies that require a protocol version, such as Hortonworks Schema Registry strategies. " + + "Valid protocol versions for Hortonworks Schema Registry are integer values 1, 2, or 3.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .build(); + /** * This constant is just a base spec for the actual PropertyDescriptor. * As it can be overridden by subclasses with different AllowableValues and default value, @@ -97,9 +112,12 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic private final List schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList( SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA, NO_SCHEMA)); + private final List schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList( SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY)); + private final Set schemaWriteStrategiesRequiringProtocolVersion = new HashSet<>(Arrays.asList( + HWX_CONTENT_ENCODED_SCHEMA.getValue(), HWX_SCHEMA_REF_ATTRIBUTES.getValue())); @Override protected List getSupportedPropertyDescriptors() { @@ -112,6 +130,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic .allowableValues(strategies) .build()); properties.add(SCHEMA_CACHE); + properties.add(SCHEMA_PROTOCOL_VERSION); properties.addAll(super.getSupportedPropertyDescriptors()); return properties; @@ -134,10 +153,18 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic public void storeSchemaWriteStrategy(final ConfigurationContext context) { this.configurationContext = context; + // If Schema Protocol Version is specified without EL then we can create it up front, otherwise when + // EL is present we will re-create it later so we can re-evaluate the EL against the incoming variables + final String strategy = context.getProperty(getSchemaWriteStrategyDescriptor()).getValue(); if (strategy != null) { final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class); - this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService); + + final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION); + if (!protocolVersionValue.isExpressionLanguagePresent()) { + final int protocolVersion = context.getProperty(SCHEMA_PROTOCOL_VERSION).asInteger(); + this.schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService); + } } } @@ -146,7 +173,27 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic return configurationContext; } - protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema) throws SchemaNotFoundException { + protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema, final Map variables) throws SchemaNotFoundException { + // If Schema Protocol Version is using expression language, then we reevaluate against the passed in variables + final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION); + if (protocolVersionValue.isExpressionLanguagePresent()) { + final int protocolVersion; + final String protocolVersionString = protocolVersionValue.evaluateAttributeExpressions(variables).getValue(); + try { + protocolVersion = Integer.parseInt(protocolVersionString); + } catch (NumberFormatException nfe) { + throw new SchemaNotFoundException("Unable to create Schema Write Strategy because " + SCHEMA_PROTOCOL_VERSION.getDisplayName() + + " must be a positive integer, but was '" + protocolVersionString + "'", nfe); + } + + // Now recreate the SchemaAccessWriter since we may have a new value for Schema Protocol Version + final String strategy = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue(); + if (strategy != null) { + final RecordSchemaCacheService recordSchemaCacheService = getConfigurationContext().getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class); + schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService); + } + } + schemaAccessWriter.validateSchema(schema); return schemaAccessWriter; } @@ -164,8 +211,8 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic return schemaAccessWriter; } - private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final RecordSchemaCacheService recordSchemaCacheService) { - final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy); + private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final Integer protocolVersion, final RecordSchemaCacheService recordSchemaCacheService) { + final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy, protocolVersion); if (recordSchemaCacheService == null) { return writer; } else { @@ -173,15 +220,15 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic } } - private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy) { + private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy, final Integer protocolVersion) { if (strategy.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) { return new SchemaNameAsAttribute(); } else if (strategy.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) { return new WriteAvroSchemaAttributeStrategy(); } else if (strategy.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { - return new HortonworksEncodedSchemaReferenceWriter(); + return new HortonworksEncodedSchemaReferenceWriter(protocolVersion); } else if (strategy.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) { - return new HortonworksAttributeSchemaReferenceWriter(); + return new HortonworksAttributeSchemaReferenceWriter(protocolVersion); } else if (strategy.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) { return new ConfluentSchemaRegistryWriter(); } else if (strategy.equalsIgnoreCase(NO_SCHEMA.getValue())) { @@ -221,6 +268,17 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic .build()); } + final String schemaWriteStrategy = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue(); + final String protocolVersion = validationContext.getProperty(SCHEMA_PROTOCOL_VERSION).getValue(); + + if (schemaWriteStrategy != null && schemaWriteStrategiesRequiringProtocolVersion.contains(schemaWriteStrategy) && protocolVersion == null) { + results.add(new ValidationResult.Builder() + .subject(SCHEMA_PROTOCOL_VERSION.getDisplayName()) + .valid(false) + .explanation("The configured Schema Write Strategy requires a Schema Protocol Version to be specified.") + .build()); + } + return results; } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java index b20d68330e..cb037787fe 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java @@ -17,6 +17,10 @@ package org.apache.nifi.schema.access; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + import java.io.IOException; import java.io.InputStream; import java.util.Collections; @@ -24,10 +28,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy { private final Set schemaFields; @@ -37,7 +37,6 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess public static final String SCHEMA_VERSION_ID_ATTRIBUTE = "schema.version.id"; private final SchemaRegistry schemaRegistry; - static final int LATEST_PROTOCOL_VERSION = 3; public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) { this.schemaRegistry = schemaRegistry; @@ -55,44 +54,52 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess @Override public RecordSchema getSchema(Map variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { - final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE); - final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE); final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); - final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE); - if ((schemaVersionId == null && (schemaIdentifier == null || schemaVersion == null)) || schemaProtocol == null) { - throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: " - + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); - } - if (!isNumber(schemaProtocol)) { throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '" + schemaProtocol + "', which is not a valid Protocol Version number"); } final int protocol = Integer.parseInt(schemaProtocol); - if (protocol > LATEST_PROTOCOL_VERSION) { - throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version " - + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocol + " or was not encoded with this data format"); + if (protocol < HortonworksProtocolVersions.MIN_VERSION || protocol > HortonworksProtocolVersions.MAX_VERSION) { + throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '" + + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be a value between " + + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + "."); } SchemaIdentifier identifier; - if (!isNumber(schemaVersionId)) { - if (!isNumber(schemaIdentifier)) { - throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '" - + schemaProtocol + "', which is not a valid Schema Identifier number"); - } - if (!isNumber(schemaVersion)) { - throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '" - + schemaProtocol + "', which is not a valid Schema Version number"); - } + switch (protocol) { + case 1: + final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE); + if (!isNumber(schemaIdentifier)) { + throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_ID_ATTRIBUTE + " has a value of '" + + schemaIdentifier + "', which is not a valid Schema Identifier and is required by Protocol Version " + protocol); + } - final long schemaId = Long.parseLong(schemaIdentifier); - final int version = Integer.parseInt(schemaVersion); - identifier = SchemaIdentifier.builder().id(schemaId).version(version).protocol(protocol).build(); - } else { - final long svi = Long.parseLong(schemaVersionId); - identifier = SchemaIdentifier.builder().schemaVersionId(svi).protocol(protocol).build(); + final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE); + if (!isNumber(schemaVersion)) { + throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '" + + schemaVersion + "', which is not a valid Schema Version and is required by Protocol Version " + protocol); + } + + final long schemaId = Long.parseLong(schemaIdentifier); + final int version = Integer.parseInt(schemaVersion); + identifier = SchemaIdentifier.builder().id(schemaId).version(version).build(); + break; + case 2: + case 3: + final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE); + if (!isNumber(schemaVersionId)) { + throw new SchemaNotFoundException("Could not determine schema because " + SCHEMA_VERSION_ID_ATTRIBUTE + " has a value of '" + + schemaVersionId + "', which is not a valid Schema Version Identifier and is required by Protocol Version " + protocol); + } + + final long svi = Long.parseLong(schemaVersionId); + identifier = SchemaIdentifier.builder().schemaVersionId(svi).build(); + break; + default: + throw new SchemaNotFoundException("Unknown Protocol Version: " + protocol); } final RecordSchema schema = schemaRegistry.retrieveSchema(identifier); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java index ad4558f4d5..24de4628f7 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java @@ -17,21 +17,30 @@ package org.apache.nifi.schema.access; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + import java.io.IOException; import java.io.OutputStream; -import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.Set; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter { - private static final Set requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); - static final int LATEST_PROTOCOL_VERSION = 3; + static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch"; + private final int protocolVersion; + + public HortonworksAttributeSchemaReferenceWriter(final int protocolVersion) { + this.protocolVersion = protocolVersion; + + if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) { + throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between " + + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + "."); + } + } + @Override public void writeHeader(RecordSchema schema, OutputStream out) throws IOException { } @@ -41,21 +50,29 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr final Map attributes = new HashMap<>(4); final SchemaIdentifier id = schema.getIdentifier(); - final Long schemaId = id.getIdentifier().getAsLong(); - final Integer schemaVersion = id.getVersion().getAsInt(); + switch (protocolVersion) { + case 1: + final Long schemaId = id.getIdentifier().getAsLong(); + final Integer schemaVersion = id.getVersion().getAsInt(); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion)); + break; + case 2: + case 3: + final Long schemaVersionId = id.getSchemaVersionId().getAsLong(); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId)); + break; + default: + // Can't reach this point + throw new IllegalStateException("Unknown Protocol Verison: " + protocolVersion); + } - attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId)); - attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion)); - attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(id.getProtocol())); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocolVersion)); if (id.getBranch().isPresent()) { attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get()); } - if (id.getSchemaVersionId().isPresent()) { - attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(id.getSchemaVersionId().getAsLong())); - } - return attributes; } @@ -63,19 +80,33 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { final SchemaIdentifier identifier = schema.getIdentifier(); - if(!identifier.getSchemaVersionId().isPresent()) { - if (!identifier.getIdentifier().isPresent()) { - throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known"); - } - if (!identifier.getVersion().isPresent()) { - throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known"); - } + switch (protocolVersion) { + case 1: + if (!identifier.getIdentifier().isPresent()) { + throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Identifier " + + "is not known and is required for Protocol Version " + protocolVersion); + } + if (!identifier.getVersion().isPresent()) { + throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version " + + "is not known and is required for Protocol Version " + protocolVersion); + } + break; + case 2: + case 3: + if (!identifier.getSchemaVersionId().isPresent()) { + throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version Identifier " + + "is not known and is required for Protocol Version " + protocolVersion); + } + break; + default: + // Can't reach this point + throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion); } } @Override public Set getRequiredSchemaFields() { - return requiredSchemaFields; + return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java index 8f3c1b4e64..137cee778d 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java @@ -17,6 +17,11 @@ package org.apache.nifi.schema.access; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.stream.io.StreamUtils; + import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -25,13 +30,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; -import org.apache.nifi.stream.io.StreamUtils; - public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy { - private static final int LATEST_PROTOCOL_VERSION = 3; private final Set schemaFields; private final SchemaRegistry schemaRegistry; @@ -42,6 +41,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt schemaFields = new HashSet<>(); schemaFields.add(SchemaField.SCHEMA_IDENTIFIER); schemaFields.add(SchemaField.SCHEMA_VERSION); + schemaFields.add(SchemaField.SCHEMA_VERSION_ID); schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields()); } @@ -58,6 +58,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt // See: https://registry-project.readthedocs.io/en/latest/serdes.html# final ByteBuffer bb = ByteBuffer.wrap(buffer); final int protocolVersion = bb.get(); + SchemaIdentifier schemaIdentifier; switch(protocolVersion) { @@ -69,11 +70,11 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt } catch (final IOException ioe) { throw new SchemaNotFoundException("Could not read bytes from stream", ioe); } - final ByteBuffer bbv1 = ByteBuffer.wrap(buffer); + final ByteBuffer bbv1 = ByteBuffer.wrap(bufferv1); final long schemaId = bbv1.getLong(); final int schemaVersion = bbv1.getInt(); - schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).protocol(protocolVersion).build(); + schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build(); return schemaRegistry.retrieveSchema(schemaIdentifier); case 2: @@ -84,10 +85,10 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt } catch (final IOException ioe) { throw new SchemaNotFoundException("Could not read bytes from stream", ioe); } - final ByteBuffer bbv2 = ByteBuffer.wrap(buffer); + final ByteBuffer bbv2 = ByteBuffer.wrap(bufferv2); final long sviLong = bbv2.getLong(); - schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).protocol(protocolVersion).build(); + schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).build(); return schemaRegistry.retrieveSchema(schemaIdentifier); case 3: @@ -98,15 +99,16 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt } catch (final IOException ioe) { throw new SchemaNotFoundException("Could not read bytes from stream", ioe); } - final ByteBuffer bbv3 = ByteBuffer.wrap(buffer); + final ByteBuffer bbv3 = ByteBuffer.wrap(bufferv3); final int sviInt = bbv3.getInt(); - schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).protocol(protocolVersion).build(); + schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).build(); return schemaRegistry.retrieveSchema(schemaIdentifier); default: - throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version " - + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format"); + throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. Expected Protocol Version to be a value between " + + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + + ", but data was encoded with protocol version " + protocolVersion + "."); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java index 99dbd1fcef..504515a1cf 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java @@ -17,20 +17,28 @@ package org.apache.nifi.schema.access; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Collections; -import java.util.EnumSet; import java.util.Map; import java.util.Set; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter { - private static final Set requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); - private static final int LATEST_PROTOCOL_VERSION = 3; + + private final int protocolVersion; + + public HortonworksEncodedSchemaReferenceWriter(final int protocolVersion) { + this.protocolVersion = protocolVersion; + + if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) { + throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between " + + HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + "."); + } + } @Override public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { @@ -38,7 +46,7 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer // See: https://registry-project.readthedocs.io/en/latest/serdes.html# - switch(identifier.getProtocol()) { + switch(protocolVersion) { case 1: final Long id = identifier.getIdentifier().getAsLong(); final Integer version = identifier.getVersion().getAsInt(); @@ -49,25 +57,23 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit out.write(bbv1.array()); return; case 2: - final Long sviV2 = identifier.getIdentifier().getAsLong(); + final Long sviV2 = identifier.getSchemaVersionId().getAsLong(); final ByteBuffer bbv2 = ByteBuffer.allocate(9); bbv2.put((byte) 2); bbv2.putLong(sviV2); out.write(bbv2.array()); return; case 3: - final Long sviV3 = identifier.getIdentifier().getAsLong(); + final Long sviV3 = identifier.getSchemaVersionId().getAsLong(); final ByteBuffer bbv3 = ByteBuffer.allocate(5); bbv3.put((byte) 3); bbv3.putInt(sviV3.intValue()); out.write(bbv3.array()); return; default: - throw new IOException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version " - + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + identifier.getProtocol() + " or was not encoded with this data format"); + // Can't reach this point + throw new IllegalStateException("Unknown Protocol Version: " + this.protocolVersion); } - - } @Override @@ -79,19 +85,33 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit public void validateSchema(RecordSchema schema) throws SchemaNotFoundException { final SchemaIdentifier identifier = schema.getIdentifier(); - if(!identifier.getSchemaVersionId().isPresent()) { - if (!identifier.getIdentifier().isPresent()) { - throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known"); - } - if (!identifier.getVersion().isPresent()) { - throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known"); - } + switch (protocolVersion) { + case 1: + if (!identifier.getIdentifier().isPresent()) { + throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier " + + "is not known and is required for Protocol Version " + protocolVersion); + } + if (!identifier.getVersion().isPresent()) { + throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version " + + "is not known and is required for Protocol Version " + protocolVersion); + } + break; + case 2: + case 3: + if (!identifier.getSchemaVersionId().isPresent()) { + throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version Identifier " + + "is not known and is required for Protocol Version " + protocolVersion); + } + break; + default: + // Can't reach this point + throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion); } } @Override public Set getRequiredSchemaFields() { - return requiredSchemaFields; + return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion); } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java new file mode 100644 index 0000000000..0fe0e83cfb --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksProtocolVersions.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * Constants related to Protocol Versions for Hortonworks Schema Registry. + */ +public class HortonworksProtocolVersions { + + /** + * The minimum valid protocol version. + */ + public static final int MIN_VERSION = 1; + + /** + * The maximum valid protocol version. + */ + public static final int MAX_VERSION = 3; + + /** + * Map from protocol version to the required schema fields for the given version. + */ + private static final Map> REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL; + static { + final Map> requiredFieldsByProtocol = new HashMap<>(); + requiredFieldsByProtocol.put(1, EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION)); + requiredFieldsByProtocol.put(2, EnumSet.of(SchemaField.SCHEMA_VERSION_ID)); + requiredFieldsByProtocol.put(3, EnumSet.of(SchemaField.SCHEMA_VERSION_ID)); + REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL = Collections.unmodifiableMap(requiredFieldsByProtocol); + } + + public static Set getRequiredSchemaFields(final Integer protocolVersion) { + return REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL.get(protocolVersion); + } + +} diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java index f613ab8389..f9dbb202ae 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java @@ -42,7 +42,11 @@ public class AbstractSchemaAccessStrategyTest { fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType())); final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder() - .name("person").branch("master").version(1).id(1L).build(); + .name("person") + .branch("master") + .version(1) + .id(1L) + .build(); this.recordSchema = new SimpleRecordSchema(fields, schemaIdentifier); } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java index a651a06967..f41ccdf844 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java @@ -32,7 +32,7 @@ import static org.mockito.Mockito.when; public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSchemaAccessStrategyTest { @Test - public void testGetSchemaWithValidAttributes() throws IOException, SchemaNotFoundException { + public void testGetSchemaWithValidSchemaIdVersionAndProtocol() throws IOException, SchemaNotFoundException { final long schemaId = 123456; final int version = 2; final int protocol = 1; @@ -56,9 +56,115 @@ public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSch assertNotNull(retrievedSchema); } + @Test + public void testGetSchemaWithValidSchemaVersionIdAndProtocol() throws IOException, SchemaNotFoundException { + final long schemaVersionId = 9999; + final int protocol = 2; + + final Map attributes = new HashMap<>(); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol)); + + final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); + + final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() + .schemaVersionId(schemaVersionId) + .build(); + + when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) + .thenReturn(recordSchema); + + final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema); + assertNotNull(retrievedSchema); + } + + @Test + public void testGetSchemaWithAllAttributes() throws IOException, SchemaNotFoundException { + final long schemaId = 123456; + final int version = 2; + final long schemaVersionId = 9999; + final int protocol = 2; + + final Map attributes = new HashMap<>(); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol)); + + final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); + + // The schema version id should take precedence + final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() + .schemaVersionId(schemaVersionId) + .build(); + + when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) + .thenReturn(recordSchema); + + final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema); + assertNotNull(retrievedSchema); + } + @Test(expected = SchemaNotFoundException.class) - public void testGetSchemaMissingAttributes() throws IOException, SchemaNotFoundException { + public void testGetSchemaMissingAllAttributes() throws IOException, SchemaNotFoundException { final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); schemaAccessStrategy.getSchema(Collections.emptyMap(), null, recordSchema); } + + @Test(expected = SchemaNotFoundException.class) + public void testGetSchemaMissingProtocol() throws IOException, SchemaNotFoundException { + final long schemaId = 123456; + final int version = 2; + final long schemaVersionId = 9999; + + final Map attributes = new HashMap<>(); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId)); + + final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); + schemaAccessStrategy.getSchema(attributes, null, recordSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testGetSchemaWithInvalidProtocol() throws IOException, SchemaNotFoundException { + final long schemaId = 123456; + final int version = 2; + final long schemaVersionId = 9999; + + final Map attributes = new HashMap<>(); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, "INVALID_PROTOCOL"); + + final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); + schemaAccessStrategy.getSchema(attributes, null, recordSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testGetSchemaNotFound() throws IOException, SchemaNotFoundException { + final long schemaId = 123456; + final int version = 2; + final long schemaVersionId = 9999; + final int protocol = 2; + + final Map attributes = new HashMap<>(); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol)); + + final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); + + // The schema version id should take precedence + final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() + .schemaVersionId(schemaVersionId) + .build(); + + when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) + .thenReturn(null); + + schemaAccessStrategy.getSchema(attributes, null, recordSchema); + } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java index ea3be57535..dd7e034d0b 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java @@ -31,29 +31,75 @@ import java.util.Map; public class TestHortonworksAttributeSchemaReferenceWriter { @Test - public void testValidateWithValidSchema() throws SchemaNotFoundException { + public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException { final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build(); final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); - final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(); + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1); schemaAccessWriter.validateSchema(recordSchema); } @Test(expected = SchemaNotFoundException.class) - public void testValidateWithInvalidSchema() throws SchemaNotFoundException { + public void testValidateWithProtocol1AndMissingSchemaId() throws SchemaNotFoundException { final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build(); final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); - final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(); + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testValidateWithProtocol1AndMissingSchemaName() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1); schemaAccessWriter.validateSchema(recordSchema); } @Test - public void testGetAttributesWithoutBranch() { + public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testValidateWithProtocol2AndMissingSchemaVersionId() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test + public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testValidateWithProtocol3AndMissingSchemaVersionId() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test + public void testGetAttributesWithProtocol1() { + final Integer protocolVersion = 1; final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build(); final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); - final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(); + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion); final Map attributes = schemaAccessWriter.getAttributes(recordSchema); Assert.assertEquals(3, attributes.size()); @@ -64,16 +110,17 @@ public class TestHortonworksAttributeSchemaReferenceWriter { Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()), attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE)); - Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION), + Assert.assertEquals(String.valueOf(protocolVersion), attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE)); } @Test - public void testGetAttributesWithBranch() { + public void testGetAttributesWithProtocol1AndBranch() { + final Integer protocolVersion = 1; final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).branch("foo").build(); final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); - final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(); + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion); final Map attributes = schemaAccessWriter.getAttributes(recordSchema); Assert.assertEquals(4, attributes.size()); @@ -84,10 +131,52 @@ public class TestHortonworksAttributeSchemaReferenceWriter { Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()), attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE)); - Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION), + Assert.assertEquals(String.valueOf(protocolVersion), attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE)); - Assert.assertEquals("foo", attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE)); + Assert.assertEquals(schemaIdentifier.getBranch().get(), + attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE)); + } + + @Test + public void testGetAttributesWithProtocol2() { + final Integer protocolVersion = 2; + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion); + final Map attributes = schemaAccessWriter.getAttributes(recordSchema); + + Assert.assertEquals(2, attributes.size()); + + Assert.assertEquals(String.valueOf(protocolVersion), + attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE)); + + Assert.assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()), + attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE)); + } + + @Test + public void testGetAttributesWithProtocol3() { + final Integer protocolVersion = 3; + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion); + final Map attributes = schemaAccessWriter.getAttributes(recordSchema); + + Assert.assertEquals(2, attributes.size()); + + Assert.assertEquals(String.valueOf(protocolVersion), + attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE)); + + Assert.assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()), + attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE)); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidProtocolVersion() { + new HortonworksAttributeSchemaReferenceWriter(99); } private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java index b0589ea17b..524c8b2c5a 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java @@ -18,6 +18,8 @@ package org.apache.nifi.schema.access; import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; import org.junit.Test; @@ -26,17 +28,24 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import static org.junit.Assert.assertEquals; public class TestHortonworksEncodedSchemaReferenceWriter { @Test - public void testHeader() throws IOException { - final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(); + public void testEncodeProtocolVersion1() throws IOException { + final long id = 48; + final int version = 2; + final int protocolVersion = 1; - final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), SchemaIdentifier.builder().name("name").id( 48L).version( 2).build()); + final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion); + + final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), + SchemaIdentifier.builder().name("name").id(id).version(version).build()); final byte[] header; try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { @@ -45,11 +54,126 @@ public class TestHortonworksEncodedSchemaReferenceWriter { } try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) { - assertEquals(1, dis.read()); // verify 'protocol version' - assertEquals(48, dis.readLong()); // verify schema id - assertEquals(2, dis.readInt()); // verify schema version + assertEquals(protocolVersion, dis.read()); // verify 'protocol version' + assertEquals(id, dis.readLong()); // verify schema id + assertEquals(version, dis.readInt()); // verify schema version assertEquals(-1, dis.read()); // no more bytes } } + @Test + public void testEncodeProtocolVersion2() throws IOException { + final long schemaVersionId = 123; + final int protocolVersion = 2; + + final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion); + + final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), + SchemaIdentifier.builder().schemaVersionId(schemaVersionId).build()); + + final byte[] header; + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + writer.writeHeader(schema, baos); + header = baos.toByteArray(); + } + + try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) { + assertEquals(protocolVersion, dis.read()); // verify 'protocol version' + assertEquals(schemaVersionId, dis.readLong()); // verify schema version id + assertEquals(-1, dis.read()); // no more bytes + } + } + + @Test + public void testEncodeProtocolVersion3() throws IOException { + final int schemaVersionId = 123; + final int protocolVersion = 3; + + final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion); + + final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), + SchemaIdentifier.builder().schemaVersionId((long)schemaVersionId).build()); + + final byte[] header; + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + writer.writeHeader(schema, baos); + header = baos.toByteArray(); + } + + try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) { + assertEquals(protocolVersion, dis.read()); // verify 'protocol version' + assertEquals(schemaVersionId, dis.readInt()); // verify schema version id + assertEquals(-1, dis.read()); // no more bytes + } + } + + @Test + public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testValidateWithProtocol1AndMissingSchemaId() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testValidateWithProtocol1AndMissingSchemaName() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test + public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testValidateWithProtocol2AndMissingSchemaVersionId() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test + public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testValidateWithProtocol3AndMissingSchemaVersionId() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3); + schemaAccessWriter.validateSchema(recordSchema); + } + + private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) { + final List fields = new ArrayList<>(); + fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType())); + return new SimpleRecordSchema(fields, schemaIdentifier); + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java index c26c55ad2a..317c5e6c45 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java @@ -426,7 +426,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme .name(schemaName.get()) .branch(schemaBranchName.orElse(null)) .version(versionInfo.getVersion()) - .protocol(schemaIdentifier.getProtocol()) + .schemaVersionId(versionInfo.getId()) .build(); final Tuple tuple = new Tuple<>(resultSchemaIdentifier, schemaText); @@ -477,7 +477,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme .name(schemaName) .id(schemaId.getAsLong()) .version(version.getAsInt()) - .protocol(schemaIdentifier.getProtocol()) + .schemaVersionId(versionInfo.getId()) .build(); final Tuple tuple = new Tuple<>(resultSchemaIdentifier, schemaText); @@ -522,7 +522,6 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme .id(versionInfo.getSchemaMetadataId()) .version(versionInfo.getVersion()) .schemaVersionId(schemaVersionId.getAsLong()) - .protocol(schemaIdentifier.getProtocol()) .build(); final Tuple tuple = new Tuple<>(resultSchemaIdentifier, schemaText); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index 5730ee37d2..08c0887af9 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -148,7 +148,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement if (AVRO_EMBEDDED.getValue().equals(strategyValue)) { return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat)); } else { - return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool, getLogger()); + return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema, variables), out, encoderPool, getLogger()); } } catch (final SchemaNotFoundException e) { throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java index 9326d8a228..ca934a7c9c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java @@ -17,13 +17,6 @@ package org.apache.nifi.csv; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - import org.apache.commons.csv.CSVFormat; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -37,6 +30,13 @@ import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + @Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"}) @CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written " + "will be the column names (unless the 'Include Header Line' property is false). All subsequent lines will be the values " @@ -92,7 +92,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R csvFormat = CSVUtils.createCSVFormat(context, variables); } - return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out, + return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema, variables), out, getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java index 864574f9f7..21ff5c628c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java @@ -17,14 +17,6 @@ package org.apache.nifi.json; -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; - import org.apache.commons.compress.compressors.CompressorException; import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -49,6 +41,14 @@ import org.tukaani.xz.XZOutputStream; import org.xerial.snappy.SnappyFramedOutputStream; import org.xerial.snappy.SnappyOutputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + @Tags({"json", "resultset", "writer", "serialize", "record", "recordset", "row"}) @CapabilityDescription("Writes the results of a RecordSet as either a JSON Array or one JSON object per line. If using Array output, then even if the RecordSet " + "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.") @@ -210,7 +210,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements throw new IOException(e); } - return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), compressionOut, prettyPrint, nullSuppression, outputGrouping, + return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema, variables), compressionOut, prettyPrint, nullSuppression, outputGrouping, getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeTypeRef); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java index 87874980c4..6ad7006e4b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLRecordSetWriter.java @@ -203,7 +203,7 @@ public class XMLRecordSetWriter extends DateTimeTextRecordSetWriter implements R final String charSet = getConfigurationContext().getProperty(CHARACTER_SET).getValue(); - return new WriteXMLResult(schema, getSchemaAccessWriter(schema), + return new WriteXMLResult(schema, getSchemaAccessWriter(schema, variables), out, prettyPrint, nullSuppressionEnum, arrayWrappingEnum, arrayTagName, rootTagName, recordTagName, charSet, getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null)); }