NIFI-7221 Support v2 and v3 protocol version for Hortonworks Schema Registry

- Update nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
- Addressing review feedback

This closes #4120.
This commit is contained in:
Bryan Bende 2020-03-05 15:52:08 -05:00
parent 1fe79021b5
commit 2feeb57159
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
17 changed files with 638 additions and 168 deletions

View File

@ -48,13 +48,8 @@ public interface SchemaIdentifier {
*/
Optional<String> getBranch();
/**
* @return the protocol used to get this schema identifier
*/
Integer getProtocol();
SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null, -1);
SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null);
static Builder builder() {
return new StandardSchemaIdentifier.Builder();
@ -75,8 +70,6 @@ public interface SchemaIdentifier {
Builder branch(String branch);
Builder protocol(Integer protocol);
SchemaIdentifier build();
}

View File

@ -27,20 +27,14 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
private final OptionalInt version;
private final OptionalLong schemaVersionId;
private final Optional<String> branch;
private final int protocol;
StandardSchemaIdentifier(final String name, final Long identifier, final Integer version,
final Long schemaVersionId, final String branch, final int protocol) {
final Long schemaVersionId, final String branch) {
this.name = Optional.ofNullable(name);
this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);
this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);
this.schemaVersionId = schemaVersionId == null ? OptionalLong.empty() : OptionalLong.of(schemaVersionId);
this.branch = Optional.ofNullable(branch);
this.protocol = protocol;
if ((this.name == null && this.identifier == null) || this.schemaVersionId == null) {
throw new IllegalStateException("Name or Identifier must be provided");
}
}
@Override
@ -68,11 +62,6 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
return branch;
}
@Override
public Integer getProtocol() {
return protocol;
}
@Override
public int hashCode() {
return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode()
@ -104,8 +93,7 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
+ "identifier = " + identifier + ", "
+ "version = " + version + ", "
+ "schemaVersionId = " + schemaVersionId + ", "
+ "branch = " + branch + ", "
+ "protocol = " + protocol + " ]";
+ "branch = " + branch + " ]";
}
/**
@ -118,7 +106,6 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
private Long identifier;
private Integer version;
private Long schemaVersionId;
private Integer protocol;
@Override
public SchemaIdentifier.Builder name(final String name) {
@ -150,15 +137,9 @@ public class StandardSchemaIdentifier implements SchemaIdentifier {
return this;
}
@Override
public SchemaIdentifier.Builder protocol(final Integer protocol) {
this.protocol = protocol;
return this;
}
@Override
public SchemaIdentifier build() {
return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch, protocol);
return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch);
}
}
}

View File

@ -21,9 +21,12 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.ConfluentSchemaRegistryWriter;
import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter;
import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter;
@ -42,6 +45,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.schema.access.SchemaAccessUtils.INHERIT_RECORD_SCHEMA;
@ -80,6 +84,17 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
.identifiesControllerService(RecordSchemaCacheService.class)
.build();
static final PropertyDescriptor SCHEMA_PROTOCOL_VERSION = new Builder()
.name("schema-protocol-version")
.displayName("Schema Protocol Version")
.description("The protocol version to be used for Schema Write Strategies that require a protocol version, such as Hortonworks Schema Registry strategies. " +
"Valid protocol versions for Hortonworks Schema Registry are integer values 1, 2, or 3.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.build();
/**
* This constant is just a base spec for the actual PropertyDescriptor.
* As it can be overridden by subclasses with different AllowableValues and default value,
@ -97,9 +112,12 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
private final List<AllowableValue> schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList(
SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA, NO_SCHEMA));
private final List<AllowableValue> schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList(
SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY));
private final Set<String> schemaWriteStrategiesRequiringProtocolVersion = new HashSet<>(Arrays.asList(
HWX_CONTENT_ENCODED_SCHEMA.getValue(), HWX_SCHEMA_REF_ATTRIBUTES.getValue()));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -112,6 +130,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
.allowableValues(strategies)
.build());
properties.add(SCHEMA_CACHE);
properties.add(SCHEMA_PROTOCOL_VERSION);
properties.addAll(super.getSupportedPropertyDescriptors());
return properties;
@ -134,10 +153,18 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
public void storeSchemaWriteStrategy(final ConfigurationContext context) {
this.configurationContext = context;
// If Schema Protocol Version is specified without EL then we can create it up front, otherwise when
// EL is present we will re-create it later so we can re-evaluate the EL against the incoming variables
final String strategy = context.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
if (strategy != null) {
final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService);
final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION);
if (!protocolVersionValue.isExpressionLanguagePresent()) {
final int protocolVersion = context.getProperty(SCHEMA_PROTOCOL_VERSION).asInteger();
this.schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService);
}
}
}
@ -146,7 +173,27 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
return configurationContext;
}
protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema) throws SchemaNotFoundException {
protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema, final Map<String,String> variables) throws SchemaNotFoundException {
// If Schema Protocol Version is using expression language, then we reevaluate against the passed in variables
final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION);
if (protocolVersionValue.isExpressionLanguagePresent()) {
final int protocolVersion;
final String protocolVersionString = protocolVersionValue.evaluateAttributeExpressions(variables).getValue();
try {
protocolVersion = Integer.parseInt(protocolVersionString);
} catch (NumberFormatException nfe) {
throw new SchemaNotFoundException("Unable to create Schema Write Strategy because " + SCHEMA_PROTOCOL_VERSION.getDisplayName()
+ " must be a positive integer, but was '" + protocolVersionString + "'", nfe);
}
// Now recreate the SchemaAccessWriter since we may have a new value for Schema Protocol Version
final String strategy = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
if (strategy != null) {
final RecordSchemaCacheService recordSchemaCacheService = getConfigurationContext().getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService);
}
}
schemaAccessWriter.validateSchema(schema);
return schemaAccessWriter;
}
@ -164,8 +211,8 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
return schemaAccessWriter;
}
private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final RecordSchemaCacheService recordSchemaCacheService) {
final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy);
private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final Integer protocolVersion, final RecordSchemaCacheService recordSchemaCacheService) {
final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy, protocolVersion);
if (recordSchemaCacheService == null) {
return writer;
} else {
@ -173,15 +220,15 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
}
}
private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy) {
private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy, final Integer protocolVersion) {
if (strategy.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
return new SchemaNameAsAttribute();
} else if (strategy.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
return new WriteAvroSchemaAttributeStrategy();
} else if (strategy.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
return new HortonworksEncodedSchemaReferenceWriter();
return new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
} else if (strategy.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
return new HortonworksAttributeSchemaReferenceWriter();
return new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
} else if (strategy.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
return new ConfluentSchemaRegistryWriter();
} else if (strategy.equalsIgnoreCase(NO_SCHEMA.getValue())) {
@ -221,6 +268,17 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
.build());
}
final String schemaWriteStrategy = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
final String protocolVersion = validationContext.getProperty(SCHEMA_PROTOCOL_VERSION).getValue();
if (schemaWriteStrategy != null && schemaWriteStrategiesRequiringProtocolVersion.contains(schemaWriteStrategy) && protocolVersion == null) {
results.add(new ValidationResult.Builder()
.subject(SCHEMA_PROTOCOL_VERSION.getDisplayName())
.valid(false)
.explanation("The configured Schema Write Strategy requires a Schema Protocol Version to be specified.")
.build());
}
return results;
}
}

View File

@ -17,6 +17,10 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
@ -24,10 +28,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy {
private final Set<SchemaField> schemaFields;
@ -37,7 +37,6 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
public static final String SCHEMA_VERSION_ID_ATTRIBUTE = "schema.version.id";
private final SchemaRegistry schemaRegistry;
static final int LATEST_PROTOCOL_VERSION = 3;
public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
this.schemaRegistry = schemaRegistry;
@ -55,44 +54,52 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
@Override
public RecordSchema getSchema(Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
if ((schemaVersionId == null && (schemaIdentifier == null || schemaVersion == null)) || schemaProtocol == null) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: "
+ SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
}
if (!isNumber(schemaProtocol)) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Protocol Version number");
}
final int protocol = Integer.parseInt(schemaProtocol);
if (protocol > LATEST_PROTOCOL_VERSION) {
throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+ LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocol + " or was not encoded with this data format");
if (protocol < HortonworksProtocolVersions.MIN_VERSION || protocol > HortonworksProtocolVersions.MAX_VERSION) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be a value between "
+ HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
}
SchemaIdentifier identifier;
if (!isNumber(schemaVersionId)) {
if (!isNumber(schemaIdentifier)) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Schema Identifier number");
}
if (!isNumber(schemaVersion)) {
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+ schemaProtocol + "', which is not a valid Schema Version number");
}
switch (protocol) {
case 1:
final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
if (!isNumber(schemaIdentifier)) {
throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
+ schemaIdentifier + "', which is not a valid Schema Identifier and is required by Protocol Version " + protocol);
}
final long schemaId = Long.parseLong(schemaIdentifier);
final int version = Integer.parseInt(schemaVersion);
identifier = SchemaIdentifier.builder().id(schemaId).version(version).protocol(protocol).build();
} else {
final long svi = Long.parseLong(schemaVersionId);
identifier = SchemaIdentifier.builder().schemaVersionId(svi).protocol(protocol).build();
final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
if (!isNumber(schemaVersion)) {
throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
+ schemaVersion + "', which is not a valid Schema Version and is required by Protocol Version " + protocol);
}
final long schemaId = Long.parseLong(schemaIdentifier);
final int version = Integer.parseInt(schemaVersion);
identifier = SchemaIdentifier.builder().id(schemaId).version(version).build();
break;
case 2:
case 3:
final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
if (!isNumber(schemaVersionId)) {
throw new SchemaNotFoundException("Could not determine schema because " + SCHEMA_VERSION_ID_ATTRIBUTE + " has a value of '"
+ schemaVersionId + "', which is not a valid Schema Version Identifier and is required by Protocol Version " + protocol);
}
final long svi = Long.parseLong(schemaVersionId);
identifier = SchemaIdentifier.builder().schemaVersionId(svi).build();
break;
default:
throw new SchemaNotFoundException("Unknown Protocol Version: " + protocol);
}
final RecordSchema schema = schemaRegistry.retrieveSchema(identifier);

View File

@ -17,21 +17,30 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.IOException;
import java.io.OutputStream;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter {
private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
static final int LATEST_PROTOCOL_VERSION = 3;
static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
private final int protocolVersion;
public HortonworksAttributeSchemaReferenceWriter(final int protocolVersion) {
this.protocolVersion = protocolVersion;
if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) {
throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between "
+ HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
}
}
@Override
public void writeHeader(RecordSchema schema, OutputStream out) throws IOException {
}
@ -41,21 +50,29 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
final Map<String, String> attributes = new HashMap<>(4);
final SchemaIdentifier id = schema.getIdentifier();
final Long schemaId = id.getIdentifier().getAsLong();
final Integer schemaVersion = id.getVersion().getAsInt();
switch (protocolVersion) {
case 1:
final Long schemaId = id.getIdentifier().getAsLong();
final Integer schemaVersion = id.getVersion().getAsInt();
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
break;
case 2:
case 3:
final Long schemaVersionId = id.getSchemaVersionId().getAsLong();
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
break;
default:
// Can't reach this point
throw new IllegalStateException("Unknown Protocol Verison: " + protocolVersion);
}
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(id.getProtocol()));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocolVersion));
if (id.getBranch().isPresent()) {
attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get());
}
if (id.getSchemaVersionId().isPresent()) {
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(id.getSchemaVersionId().getAsLong()));
}
return attributes;
}
@ -63,19 +80,33 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr
public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
final SchemaIdentifier identifier = schema.getIdentifier();
if(!identifier.getSchemaVersionId().isPresent()) {
if (!identifier.getIdentifier().isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
}
if (!identifier.getVersion().isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
}
switch (protocolVersion) {
case 1:
if (!identifier.getIdentifier().isPresent()) {
throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Identifier " +
"is not known and is required for Protocol Version " + protocolVersion);
}
if (!identifier.getVersion().isPresent()) {
throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version " +
"is not known and is required for Protocol Version " + protocolVersion);
}
break;
case 2:
case 3:
if (!identifier.getSchemaVersionId().isPresent()) {
throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version Identifier " +
"is not known and is required for Protocol Version " + protocolVersion);
}
break;
default:
// Can't reach this point
throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion);
}
}
@Override
public Set<SchemaField> getRequiredSchemaFields() {
return requiredSchemaFields;
return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion);
}
}

View File

@ -17,6 +17,11 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
@ -25,13 +30,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.stream.io.StreamUtils;
public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy {
private static final int LATEST_PROTOCOL_VERSION = 3;
private final Set<SchemaField> schemaFields;
private final SchemaRegistry schemaRegistry;
@ -42,6 +41,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
schemaFields = new HashSet<>();
schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
schemaFields.add(SchemaField.SCHEMA_VERSION);
schemaFields.add(SchemaField.SCHEMA_VERSION_ID);
schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
}
@ -58,6 +58,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
// See: https://registry-project.readthedocs.io/en/latest/serdes.html#
final ByteBuffer bb = ByteBuffer.wrap(buffer);
final int protocolVersion = bb.get();
SchemaIdentifier schemaIdentifier;
switch(protocolVersion) {
@ -69,11 +70,11 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
} catch (final IOException ioe) {
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
}
final ByteBuffer bbv1 = ByteBuffer.wrap(buffer);
final ByteBuffer bbv1 = ByteBuffer.wrap(bufferv1);
final long schemaId = bbv1.getLong();
final int schemaVersion = bbv1.getInt();
schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).protocol(protocolVersion).build();
schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
case 2:
@ -84,10 +85,10 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
} catch (final IOException ioe) {
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
}
final ByteBuffer bbv2 = ByteBuffer.wrap(buffer);
final ByteBuffer bbv2 = ByteBuffer.wrap(bufferv2);
final long sviLong = bbv2.getLong();
schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).protocol(protocolVersion).build();
schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
case 3:
@ -98,15 +99,16 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
} catch (final IOException ioe) {
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
}
final ByteBuffer bbv3 = ByteBuffer.wrap(buffer);
final ByteBuffer bbv3 = ByteBuffer.wrap(bufferv3);
final int sviInt = bbv3.getInt();
schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).protocol(protocolVersion).build();
schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
default:
throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+ LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format");
throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. Expected Protocol Version to be a value between "
+ HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION
+ ", but data was encoded with protocol version " + protocolVersion + ".");
}
}

View File

@ -17,20 +17,28 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter {
private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
private static final int LATEST_PROTOCOL_VERSION = 3;
private final int protocolVersion;
public HortonworksEncodedSchemaReferenceWriter(final int protocolVersion) {
this.protocolVersion = protocolVersion;
if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) {
throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between "
+ HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
}
}
@Override
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
@ -38,7 +46,7 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
// This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
// See: https://registry-project.readthedocs.io/en/latest/serdes.html#
switch(identifier.getProtocol()) {
switch(protocolVersion) {
case 1:
final Long id = identifier.getIdentifier().getAsLong();
final Integer version = identifier.getVersion().getAsInt();
@ -49,25 +57,23 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
out.write(bbv1.array());
return;
case 2:
final Long sviV2 = identifier.getIdentifier().getAsLong();
final Long sviV2 = identifier.getSchemaVersionId().getAsLong();
final ByteBuffer bbv2 = ByteBuffer.allocate(9);
bbv2.put((byte) 2);
bbv2.putLong(sviV2);
out.write(bbv2.array());
return;
case 3:
final Long sviV3 = identifier.getIdentifier().getAsLong();
final Long sviV3 = identifier.getSchemaVersionId().getAsLong();
final ByteBuffer bbv3 = ByteBuffer.allocate(5);
bbv3.put((byte) 3);
bbv3.putInt(sviV3.intValue());
out.write(bbv3.array());
return;
default:
throw new IOException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version "
+ LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + identifier.getProtocol() + " or was not encoded with this data format");
// Can't reach this point
throw new IllegalStateException("Unknown Protocol Version: " + this.protocolVersion);
}
}
@Override
@ -79,19 +85,33 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit
public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
final SchemaIdentifier identifier = schema.getIdentifier();
if(!identifier.getSchemaVersionId().isPresent()) {
if (!identifier.getIdentifier().isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known");
}
if (!identifier.getVersion().isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known");
}
switch (protocolVersion) {
case 1:
if (!identifier.getIdentifier().isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier " +
"is not known and is required for Protocol Version " + protocolVersion);
}
if (!identifier.getVersion().isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version " +
"is not known and is required for Protocol Version " + protocolVersion);
}
break;
case 2:
case 3:
if (!identifier.getSchemaVersionId().isPresent()) {
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version Identifier " +
"is not known and is required for Protocol Version " + protocolVersion);
}
break;
default:
// Can't reach this point
throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion);
}
}
@Override
public Set<SchemaField> getRequiredSchemaFields() {
return requiredSchemaFields;
return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion);
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.schema.access;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* Constants related to Protocol Versions for Hortonworks Schema Registry.
*/
public class HortonworksProtocolVersions {
/**
* The minimum valid protocol version.
*/
public static final int MIN_VERSION = 1;
/**
* The maximum valid protocol version.
*/
public static final int MAX_VERSION = 3;
/**
* Map from protocol version to the required schema fields for the given version.
*/
private static final Map<Integer, Set<SchemaField>> REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL;
static {
final Map<Integer,Set<SchemaField>> requiredFieldsByProtocol = new HashMap<>();
requiredFieldsByProtocol.put(1, EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION));
requiredFieldsByProtocol.put(2, EnumSet.of(SchemaField.SCHEMA_VERSION_ID));
requiredFieldsByProtocol.put(3, EnumSet.of(SchemaField.SCHEMA_VERSION_ID));
REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL = Collections.unmodifiableMap(requiredFieldsByProtocol);
}
public static Set<SchemaField> getRequiredSchemaFields(final Integer protocolVersion) {
return REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL.get(protocolVersion);
}
}

