NIFI-11807 Added ExtractRecordSchema Processor

This closes #7482

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matt Burgess 2023-07-16 13:45:42 -04:00 committed by exceptionfactory
parent f5407fcbe8
commit f6a14bc475
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
6 changed files with 274 additions and 0 deletions

View File

@ -559,6 +559,7 @@
<exclude>src/test/resources/TestExtractGrok/apache.log</exclude> <exclude>src/test/resources/TestExtractGrok/apache.log</exclude>
<exclude>src/test/resources/TestExtractGrok/patterns</exclude> <exclude>src/test/resources/TestExtractGrok/patterns</exclude>
<exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude> <exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude>
<exclude>src/test/resources/TestExtractRecordSchema/name_age_schema.avsc</exclude>
<exclude>src/test/resources/TestForkRecord/input/complex-input-json.json</exclude> <exclude>src/test/resources/TestForkRecord/input/complex-input-json.json</exclude>
<exclude>src/test/resources/TestForkRecord/output/extract-transactions.json</exclude> <exclude>src/test/resources/TestForkRecord/output/extract-transactions.json</exclude>
<exclude>src/test/resources/TestForkRecord/output/split-address.json</exclude> <exclude>src/test/resources/TestForkRecord/output/split-address.json</exclude>

View File

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@SideEffectFree
@Tags({"record", "generic", "schema", "json", "csv", "avro", "freeform", "text", "xml"})
@WritesAttributes({
@WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader."),
@WritesAttribute(attribute = "avro.schema", description = "This attribute provides the schema extracted from the input FlowFile using the provided RecordReader."),
})
@CapabilityDescription("Extracts the record schema from the FlowFile using the supplied Record Reader and writes it to the `avro.schema` attribute.")
public class ExtractRecordSchema extends AbstractProcessor {
public static final String SCHEMA_ATTRIBUTE_NAME = "avro.schema";
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("record-reader")
.displayName("Record Reader")
.description("Specifies the Controller Service to use for reading incoming data")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor SCHEMA_CACHE_SIZE = new PropertyDescriptor.Builder()
.name("cache-size")
.displayName("Schema Cache Size")
.description("Specifies the number of schemas to cache. This value should reflect the expected number of different schemas "
+ "that may be in the incoming FlowFiles. This ensures more efficient retrieval of the schemas and thus the processor performance.")
.defaultValue("10")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.required(true)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles whose record schemas are successfully extracted will be routed to this relationship")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile's record schema cannot be extracted from the configured input format, "
+ "the FlowFile will be routed to this relationship")
.build();
static final List<PropertyDescriptor> properties = Arrays.asList(RECORD_READER, SCHEMA_CACHE_SIZE);
private LoadingCache<RecordSchema, String> avroSchemaTextCache;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
return relationships;
}
@OnScheduled
public void setup(ProcessContext context) {
final int cacheSize = context.getProperty(SCHEMA_CACHE_SIZE).asInteger();
avroSchemaTextCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.build(schema -> AvroTypeUtil.extractAvroSchema(schema).toString());
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final Map<String, String> originalAttributes = flowFile.getAttributes();
final RecordSchema recordSchema;
try (final InputStream inputStream = session.read(flowFile);
final RecordReader reader = readerFactory.createRecordReader(originalAttributes, inputStream, flowFile.getSize(), getLogger())) {
recordSchema = reader.getSchema();
} catch (final Exception e) {
getLogger().error("Failed to process {}; will route to failure", flowFile, e);
// Since we are wrapping the exceptions above there should always be a cause
// but it's possible it might not have a message. This handles that by logging
// the name of the class thrown.
Throwable c = e.getCause();
if (c == null) {
session.putAttribute(flowFile, "record.error.message", e.getClass().getCanonicalName() + " Thrown");
} else {
session.putAttribute(flowFile, "record.error.message", (c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
}
session.transfer(flowFile, REL_FAILURE);
return;
}
session.putAttribute(flowFile, SCHEMA_ATTRIBUTE_NAME, avroSchemaTextCache.get(recordSchema));
session.transfer(flowFile, REL_SUCCESS);
}
}

View File

@ -69,8 +69,11 @@ import java.io.UnsupportedEncodingException;
import java.net.URI; import java.net.URI;
import java.net.URLDecoder; import java.net.URLDecoder;
import java.security.Principal; import java.security.Principal;
import java.security.cert.CertificateParsingException;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.HashMap; import java.util.HashMap;
@ -769,6 +772,38 @@ public class HandleHttpRequest extends AbstractProcessor {
return session.putAllAttributes(flowFile, attributes); return session.putAllAttributes(flowFile, attributes);
} }
private void putCertificateAttributes(final X509Certificate certificate, final Map<String, String> attributes) {
putAttribute(attributes, "http.subject.dn", certificate.getSubjectX500Principal().getName());
putAttribute(attributes, "http.issuer.dn", certificate.getIssuerX500Principal().getName());
try {
final Collection<List<?>> subjectAlternativeNames = certificate.getSubjectAlternativeNames();
if (subjectAlternativeNames != null) {
int subjectAlternativeNameIndex = 0;
for (final List<?> subjectAlternativeTypeName : subjectAlternativeNames) {
final String nameTypeAttributeKey = String.format("http.client.certificate.sans.%d.nameType", subjectAlternativeNameIndex);
final String nameType = subjectAlternativeTypeName.get(0).toString();
putAttribute(attributes, nameTypeAttributeKey, nameType);
final String nameAttributeKey = String.format("http.client.certificate.sans.%d.name", subjectAlternativeNameIndex);
final Object name = subjectAlternativeTypeName.get(1);
final String serializedName;
if (name instanceof byte[]) {
final byte[] encodedName = (byte[]) name;
serializedName = Base64.getEncoder().encodeToString(encodedName);
} else {
serializedName = name.toString();
}
putAttribute(attributes, nameTypeAttributeKey, serializedName);
}
}
} catch (final CertificateParsingException e) {
getLogger().info("Read X.509 Client Certificate Subject Alternative Names failed", e);
}
}
private void forwardFlowFile(final ProcessSession session, final long start, final HttpServletRequest request, final FlowFile flowFile) { private void forwardFlowFile(final ProcessSession session, final long start, final HttpServletRequest request, final FlowFile flowFile) {
final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
final String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT); final String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT);

