NIFI-13578 Add Schema Branch Name and Schema Version in ValidateRecord (#9108)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Juldrixx 2024-08-31 17:54:26 +02:00 committed by GitHub
parent ba13c5d48a
commit f92f6f1def
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 421 additions and 149 deletions

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
public record Triple<A, B, C>(A first, B second, C third) {
}

View File

@ -22,8 +22,8 @@ import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.Triple;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Optional;
import java.util.OptionalInt;
@ -33,40 +33,53 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class MockSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private final ConcurrentMap<String, RecordSchema> schemaNameMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Triple<String, String, Integer>, RecordSchema> schemaNameMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Tuple<Long, Integer>, RecordSchema> schemaIdVersionMap = new ConcurrentHashMap<>();
public void addSchema(final String name, final RecordSchema schema) {
schemaNameMap.put(name, schema);
addSchema(name, null, null, schema);
}
RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
public void addSchema(final String name, final String branch, final RecordSchema schema) {
addSchema(name, branch, null, schema);
}
public void addSchema(final String name, final Integer version, final RecordSchema schema) {
addSchema(name, null, version, schema);
}
public void addSchema(final String name, final String branch, final Integer version, final RecordSchema schema) {
schemaNameMap.put(new Triple<>(name, branch, version), schema);
}
RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
final Optional<String> schemaName = schemaIdentifier.getName();
if (!schemaName.isPresent()) {
if (schemaName.isEmpty()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present");
}
return schemaNameMap.get(schemaName.get());
final String schemaBranch = schemaIdentifier.getBranch().orElse(null);
final Integer schemaVersion = schemaIdentifier.getVersion().isPresent() ? schemaIdentifier.getVersion().getAsInt() : null;
return schemaNameMap.get(new Triple<>(schemaName.get(), schemaBranch, schemaVersion));
}
private RecordSchema retrieveSchemaByIdAndVersion(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
private RecordSchema retrieveSchemaByIdAndVersion(final SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
final OptionalLong schemaId = schemaIdentifier.getIdentifier();
if (!schemaId.isPresent()) {
if (schemaId.isEmpty()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present");
}
final OptionalInt version = schemaIdentifier.getVersion();
if (!version.isPresent()) {
if (version.isEmpty()) {
throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Version is not present");
}
final Tuple<Long, Integer> tuple = new Tuple<>(schemaId.getAsLong(), version.getAsInt());
final RecordSchema schema = schemaIdVersionMap.get(tuple);
return schema;
return schemaIdVersionMap.get(tuple);
}
@Override
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException {
public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
if (schemaIdentifier.getName().isPresent()) {
return retrieveSchemaByName(schemaIdentifier);
} else {

View File

@ -28,7 +28,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.avro.AvroSchemaValidator;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
@ -37,6 +36,7 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -74,6 +74,14 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -89,10 +97,6 @@ import java.util.Set;
})
public class ValidateRecord extends AbstractProcessor {
static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name-property", "Use Schema Name Property",
"The schema to validate the data against is determined by looking at the 'Schema Name' Property and looking up the schema in the configured Schema Registry");
static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use Schema Text Property",
"The schema to validate the data against is determined by looking at the 'Schema Text' Property and parsing the schema as an Avro schema");
static final AllowableValue READER_SCHEMA = new AllowableValue("reader-schema", "Use Reader's Schema",
"The schema to validate the data against is determined by asking the configured Record Reader for its schema");
@ -131,31 +135,6 @@ public class ValidateRecord extends AbstractProcessor {
.defaultValue(READER_SCHEMA.getValue())
.required(true)
.build();
public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
.name("schema-registry")
.displayName("Schema Registry")
.description("Specifies the Controller Service to use for the Schema Registry. This is necessary only if the Schema Access Strategy is set to \"Use 'Schema Name' Property\".")
.identifiesControllerService(SchemaRegistry.class)
.required(false)
.build();
static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
.name("schema-name")
.displayName("Schema Name")
.description("Specifies the name of the schema to lookup in the Schema Registry property")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${schema.name}")
.required(false)
.build();
static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
.name("schema-text")
.displayName("Schema Text")
.description("The text of an Avro-formatted Schema")
.addValidator(new AvroSchemaValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.defaultValue("${avro.schema}")
.required(false)
.build();
static final PropertyDescriptor ALLOW_EXTRA_FIELDS = new PropertyDescriptor.Builder()
.name("allow-extra-fields")
.displayName("Allow Extra Fields")
@ -219,6 +198,8 @@ public class ValidateRecord extends AbstractProcessor {
SCHEMA_ACCESS_STRATEGY,
SCHEMA_REGISTRY,
SCHEMA_NAME,
SCHEMA_BRANCH_NAME,
SCHEMA_VERSION,
SCHEMA_TEXT,
ALLOW_EXTRA_FIELDS,
STRICT_TYPE_CHECKING,
@ -256,6 +237,17 @@ public class ValidateRecord extends AbstractProcessor {
return RELATIONSHIPS;
}
@Override
public void migrateProperties(final PropertyConfiguration config) {
if (config.isPropertySet(SCHEMA_ACCESS_STRATEGY)) {
config.getPropertyValue(SCHEMA_ACCESS_STRATEGY).ifPresent(value -> {
if (value.equals("schema-name-property")) {
config.setProperty(SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY.getValue());
}
});
}
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final String schemaAccessStrategy = validationContext.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
@ -482,8 +474,7 @@ public class ValidateRecord extends AbstractProcessor {
final Integer maxValidationDetailsLength = context.getProperty(MAX_VALIDATION_DETAILS_LENGTH).evaluateAttributeExpressions(flowFile).asInteger();
final Map<String, String> attributes = new HashMap<>();
attributes.putAll(writeResult.getAttributes());
final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
@ -536,7 +527,9 @@ public class ValidateRecord extends AbstractProcessor {
} else if (schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).build();
final String schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME).evaluateAttributeExpressions(flowFile).getValue();
final Integer schemaVersion = context.getProperty(SCHEMA_VERSION).evaluateAttributeExpressions(flowFile).asInteger();
final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).branch(schemaBranchName).version(schemaVersion).build();
return schemaRegistry.retrieveSchema(schemaIdentifier);
} else if (schemaAccessStrategy.equals(SCHEMA_TEXT_PROPERTY.getValue())) {
final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions(flowFile).getValue();

View File

@ -17,23 +17,25 @@
package org.apache.nifi.processors.standard;
import org.apache.avro.Schema;
import org.apache.nifi.avro.AvroReader;
import org.apache.nifi.avro.AvroReaderWithEmbeddedSchema;
import org.apache.nifi.avro.AvroRecordReader;
import org.apache.nifi.avro.AvroRecordSetWriter;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.csv.CSVRecordSetWriter;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.serialization.DateTimeUtils;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.MockSchemaRegistry;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
@ -48,13 +50,19 @@ import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Optional;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -87,12 +95,12 @@ public class TestValidateRecord {
runner.enqueue(content);
runner.run();
runner.assertAllFlowFilesTransferred(ValidateRecord.REL_VALID, 1);
runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0).assertContentEquals(content);
runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).getFirst().assertContentEquals(content);
}
@Test
public void testWriteFailureRoutesToFaliure() throws InitializationException {
public void testWriteFailureRoutesToFailure() throws InitializationException {
final CSVReader csvReader = new CSVReader();
runner.addControllerService("reader", csvReader);
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
@ -114,13 +122,13 @@ public class TestValidateRecord {
}
@Test
public void testAppropriateServiceUsedForInvalidRecords() throws InitializationException, UnsupportedEncodingException, IOException {
final String schema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc")), "UTF-8");
public void testAppropriateServiceUsedForInvalidRecords() throws InitializationException, IOException {
final String schema = Files.readString(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc"));
final CSVReader csvReader = new CSVReader();
runner.addControllerService("reader", csvReader);
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_TEXT, schema);
runner.setProperty(csvReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvReader, SCHEMA_TEXT, schema);
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "false");
runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
@ -139,9 +147,11 @@ public class TestValidateRecord {
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
final String content = "1, John Doe\n"
+ "2, Jane Doe\n"
+ "Three, Jack Doe\n";
final String content = """
1, John Doe
2, Jane Doe
Three, Jack Doe
""";
runner.enqueue(content);
runner.run();
@ -150,24 +160,26 @@ public class TestValidateRecord {
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).getFirst();
validFlowFile.assertAttributeEquals("record.count", "2");
validFlowFile.assertContentEquals("valid\n"
+ "1,John Doe\n"
+ "2,Jane Doe\n");
validFlowFile.assertContentEquals("""
valid
1,John Doe
2,Jane Doe
""");
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).getFirst();
invalidFlowFile.assertAttributeEquals("record.count", "1");
invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n");
}
@Test
public void testStrictTypeCheck() throws InitializationException, IOException {
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")), "UTF-8");
final String validateSchema = Files.readString(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc"));
final CSVReader csvReader = new CSVReader();
runner.addControllerService("reader", csvReader);
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
runner.setProperty(csvReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true");
runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
@ -185,18 +197,20 @@ public class TestValidateRecord {
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "true");
// The validationSchema expects 'id' to be int, but CSVReader reads it as 'string'
// with strict type check, the type difference is not allowed.
final String content = "id, firstName, lastName\n"
+ "1, John, Doe\n"
+ "2, Jane, Doe\n"
+ "Three, Jack, Doe\n";
final String content = """
id, firstName, lastName
1, John, Doe
2, Jane, Doe
Three, Jack, Doe
""";
runner.enqueue(content);
runner.run();
@ -205,22 +219,24 @@ public class TestValidateRecord {
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).getFirst();
invalidFlowFile.assertAttributeEquals("record.count", "3");
final String expectedInvalidContents = "invalid\n"
+ "\"1\",\"John\",\"Doe\"\n"
+ "\"2\",\"Jane\",\"Doe\"\n"
+ "\"Three\",\"Jack\",\"Doe\"\n";
final String expectedInvalidContents = """
invalid
"1","John","Doe"
"2","Jane","Doe"
"Three","Jack","Doe"
""";
invalidFlowFile.assertContentEquals(expectedInvalidContents);
}
@Test
public void testNonStrictTypeCheckWithAvroWriter() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")), "UTF-8");
final String validateSchema = Files.readString(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc"));
final CSVReader csvReader = new CSVReader();
runner.addControllerService("reader", csvReader);
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
runner.setProperty(csvReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true");
runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
@ -238,18 +254,20 @@ public class TestValidateRecord {
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false");
// The validationSchema expects 'id' to be int, but CSVReader reads it as 'string'
// with non-strict type check, the type difference should be accepted, and results should be written as 'int'.
final String content = "id, firstName, lastName\n"
+ "1, John, Doe\n"
+ "2, Jane, Doe\n"
+ "Three, Jack, Doe\n";
final String content = """
id, firstName, lastName
1, John, Doe
2, Jane, Doe
Three, Jack, Doe
""";
runner.enqueue(content);
runner.run();
@ -260,13 +278,13 @@ public class TestValidateRecord {
final AvroReader avroReader = new AvroReader();
runner.addControllerService("avroReader", avroReader);
runner.setProperty(avroReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(avroReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.enableControllerService(avroReader);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).getFirst();
final byte[] validFlowFileBytes = validFlowFile.toByteArray();
try (
final ByteArrayInputStream resultContentStream = new ByteArrayInputStream(validFlowFileBytes);
final RecordReader recordReader = avroReader.createRecordReader(validFlowFile.getAttributes(), resultContentStream, validFlowFileBytes.length, runner.getLogger());
final RecordReader recordReader = avroReader.createRecordReader(validFlowFile.getAttributes(), resultContentStream, validFlowFileBytes.length, runner.getLogger())
) {
final RecordSchema resultSchema = recordReader.getSchema();
assertEquals(3, resultSchema.getFieldCount());
@ -289,10 +307,12 @@ public class TestValidateRecord {
assertEquals("Doe", record.getValue("lastName"));
}
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).getFirst();
invalidFlowFile.assertAttributeEquals("record.count", "1");
final String expectedInvalidContents = "invalid\n"
+ "\"Three\",\"Jack\",\"Doe\"\n";
final String expectedInvalidContents = """
invalid
"Three","Jack","Doe"
""";
invalidFlowFile.assertContentEquals(expectedInvalidContents);
}
@ -300,7 +320,7 @@ public class TestValidateRecord {
* This test case demonstrates the limitation on JsonRecordSetWriter type-coercing when strict type check is disabled.
* Since WriteJsonResult.writeRawRecord doesn't use record schema,
* type coercing does not happen with JsonWriter even if strict type check is disabled.
*
* <p>
* E.g. When an input "1" as string is given, and output field schema is int:
* <ul>
* <li>Expected result: "id": 1 (without quote)</li>
@ -309,11 +329,11 @@ public class TestValidateRecord {
*/
@Test
public void testNonStrictTypeCheckWithJsonWriter() throws InitializationException, IOException {
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc")), "UTF-8");
final String validateSchema = Files.readString(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc"));
final CSVReader csvReader = new CSVReader();
runner.addControllerService("reader", csvReader);
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
runner.setProperty(csvReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, "csv-header-derived");
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "true");
runner.setProperty(csvReader, CSVUtils.IGNORE_CSV_HEADER, "true");
runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
@ -331,18 +351,20 @@ public class TestValidateRecord {
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false");
// The validationSchema expects 'id' to be int, but CSVReader reads it as 'string'
// with non-strict type check, the type difference should be accepted, and results should be written as 'int'.
final String content = "id, firstName, lastName\n"
+ "1, John, Doe\n"
+ "2, Jane, Doe\n"
+ "Three, Jack, Doe\n";
final String content = """
id, firstName, lastName
1, John, Doe
2, Jane, Doe
Three, Jack, Doe
""";
runner.enqueue(content);
runner.run();
@ -354,7 +376,7 @@ public class TestValidateRecord {
/*
TODO: JsonRecordSetWriter does not coerce value. Should we fix this??
*/
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).getFirst();
validFlowFile.assertAttributeEquals("record.count", "2");
final String expectedValidContents = "[" +
"{\"id\":\"1\",\"firstName\":\"John\",\"lastName\":\"Doe\"}," +
@ -362,21 +384,23 @@ public class TestValidateRecord {
"]";
validFlowFile.assertContentEquals(expectedValidContents);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).getFirst();
invalidFlowFile.assertAttributeEquals("record.count", "1");
final String expectedInvalidContents = "invalid\n"
+ "\"Three\",\"Jack\",\"Doe\"\n";
final String expectedInvalidContents = """
invalid
"Three","Jack","Doe"
""";
invalidFlowFile.assertContentEquals(expectedInvalidContents);
}
@Test
public void testValidateNestedMap() throws InitializationException, IOException {
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/nested-map-schema.avsc")), StandardCharsets.UTF_8);
final String validateSchema = Files.readString(Paths.get("src/test/resources/TestValidateRecord/nested-map-schema.avsc"));
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
runner.setProperty(jsonReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
runner.setProperty(jsonReader, SCHEMA_TEXT, validateSchema);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
@ -390,14 +414,15 @@ public class TestValidateRecord {
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
// Both records should be valid if strict type checking is off
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "false");
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/nested-map-input.json"));
final Path nestedMapINputpath = Paths.get("src/test/resources/TestValidateRecord/nested-map-input.json");
runner.enqueue(nestedMapINputpath);
runner.run();
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
@ -407,7 +432,7 @@ public class TestValidateRecord {
// The second record should be invalid if strict type checking is on
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "true");
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/nested-map-input.json"));
runner.enqueue(nestedMapINputpath);
runner.run();
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
@ -417,12 +442,12 @@ public class TestValidateRecord {
@Test
public void testValidateMissingRequiredArray() throws InitializationException, IOException {
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/missing-array.avsc")), StandardCharsets.UTF_8);
final String validateSchema = Files.readString(Paths.get("src/test/resources/TestValidateRecord/missing-array.avsc"));
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
runner.setProperty(jsonReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
runner.setProperty(jsonReader, SCHEMA_TEXT, validateSchema);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
@ -436,8 +461,8 @@ public class TestValidateRecord {
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "true");
@ -454,12 +479,12 @@ public class TestValidateRecord {
@Test
public void testValidateMissingRequiredArrayWithDefault() throws InitializationException, IOException {
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/missing-array-with-default.avsc")), StandardCharsets.UTF_8);
final String validateSchema = Files.readString(Paths.get("src/test/resources/TestValidateRecord/missing-array-with-default.avsc"));
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
runner.setProperty(jsonReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
runner.setProperty(jsonReader, SCHEMA_TEXT, validateSchema);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
@ -473,8 +498,8 @@ public class TestValidateRecord {
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "true");
@ -492,12 +517,12 @@ public class TestValidateRecord {
@Test
public void testValidateJsonTimestamp() throws IOException, InitializationException {
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/timestamp.avsc")), StandardCharsets.UTF_8);
final String validateSchema = Files.readString(Paths.get("src/test/resources/TestValidateRecord/timestamp.avsc"));
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
runner.setProperty(jsonReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
runner.setProperty(jsonReader, SCHEMA_TEXT, validateSchema);
runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss");
runner.enableControllerService(jsonReader);
@ -509,17 +534,18 @@ public class TestValidateRecord {
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "true");
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
final Path timestampPath = Paths.get("src/test/resources/TestValidateRecord/timestamp.json");
runner.enqueue(timestampPath);
runner.run();
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).getFirst();
validFlowFile.assertContentEquals(new File("src/test/resources/TestValidateRecord/timestamp.json"));
// Test with a timestamp that has an invalid format.
@ -527,39 +553,38 @@ public class TestValidateRecord {
runner.disableControllerService(jsonReader);
runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
runner.enqueue(timestampPath);
runner.enableControllerService(jsonReader);
runner.run();
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).getFirst();
invalidFlowFile.assertContentEquals(new File("src/test/resources/TestValidateRecord/timestamp.json"));
// Test with an Inferred Schema.
runner.disableControllerService(jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
runner.setProperty(jsonReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss");
runner.enableControllerService(jsonReader);
runner.clearTransferState();
runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
runner.enqueue(timestampPath);
runner.run();
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
final MockFlowFile validFlowFileInferredSchema = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
final MockFlowFile validFlowFileInferredSchema = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).getFirst();
validFlowFileInferredSchema.assertContentEquals(new File("src/test/resources/TestValidateRecord/timestamp.json"));
}
@Test
public void testValidateMaps() throws IOException, InitializationException, MalformedRecordException {
final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/int-maps-schema.avsc")), StandardCharsets.UTF_8);
final String validateSchema = Files.readString(Paths.get("src/test/resources/TestValidateRecord/int-maps-schema.avsc"));
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
runner.setProperty(jsonReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SCHEMA_TEXT, validateSchema);
runner.enableControllerService(jsonReader);
final AvroRecordSetWriter avroWriter = new AvroRecordSetWriter();
@ -568,8 +593,8 @@ public class TestValidateRecord {
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(SCHEMA_TEXT, validateSchema);
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
@ -577,7 +602,7 @@ public class TestValidateRecord {
runner.run();
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).getFirst();
byte[] source = validFlowFile.toByteArray();
@ -594,13 +619,13 @@ public class TestValidateRecord {
}
@Test
public void testValidationsDetailsAttributeForInvalidRecords() throws InitializationException, UnsupportedEncodingException, IOException {
final String schema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc")), "UTF-8");
public void testValidationsDetailsAttributeForInvalidRecords() throws InitializationException, IOException {
final String schema = Files.readString(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc"));
final CSVReader csvReader = new CSVReader();
runner.addControllerService("reader", csvReader);
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvReader, SchemaAccessUtils.SCHEMA_TEXT, schema);
runner.setProperty(csvReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvReader, SCHEMA_TEXT, schema);
runner.setProperty(csvReader, CSVUtils.FIRST_LINE_IS_HEADER, "false");
runner.setProperty(csvReader, CSVUtils.QUOTE_MODE, CSVUtils.QUOTE_MINIMAL.getValue());
runner.setProperty(csvReader, CSVUtils.TRAILING_DELIMITER, "false");
@ -621,9 +646,11 @@ public class TestValidateRecord {
runner.setProperty(ValidateRecord.MAX_VALIDATION_DETAILS_LENGTH, "150");
runner.setProperty(ValidateRecord.VALIDATION_DETAILS_ATTRIBUTE_NAME, "valDetails");
final String content = "1, John Doe\n"
+ "2, Jane Doe\n"
+ "Three, Jack Doe\n";
final String content = """
1, John Doe
2, Jane Doe
Three, Jack Doe
""";
runner.enqueue(content);
runner.run();
@ -631,7 +658,7 @@ public class TestValidateRecord {
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).getFirst();
invalidFlowFile.assertAttributeEquals("record.count", "1");
invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n");
invalidFlowFile.assertAttributeExists("valDetails");
@ -641,11 +668,10 @@ public class TestValidateRecord {
@Test
public void testValidationForNullElementArrayAndMap() throws Exception {
AvroReader avroReader = new AvroReader();
final AvroReader avroReader = new AvroReader();
runner.addControllerService("reader", avroReader);
runner.enableControllerService(avroReader);
final MockRecordWriter validWriter = new MockRecordWriter("valid", false);
runner.addControllerService("writer", validWriter);
runner.enableControllerService(validWriter);
@ -668,9 +694,228 @@ public class TestValidateRecord {
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).getFirst();
validFlowFile.assertAttributeEquals("record.count", "1");
validFlowFile.assertContentEquals("valid\n[text, null],{key=null}\n");
}
@Test
public void testSchemaNameAccess() throws Exception {
final String schema = Files.readString(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc"));
final MockSchemaRegistry registry = new MockSchemaRegistry();
registry.addSchema("record", AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)));
runner.addControllerService("registry", registry);
runner.enableControllerService(registry);
final CSVReader csvReader = new CSVReader();
runner.addControllerService("reader", csvReader);
runner.setProperty(csvReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvReader, SCHEMA_TEXT, schema);
runner.enableControllerService(csvReader);
final CSVRecordSetWriter csvWriter = new CSVRecordSetWriter();
runner.addControllerService("writer", csvWriter);
runner.setProperty(csvWriter, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvWriter, SCHEMA_TEXT, schema);
runner.enableControllerService(csvWriter);
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
runner.addControllerService("invalid-writer", invalidWriter);
runner.enableControllerService(invalidWriter);
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY);
runner.setProperty(SCHEMA_REGISTRY, "registry");
runner.setProperty(SCHEMA_NAME, "record");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
runner.setProperty(ValidateRecord.MAX_VALIDATION_DETAILS_LENGTH, "150");
runner.setProperty(ValidateRecord.VALIDATION_DETAILS_ATTRIBUTE_NAME, "valDetails");
final String content = """
1, John Doe
2, Jane Doe
Three, Jack Doe
""";
runner.enqueue(content);
runner.run();
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).getFirst();
invalidFlowFile.assertAttributeEquals("record.count", "1");
invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n");
invalidFlowFile.assertAttributeExists("valDetails");
invalidFlowFile.assertAttributeEquals("valDetails", "Records in this FlowFile were invalid for the following reasons: ; "
+ "The following 1 fields had values whose type did not match the schema: [/id]");
}
@Test
public void testSchemaNameAccessWithBranch() throws Exception {
final String schema = Files.readString(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc"));
final MockSchemaRegistry registry = new MockSchemaRegistry();
registry.addSchema("record", "branch", AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)));
runner.addControllerService("registry", registry);
runner.enableControllerService(registry);
final CSVReader csvReader = new CSVReader();
runner.addControllerService("reader", csvReader);
runner.setProperty(csvReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvReader, SCHEMA_TEXT, schema);
runner.enableControllerService(csvReader);
final CSVRecordSetWriter csvWriter = new CSVRecordSetWriter();
runner.addControllerService("writer", csvWriter);
runner.setProperty(csvWriter, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvWriter, SCHEMA_TEXT, schema);
runner.enableControllerService(csvWriter);
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
runner.addControllerService("invalid-writer", invalidWriter);
runner.enableControllerService(invalidWriter);
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY);
runner.setProperty(SCHEMA_REGISTRY, "registry");
runner.setProperty(SCHEMA_NAME, "record");
runner.setProperty(SCHEMA_BRANCH_NAME, "branch");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
runner.setProperty(ValidateRecord.MAX_VALIDATION_DETAILS_LENGTH, "150");
runner.setProperty(ValidateRecord.VALIDATION_DETAILS_ATTRIBUTE_NAME, "valDetails");
final String content = """
1, John Doe
2, Jane Doe
Three, Jack Doe
""";
runner.enqueue(content);
runner.run();
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).getFirst();
invalidFlowFile.assertAttributeEquals("record.count", "1");
invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n");
invalidFlowFile.assertAttributeExists("valDetails");
invalidFlowFile.assertAttributeEquals("valDetails", "Records in this FlowFile were invalid for the following reasons: ; "
+ "The following 1 fields had values whose type did not match the schema: [/id]");
}
@Test
public void testSchemaNameAccessWithVersion() throws Exception {
final String schema = Files.readString(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc"));
final MockSchemaRegistry registry = new MockSchemaRegistry();
registry.addSchema("record", 1, AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)));
runner.addControllerService("registry", registry);
runner.enableControllerService(registry);
final CSVReader csvReader = new CSVReader();
runner.addControllerService("reader", csvReader);
runner.setProperty(csvReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvReader, SCHEMA_TEXT, schema);
runner.enableControllerService(csvReader);
final CSVRecordSetWriter csvWriter = new CSVRecordSetWriter();
runner.addControllerService("writer", csvWriter);
runner.setProperty(csvWriter, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvWriter, SCHEMA_TEXT, schema);
runner.enableControllerService(csvWriter);
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
runner.addControllerService("invalid-writer", invalidWriter);
runner.enableControllerService(invalidWriter);
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY);
runner.setProperty(SCHEMA_REGISTRY, "registry");
runner.setProperty(SCHEMA_NAME, "record");
runner.setProperty(SCHEMA_VERSION, "1");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
runner.setProperty(ValidateRecord.MAX_VALIDATION_DETAILS_LENGTH, "150");
runner.setProperty(ValidateRecord.VALIDATION_DETAILS_ATTRIBUTE_NAME, "valDetails");
final String content = """
1, John Doe
2, Jane Doe
Three, Jack Doe
""";
runner.enqueue(content);
runner.run();
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).getFirst();
invalidFlowFile.assertAttributeEquals("record.count", "1");
invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n");
invalidFlowFile.assertAttributeExists("valDetails");
invalidFlowFile.assertAttributeEquals("valDetails", "Records in this FlowFile were invalid for the following reasons: ; "
+ "The following 1 fields had values whose type did not match the schema: [/id]");
}
@Test
public void testSchemaNameAccessWithBranchAndVersion() throws Exception {
final String schema = Files.readString(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc"));
final MockSchemaRegistry registry = new MockSchemaRegistry();
registry.addSchema("record", "branch", 1, AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)));
runner.addControllerService("registry", registry);
runner.enableControllerService(registry);
final CSVReader csvReader = new CSVReader();
runner.addControllerService("reader", csvReader);
runner.setProperty(csvReader, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvReader, SCHEMA_TEXT, schema);
runner.enableControllerService(csvReader);
final CSVRecordSetWriter csvWriter = new CSVRecordSetWriter();
runner.addControllerService("writer", csvWriter);
runner.setProperty(csvWriter, ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_TEXT_PROPERTY);
runner.setProperty(csvWriter, SCHEMA_TEXT, schema);
runner.enableControllerService(csvWriter);
final MockRecordWriter invalidWriter = new MockRecordWriter("invalid", true);
runner.addControllerService("invalid-writer", invalidWriter);
runner.enableControllerService(invalidWriter);
runner.setProperty(ValidateRecord.RECORD_READER, "reader");
runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "invalid-writer");
runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY);
runner.setProperty(SCHEMA_REGISTRY, "registry");
runner.setProperty(SCHEMA_NAME, "record");
runner.setProperty(SCHEMA_BRANCH_NAME, "branch");
runner.setProperty(SCHEMA_VERSION, "1");
runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
runner.setProperty(ValidateRecord.MAX_VALIDATION_DETAILS_LENGTH, "150");
runner.setProperty(ValidateRecord.VALIDATION_DETAILS_ATTRIBUTE_NAME, "valDetails");
final String content = """
1, John Doe
2, Jane Doe
Three, Jack Doe
""";
runner.enqueue(content);
runner.run();
runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).getFirst();
invalidFlowFile.assertAttributeEquals("record.count", "1");
invalidFlowFile.assertContentEquals("invalid\n\"Three\",\"Jack Doe\"\n");
invalidFlowFile.assertAttributeExists("valDetails");
invalidFlowFile.assertAttributeEquals("valDetails", "Records in this FlowFile were invalid for the following reasons: ; "
+ "The following 1 fields had values whose type did not match the schema: [/id]");
}
}