View File

@ -42,7 +42,11 @@ public class AbstractSchemaAccessStrategyTest {
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
.name("person").branch("master").version(1).id(1L).build();
.name("person")
.branch("master")
.version(1)
.id(1L)
.build();
this.recordSchema = new SimpleRecordSchema(fields, schemaIdentifier);
}

View File

@ -32,7 +32,7 @@ import static org.mockito.Mockito.when;
public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSchemaAccessStrategyTest {
@Test
public void testGetSchemaWithValidAttributes() throws IOException, SchemaNotFoundException {
public void testGetSchemaWithValidSchemaIdVersionAndProtocol() throws IOException, SchemaNotFoundException {
final long schemaId = 123456;
final int version = 2;
final int protocol = 1;
@ -56,9 +56,115 @@ public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSch
assertNotNull(retrievedSchema);
}
@Test
public void testGetSchemaWithValidSchemaVersionIdAndProtocol() throws IOException, SchemaNotFoundException {
final long schemaVersionId = 9999;
final int protocol = 2;
final Map<String,String> attributes = new HashMap<>();
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.schemaVersionId(schemaVersionId)
.build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
.thenReturn(recordSchema);
final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
assertNotNull(retrievedSchema);
}
@Test
public void testGetSchemaWithAllAttributes() throws IOException, SchemaNotFoundException {
final long schemaId = 123456;
final int version = 2;
final long schemaVersionId = 9999;
final int protocol = 2;
final Map<String,String> attributes = new HashMap<>();
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
// The schema version id should take precedence
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.schemaVersionId(schemaVersionId)
.build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
.thenReturn(recordSchema);
final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
assertNotNull(retrievedSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testGetSchemaMissingAttributes() throws IOException, SchemaNotFoundException {
public void testGetSchemaMissingAllAttributes() throws IOException, SchemaNotFoundException {
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
schemaAccessStrategy.getSchema(Collections.emptyMap(), null, recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testGetSchemaMissingProtocol() throws IOException, SchemaNotFoundException {
final long schemaId = 123456;
final int version = 2;
final long schemaVersionId = 9999;
final Map<String,String> attributes = new HashMap<>();
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
schemaAccessStrategy.getSchema(attributes, null, recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testGetSchemaWithInvalidProtocol() throws IOException, SchemaNotFoundException {
final long schemaId = 123456;
final int version = 2;
final long schemaVersionId = 9999;
final Map<String,String> attributes = new HashMap<>();
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, "INVALID_PROTOCOL");
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
schemaAccessStrategy.getSchema(attributes, null, recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testGetSchemaNotFound() throws IOException, SchemaNotFoundException {
final long schemaId = 123456;
final int version = 2;
final long schemaVersionId = 9999;
final int protocol = 2;
final Map<String,String> attributes = new HashMap<>();
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
// The schema version id should take precedence
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
.schemaVersionId(schemaVersionId)
.build();
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
.thenReturn(null);
schemaAccessStrategy.getSchema(attributes, null, recordSchema);
}
}

View File

@ -31,29 +31,75 @@ import java.util.Map;
public class TestHortonworksAttributeSchemaReferenceWriter {
@Test
public void testValidateWithValidSchema() throws SchemaNotFoundException {
public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateWithInvalidSchema() throws SchemaNotFoundException {
public void testValidateWithProtocol1AndMissingSchemaId() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateWithProtocol1AndMissingSchemaName() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test
public void testGetAttributesWithoutBranch() {
public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateWithProtocol2AndMissingSchemaVersionId() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test
public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateWithProtocol3AndMissingSchemaVersionId() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test
public void testGetAttributesWithProtocol1() {
final Integer protocolVersion = 1;
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
Assert.assertEquals(3, attributes.size());
@ -64,16 +110,17 @@ public class TestHortonworksAttributeSchemaReferenceWriter {
Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
Assert.assertEquals(String.valueOf(protocolVersion),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
}
@Test
public void testGetAttributesWithBranch() {
public void testGetAttributesWithProtocol1AndBranch() {
final Integer protocolVersion = 1;
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).branch("foo").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter();
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
Assert.assertEquals(4, attributes.size());
@ -84,10 +131,52 @@ public class TestHortonworksAttributeSchemaReferenceWriter {
Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
Assert.assertEquals(String.valueOf(protocolVersion),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
Assert.assertEquals("foo", attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE));
Assert.assertEquals(schemaIdentifier.getBranch().get(),
attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE));
}
@Test
public void testGetAttributesWithProtocol2() {
final Integer protocolVersion = 2;
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
Assert.assertEquals(2, attributes.size());
Assert.assertEquals(String.valueOf(protocolVersion),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
Assert.assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE));
}
@Test
public void testGetAttributesWithProtocol3() {
final Integer protocolVersion = 3;
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
Assert.assertEquals(2, attributes.size());
Assert.assertEquals(String.valueOf(protocolVersion),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
Assert.assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()),
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE));
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidProtocolVersion() {
new HortonworksAttributeSchemaReferenceWriter(99);
}
private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {

View File

@ -18,6 +18,8 @@
package org.apache.nifi.schema.access;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Test;
@ -26,17 +28,24 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class TestHortonworksEncodedSchemaReferenceWriter {
@Test
public void testHeader() throws IOException {
final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter();
public void testEncodeProtocolVersion1() throws IOException {
final long id = 48;
final int version = 2;
final int protocolVersion = 1;
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), SchemaIdentifier.builder().name("name").id( 48L).version( 2).build());
final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
SchemaIdentifier.builder().name("name").id(id).version(version).build());
final byte[] header;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
@ -45,11 +54,126 @@ public class TestHortonworksEncodedSchemaReferenceWriter {
}
try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
assertEquals(1, dis.read()); // verify 'protocol version'
assertEquals(48, dis.readLong()); // verify schema id
assertEquals(2, dis.readInt()); // verify schema version
assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
assertEquals(id, dis.readLong()); // verify schema id
assertEquals(version, dis.readInt()); // verify schema version
assertEquals(-1, dis.read()); // no more bytes
}
}
@Test
public void testEncodeProtocolVersion2() throws IOException {
final long schemaVersionId = 123;
final int protocolVersion = 2;
final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
SchemaIdentifier.builder().schemaVersionId(schemaVersionId).build());
final byte[] header;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
writer.writeHeader(schema, baos);
header = baos.toByteArray();
}
try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
assertEquals(schemaVersionId, dis.readLong()); // verify schema version id
assertEquals(-1, dis.read()); // no more bytes
}
}
@Test
public void testEncodeProtocolVersion3() throws IOException {
final int schemaVersionId = 123;
final int protocolVersion = 3;
final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
SchemaIdentifier.builder().schemaVersionId((long)schemaVersionId).build());
final byte[] header;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
writer.writeHeader(schema, baos);
header = baos.toByteArray();
}
try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
assertEquals(schemaVersionId, dis.readInt()); // verify schema version id
assertEquals(-1, dis.read()); // no more bytes
}
}
@Test
public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateWithProtocol1AndMissingSchemaId() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateWithProtocol1AndMissingSchemaName() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test
public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateWithProtocol2AndMissingSchemaVersionId() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test
public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3);
schemaAccessWriter.validateSchema(recordSchema);
}
@Test(expected = SchemaNotFoundException.class)
public void testValidateWithProtocol3AndMissingSchemaVersionId() throws SchemaNotFoundException {
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3);
schemaAccessWriter.validateSchema(recordSchema);
}
private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
return new SimpleRecordSchema(fields, schemaIdentifier);
}
}

View File

@ -426,7 +426,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.name(schemaName.get())
.branch(schemaBranchName.orElse(null))
.version(versionInfo.getVersion())
.protocol(schemaIdentifier.getProtocol())
.schemaVersionId(versionInfo.getId())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@ -477,7 +477,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.name(schemaName)
.id(schemaId.getAsLong())
.version(version.getAsInt())
.protocol(schemaIdentifier.getProtocol())
.schemaVersionId(versionInfo.getId())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);
@ -522,7 +522,6 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.id(versionInfo.getSchemaMetadataId())
.version(versionInfo.getVersion())
.schemaVersionId(schemaVersionId.getAsLong())
.protocol(schemaIdentifier.getProtocol())
.build();
final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText);

