mirror of https://github.com/apache/nifi.git
NIFI-12261 Added Schema Reference Reader and Writer Services (#7991)
- Refactored Confluent Encoded handling to Schema Reference Reader and Schema Reference Writer implementations - Removed Schema Protocol Version from Schema Registry Service associated with Hortonworks Schema Registry
This commit is contained in:
parent
8b0abd45df
commit
3dcfc919bb
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.confluent.schemaregistry;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.schema.access.SchemaField;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaReferenceReader;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
@Tags({"confluent", "schema", "registry", "kafka", "avro"})
|
||||
@CapabilityDescription("Reads Schema Identifier according to Confluent encoding as a header consisting of a byte marker and an integer represented as four bytes")
|
||||
public class ConfluentEncodedSchemaReferenceReader extends AbstractControllerService implements SchemaReferenceReader {
|
||||
private static final int HEADER_CAPACITY = 5;
|
||||
|
||||
private static final byte MAGIC_BYTE = 0;
|
||||
|
||||
private static final Set<SchemaField> SUPPLIED_SCHEMA_FIELDS = Set.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
|
||||
|
||||
@Override
|
||||
public SchemaIdentifier getSchemaIdentifier(final Map<String, String> variables, final InputStream contentStream) throws SchemaNotFoundException {
|
||||
final byte[] header = new byte[HEADER_CAPACITY];
|
||||
try {
|
||||
StreamUtils.fillBuffer(contentStream, header);
|
||||
} catch (final IOException e) {
|
||||
throw new SchemaNotFoundException("Failed to read header in first 5 bytes from stream", e);
|
||||
}
|
||||
|
||||
final ByteBuffer headerBuffer = ByteBuffer.wrap(header);
|
||||
final byte firstByte = headerBuffer.get();
|
||||
if (MAGIC_BYTE == firstByte) {
|
||||
final int schemaId = headerBuffer.getInt();
|
||||
getLogger().debug("Confluent Schema Identifier found [{}]", schemaId);
|
||||
return SchemaIdentifier.builder().schemaVersionId((long) schemaId).build();
|
||||
} else {
|
||||
throw new SchemaNotFoundException("Confluent Schema encoding not found in first byte of content header");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SchemaField> getSuppliedSchemaFields() {
|
||||
return SUPPLIED_SCHEMA_FIELDS;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.confluent.schemaregistry;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.schema.access.SchemaField;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaReferenceWriter;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Map;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
|
||||
@Tags({"confluent", "schema", "registry", "kafka", "avro"})
|
||||
@CapabilityDescription("Writes Schema Identifier according to Confluent encoding as a header consisting of a byte marker and an integer represented as four bytes")
|
||||
public class ConfluentEncodedSchemaReferenceWriter extends AbstractControllerService implements SchemaReferenceWriter {
|
||||
private static final Set<SchemaField> REQUIRED_SCHEMA_FIELDS = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
|
||||
|
||||
private static final int HEADER_CAPACITY = 5;
|
||||
|
||||
private static final byte MAGIC_BYTE = 0;
|
||||
|
||||
/**
|
||||
* Write Record Schema Identifier according to Confluent wire format consisting of a magic byte followed by an integer
|
||||
*
|
||||
* @param recordSchema Record Schema to be written to the provided Output Stream
|
||||
* @param outputStream Output Stream to which the provided Record Schema should be written
|
||||
* @throws IOException Thrown on failure to write header
|
||||
*/
|
||||
@Override
|
||||
public void writeHeader(final RecordSchema recordSchema, final OutputStream outputStream) throws IOException {
|
||||
final SchemaIdentifier schemaIdentifier = recordSchema.getIdentifier();
|
||||
final OptionalLong identifier = schemaIdentifier.getIdentifier();
|
||||
if (identifier.isPresent()) {
|
||||
final long id = identifier.getAsLong();
|
||||
final int schemaId = Math.toIntExact(id);
|
||||
|
||||
final ByteBuffer header = ByteBuffer.allocate(HEADER_CAPACITY);
|
||||
header.put(MAGIC_BYTE);
|
||||
header.putInt(schemaId);
|
||||
|
||||
outputStream.write(header.array());
|
||||
} else {
|
||||
throw new IOException("Identifier field not found in Schema Identifier");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getAttributes(final RecordSchema recordSchema) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validateSchema(final RecordSchema recordSchema) throws SchemaNotFoundException {
|
||||
final SchemaIdentifier schemaIdentifier = recordSchema.getIdentifier();
|
||||
final OptionalLong identifier = schemaIdentifier.getIdentifier();
|
||||
if (identifier.isPresent()) {
|
||||
final OptionalInt version = schemaIdentifier.getVersion();
|
||||
if (version.isEmpty()) {
|
||||
throw new SchemaNotFoundException("Cannot write Confluent Schema Registry Reference because the Schema Version is not known");
|
||||
}
|
||||
} else {
|
||||
throw new SchemaNotFoundException("Cannot write Confluent Schema Registry Reference because the Schema Identifier is not known");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SchemaField> getRequiredSchemaFields() {
|
||||
return REQUIRED_SCHEMA_FIELDS;
|
||||
}
|
||||
}
|
|
@ -13,4 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.nifi.confluent.schemaregistry.ConfluentSchemaRegistry
|
||||
org.apache.nifi.confluent.schemaregistry.ConfluentEncodedSchemaReferenceReader
|
||||
org.apache.nifi.confluent.schemaregistry.ConfluentEncodedSchemaReferenceWriter
|
||||
org.apache.nifi.confluent.schemaregistry.ConfluentSchemaRegistry
|
||||
|
|
|
@ -14,10 +14,15 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schema.access;
|
||||
package org.apache.nifi.confluent.schemaregistry;
|
||||
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.apache.nifi.util.NoOpProcessor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
@ -25,57 +30,61 @@ import java.io.ByteArrayOutputStream;
|
|||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.OptionalLong;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestConfluentSchemaRegistryStrategy extends AbstractSchemaAccessStrategyTest {
|
||||
class ConfluentEncodedSchemaReferenceReaderTest {
|
||||
private static final String SERVICE_ID = ConfluentEncodedSchemaReferenceReader.class.getSimpleName();
|
||||
|
||||
private static final byte MAGIC_BYTE = 0;
|
||||
|
||||
private static final int INVALID_MAGIC_BYTE = 1;
|
||||
|
||||
private static final int ID = 123456;
|
||||
|
||||
private ConfluentEncodedSchemaReferenceReader reader;
|
||||
|
||||
@BeforeEach
|
||||
void setRunner() throws InitializationException {
|
||||
reader = new ConfluentEncodedSchemaReferenceReader();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
|
||||
runner.addControllerService(SERVICE_ID, reader);
|
||||
runner.enableControllerService(reader);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSchemaWithValidEncoding() throws IOException, SchemaNotFoundException {
|
||||
final SchemaAccessStrategy schemaAccessStrategy = new ConfluentSchemaRegistryStrategy(schemaRegistry);
|
||||
|
||||
final int schemaId = 123456;
|
||||
|
||||
public void testGetSchemaIdentifier() throws IOException, SchemaNotFoundException {
|
||||
try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
final DataOutputStream out = new DataOutputStream(bytesOut)) {
|
||||
out.write(0);
|
||||
out.writeInt(schemaId);
|
||||
out.write(MAGIC_BYTE);
|
||||
out.writeInt(ID);
|
||||
out.flush();
|
||||
|
||||
try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) {
|
||||
final SchemaIdentifier schemaIdentifier = reader.getSchemaIdentifier(Collections.emptyMap(), in);
|
||||
|
||||
// the confluent strategy will read the id from the input stream
|
||||
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
|
||||
.schemaVersionId((long)schemaId)
|
||||
.build();
|
||||
|
||||
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
|
||||
.thenReturn(recordSchema);
|
||||
|
||||
final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema);
|
||||
assertNotNull(retrievedSchema);
|
||||
final OptionalLong schemaVersionId = schemaIdentifier.getSchemaVersionId();
|
||||
assertTrue(schemaVersionId.isPresent());
|
||||
assertEquals(schemaVersionId.getAsLong(), ID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSchemaWithInvalidEncoding() {
|
||||
final SchemaAccessStrategy schemaAccessStrategy = new ConfluentSchemaRegistryStrategy(schemaRegistry);
|
||||
|
||||
final int schemaId = 123456;
|
||||
|
||||
public void testGetSchemaIdentifierInvalid() {
|
||||
assertThrows(SchemaNotFoundException.class, () -> {
|
||||
try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
final DataOutputStream out = new DataOutputStream(bytesOut)) {
|
||||
out.write(1); // write an invalid magic byte
|
||||
out.writeInt(schemaId);
|
||||
out.write(INVALID_MAGIC_BYTE);
|
||||
out.writeInt(ID);
|
||||
out.flush();
|
||||
|
||||
try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) {
|
||||
schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema);
|
||||
reader.getSchemaIdentifier(Collections.emptyMap(), in);
|
||||
}
|
||||
}
|
||||
});
|
|
@ -14,13 +14,19 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schema.access;
|
||||
package org.apache.nifi.confluent.schemaregistry;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.apache.nifi.util.NoOpProcessor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
@ -29,50 +35,72 @@ import java.io.DataInputStream;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.OptionalLong;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class TestConfluentSchemaRegistryWriter {
|
||||
class ConfluentEncodedSchemaReferenceWriterTest {
|
||||
private static final String SERVICE_ID = ConfluentEncodedSchemaReferenceWriter.class.getSimpleName();
|
||||
|
||||
@Test
|
||||
public void testValidateValidSchema() throws SchemaNotFoundException {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
private static final byte MAGIC_BYTE = 0;
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new ConfluentSchemaRegistryWriter();
|
||||
schemaAccessWriter.validateSchema(recordSchema);
|
||||
private static final int VERSION = 2;
|
||||
|
||||
private static final long ID = 123456;
|
||||
|
||||
private ConfluentEncodedSchemaReferenceWriter writer;
|
||||
|
||||
@BeforeEach
|
||||
void setRunner() throws InitializationException {
|
||||
writer = new ConfluentEncodedSchemaReferenceWriter();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
|
||||
runner.addControllerService(SERVICE_ID, writer);
|
||||
runner.enableControllerService(writer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateInvalidSchema() {
|
||||
void testValidateValidSchema() {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(ID).version(VERSION).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
assertDoesNotThrow(() -> writer.validateSchema(recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testValidateInvalidSchema() {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new ConfluentSchemaRegistryWriter();
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessWriter.validateSchema(recordSchema));
|
||||
assertThrows(SchemaNotFoundException.class, () -> writer.validateSchema(recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteHeader() throws IOException {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
|
||||
void testWriteHeader() throws IOException {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(ID).version(VERSION).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new ConfluentSchemaRegistryWriter();
|
||||
schemaAccessWriter.writeHeader(recordSchema, out);
|
||||
writer.writeHeader(recordSchema, out);
|
||||
|
||||
try (final ByteArrayInputStream bytesIn = new ByteArrayInputStream(out.toByteArray());
|
||||
final DataInputStream in = new DataInputStream(bytesIn)) {
|
||||
assertEquals(0, in.readByte());
|
||||
assertEquals((int) schemaIdentifier.getIdentifier().getAsLong(), in.readInt());
|
||||
assertEquals(MAGIC_BYTE, in.readByte());
|
||||
|
||||
final OptionalLong identifier = schemaIdentifier.getIdentifier();
|
||||
assertTrue(identifier.isPresent());
|
||||
|
||||
final long id = identifier.getAsLong();
|
||||
assertEquals(id, in.readInt());
|
||||
}
|
||||
}
|
||||
|
||||
private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField("firstField", RecordFieldType.STRING.getDataType()));
|
||||
return new SimpleRecordSchema(fields, schemaIdentifier);
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import org.apache.nifi.components.ValidationResult;
|
|||
import org.apache.nifi.context.PropertyContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaReferenceReader;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
@ -35,28 +36,24 @@ public class SchemaAccessUtils {
|
|||
|
||||
public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property",
|
||||
"The name of the Schema to use is specified by the 'Schema Name' Property. The value of this property is used to lookup the Schema in the configured Schema Registry service.");
|
||||
|
||||
public static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use 'Schema Text' Property",
|
||||
"The text of the Schema itself is specified by the 'Schema Text' Property. The value of this property must be a valid Avro Schema. "
|
||||
+ "If Expression Language is used, the value of the 'Schema Text' property must be valid after substituting the expressions.");
|
||||
public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema Reference",
|
||||
"The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single byte indicating the 'protocol version', "
|
||||
+ "followed by 8 bytes indicating the schema identifier, and finally 4 bytes indicating the schema version, as per the Hortonworks Schema Registry serializers and deserializers, "
|
||||
+ "found at https://github.com/hortonworks/registry");
|
||||
public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
|
||||
"The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
|
||||
|
||||
public static final AllowableValue SCHEMA_REFERENCE_READER_PROPERTY = new AllowableValue("schema-reference-reader", "Schema Reference Reader",
|
||||
"The schema reference information will be provided through a configured Schema Reference Reader service implementation.");
|
||||
|
||||
public static final AllowableValue INHERIT_RECORD_SCHEMA = new AllowableValue("inherit-record-schema", "Inherit Record Schema",
|
||||
"The schema used to write records will be the same schema that was given to the Record when the Record was created.");
|
||||
public static final AllowableValue CONFLUENT_ENCODED_SCHEMA = new AllowableValue("confluent-encoded", "Confluent Content-Encoded Schema Reference",
|
||||
"The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single "
|
||||
+ "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. "
|
||||
+ "This is based on version 3.2.x of the Confluent Schema Registry.");
|
||||
|
||||
public static final AllowableValue INFER_SCHEMA = new AllowableValue("infer", "Infer from Result");
|
||||
|
||||
public static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("schema-access-strategy")
|
||||
.displayName("Schema Access Strategy")
|
||||
.description("Specifies how to obtain the schema that is to be used for interpreting the data.")
|
||||
.allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA)
|
||||
.allowableValues(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, SCHEMA_REFERENCE_READER_PROPERTY)
|
||||
.defaultValue(SCHEMA_NAME_PROPERTY.getValue())
|
||||
.required(true)
|
||||
.build();
|
||||
|
@ -67,7 +64,7 @@ public class SchemaAccessUtils {
|
|||
.description("Specifies the Controller Service to use for the Schema Registry")
|
||||
.identifiesControllerService(SchemaRegistry.class)
|
||||
.required(false)
|
||||
.dependsOn(SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA)
|
||||
.dependsOn(SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY, SCHEMA_REFERENCE_READER_PROPERTY)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
|
||||
|
@ -114,6 +111,15 @@ public class SchemaAccessUtils {
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SCHEMA_REFERENCE_READER = new PropertyDescriptor.Builder()
|
||||
.name("schema-reference-reader")
|
||||
.displayName("Schema Reference Reader")
|
||||
.description("Service implementation responsible for reading FlowFile attributes or content to determine the Schema Reference Identifier")
|
||||
.dependsOn(SCHEMA_ACCESS_STRATEGY, SCHEMA_REFERENCE_READER_PROPERTY)
|
||||
.identifiesControllerService(SchemaReferenceReader.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static Collection<ValidationResult> validateSchemaAccessStrategy(final ValidationContext validationContext, final String schemaAccessStrategyValue,
|
||||
final List<AllowableValue> schemaAccessStrategyValues) {
|
||||
|
||||
|
@ -149,6 +155,24 @@ public class SchemaAccessUtils {
|
|||
return validationResults;
|
||||
}
|
||||
|
||||
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
|
||||
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
|
||||
final PropertyValue schemaName = context.getProperty(SCHEMA_NAME);
|
||||
final PropertyValue schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME);
|
||||
final PropertyValue schemaVersion = context.getProperty(SCHEMA_VERSION);
|
||||
return new SchemaNamePropertyStrategy(schemaRegistry, schemaName, schemaBranchName, schemaVersion);
|
||||
} else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
|
||||
return new InheritSchemaFromRecord();
|
||||
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
|
||||
return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
|
||||
} else if (allowableValue.equalsIgnoreCase(SCHEMA_REFERENCE_READER_PROPERTY.getValue())) {
|
||||
final SchemaReferenceReader schemaReferenceReader = context.getProperty(SCHEMA_REFERENCE_READER).asControllerService(SchemaReferenceReader.class);
|
||||
return new SchemaReferenceReaderSchemaAccessStrategy(schemaReferenceReader, schemaRegistry);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static String getSchemaAccessStrategyName(final String schemaAccessValue, final List<AllowableValue> schemaAccessStrategyValues) {
|
||||
for (final AllowableValue allowableValue : schemaAccessStrategyValues) {
|
||||
if (allowableValue.getValue().equalsIgnoreCase(schemaAccessValue)) {
|
||||
|
@ -160,30 +184,7 @@ public class SchemaAccessUtils {
|
|||
}
|
||||
|
||||
private static boolean isSchemaRegistryRequired(final String schemaAccessValue) {
|
||||
return HWX_CONTENT_ENCODED_SCHEMA.getValue().equalsIgnoreCase(schemaAccessValue) || SCHEMA_NAME_PROPERTY.getValue().equalsIgnoreCase(schemaAccessValue)
|
||||
|| HWX_SCHEMA_REF_ATTRIBUTES.getValue().equalsIgnoreCase(schemaAccessValue) || CONFLUENT_ENCODED_SCHEMA.getValue().equalsIgnoreCase(schemaAccessValue);
|
||||
return SCHEMA_NAME_PROPERTY.getValue().equalsIgnoreCase(schemaAccessValue)
|
||||
|| SCHEMA_REFERENCE_READER_PROPERTY.getValue().equalsIgnoreCase(schemaAccessValue);
|
||||
}
|
||||
|
||||
|
||||
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
|
||||
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
|
||||
final PropertyValue schemaName = context.getProperty(SCHEMA_NAME);
|
||||
final PropertyValue schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME);
|
||||
final PropertyValue schemaVersion = context.getProperty(SCHEMA_VERSION);
|
||||
return new SchemaNamePropertyStrategy(schemaRegistry, schemaName, schemaBranchName, schemaVersion);
|
||||
} else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
|
||||
return new InheritSchemaFromRecord();
|
||||
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
|
||||
return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
|
||||
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
|
||||
return new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
|
||||
} else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
|
||||
return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
|
||||
} else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
|
||||
return new ConfluentSchemaRegistryStrategy(schemaRegistry);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ public class JsonInferenceSchemaRegistryService extends SchemaRegistryService {
|
|||
@OnEnabled
|
||||
public void onEnabled(ConfigurationContext context) {
|
||||
this.storeSchemaAccessStrategy(context);
|
||||
this.schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
|
||||
this.schemaAccess = context.getProperty(getSchemaAccessStrategyDescriptor()).getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -21,31 +21,28 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
|||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyDescriptor.Builder;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.schema.access.ConfluentSchemaRegistryWriter;
|
||||
import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter;
|
||||
import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter;
|
||||
import org.apache.nifi.migration.PropertyConfiguration;
|
||||
import org.apache.nifi.schema.access.NopSchemaAccessWriter;
|
||||
import org.apache.nifi.schema.access.SchemaAccessWriter;
|
||||
import org.apache.nifi.schema.access.SchemaField;
|
||||
import org.apache.nifi.schema.access.SchemaNameAsAttribute;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.schema.access.SchemaReferenceWriterSchemaAccessWriter;
|
||||
import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaReferenceWriter;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.INHERIT_RECORD_SCHEMA;
|
||||
|
@ -57,23 +54,14 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
|||
public static final AllowableValue SCHEMA_NAME_ATTRIBUTE = new AllowableValue("schema-name", "Set 'schema.name' Attribute",
|
||||
"The FlowFile will be given an attribute named 'schema.name' and this attribute will indicate the name of the schema in the Schema Registry. Note that if"
|
||||
+ "the schema for a record is not obtained from a Schema Registry, then no attribute will be added.");
|
||||
|
||||
public static final AllowableValue AVRO_SCHEMA_ATTRIBUTE = new AllowableValue("full-schema-attribute", "Set 'avro.schema' Attribute",
|
||||
"The FlowFile will be given an attribute named 'avro.schema' and this attribute will contain the Avro Schema that describes the records in the FlowFile. "
|
||||
+ "The contents of the FlowFile need not be Avro, but the text of the schema will be used.");
|
||||
public static final AllowableValue HWX_CONTENT_ENCODED_SCHEMA = new AllowableValue("hwx-content-encoded-schema", "HWX Content-Encoded Schema Reference",
|
||||
"The content of the FlowFile will contain a reference to a schema in the Schema Registry service. The reference is encoded as a single byte indicating the 'protocol version', "
|
||||
+ "followed by 8 bytes indicating the schema identifier, and finally 4 bytes indicating the schema version, as per the Hortonworks Schema Registry serializers and deserializers, "
|
||||
+ "as found at https://github.com/hortonworks/registry. "
|
||||
+ "This will be prepended to each FlowFile. Note that "
|
||||
+ "if the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data.");
|
||||
public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
|
||||
"The FlowFile will be given a set of 3 attributes to describe the schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. Note that if "
|
||||
+ "the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data.");
|
||||
public static final AllowableValue CONFLUENT_ENCODED_SCHEMA = new AllowableValue("confluent-encoded", "Confluent Schema Registry Reference",
|
||||
"The content of the FlowFile will contain a reference to a schema in the Schema Registry service. The reference is encoded as a single "
|
||||
+ "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. "
|
||||
+ "This will be prepended to each FlowFile. Note that if the schema for a record does not contain the necessary identifier and version, "
|
||||
+ "an Exception will be thrown when attempting to write the data. This is based on the encoding used by version 3.2.x of the Confluent Schema Registry.");
|
||||
|
||||
public static final AllowableValue SCHEMA_REFERENCE_WRITER = new AllowableValue("schema-reference-writer", "Schema Reference Writer",
|
||||
"The schema reference information will be written through a configured Schema Reference Writer service implementation.");
|
||||
|
||||
public static final AllowableValue NO_SCHEMA = new AllowableValue("no-schema", "Do Not Write Schema", "Do not add any schema-related information to the FlowFile.");
|
||||
|
||||
public static final PropertyDescriptor SCHEMA_CACHE = new Builder()
|
||||
|
@ -84,7 +72,6 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
|||
.identifiesControllerService(RecordSchemaCacheService.class)
|
||||
.build();
|
||||
|
||||
|
||||
/**
|
||||
* This constant is just a base spec for the actual PropertyDescriptor.
|
||||
* As it can be overridden by subclasses with different AllowableValues and default value,
|
||||
|
@ -96,30 +83,43 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
|||
.required(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor SCHEMA_PROTOCOL_VERSION = new Builder()
|
||||
.name("schema-protocol-version")
|
||||
.displayName("Schema Protocol Version")
|
||||
.description("The protocol version to be used for Schema Write Strategies that require a protocol version, such as Hortonworks Schema Registry strategies. " +
|
||||
"Valid protocol versions for Hortonworks Schema Registry are integer values 1, 2, or 3.")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||
.dependsOn(SCHEMA_WRITE_STRATEGY, HWX_CONTENT_ENCODED_SCHEMA, HWX_SCHEMA_REF_ATTRIBUTES)
|
||||
.defaultValue("1")
|
||||
.build();
|
||||
public static PropertyDescriptor SCHEMA_REFERENCE_WRITER_SERVICE = new Builder()
|
||||
.name("Schema Reference Writer")
|
||||
.description("Service implementation responsible for writing FlowFile attributes or content header with Schema reference information")
|
||||
.dependsOn(SCHEMA_WRITE_STRATEGY, SCHEMA_REFERENCE_WRITER)
|
||||
.required(true)
|
||||
.identifiesControllerService(SchemaReferenceWriter.class)
|
||||
.build();
|
||||
|
||||
private static final String OBSOLETE_CONFLUENT_ENCODED_WRITE_STRATEGY = "confluent-encoded";
|
||||
|
||||
private static final String OBSOLETE_SCHEMA_PROTOCOL_VERSION = "schema-protocol-version";
|
||||
|
||||
private static final String CONFLUENT_ENCODED_SCHEMA_REFERENCE_WRITER = "org.apache.nifi.confluent.schemaregistry.ConfluentEncodedSchemaReferenceWriter";
|
||||
|
||||
private volatile ConfigurationContext configurationContext;
|
||||
|
||||
private volatile SchemaAccessWriter schemaAccessWriter;
|
||||
|
||||
private final List<AllowableValue> schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList(
|
||||
NO_SCHEMA, SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA));
|
||||
private final List<AllowableValue> schemaWriteStrategies = List.of(NO_SCHEMA, SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, SCHEMA_REFERENCE_WRITER);
|
||||
|
||||
private final List<AllowableValue> schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList(
|
||||
INHERIT_RECORD_SCHEMA, SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY));
|
||||
private final List<AllowableValue> schemaAccessStrategies = List.of(INHERIT_RECORD_SCHEMA, SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY);
|
||||
|
||||
private final Set<String> schemaWriteStrategiesRequiringProtocolVersion = new HashSet<>(Arrays.asList(
|
||||
HWX_CONTENT_ENCODED_SCHEMA.getValue(), HWX_SCHEMA_REF_ATTRIBUTES.getValue()));
|
||||
@Override
|
||||
public void migrateProperties(final PropertyConfiguration propertyConfiguration) {
|
||||
propertyConfiguration.removeProperty(OBSOLETE_SCHEMA_PROTOCOL_VERSION);
|
||||
|
||||
final Optional<String> schemaWriteStrategyFound = propertyConfiguration.getPropertyValue(SCHEMA_WRITE_STRATEGY);
|
||||
if (schemaWriteStrategyFound.isPresent()) {
|
||||
final String schemaWriteStrategy = schemaWriteStrategyFound.get();
|
||||
if (OBSOLETE_CONFLUENT_ENCODED_WRITE_STRATEGY.equals(schemaWriteStrategy)) {
|
||||
propertyConfiguration.setProperty(SCHEMA_WRITE_STRATEGY, SCHEMA_REFERENCE_WRITER.getValue());
|
||||
|
||||
final String serviceId = propertyConfiguration.createControllerService(CONFLUENT_ENCODED_SCHEMA_REFERENCE_WRITER, Collections.emptyMap());
|
||||
propertyConfiguration.setProperty(SCHEMA_REFERENCE_WRITER_SERVICE, serviceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
|
@ -132,7 +132,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
|||
.allowableValues(strategies)
|
||||
.build());
|
||||
properties.add(SCHEMA_CACHE);
|
||||
properties.add(SCHEMA_PROTOCOL_VERSION);
|
||||
properties.add(SCHEMA_REFERENCE_WRITER_SERVICE);
|
||||
properties.addAll(super.getSupportedPropertyDescriptors());
|
||||
|
||||
return properties;
|
||||
|
@ -155,19 +155,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
|||
public void storeSchemaWriteStrategy(final ConfigurationContext context) {
|
||||
this.configurationContext = context;
|
||||
|
||||
// If Schema Protocol Version is specified without EL then we can create it up front, otherwise when
|
||||
// EL is present we will re-create it later so we can re-evaluate the EL against the incoming variables
|
||||
|
||||
final String strategy = context.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
|
||||
if (strategy != null) {
|
||||
final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
|
||||
|
||||
final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION);
|
||||
if (!protocolVersionValue.isExpressionLanguagePresent()) {
|
||||
final int protocolVersion = context.getProperty(SCHEMA_PROTOCOL_VERSION).asInteger();
|
||||
this.schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService);
|
||||
}
|
||||
}
|
||||
final RecordSchemaCacheService recordSchemaCacheService = context.getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
|
||||
final SchemaReferenceWriter schemaReferenceWriter = context.getProperty(SCHEMA_REFERENCE_WRITER_SERVICE).asControllerService(SchemaReferenceWriter.class);
|
||||
this.schemaAccessWriter = createSchemaWriteStrategy(strategy, recordSchemaCacheService, schemaReferenceWriter);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -176,45 +167,25 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
|||
}
|
||||
|
||||
protected SchemaAccessWriter getSchemaAccessWriter(final RecordSchema schema, final Map<String,String> variables) throws SchemaNotFoundException {
|
||||
// If Schema Protocol Version is using expression language, then we reevaluate against the passed in variables
|
||||
final PropertyValue protocolVersionValue = getConfigurationContext().getProperty(SCHEMA_PROTOCOL_VERSION);
|
||||
if (protocolVersionValue.isExpressionLanguagePresent()) {
|
||||
final int protocolVersion;
|
||||
final String protocolVersionString = protocolVersionValue.evaluateAttributeExpressions(variables).getValue();
|
||||
try {
|
||||
protocolVersion = Integer.parseInt(protocolVersionString);
|
||||
} catch (NumberFormatException nfe) {
|
||||
throw new SchemaNotFoundException("Unable to create Schema Write Strategy because " + SCHEMA_PROTOCOL_VERSION.getDisplayName()
|
||||
+ " must be a positive integer, but was '" + protocolVersionString + "'", nfe);
|
||||
}
|
||||
|
||||
// Now recreate the SchemaAccessWriter since we may have a new value for Schema Protocol Version
|
||||
final String strategy = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
|
||||
if (strategy != null) {
|
||||
final RecordSchemaCacheService recordSchemaCacheService = getConfigurationContext().getProperty(SCHEMA_CACHE).asControllerService(RecordSchemaCacheService.class);
|
||||
schemaAccessWriter = createSchemaWriteStrategy(strategy, protocolVersion, recordSchemaCacheService);
|
||||
}
|
||||
}
|
||||
|
||||
schemaAccessWriter.validateSchema(schema);
|
||||
return schemaAccessWriter;
|
||||
}
|
||||
|
||||
protected List<AllowableValue> getSchemaWriteStrategyValues() {
|
||||
return schemaWriteStrategyList;
|
||||
return schemaWriteStrategies;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<AllowableValue> getSchemaAccessStrategyValues() {
|
||||
return schemaAccessStrategyList;
|
||||
return schemaAccessStrategies;
|
||||
}
|
||||
|
||||
protected SchemaAccessWriter getSchemaWriteStrategy() {
|
||||
return schemaAccessWriter;
|
||||
}
|
||||
|
||||
private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final Integer protocolVersion, final RecordSchemaCacheService recordSchemaCacheService) {
|
||||
final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy, protocolVersion);
|
||||
private SchemaAccessWriter createSchemaWriteStrategy(final String strategy, final RecordSchemaCacheService recordSchemaCacheService, final SchemaReferenceWriter schemaReferenceWriter) {
|
||||
final SchemaAccessWriter writer = createRawSchemaWriteStrategy(strategy, schemaReferenceWriter);
|
||||
if (recordSchemaCacheService == null) {
|
||||
return writer;
|
||||
} else {
|
||||
|
@ -222,17 +193,13 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
|||
}
|
||||
}
|
||||
|
||||
private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy, final Integer protocolVersion) {
|
||||
private SchemaAccessWriter createRawSchemaWriteStrategy(final String strategy, final SchemaReferenceWriter schemaReferenceWriter) {
|
||||
if (strategy.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
|
||||
return new SchemaNameAsAttribute();
|
||||
} else if (strategy.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
|
||||
return new WriteAvroSchemaAttributeStrategy();
|
||||
} else if (strategy.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
|
||||
return new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
|
||||
} else if (strategy.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
|
||||
return new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
|
||||
} else if (strategy.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
|
||||
return new ConfluentSchemaRegistryWriter();
|
||||
} else if (strategy.equalsIgnoreCase(SCHEMA_REFERENCE_WRITER.getValue())) {
|
||||
return new SchemaReferenceWriterSchemaAccessWriter(schemaReferenceWriter);
|
||||
} else if (strategy.equalsIgnoreCase(NO_SCHEMA.getValue())) {
|
||||
return new NopSchemaAccessWriter();
|
||||
}
|
||||
|
@ -246,8 +213,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
|||
return EnumSet.noneOf(SchemaField.class);
|
||||
}
|
||||
|
||||
final Set<SchemaField> requiredFields = writer.getRequiredSchemaFields();
|
||||
return requiredFields;
|
||||
return writer.getRequiredSchemaFields();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -270,17 +236,6 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
|||
.build());
|
||||
}
|
||||
|
||||
final String schemaWriteStrategy = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
|
||||
final String protocolVersion = validationContext.getProperty(SCHEMA_PROTOCOL_VERSION).getValue();
|
||||
|
||||
if (schemaWriteStrategy != null && schemaWriteStrategiesRequiringProtocolVersion.contains(schemaWriteStrategy) && protocolVersion == null) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(SCHEMA_PROTOCOL_VERSION.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("The configured Schema Write Strategy requires a Schema Protocol Version to be specified.")
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.nifi.components.ValidationResult;
|
|||
import org.apache.nifi.context.PropertyContext;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.migration.PropertyConfiguration;
|
||||
import org.apache.nifi.schema.access.SchemaAccessStrategy;
|
||||
import org.apache.nifi.schema.access.SchemaAccessUtils;
|
||||
import org.apache.nifi.schema.access.SchemaField;
|
||||
|
@ -36,21 +37,20 @@ import java.io.ByteArrayInputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER_PROPERTY;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
|
||||
|
@ -58,28 +58,40 @@ import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
|
|||
|
||||
public abstract class SchemaRegistryService extends AbstractControllerService {
|
||||
|
||||
private volatile ConfigurationContext configurationContext;
|
||||
private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
|
||||
|
||||
private static final String OBSOLETE_CONFLUENT_ENCODED_STRATEGY = "confluent-encoded";
|
||||
|
||||
private static final String CONFLUENT_ENCODED_SCHEMA_REFERENCE_READER = "org.apache.nifi.confluent.schemaregistry.ConfluentEncodedSchemaReferenceReader";
|
||||
|
||||
private static final List<AllowableValue> strategies = List.of(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, SCHEMA_REFERENCE_READER_PROPERTY);
|
||||
|
||||
protected volatile SchemaAccessStrategy schemaAccessStrategy;
|
||||
private static InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
|
||||
|
||||
private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList(
|
||||
SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA));
|
||||
private volatile ConfigurationContext configurationContext;
|
||||
|
||||
protected PropertyDescriptor getSchemaAcessStrategyDescriptor() {
|
||||
return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
|
||||
}
|
||||
/**
|
||||
* Migrate from Confluent Encoded access strategy to Confluent Encoded Schema Reference Reader
|
||||
*
|
||||
* @param propertyConfiguration Current Property Configuration
|
||||
*/
|
||||
@Override
|
||||
public void migrateProperties(final PropertyConfiguration propertyConfiguration) {
|
||||
final Optional<String> schemaAccessStrategyFound = propertyConfiguration.getPropertyValue(SCHEMA_ACCESS_STRATEGY);
|
||||
if (schemaAccessStrategyFound.isPresent()) {
|
||||
final String schemaAccessStrategy = schemaAccessStrategyFound.get();
|
||||
if (OBSOLETE_CONFLUENT_ENCODED_STRATEGY.equals(schemaAccessStrategy)) {
|
||||
propertyConfiguration.setProperty(SCHEMA_ACCESS_STRATEGY, SCHEMA_REFERENCE_READER_PROPERTY.getValue());
|
||||
|
||||
protected PropertyDescriptor buildStrategyProperty(AllowableValue[] values) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
|
||||
.allowableValues(values)
|
||||
.defaultValue(getDefaultSchemaAccessStrategy().getValue())
|
||||
.build();
|
||||
final String serviceId = propertyConfiguration.createControllerService(CONFLUENT_ENCODED_SCHEMA_REFERENCE_READER, Collections.emptyMap());
|
||||
propertyConfiguration.setProperty(SCHEMA_REFERENCE_READER, serviceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>(2);
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
|
||||
final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
|
||||
properties.add(buildStrategyProperty(strategies));
|
||||
|
@ -89,21 +101,18 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
|
|||
properties.add(SCHEMA_VERSION);
|
||||
properties.add(SCHEMA_BRANCH_NAME);
|
||||
properties.add(SCHEMA_TEXT);
|
||||
properties.add(SCHEMA_REFERENCE_READER);
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
||||
protected AllowableValue getDefaultSchemaAccessStrategy() {
|
||||
return SCHEMA_NAME_PROPERTY;
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void storeSchemaAccessStrategy(final ConfigurationContext context) {
|
||||
this.configurationContext = context;
|
||||
|
||||
final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
|
||||
|
||||
final PropertyDescriptor descriptor = getSchemaAcessStrategyDescriptor();
|
||||
final PropertyDescriptor descriptor = getSchemaAccessStrategyDescriptor();
|
||||
final String schemaAccess = context.getProperty(descriptor).getValue();
|
||||
this.schemaAccessStrategy = getSchemaAccessStrategy(schemaAccess, schemaRegistry, context);
|
||||
}
|
||||
|
@ -113,6 +122,22 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
|
|||
return configurationContext;
|
||||
}
|
||||
|
||||
protected PropertyDescriptor getSchemaAccessStrategyDescriptor() {
|
||||
return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
|
||||
}
|
||||
|
||||
protected PropertyDescriptor buildStrategyProperty(final AllowableValue[] values) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
|
||||
.allowableValues(values)
|
||||
.defaultValue(getDefaultSchemaAccessStrategy().getValue())
|
||||
.build();
|
||||
}
|
||||
|
||||
protected AllowableValue getDefaultSchemaAccessStrategy() {
|
||||
return SCHEMA_NAME_PROPERTY;
|
||||
}
|
||||
|
||||
protected SchemaAccessStrategy getSchemaAccessStrategy() {
|
||||
return schemaAccessStrategy;
|
||||
}
|
||||
|
@ -132,24 +157,23 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
|
|||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
|
||||
final String schemaAccessStrategy = validationContext.getProperty(getSchemaAccessStrategyDescriptor()).getValue();
|
||||
return SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, schemaAccessStrategy, getSchemaAccessStrategyValues());
|
||||
}
|
||||
|
||||
protected List<AllowableValue> getSchemaAccessStrategyValues() {
|
||||
return strategyList;
|
||||
return strategies;
|
||||
}
|
||||
|
||||
protected Set<SchemaField> getSuppliedSchemaFields(final ValidationContext validationContext) {
|
||||
final String accessStrategyValue = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
|
||||
final String accessStrategyValue = validationContext.getProperty(getSchemaAccessStrategyDescriptor()).getValue();
|
||||
final SchemaRegistry schemaRegistry = validationContext.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
|
||||
final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy(accessStrategyValue, schemaRegistry, validationContext);
|
||||
|
||||
if (accessStrategy == null) {
|
||||
return EnumSet.noneOf(SchemaField.class);
|
||||
}
|
||||
final Set<SchemaField> suppliedFields = accessStrategy.getSuppliedSchemaFields();
|
||||
return suppliedFields;
|
||||
return accessStrategy.getSuppliedSchemaFields();
|
||||
}
|
||||
|
||||
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) {
|
||||
|
@ -158,5 +182,4 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
|
|||
}
|
||||
return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,80 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy {
|
||||
private final Set<SchemaField> schemaFields;
|
||||
private final SchemaRegistry schemaRegistry;
|
||||
|
||||
public ConfluentSchemaRegistryStrategy(final SchemaRegistry schemaRegistry) {
|
||||
this.schemaRegistry = schemaRegistry;
|
||||
|
||||
schemaFields = new HashSet<>();
|
||||
schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
|
||||
schemaFields.add(SchemaField.SCHEMA_VERSION);
|
||||
schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
|
||||
final byte[] buffer = new byte[5];
|
||||
try {
|
||||
StreamUtils.fillBuffer(contentStream, buffer);
|
||||
} catch (final IOException ioe) {
|
||||
throw new SchemaNotFoundException("Could not read first 5 bytes from stream", ioe);
|
||||
}
|
||||
|
||||
// This encoding follows the pattern that is provided for serializing data by the Confluent Schema Registry serializer
|
||||
// as it is provided at:
|
||||
// https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
|
||||
// The format consists of the first byte always being 0, to indicate a 'magic byte' followed by 4 bytes
|
||||
// representing the schema id.
|
||||
final ByteBuffer bb = ByteBuffer.wrap(buffer);
|
||||
final int magicByte = bb.get();
|
||||
if (magicByte != 0) {
|
||||
throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. "
|
||||
+ "Expected stream to begin with a Magic Byte of 0 but first byte was " + magicByte);
|
||||
}
|
||||
|
||||
final int schemaId = bb.getInt();
|
||||
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
|
||||
.schemaVersionId(Long.valueOf(schemaId))
|
||||
.build();
|
||||
|
||||
return schemaRegistry.retrieveSchema(schemaIdentifier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SchemaField> getSuppliedSchemaFields() {
|
||||
return schemaFields;
|
||||
}
|
||||
}
|
|
@ -1,77 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Map;
|
||||
import java.util.OptionalInt;
|
||||
import java.util.OptionalLong;
|
||||
import java.util.Set;
|
||||
|
||||
public class ConfluentSchemaRegistryWriter implements SchemaAccessWriter {
|
||||
private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
|
||||
|
||||
@Override
|
||||
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
|
||||
final SchemaIdentifier identifier = schema.getIdentifier();
|
||||
final Long id = identifier.getIdentifier().getAsLong();
|
||||
|
||||
// This encoding follows the pattern that is provided for serializing data by the Confluent Schema Registry serializer
|
||||
// as it is provided at:
|
||||
// http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
|
||||
// The format consists of the first byte always being 0, to indicate a 'magic byte' followed by 4 bytes
|
||||
// representing the schema id.
|
||||
final ByteBuffer bb = ByteBuffer.allocate(5);
|
||||
bb.put((byte) 0);
|
||||
bb.putInt(id.intValue());
|
||||
|
||||
out.write(bb.array());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getAttributes(final RecordSchema schema) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
|
||||
final SchemaIdentifier identifier = schema.getIdentifier();
|
||||
final OptionalLong identifierOption = identifier.getIdentifier();
|
||||
if (!identifierOption.isPresent()) {
|
||||
throw new SchemaNotFoundException("Cannot write Confluent Schema Registry Reference because the Schema Identifier is not known");
|
||||
}
|
||||
|
||||
final OptionalInt versionOption = identifier.getVersion();
|
||||
if (!versionOption.isPresent()) {
|
||||
throw new SchemaNotFoundException("Cannot write Confluent Schema Registry Reference because the Schema Version is not known");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SchemaField> getRequiredSchemaFields() {
|
||||
return requiredSchemaFields;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,137 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy {
|
||||
private final Set<SchemaField> schemaFields;
|
||||
|
||||
public static final String SCHEMA_ID_ATTRIBUTE = "schema.identifier";
|
||||
public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
|
||||
public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = "schema.protocol.version";
|
||||
public static final String SCHEMA_VERSION_ID_ATTRIBUTE = "schema.version.id";
|
||||
|
||||
private final SchemaRegistry schemaRegistry;
|
||||
|
||||
public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
|
||||
this.schemaRegistry = schemaRegistry;
|
||||
|
||||
schemaFields = new HashSet<>();
|
||||
schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
|
||||
schemaFields.add(SchemaField.SCHEMA_VERSION);
|
||||
schemaFields.add(SchemaField.SCHEMA_VERSION_ID);
|
||||
schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
|
||||
}
|
||||
|
||||
public boolean isFlowFileRequired() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordSchema getSchema(Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
|
||||
final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
|
||||
if (!isNumber(schemaProtocol)) {
|
||||
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
|
||||
+ schemaProtocol + "', which is not a valid Protocol Version number");
|
||||
}
|
||||
|
||||
final int protocol = Integer.parseInt(schemaProtocol);
|
||||
if (protocol < HortonworksProtocolVersions.MIN_VERSION || protocol > HortonworksProtocolVersions.MAX_VERSION) {
|
||||
throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '"
|
||||
+ schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be a value between "
|
||||
+ HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
|
||||
}
|
||||
|
||||
SchemaIdentifier identifier;
|
||||
|
||||
switch (protocol) {
|
||||
case 1:
|
||||
final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE);
|
||||
if (!isNumber(schemaIdentifier)) {
|
||||
throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_ID_ATTRIBUTE + " has a value of '"
|
||||
+ schemaIdentifier + "', which is not a valid Schema Identifier and is required by Protocol Version " + protocol);
|
||||
}
|
||||
|
||||
final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE);
|
||||
if (!isNumber(schemaVersion)) {
|
||||
throw new SchemaNotFoundException("Could not determine Schema because " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '"
|
||||
+ schemaVersion + "', which is not a valid Schema Version and is required by Protocol Version " + protocol);
|
||||
}
|
||||
|
||||
final long schemaId = Long.parseLong(schemaIdentifier);
|
||||
final int version = Integer.parseInt(schemaVersion);
|
||||
identifier = SchemaIdentifier.builder().id(schemaId).version(version).build();
|
||||
break;
|
||||
case 2:
|
||||
case 3:
|
||||
final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE);
|
||||
if (!isNumber(schemaVersionId)) {
|
||||
throw new SchemaNotFoundException("Could not determine schema because " + SCHEMA_VERSION_ID_ATTRIBUTE + " has a value of '"
|
||||
+ schemaVersionId + "', which is not a valid Schema Version Identifier and is required by Protocol Version " + protocol);
|
||||
}
|
||||
|
||||
final long svi = Long.parseLong(schemaVersionId);
|
||||
identifier = SchemaIdentifier.builder().schemaVersionId(svi).build();
|
||||
break;
|
||||
default:
|
||||
throw new SchemaNotFoundException("Unknown Protocol Version: " + protocol);
|
||||
}
|
||||
|
||||
final RecordSchema schema = schemaRegistry.retrieveSchema(identifier);
|
||||
if (schema == null) {
|
||||
throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + identifier.toString() + "'");
|
||||
}
|
||||
|
||||
return schema;
|
||||
}
|
||||
|
||||
private static boolean isNumber(final String value) {
|
||||
if (value == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final String trimmed = value.trim();
|
||||
if (value.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int i = 0; i < trimmed.length(); i++) {
|
||||
final char c = value.charAt(i);
|
||||
if (c > '9' || c < '0') {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SchemaField> getSuppliedSchemaFields() {
|
||||
return schemaFields;
|
||||
}
|
||||
}
|
|
@ -1,112 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter {
|
||||
|
||||
static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
|
||||
|
||||
private final int protocolVersion;
|
||||
|
||||
public HortonworksAttributeSchemaReferenceWriter(final int protocolVersion) {
|
||||
this.protocolVersion = protocolVersion;
|
||||
|
||||
if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) {
|
||||
throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between "
|
||||
+ HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeHeader(RecordSchema schema, OutputStream out) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getAttributes(final RecordSchema schema) {
|
||||
final Map<String, String> attributes = new HashMap<>(4);
|
||||
final SchemaIdentifier id = schema.getIdentifier();
|
||||
|
||||
switch (protocolVersion) {
|
||||
case 1:
|
||||
final Long schemaId = id.getIdentifier().getAsLong();
|
||||
final Integer schemaVersion = id.getVersion().getAsInt();
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion));
|
||||
break;
|
||||
case 2:
|
||||
case 3:
|
||||
final Long schemaVersionId = id.getSchemaVersionId().getAsLong();
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
|
||||
break;
|
||||
default:
|
||||
// Can't reach this point
|
||||
throw new IllegalStateException("Unknown Protocol Verison: " + protocolVersion);
|
||||
}
|
||||
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocolVersion));
|
||||
|
||||
if (id.getBranch().isPresent()) {
|
||||
attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get());
|
||||
}
|
||||
|
||||
return attributes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
|
||||
final SchemaIdentifier identifier = schema.getIdentifier();
|
||||
|
||||
switch (protocolVersion) {
|
||||
case 1:
|
||||
if (!identifier.getIdentifier().isPresent()) {
|
||||
throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Identifier " +
|
||||
"is not known and is required for Protocol Version " + protocolVersion);
|
||||
}
|
||||
if (!identifier.getVersion().isPresent()) {
|
||||
throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version " +
|
||||
"is not known and is required for Protocol Version " + protocolVersion);
|
||||
}
|
||||
break;
|
||||
case 2:
|
||||
case 3:
|
||||
if (!identifier.getSchemaVersionId().isPresent()) {
|
||||
throw new SchemaNotFoundException("Cannot write Schema Reference attributes because the Schema Version Identifier " +
|
||||
"is not known and is required for Protocol Version " + protocolVersion);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// Can't reach this point
|
||||
throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SchemaField> getRequiredSchemaFields() {
|
||||
return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,119 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.apache.nifi.stream.io.StreamUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy {
|
||||
|
||||
private final Set<SchemaField> schemaFields;
|
||||
private final SchemaRegistry schemaRegistry;
|
||||
|
||||
public HortonworksEncodedSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) {
|
||||
this.schemaRegistry = schemaRegistry;
|
||||
|
||||
schemaFields = new HashSet<>();
|
||||
schemaFields.add(SchemaField.SCHEMA_IDENTIFIER);
|
||||
schemaFields.add(SchemaField.SCHEMA_VERSION);
|
||||
schemaFields.add(SchemaField.SCHEMA_VERSION_ID);
|
||||
schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields());
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
|
||||
final byte[] buffer = new byte[1];
|
||||
try {
|
||||
StreamUtils.fillBuffer(contentStream, buffer);
|
||||
} catch (final IOException ioe) {
|
||||
throw new SchemaNotFoundException("Could not read first byte from stream", ioe);
|
||||
}
|
||||
|
||||
// This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
|
||||
// See: https://registry-project.readthedocs.io/en/latest/serdes.html#
|
||||
final ByteBuffer bb = ByteBuffer.wrap(buffer);
|
||||
final int protocolVersion = bb.get();
|
||||
|
||||
SchemaIdentifier schemaIdentifier;
|
||||
|
||||
switch(protocolVersion) {
|
||||
case 1:
|
||||
final byte[] bufferv1 = new byte[12];
|
||||
|
||||
try {
|
||||
StreamUtils.fillBuffer(contentStream, bufferv1);
|
||||
} catch (final IOException ioe) {
|
||||
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
|
||||
}
|
||||
final ByteBuffer bbv1 = ByteBuffer.wrap(bufferv1);
|
||||
|
||||
final long schemaId = bbv1.getLong();
|
||||
final int schemaVersion = bbv1.getInt();
|
||||
schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build();
|
||||
return schemaRegistry.retrieveSchema(schemaIdentifier);
|
||||
|
||||
case 2:
|
||||
final byte[] bufferv2 = new byte[8];
|
||||
|
||||
try {
|
||||
StreamUtils.fillBuffer(contentStream, bufferv2);
|
||||
} catch (final IOException ioe) {
|
||||
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
|
||||
}
|
||||
final ByteBuffer bbv2 = ByteBuffer.wrap(bufferv2);
|
||||
|
||||
final long sviLong = bbv2.getLong();
|
||||
schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).build();
|
||||
return schemaRegistry.retrieveSchema(schemaIdentifier);
|
||||
|
||||
case 3:
|
||||
final byte[] bufferv3 = new byte[4];
|
||||
|
||||
try {
|
||||
StreamUtils.fillBuffer(contentStream, bufferv3);
|
||||
} catch (final IOException ioe) {
|
||||
throw new SchemaNotFoundException("Could not read bytes from stream", ioe);
|
||||
}
|
||||
final ByteBuffer bbv3 = ByteBuffer.wrap(bufferv3);
|
||||
|
||||
final int sviInt = bbv3.getInt();
|
||||
schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).build();
|
||||
return schemaRegistry.retrieveSchema(schemaIdentifier);
|
||||
|
||||
default:
|
||||
throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. Expected Protocol Version to be a value between "
|
||||
+ HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION
|
||||
+ ", but data was encoded with protocol version " + protocolVersion + ".");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SchemaField> getSuppliedSchemaFields() {
|
||||
return schemaFields;
|
||||
}
|
||||
}
|
|
@ -1,117 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter {
|
||||
|
||||
private final int protocolVersion;
|
||||
|
||||
public HortonworksEncodedSchemaReferenceWriter(final int protocolVersion) {
|
||||
this.protocolVersion = protocolVersion;
|
||||
|
||||
if (this.protocolVersion < HortonworksProtocolVersions.MIN_VERSION || this.protocolVersion > HortonworksProtocolVersions.MAX_VERSION) {
|
||||
throw new IllegalArgumentException("Unknown Protocol Version '" + this.protocolVersion + "'. Protocol Version must be a value between "
|
||||
+ HortonworksProtocolVersions.MIN_VERSION + " and " + HortonworksProtocolVersions.MAX_VERSION + ".");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
|
||||
final SchemaIdentifier identifier = schema.getIdentifier();
|
||||
|
||||
// This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer
|
||||
// See: https://registry-project.readthedocs.io/en/latest/serdes.html#
|
||||
switch(protocolVersion) {
|
||||
case 1:
|
||||
final Long id = identifier.getIdentifier().getAsLong();
|
||||
final Integer version = identifier.getVersion().getAsInt();
|
||||
final ByteBuffer bbv1 = ByteBuffer.allocate(13);
|
||||
bbv1.put((byte) 1);
|
||||
bbv1.putLong(id);
|
||||
bbv1.putInt(version);
|
||||
out.write(bbv1.array());
|
||||
return;
|
||||
case 2:
|
||||
final Long sviV2 = identifier.getSchemaVersionId().getAsLong();
|
||||
final ByteBuffer bbv2 = ByteBuffer.allocate(9);
|
||||
bbv2.put((byte) 2);
|
||||
bbv2.putLong(sviV2);
|
||||
out.write(bbv2.array());
|
||||
return;
|
||||
case 3:
|
||||
final Long sviV3 = identifier.getSchemaVersionId().getAsLong();
|
||||
final ByteBuffer bbv3 = ByteBuffer.allocate(5);
|
||||
bbv3.put((byte) 3);
|
||||
bbv3.putInt(sviV3.intValue());
|
||||
out.write(bbv3.array());
|
||||
return;
|
||||
default:
|
||||
// Can't reach this point
|
||||
throw new IllegalStateException("Unknown Protocol Version: " + this.protocolVersion);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getAttributes(final RecordSchema schema) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
|
||||
final SchemaIdentifier identifier = schema.getIdentifier();
|
||||
|
||||
switch (protocolVersion) {
|
||||
case 1:
|
||||
if (!identifier.getIdentifier().isPresent()) {
|
||||
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier " +
|
||||
"is not known and is required for Protocol Version " + protocolVersion);
|
||||
}
|
||||
if (!identifier.getVersion().isPresent()) {
|
||||
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version " +
|
||||
"is not known and is required for Protocol Version " + protocolVersion);
|
||||
}
|
||||
break;
|
||||
case 2:
|
||||
case 3:
|
||||
if (!identifier.getSchemaVersionId().isPresent()) {
|
||||
throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version Identifier " +
|
||||
"is not known and is required for Protocol Version " + protocolVersion);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// Can't reach this point
|
||||
throw new SchemaNotFoundException("Unknown Protocol Version: " + protocolVersion);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SchemaField> getRequiredSchemaFields() {
|
||||
return HortonworksProtocolVersions.getRequiredSchemaFields(protocolVersion);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,56 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Constants related to Protocol Versions for Hortonworks Schema Registry.
|
||||
*/
|
||||
public class HortonworksProtocolVersions {
|
||||
|
||||
/**
|
||||
* The minimum valid protocol version.
|
||||
*/
|
||||
public static final int MIN_VERSION = 1;
|
||||
|
||||
/**
|
||||
* The maximum valid protocol version.
|
||||
*/
|
||||
public static final int MAX_VERSION = 3;
|
||||
|
||||
/**
|
||||
* Map from protocol version to the required schema fields for the given version.
|
||||
*/
|
||||
private static final Map<Integer, Set<SchemaField>> REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL;
|
||||
static {
|
||||
final Map<Integer,Set<SchemaField>> requiredFieldsByProtocol = new HashMap<>();
|
||||
requiredFieldsByProtocol.put(1, EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION));
|
||||
requiredFieldsByProtocol.put(2, EnumSet.of(SchemaField.SCHEMA_VERSION_ID));
|
||||
requiredFieldsByProtocol.put(3, EnumSet.of(SchemaField.SCHEMA_VERSION_ID));
|
||||
REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL = Collections.unmodifiableMap(requiredFieldsByProtocol);
|
||||
}
|
||||
|
||||
public static Set<SchemaField> getRequiredSchemaFields(final Integer protocolVersion) {
|
||||
return REQUIRED_SCHEMA_FIELDS_BY_PROTOCOL.get(protocolVersion);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.schemaregistry.services.SchemaReferenceReader;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Schema Access Strategy that delegates to the configured Schema Reference Reader
|
||||
*/
|
||||
class SchemaReferenceReaderSchemaAccessStrategy implements SchemaAccessStrategy {
|
||||
private final SchemaReferenceReader schemaReferenceReader;
|
||||
|
||||
private final SchemaRegistry schemaRegistry;
|
||||
|
||||
private final Set<SchemaField> suppliedSchemaFields;
|
||||
|
||||
SchemaReferenceReaderSchemaAccessStrategy(final SchemaReferenceReader schemaReferenceReader, final SchemaRegistry schemaRegistry) {
|
||||
this.schemaReferenceReader = Objects.requireNonNull(schemaReferenceReader, "Schema Reference Reader required");
|
||||
this.schemaRegistry = Objects.requireNonNull(schemaRegistry, "Schema Registry required");
|
||||
|
||||
final Set<SchemaField> configuredSchemaFields = new LinkedHashSet<>(schemaRegistry.getSuppliedSchemaFields());
|
||||
configuredSchemaFields.addAll(schemaReferenceReader.getSuppliedSchemaFields());
|
||||
this.suppliedSchemaFields = Collections.unmodifiableSet(configuredSchemaFields);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Schema based on provided variables and content information
|
||||
*
|
||||
* @param variables Map of variables for Schema Identifier resolution can be null or empty
|
||||
* @param contentStream Stream of FlowFile content that may contain encoded Schema Identifier references
|
||||
* @param inputSchema Record Schema from input content or null when not provided
|
||||
* @return Record Schema
|
||||
* @throws SchemaNotFoundException Thrown on failure to resolve Schema Identifier information
|
||||
* @throws IOException Thrown on failure to read input content stream
|
||||
*/
|
||||
@Override
|
||||
public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema inputSchema) throws SchemaNotFoundException, IOException {
|
||||
final Map<String, String> schemaVariables = variables == null ? Collections.emptyMap() : variables;
|
||||
|
||||
final SchemaIdentifier schemaIdentifier = schemaReferenceReader.getSchemaIdentifier(schemaVariables, contentStream);
|
||||
if (schemaIdentifier == null) {
|
||||
throw new IllegalArgumentException("Schema Identifier not supplied from Schema Reference Provider");
|
||||
}
|
||||
|
||||
return schemaRegistry.retrieveSchema(schemaIdentifier);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get supplied Record Schema Fields containing of combination of fields from the Schema Registry and Schema Reference Provider services
|
||||
*
|
||||
* @return Record Schema Fields
|
||||
*/
|
||||
@Override
|
||||
public Set<SchemaField> getSuppliedSchemaFields() {
|
||||
return suppliedSchemaFields;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.schemaregistry.services.SchemaReferenceWriter;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Schema Access Writer that delegates to the configured Schema Reference Writer
|
||||
*/
|
||||
public class SchemaReferenceWriterSchemaAccessWriter implements SchemaAccessWriter {
|
||||
private final SchemaReferenceWriter schemaReferenceWriter;
|
||||
|
||||
public SchemaReferenceWriterSchemaAccessWriter(final SchemaReferenceWriter schemaReferenceWriter) {
|
||||
this.schemaReferenceWriter = Objects.requireNonNull(schemaReferenceWriter, "Schema Reference Writer required");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeHeader(final RecordSchema schema, final OutputStream outputStream) throws IOException {
|
||||
schemaReferenceWriter.writeHeader(schema, outputStream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getAttributes(final RecordSchema schema) {
|
||||
return schemaReferenceWriter.getAttributes(schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
|
||||
schemaReferenceWriter.validateSchema(schema);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<SchemaField> getRequiredSchemaFields() {
|
||||
return schemaReferenceWriter.getRequiredSchemaFields();
|
||||
}
|
||||
}
|
|
@ -1,172 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSchemaAccessStrategyTest {
|
||||
|
||||
@Test
|
||||
public void testGetSchemaWithValidSchemaIdVersionAndProtocol() throws IOException, SchemaNotFoundException {
|
||||
final long schemaId = 123456;
|
||||
final int version = 2;
|
||||
final int protocol = 1;
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
|
||||
|
||||
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
|
||||
|
||||
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
|
||||
.id(schemaId)
|
||||
.version(version)
|
||||
.build();
|
||||
|
||||
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
|
||||
.thenReturn(recordSchema);
|
||||
|
||||
final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
|
||||
assertNotNull(retrievedSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSchemaWithValidSchemaVersionIdAndProtocol() throws IOException, SchemaNotFoundException {
|
||||
final long schemaVersionId = 9999;
|
||||
final int protocol = 2;
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
|
||||
|
||||
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
|
||||
|
||||
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
|
||||
.schemaVersionId(schemaVersionId)
|
||||
.build();
|
||||
|
||||
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
|
||||
.thenReturn(recordSchema);
|
||||
|
||||
final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
|
||||
assertNotNull(retrievedSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSchemaWithAllAttributes() throws IOException, SchemaNotFoundException {
|
||||
final long schemaId = 123456;
|
||||
final int version = 2;
|
||||
final long schemaVersionId = 9999;
|
||||
final int protocol = 2;
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
|
||||
|
||||
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
|
||||
|
||||
// The schema version id should take precedence
|
||||
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
|
||||
.schemaVersionId(schemaVersionId)
|
||||
.build();
|
||||
|
||||
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
|
||||
.thenReturn(recordSchema);
|
||||
|
||||
final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema);
|
||||
assertNotNull(retrievedSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSchemaMissingAllAttributes() {
|
||||
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
|
||||
assertThrows(SchemaNotFoundException.class,
|
||||
() -> schemaAccessStrategy.getSchema(Collections.emptyMap(), null, recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSchemaMissingProtocol() {
|
||||
final long schemaId = 123456;
|
||||
final int version = 2;
|
||||
final long schemaVersionId = 9999;
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
|
||||
|
||||
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessStrategy.getSchema(attributes, null, recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSchemaWithInvalidProtocol() {
|
||||
final long schemaId = 123456;
|
||||
final int version = 2;
|
||||
final long schemaVersionId = 9999;
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, "INVALID_PROTOCOL");
|
||||
|
||||
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessStrategy.getSchema(attributes, null, recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSchemaNotFound() throws IOException, SchemaNotFoundException {
|
||||
final long schemaId = 123456;
|
||||
final int version = 2;
|
||||
final long schemaVersionId = 9999;
|
||||
final int protocol = 2;
|
||||
|
||||
final Map<String,String> attributes = new HashMap<>();
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(schemaVersionId));
|
||||
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol));
|
||||
|
||||
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
|
||||
|
||||
// The schema version id should take precedence
|
||||
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
|
||||
.schemaVersionId(schemaVersionId)
|
||||
.build();
|
||||
|
||||
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
|
||||
.thenReturn(null);
|
||||
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessStrategy.getSchema(attributes, null, recordSchema));
|
||||
}
|
||||
}
|
|
@ -1,190 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class TestHortonworksAttributeSchemaReferenceWriter {
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
|
||||
schemaAccessWriter.validateSchema(recordSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol1AndMissingSchemaId() {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessWriter.validateSchema(recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol1AndMissingSchemaName() {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(1);
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessWriter.validateSchema(recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2);
|
||||
schemaAccessWriter.validateSchema(recordSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol2AndMissingSchemaVersionId() {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(2);
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessWriter.validateSchema(recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3);
|
||||
schemaAccessWriter.validateSchema(recordSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol3AndMissingSchemaVersionId() {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(3);
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessWriter.validateSchema(recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAttributesWithProtocol1() {
|
||||
final Integer protocolVersion = 1;
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
|
||||
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
|
||||
|
||||
assertEquals(3, attributes.size());
|
||||
|
||||
assertEquals(String.valueOf(schemaIdentifier.getIdentifier().getAsLong()),
|
||||
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE));
|
||||
|
||||
assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
|
||||
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
|
||||
|
||||
assertEquals(String.valueOf(protocolVersion),
|
||||
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAttributesWithProtocol1AndBranch() {
|
||||
final Integer protocolVersion = 1;
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).branch("foo").build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
|
||||
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
|
||||
|
||||
assertEquals(4, attributes.size());
|
||||
|
||||
assertEquals(String.valueOf(schemaIdentifier.getIdentifier().getAsLong()),
|
||||
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE));
|
||||
|
||||
assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
|
||||
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
|
||||
|
||||
assertEquals(String.valueOf(protocolVersion),
|
||||
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
|
||||
|
||||
assertEquals(schemaIdentifier.getBranch().get(),
|
||||
attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAttributesWithProtocol2() {
|
||||
final Integer protocolVersion = 2;
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
|
||||
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
|
||||
|
||||
assertEquals(2, attributes.size());
|
||||
|
||||
assertEquals(String.valueOf(protocolVersion),
|
||||
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
|
||||
|
||||
assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()),
|
||||
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetAttributesWithProtocol3() {
|
||||
final Integer protocolVersion = 3;
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(protocolVersion);
|
||||
final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema);
|
||||
|
||||
assertEquals(2, attributes.size());
|
||||
|
||||
assertEquals(String.valueOf(protocolVersion),
|
||||
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
|
||||
|
||||
assertEquals(String.valueOf(schemaIdentifier.getSchemaVersionId().getAsLong()),
|
||||
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidProtocolVersion() {
|
||||
assertThrows(IllegalArgumentException.class, () -> new HortonworksAttributeSchemaReferenceWriter(99));
|
||||
}
|
||||
|
||||
private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
|
||||
return new SimpleRecordSchema(fields, schemaIdentifier);
|
||||
}
|
||||
}
|
|
@ -1,88 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestHortonworksEncodedSchemaReferenceStrategy extends AbstractSchemaAccessStrategyTest {
|
||||
|
||||
@Test
|
||||
public void testGetSchemaWithValidEncoding() throws IOException, SchemaNotFoundException {
|
||||
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
|
||||
|
||||
final int protocol = 1;
|
||||
final long schemaId = 123456;
|
||||
final int version = 2;
|
||||
|
||||
try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
final DataOutputStream out = new DataOutputStream(bytesOut)) {
|
||||
out.write(protocol);
|
||||
out.writeLong(schemaId);
|
||||
out.writeInt(version);
|
||||
out.flush();
|
||||
|
||||
try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) {
|
||||
|
||||
// the confluent strategy will read the id from the input stream and use '1' as the version
|
||||
final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder()
|
||||
.id(schemaId)
|
||||
.version(version)
|
||||
.build();
|
||||
|
||||
when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier))))
|
||||
.thenReturn(recordSchema);
|
||||
|
||||
final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema);
|
||||
assertNotNull(retrievedSchema);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetSchemaWithInvalidProtocol() throws IOException {
|
||||
final SchemaAccessStrategy schemaAccessStrategy = new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
|
||||
|
||||
final int protocol = 0; // use an invalid protocol
|
||||
final long schemaId = 123456;
|
||||
final int version = 2;
|
||||
|
||||
try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
|
||||
final DataOutputStream out = new DataOutputStream(bytesOut)) {
|
||||
out.write(protocol);
|
||||
out.writeLong(schemaId);
|
||||
out.writeInt(version);
|
||||
out.flush();
|
||||
|
||||
try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) {
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,180 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.schema.access;
|
||||
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class TestHortonworksEncodedSchemaReferenceWriter {
|
||||
|
||||
@Test
|
||||
public void testEncodeProtocolVersion1() throws IOException {
|
||||
final long id = 48;
|
||||
final int version = 2;
|
||||
final int protocolVersion = 1;
|
||||
|
||||
final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
|
||||
SchemaIdentifier.builder().name("name").id(id).version(version).build());
|
||||
|
||||
final byte[] header;
|
||||
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||
writer.writeHeader(schema, baos);
|
||||
header = baos.toByteArray();
|
||||
}
|
||||
|
||||
try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
|
||||
assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
|
||||
assertEquals(id, dis.readLong()); // verify schema id
|
||||
assertEquals(version, dis.readInt()); // verify schema version
|
||||
assertEquals(-1, dis.read()); // no more bytes
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncodeProtocolVersion2() throws IOException {
|
||||
final long schemaVersionId = 123;
|
||||
final int protocolVersion = 2;
|
||||
|
||||
final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
|
||||
SchemaIdentifier.builder().schemaVersionId(schemaVersionId).build());
|
||||
|
||||
final byte[] header;
|
||||
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||
writer.writeHeader(schema, baos);
|
||||
header = baos.toByteArray();
|
||||
}
|
||||
|
||||
try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
|
||||
assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
|
||||
assertEquals(schemaVersionId, dis.readLong()); // verify schema version id
|
||||
assertEquals(-1, dis.read()); // no more bytes
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEncodeProtocolVersion3() throws IOException {
|
||||
final int schemaVersionId = 123;
|
||||
final int protocolVersion = 3;
|
||||
|
||||
final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(protocolVersion);
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(),
|
||||
SchemaIdentifier.builder().schemaVersionId((long)schemaVersionId).build());
|
||||
|
||||
final byte[] header;
|
||||
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
|
||||
writer.writeHeader(schema, baos);
|
||||
header = baos.toByteArray();
|
||||
}
|
||||
|
||||
try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(header))) {
|
||||
assertEquals(protocolVersion, dis.read()); // verify 'protocol version'
|
||||
assertEquals(schemaVersionId, dis.readInt()); // verify schema version id
|
||||
assertEquals(-1, dis.read()); // no more bytes
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol1AndValidSchema() throws SchemaNotFoundException {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
|
||||
schemaAccessWriter.validateSchema(recordSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol1AndMissingSchemaId() {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessWriter.validateSchema(recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol1AndMissingSchemaName() {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(1);
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessWriter.validateSchema(recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol2AndValidSchema() throws SchemaNotFoundException {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2);
|
||||
schemaAccessWriter.validateSchema(recordSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol2AndMissingSchemaVersionId() {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(2);
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessWriter.validateSchema(recordSchema));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol3AndValidSchema() throws SchemaNotFoundException {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(9999L).build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3);
|
||||
schemaAccessWriter.validateSchema(recordSchema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateWithProtocol3AndMissingSchemaVersionId() {
|
||||
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build();
|
||||
final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
|
||||
|
||||
final SchemaAccessWriter schemaAccessWriter = new HortonworksEncodedSchemaReferenceWriter(3);
|
||||
assertThrows(SchemaNotFoundException.class, () -> schemaAccessWriter.validateSchema(recordSchema));
|
||||
}
|
||||
|
||||
private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
|
||||
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
|
||||
return new SimpleRecordSchema(fields, schemaIdentifier);
|
||||
}
|
||||
}
|
|
@ -95,7 +95,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
|
|||
|
||||
@Override
|
||||
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
|
||||
final String schemaAccessStrategy = getConfigurationContext().getProperty(getSchemaAcessStrategyDescriptor()).getValue();
|
||||
final String schemaAccessStrategy = getConfigurationContext().getProperty(getSchemaAccessStrategyDescriptor()).getValue();
|
||||
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
|
||||
return new AvroReaderWithEmbeddedSchema(in);
|
||||
} else {
|
||||
|
|
|
@ -176,7 +176,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
|
|||
|
||||
this.recordSchemaFromGrok = createRecordSchema(groks);
|
||||
|
||||
final String schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
|
||||
final String schemaAccess = context.getProperty(getSchemaAccessStrategyDescriptor()).getValue();
|
||||
if (STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) {
|
||||
this.recordSchema = recordSchemaFromGrok;
|
||||
} else {
|
||||
|
|
|
@ -51,11 +51,9 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REFERENCE_READER_PROPERTY;
|
||||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
|
||||
import static org.apache.nifi.schema.inference.SchemaInferenceUtil.INFER_SCHEMA;
|
||||
import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
|
||||
|
@ -108,7 +106,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
|
|||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.defaultValue(SchemaApplicationStrategy.SELECTED_PART.getValue())
|
||||
.dependsOn(STARTING_FIELD_STRATEGY, StartingFieldStrategy.NESTED_FIELD.name())
|
||||
.dependsOn(SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA)
|
||||
.dependsOn(SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, SCHEMA_REFERENCE_READER_PROPERTY)
|
||||
.allowableValues(SchemaApplicationStrategy.class)
|
||||
.build();
|
||||
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.services;
|
||||
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.schema.access.SchemaField;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Service interface responsible for reading Schema Identifier reference information based on attributes or encoded content information
|
||||
*/
|
||||
public interface SchemaReferenceReader extends ControllerService {
|
||||
/**
|
||||
* Get Schema Identifier
|
||||
*
|
||||
* @param variables Map of variables for Schema Identifier resolution can be null or empty
|
||||
* @param contentStream Stream of FlowFile content that may contain encoded Schema Identifier references
|
||||
* @return Schema Identifier
|
||||
* @throws IOException Thrown on failure to read input content stream
|
||||
* @throws SchemaNotFoundException Thrown on failure to find expected Schema Identifier information
|
||||
*/
|
||||
SchemaIdentifier getSchemaIdentifier(Map<String, String> variables, InputStream contentStream) throws IOException, SchemaNotFoundException;
|
||||
|
||||
/**
|
||||
* Get supplied Record Schema Fields for validation against the set of Record Schema Fields required for writing schema information
|
||||
*
|
||||
* @return Supplied Record Schema Fields
|
||||
*/
|
||||
Set<SchemaField> getSuppliedSchemaFields();
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.services;
|
||||
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.schema.access.SchemaField;
|
||||
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Service interface responsible for writing Schema Identifier reference information to attributes or encoded content information
|
||||
*/
|
||||
public interface SchemaReferenceWriter extends ControllerService {
|
||||
/**
|
||||
* Writer Record Schema to the provided OutputStream as header information when required
|
||||
*
|
||||
* @param recordSchema Record Schema to be written to the provided Output Stream
|
||||
* @param outputStream Output Stream to which the provided Record Schema should be written
|
||||
* @throws IOException Thrown in failure to write Record Schema to the Output Stream
|
||||
*/
|
||||
void writeHeader(RecordSchema recordSchema, OutputStream outputStream) throws IOException;
|
||||
|
||||
/**
|
||||
* Get Attributes containing Record Schema content or references based on service implementation requirements
|
||||
*
|
||||
* @return Attributes containing Record Schema information or empty when not populated
|
||||
*/
|
||||
Map<String, String> getAttributes(RecordSchema recordSchema);
|
||||
|
||||
/**
|
||||
* Validate provided Record Schema for required information or throw SchemaNotFoundException indicating missing fields
|
||||
*
|
||||
* @param recordSchema Record Schema to be validated
|
||||
* @throws SchemaNotFoundException Thrown when the provided Record Schema is missing required information
|
||||
*/
|
||||
void validateSchema(RecordSchema recordSchema) throws SchemaNotFoundException;
|
||||
|
||||
/**
|
||||
* Get required Record Schema Fields for writing schema information
|
||||
*
|
||||
* @return Required Record Schema Fields
|
||||
*/
|
||||
Set<SchemaField> getRequiredSchemaFields();
|
||||
}
|
Loading…
Reference in New Issue