diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 8c00f9f64a..d7c206d37b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -559,6 +559,7 @@
src/test/resources/TestExtractGrok/apache.log
src/test/resources/TestExtractGrok/patterns
src/test/resources/TestExtractGrok/simple_text.log
+ src/test/resources/TestExtractRecordSchema/name_age_schema.avsc
src/test/resources/TestForkRecord/input/complex-input-json.json
src/test/resources/TestForkRecord/output/extract-transactions.json
src/test/resources/TestForkRecord/output/split-address.json
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractRecordSchema.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractRecordSchema.java
new file mode 100644
index 0000000000..8c7c670816
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExtractRecordSchema.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@SideEffectFree
+@Tags({"record", "generic", "schema", "json", "csv", "avro", "freeform", "text", "xml"})
+@WritesAttributes({
+ @WritesAttribute(attribute = "record.error.message", description = "This attribute provides on failure the error message encountered by the Reader."),
+ @WritesAttribute(attribute = "avro.schema", description = "This attribute provides the schema extracted from the input FlowFile using the provided RecordReader."),
+})
+@CapabilityDescription("Extracts the record schema from the FlowFile using the supplied Record Reader and writes it to the `avro.schema` attribute.")
+public class ExtractRecordSchema extends AbstractProcessor {
+
+ public static final String SCHEMA_ATTRIBUTE_NAME = "avro.schema";
+
+ static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for reading incoming data")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor SCHEMA_CACHE_SIZE = new PropertyDescriptor.Builder()
+ .name("cache-size")
+ .displayName("Schema Cache Size")
+ .description("Specifies the number of schemas to cache. This value should reflect the expected number of different schemas "
+ + "that may be in the incoming FlowFiles. This ensures more efficient retrieval of the schemas and thus the processor performance.")
+ .defaultValue("10")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .required(true)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles whose record schemas are successfully extracted will be routed to this relationship")
+ .build();
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile's record schema cannot be extracted from the configured input format, "
+ + "the FlowFile will be routed to this relationship")
+ .build();
+
+ static final List properties = Arrays.asList(RECORD_READER, SCHEMA_CACHE_SIZE);
+
+ private LoadingCache avroSchemaTextCache;
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set getRelationships() {
+ final Set relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ return relationships;
+ }
+
+ @OnScheduled
+ public void setup(ProcessContext context) {
+ final int cacheSize = context.getProperty(SCHEMA_CACHE_SIZE).asInteger();
+ avroSchemaTextCache = Caffeine.newBuilder()
+ .maximumSize(cacheSize)
+ .build(schema -> AvroTypeUtil.extractAvroSchema(schema).toString());
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final Map originalAttributes = flowFile.getAttributes();
+ final RecordSchema recordSchema;
+ try (final InputStream inputStream = session.read(flowFile);
+ final RecordReader reader = readerFactory.createRecordReader(originalAttributes, inputStream, flowFile.getSize(), getLogger())) {
+ recordSchema = reader.getSchema();
+
+ } catch (final Exception e) {
+ getLogger().error("Failed to process {}; will route to failure", flowFile, e);
+ // Since we are wrapping the exceptions above there should always be a cause
+ // but it's possible it might not have a message. This handles that by logging
+ // the name of the class thrown.
+ Throwable c = e.getCause();
+ if (c == null) {
+ session.putAttribute(flowFile, "record.error.message", e.getClass().getCanonicalName() + " Thrown");
+ } else {
+ session.putAttribute(flowFile, "record.error.message", (c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : c.getClass().getCanonicalName() + " Thrown");
+ }
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ session.putAttribute(flowFile, SCHEMA_ATTRIBUTE_NAME, avroSchemaTextCache.get(recordSchema));
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
index 39e598f880..e3dda03afe 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java
@@ -69,8 +69,11 @@ import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.security.Principal;
+import java.security.cert.CertificateParsingException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@@ -769,6 +772,38 @@ public class HandleHttpRequest extends AbstractProcessor {
return session.putAllAttributes(flowFile, attributes);
}
+ private void putCertificateAttributes(final X509Certificate certificate, final Map attributes) {
+ putAttribute(attributes, "http.subject.dn", certificate.getSubjectX500Principal().getName());
+ putAttribute(attributes, "http.issuer.dn", certificate.getIssuerX500Principal().getName());
+
+ try {
+ final Collection> subjectAlternativeNames = certificate.getSubjectAlternativeNames();
+
+ if (subjectAlternativeNames != null) {
+ int subjectAlternativeNameIndex = 0;
+ for (final List> subjectAlternativeTypeName : subjectAlternativeNames) {
+ final String nameTypeAttributeKey = String.format("http.client.certificate.sans.%d.nameType", subjectAlternativeNameIndex);
+ final String nameType = subjectAlternativeTypeName.get(0).toString();
+ putAttribute(attributes, nameTypeAttributeKey, nameType);
+
+ final String nameAttributeKey = String.format("http.client.certificate.sans.%d.name", subjectAlternativeNameIndex);
+ final Object name = subjectAlternativeTypeName.get(1);
+
+ final String serializedName;
+ if (name instanceof byte[]) {
+ final byte[] encodedName = (byte[]) name;
+ serializedName = Base64.getEncoder().encodeToString(encodedName);
+ } else {
+ serializedName = name.toString();
+ }
+ putAttribute(attributes, nameTypeAttributeKey, serializedName);
+ }
+ }
+ } catch (final CertificateParsingException e) {
+ getLogger().info("Read X.509 Client Certificate Subject Alternative Names failed", e);
+ }
+ }
+
private void forwardFlowFile(final ProcessSession session, final long start, final HttpServletRequest request, final FlowFile flowFile) {
final long receiveMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
final String subjectDn = flowFile.getAttribute(HTTPUtils.HTTP_SSL_CERT);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index eb01b54c9f..38ef7d0267 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -39,6 +39,7 @@ org.apache.nifi.processors.standard.ExecuteSQL
org.apache.nifi.processors.standard.ExecuteSQLRecord
org.apache.nifi.processors.standard.ExecuteStreamCommand
org.apache.nifi.processors.standard.ExtractGrok
+org.apache.nifi.processors.standard.ExtractRecordSchema
org.apache.nifi.processors.standard.ExtractText
org.apache.nifi.processors.standard.FetchDistributedMapCache
org.apache.nifi.processors.standard.FetchFile
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractRecordSchema.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractRecordSchema.java
new file mode 100644
index 0000000000..9e2e64db72
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExtractRecordSchema.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Test;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Map;
+
+public class TestExtractRecordSchema {
+
+ final static Path NAME_AGE_SCHEMA_PATH = Paths.get("src/test/resources/TestExtractRecordSchema/name_age_schema.avsc");
+
+ @Test
+ public void testSuccessfulExtraction() throws Exception {
+ final MockRecordParser readerService = new MockRecordParser();
+ final TestRunner runner = TestRunners.newTestRunner(ExtractRecordSchema.class);
+ runner.addControllerService("reader", readerService);
+ runner.enableControllerService(readerService);
+ runner.setProperty(ExtractRecordSchema.RECORD_READER, "reader");
+
+ readerService.addSchemaField("name", RecordFieldType.STRING);
+ readerService.addSchemaField("age", RecordFieldType.INT);
+
+ readerService.addRecord("John Doe", 48);
+ readerService.addRecord("Jane Doe", 47);
+ readerService.addRecord("Jimmy Doe", 14);
+
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExtractRecordSchema.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ExtractRecordSchema.REL_SUCCESS).get(0);
+
+ final String expectedAttributeValue = new String(Files.readAllBytes(NAME_AGE_SCHEMA_PATH));
+ out.assertAttributeEquals(ExtractRecordSchema.SCHEMA_ATTRIBUTE_NAME, expectedAttributeValue);
+ }
+
+ @Test
+ public void testNoSchema() throws Exception {
+ final MockRecordParser readerService = new MockRecordParserSchemaNotFound();
+ final TestRunner runner = TestRunners.newTestRunner(ExtractRecordSchema.class);
+ runner.addControllerService("reader", readerService);
+ runner.enableControllerService(readerService);
+ runner.setProperty(ExtractRecordSchema.RECORD_READER, "reader");
+
+ runner.enqueue("");
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ExtractRecordSchema.REL_FAILURE, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ExtractRecordSchema.REL_FAILURE).get(0);
+
+ out.assertAttributeEquals("record.error.message", "org.apache.nifi.schema.access.SchemaNotFoundException Thrown");
+ }
+
+ private static class MockRecordParserSchemaNotFound extends MockRecordParser {
+ @Override
+ public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) throws SchemaNotFoundException {
+ throw new SchemaNotFoundException("test");
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractRecordSchema/name_age_schema.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractRecordSchema/name_age_schema.avsc
new file mode 100644
index 0000000000..c349193e84
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestExtractRecordSchema/name_age_schema.avsc
@@ -0,0 +1 @@
+{"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields":[{"name":"name","type":["string","null"]},{"name":"age","type":["int","null"]}]}
\ No newline at end of file