View File

@ -148,7 +148,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
if (AVRO_EMBEDDED.getValue().equals(strategyValue)) {
return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat));
} else {
return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool, getLogger());
return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema, variables), out, encoderPool, getLogger());
}
} catch (final SchemaNotFoundException e) {
throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e);

View File

@ -17,13 +17,6 @@
package org.apache.nifi.csv;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.csv.CSVFormat;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -37,6 +30,13 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"})
@CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written "
+ "will be the column names (unless the 'Include Header Line' property is false). All subsequent lines will be the values "
@ -92,7 +92,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R
csvFormat = CSVUtils.createCSVFormat(context, variables);
}
return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out,
return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema, variables), out,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader, charSet);
}
}

View File

@ -17,14 +17,6 @@
package org.apache.nifi.json;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -49,6 +41,14 @@ import org.tukaani.xz.XZOutputStream;
import org.xerial.snappy.SnappyFramedOutputStream;
import org.xerial.snappy.SnappyOutputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@Tags({"json", "resultset", "writer", "serialize", "record", "recordset", "row"})
@CapabilityDescription("Writes the results of a RecordSet as either a JSON Array or one JSON object per line. If using Array output, then even if the RecordSet "
+ "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.")
@ -210,7 +210,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
throw new IOException(e);
}
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), compressionOut, prettyPrint, nullSuppression, outputGrouping,
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema, variables), compressionOut, prettyPrint, nullSuppression, outputGrouping,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeTypeRef);
}

View File

@ -203,7 +203,7 @@ public class XMLRecordSetWriter extends DateTimeTextRecordSetWriter implements R
final String charSet = getConfigurationContext().getProperty(CHARACTER_SET).getValue();
return new WriteXMLResult(schema, getSchemaAccessWriter(schema),
return new WriteXMLResult(schema, getSchemaAccessWriter(schema, variables),
out, prettyPrint, nullSuppressionEnum, arrayWrappingEnum, arrayTagName, rootTagName, recordTagName, charSet,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
}