NIFI-9838: Added Client Certificate attributes to ListenTCPRecord FlowFiles

This closes #5908

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Peter Gyori 2022-03-25 18:36:43 +01:00 committed by exceptionfactory
parent 9566d3fa78
commit 6ade47ac4f
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 65 additions and 5 deletions

View File

@ -24,6 +24,8 @@ import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -40,17 +42,20 @@ public class SSLSocketChannelRecordReader implements SocketChannelRecordReader {
private final SSLSocketChannel sslSocketChannel;
private final RecordReaderFactory readerFactory;
private final SocketChannelRecordReaderDispatcher dispatcher;
private final SSLEngine sslEngine;
private RecordReader recordReader;
public SSLSocketChannelRecordReader(final SocketChannel socketChannel,
final SSLSocketChannel sslSocketChannel,
final RecordReaderFactory readerFactory,
final SocketChannelRecordReaderDispatcher dispatcher) {
final SocketChannelRecordReaderDispatcher dispatcher,
final SSLEngine sslEngine) {
this.socketChannel = socketChannel;
this.sslSocketChannel = sslSocketChannel;
this.readerFactory = readerFactory;
this.dispatcher = dispatcher;
this.sslEngine = sslEngine;
}
@Override
@ -87,4 +92,7 @@ public class SSLSocketChannelRecordReader implements SocketChannelRecordReader {
dispatcher.connectionCompleted();
}
public SSLSession getSession() {
return sslEngine.getSession();
}
}

View File

@ -119,7 +119,7 @@ public class SocketChannelRecordReaderDispatcher implements Runnable, Closeable
}
final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel);
socketChannelRecordReader = new SSLSocketChannelRecordReader(socketChannel, sslSocketChannel, readerFactory, this);
socketChannelRecordReader = new SSLSocketChannelRecordReader(socketChannel, sslSocketChannel, readerFactory, this, sslEngine);
}
// queue the SocketChannelRecordReader for processing by the processor

View File

@ -25,6 +25,8 @@ import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketTimeoutException;
import java.nio.channels.ServerSocketChannel;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -37,6 +39,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -62,6 +67,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.record.listen.SSLSocketChannelRecordReader;
import org.apache.nifi.record.listen.SocketChannelRecordReader;
import org.apache.nifi.record.listen.SocketChannelRecordReaderDispatcher;
import org.apache.nifi.security.util.ClientAuth;
@ -87,14 +93,26 @@ import org.apache.nifi.ssl.SSLContextService;
"If the read times out, or if any other error is encountered when reading, the connection will be closed, and any records " +
"read up to that point will be handled according to the configured Read Error Strategy (Discard or Transfer). In cases where " +
"clients are keeping a connection open, the concurrent tasks for the processor should be adjusted to match the Max Number of " +
"TCP Connections allowed, so that there is a task processing each connection.")
"TCP Connections allowed, so that there is a task processing each connection. " +
"The processor can be configured to use an SSL Context Service to only allow secure connections. " +
"When connected clients present certificates for mutual TLS authentication, the Distinguished Names of the client certificate's " +
"issuer and subject are added to the outgoing FlowFiles as attributes. " +
"The processor does not perform authorization based on Distinguished Name values, but since these values " +
"are attached to the outgoing FlowFiles, authorization can be implemented based on these attributes.")
@WritesAttributes({
@WritesAttribute(attribute="tcp.sender", description="The host that sent the data."),
@WritesAttribute(attribute="tcp.port", description="The port that the processor accepted the connection on."),
@WritesAttribute(attribute="record.count", description="The number of records written to the flow file."),
@WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file.")
@WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file."),
@WritesAttribute(attribute="client.certificate.issuer.dn", description="For connections using mutual TLS, the Distinguished Name of the " +
"Certificate Authority that issued the client's certificate " +
"is attached to the FlowFile."),
@WritesAttribute(attribute="client.certificate.subject.dn", description="For connections using mutual TLS, the Distinguished Name of the " +
"client certificate's owner (subject) is attached to the FlowFile.")
})
public class ListenTCPRecord extends AbstractProcessor {
private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn";
private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn";
static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("port")
@ -199,7 +217,6 @@ public class ListenTCPRecord extends AbstractProcessor {
.description("Messages received successfully will be sent out this relationship.")
.build();
static final List<PropertyDescriptor> PROPERTIES;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
@ -427,6 +444,7 @@ public class ListenTCPRecord extends AbstractProcessor {
attributes.put("tcp.sender", sender);
attributes.put("tcp.port", String.valueOf(port));
attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
addClientCertificateAttributes(attributes, socketRecordReader);
flowFile = session.putAllAttributes(flowFile, attributes);
final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
@ -460,4 +478,23 @@ public class ListenTCPRecord extends AbstractProcessor {
private String getRemoteAddress(final SocketChannelRecordReader socketChannelRecordReader) {
return socketChannelRecordReader.getRemoteAddress() == null ? "null" : socketChannelRecordReader.getRemoteAddress().toString();
}
private void addClientCertificateAttributes(final Map<String, String> attributes, final SocketChannelRecordReader socketRecordReader)
throws SSLPeerUnverifiedException {
if (socketRecordReader instanceof SSLSocketChannelRecordReader) {
SSLSocketChannelRecordReader sslSocketRecordReader = (SSLSocketChannelRecordReader) socketRecordReader;
SSLSession sslSession = sslSocketRecordReader.getSession();
try {
Certificate[] certificates = sslSession.getPeerCertificates();
if (certificates.length > 0) {
X509Certificate certificate = (X509Certificate) certificates[0];
attributes.put(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, certificate.getSubjectDN().toString());
attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, certificate.getIssuerDN().toString());
}
} catch (SSLPeerUnverifiedException peerUnverifiedException) {
getLogger().debug("Remote Peer [{}] not verified: client certificates not provided",
socketRecordReader.getRemoteAddress(), peerUnverifiedException);
}
}
}
}

View File

@ -172,6 +172,21 @@ public class TestListenTCPRecord {
Assert.assertTrue(content.contains("This is a test " + 3));
}
@Test(timeout = TEST_TIMEOUT)
public void testRunSSLClientDNsAddedAsAttributes() throws InitializationException, IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
enableSslContextService(keyStoreSslContext);
run(1, keyStoreSslContext);
final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
Assert.assertEquals(1, mockFlowFiles.size());
final MockFlowFile flowFile = mockFlowFiles.get(0);
flowFile.assertAttributeEquals("client.certificate.subject.dn", "CN=localhost");
flowFile.assertAttributeEquals("client.certificate.issuer.dn", "CN=localhost");
}
@Test(timeout = TEST_TIMEOUT)
public void testRunClientAuthNone() throws InitializationException, IOException, InterruptedException {
runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.NONE.name());