View File

@ -39,6 +39,7 @@ org.apache.nifi.processors.standard.ExecuteSQL
org.apache.nifi.processors.standard.ExecuteSQLRecord org.apache.nifi.processors.standard.ExecuteSQLRecord
org.apache.nifi.processors.standard.ExecuteStreamCommand org.apache.nifi.processors.standard.ExecuteStreamCommand
org.apache.nifi.processors.standard.ExtractGrok org.apache.nifi.processors.standard.ExtractGrok
org.apache.nifi.processors.standard.ExtractRecordSchema
org.apache.nifi.processors.standard.ExtractText org.apache.nifi.processors.standard.ExtractText
org.apache.nifi.processors.standard.FetchDistributedMapCache org.apache.nifi.processors.standard.FetchDistributedMapCache
org.apache.nifi.processors.standard.FetchFile org.apache.nifi.processors.standard.FetchFile

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
public class TestExtractRecordSchema {
final static Path NAME_AGE_SCHEMA_PATH = Paths.get("src/test/resources/TestExtractRecordSchema/name_age_schema.avsc");
@Test
public void testSuccessfulExtraction() throws Exception {
final MockRecordParser readerService = new MockRecordParser();
final TestRunner runner = TestRunners.newTestRunner(ExtractRecordSchema.class);
runner.addControllerService("reader", readerService);
runner.enableControllerService(readerService);
runner.setProperty(ExtractRecordSchema.RECORD_READER, "reader");
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
readerService.addRecord("John Doe", 48);
readerService.addRecord("Jane Doe", 47);
readerService.addRecord("Jimmy Doe", 14);
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(ExtractRecordSchema.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ExtractRecordSchema.REL_SUCCESS).get(0);
final String expectedAttributeValue = new String(Files.readAllBytes(NAME_AGE_SCHEMA_PATH));
out.assertAttributeEquals(ExtractRecordSchema.SCHEMA_ATTRIBUTE_NAME, expectedAttributeValue);
}
@Test
public void testNoSchema() throws Exception {
final MockRecordParser readerService = new MockRecordParserSchemaNotFound();
final TestRunner runner = TestRunners.newTestRunner(ExtractRecordSchema.class);
runner.addControllerService("reader", readerService);
runner.enableControllerService(readerService);
runner.setProperty(ExtractRecordSchema.RECORD_READER, "reader");
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(ExtractRecordSchema.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(ExtractRecordSchema.REL_FAILURE).get(0);
out.assertAttributeEquals("record.error.message", "org.apache.nifi.schema.access.SchemaNotFoundException Thrown");
}
private static class MockRecordParserSchemaNotFound extends MockRecordParser {
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws SchemaNotFoundException {
throw new SchemaNotFoundException("test");
}
}
}

View File

@ -0,0 +1 @@
{"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields":[{"name":"name","type":["string","null"]},{"name":"age","type":["int","null"]}]}