diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java index 6236d8e3d4..cf9a538c27 100644 --- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java +++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/util/CertificateUtils.java @@ -17,14 +17,21 @@ package org.apache.nifi.security.util; import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.net.Socket; import java.net.URL; import java.security.KeyStore; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; import java.security.cert.CertificateParsingException; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSocket; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,9 +43,9 @@ public final class CertificateUtils { /** * Returns true if the given keystore can be loaded using the given keystore type and password. Returns false otherwise. * - * @param keystore the keystore to validate + * @param keystore the keystore to validate * @param keystoreType the type of the keystore - * @param password the password to access the keystore + * @param password the password to access the keystore * @return true if valid; false otherwise */ public static boolean isStoreValid(final URL keystore, final KeystoreType keystoreType, final char[] password) { @@ -137,6 +144,75 @@ public final class CertificateUtils { return result; } + public static String extractClientDNFromSSLSocket(Socket socket) throws CertificateException { + String dn = null; + if (socket instanceof SSLSocket) { + final SSLSocket sslSocket = (SSLSocket) socket; + try { + final Certificate[] certChains = sslSocket.getSession().getPeerCertificates(); + if (certChains != null && certChains.length > 0) { + X509Certificate x509Certificate = convertAbstractX509Certificate(certChains[0]); + dn = x509Certificate.getSubjectDN().getName().trim(); + } + } catch (SSLPeerUnverifiedException e) { + throw new CertificateException(e); + } + } + + return dn; + } + + /** + * Accepts a legacy {@link javax.security.cert.X509Certificate} and returns an {@link X509Certificate}. The {@code javax.*} package certificate classes are for legacy compatibility and should + * not be used for new development. + * + * @param legacyCertificate the {@code javax.security.cert.X509Certificate} + * @return a new {@code java.security.cert.X509Certificate} + * @throws CertificateException if there is an error generating the new certificate + */ + public static X509Certificate convertLegacyX509Certificate(javax.security.cert.X509Certificate legacyCertificate) throws CertificateException { + if (legacyCertificate == null) { + throw new IllegalArgumentException("The X.509 certificate cannot be null"); + } + + try { + return formX509Certificate(legacyCertificate.getEncoded()); + } catch (javax.security.cert.CertificateEncodingException e) { + throw new CertificateException(e); + } + } + + /** + * Accepts an abstract {@link java.security.cert.Certificate} and returns an {@link X509Certificate}. Because {@code sslSocket.getSession().getPeerCertificates()} returns an array of the + * abstract certificates, they must be translated to X.509 to replace the functionality of {@code sslSocket.getSession().getPeerCertificateChain()}. + * + * @param abstractCertificate the {@code java.security.cert.Certificate} + * @return a new {@code java.security.cert.X509Certificate} + * @throws CertificateException if there is an error generating the new certificate + */ + public static X509Certificate convertAbstractX509Certificate(java.security.cert.Certificate abstractCertificate) throws CertificateException { + if (abstractCertificate == null || !(abstractCertificate instanceof X509Certificate)) { + throw new IllegalArgumentException("The certificate cannot be null and must be an X.509 certificate"); + } + + try { + return formX509Certificate(abstractCertificate.getEncoded()); + } catch (java.security.cert.CertificateEncodingException e) { + throw new CertificateException(e); + } + } + + private static X509Certificate formX509Certificate(byte[] encodedCertificate) throws CertificateException { + try { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + ByteArrayInputStream bais = new ByteArrayInputStream(encodedCertificate); + return (X509Certificate) cf.generateCertificate(bais); + } catch (CertificateException e) { + logger.error("Error converting the certificate", e); + throw e; + } + } + private CertificateUtils() { } } diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 1990a22fdf..fa35f288be 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -31,6 +31,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; +import java.security.cert.CertificateException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -54,11 +55,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Pattern; - import javax.net.ssl.SSLContext; -import javax.security.cert.CertificateExpiredException; -import javax.security.cert.CertificateNotYetValidException; - import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.PeerDescription; @@ -72,7 +69,6 @@ import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.PortNotRunningException; -import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.exception.TransmissionDisabledException; import org.apache.nifi.remote.exception.UnknownPortException; import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; @@ -236,12 +232,12 @@ public class EndpointConnectionPool { } } - public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { + public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException { return getEndpointConnection(direction, null); } public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) - throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { + throws IOException { // // Attempt to get a connection state that already exists for this URL. // @@ -532,10 +528,10 @@ public class EndpointConnectionPool { private boolean isPenalized(final PeerStatus peerStatus) { final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription()); - return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis()); + return (expirationEnd != null && expirationEnd > System.currentTimeMillis()); } - private List createPeerStatusList(final TransferDirection direction) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException { + private List createPeerStatusList(final TransferDirection direction) throws IOException { Set statuses = getPeerStatuses(); if (statuses == null) { refreshPeers(); @@ -576,7 +572,7 @@ public class EndpointConnectionPool { return cache.getStatuses(); } - private Set fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException { + private Set fetchRemotePeerStatuses() throws IOException { final String hostname = clusterUrl.getHost(); final Integer port = getSiteToSitePort(); if (port == null) { @@ -704,7 +700,7 @@ public class EndpointConnectionPool { try { commsSession.setUserDn(socketChannel.getDn()); - } catch (final CertificateNotYetValidException | CertificateExpiredException ex) { + } catch (final CertificateException ex) { throw new IOException(ex); } } else { @@ -801,7 +797,7 @@ public class EndpointConnectionPool { connection.getSocketClientProtocol().shutdown(connection.getPeer()); } catch (final Exception e) { logger.debug("Failed to shut down {} using {} due to {}", - new Object[]{connection.getSocketClientProtocol(), connection.getPeer(), e}); + connection.getSocketClientProtocol(), connection.getPeer(), e); } terminate(connection); diff --git a/nifi-commons/nifi-utils/pom.xml b/nifi-commons/nifi-utils/pom.xml index 71854e5717..d108a6d10d 100644 --- a/nifi-commons/nifi-utils/pom.xml +++ b/nifi-commons/nifi-utils/pom.xml @@ -24,7 +24,12 @@ 1.0.0-SNAPSHOT jar + + + org.apache.nifi + nifi-security-utils + + diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java index 2209e3812f..7a09f5f1c6 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java @@ -25,22 +25,20 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.ClosedByInterruptException; import java.nio.channels.SocketChannel; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.util.concurrent.TimeUnit; - import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngineResult; import javax.net.ssl.SSLEngineResult.Status; import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLPeerUnverifiedException; -import javax.security.cert.CertificateExpiredException; -import javax.security.cert.CertificateNotYetValidException; -import javax.security.cert.X509Certificate; - import org.apache.nifi.remote.exception.TransmissionDisabledException; import org.apache.nifi.remote.io.socket.BufferStateManager; import org.apache.nifi.remote.io.socket.BufferStateManager.Direction; - +import org.apache.nifi.security.util.CertificateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,7 +131,7 @@ public class SSLSocketChannel implements Closeable { return timeoutMillis; } - public void connect() throws SSLHandshakeException, IOException { + public void connect() throws IOException { try { channel.configureBlocking(false); if (!channel.isConnected()) { @@ -177,13 +175,13 @@ public class SSLSocketChannel implements Closeable { } } - public String getDn() throws CertificateExpiredException, CertificateNotYetValidException, SSLPeerUnverifiedException { - final X509Certificate[] certs = engine.getSession().getPeerCertificateChain(); + public String getDn() throws CertificateException, SSLPeerUnverifiedException { + final Certificate[] certs = engine.getSession().getPeerCertificates(); if (certs == null || certs.length == 0) { throw new SSLPeerUnverifiedException("No certificates found"); } - final X509Certificate cert = certs[0]; + final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]); cert.checkValidity(); return cert.getSubjectDN().getName().trim(); } @@ -230,7 +228,7 @@ public class SSLSocketChannel implements Closeable { final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); // Read handshake response from other side - logger.trace("{} Unwrapping: {} to {}", new Object[]{this, readableDataIn, appData}); + logger.trace("{} Unwrapping: {} to {}", this, readableDataIn, appData); SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData); logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult); @@ -402,7 +400,7 @@ public class SSLSocketChannel implements Closeable { final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); try { SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer); - logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse}); + logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", this, handshaking, unwrapResponse); if (unwrapResponse.getStatus().equals(Status.CLOSED)) { // Drain the incoming TCP buffer final ByteBuffer discardBuffer = ByteBuffer.allocate(8192); @@ -486,7 +484,7 @@ public class SSLSocketChannel implements Closeable { final int bytesCopied = appDataRemaining - appDataBuffer.remaining(); logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space", - new Object[]{this, bytesToCopy, bytesCopied}); + this, bytesToCopy, bytesCopied); return bytesCopied; } return 0; @@ -555,7 +553,7 @@ public class SSLSocketChannel implements Closeable { SSLEngineResult unwrapResponse = null; final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer); - logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse}); + logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", this, handshaking, unwrapResponse); switch (unwrapResponse.getStatus()) { case BUFFER_OVERFLOW: diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java index 9ae6182da2..886553e517 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImpl.java @@ -18,10 +18,7 @@ package org.apache.nifi.cluster.protocol.impl; import java.io.IOException; import java.net.Socket; - -import javax.net.ssl.SSLSocket; -import javax.security.cert.X509Certificate; - +import java.security.cert.CertificateException; import org.apache.nifi.cluster.protocol.NodeProtocolSender; import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; @@ -38,6 +35,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; import org.apache.nifi.io.socket.SocketConfiguration; import org.apache.nifi.io.socket.SocketUtils; import org.apache.nifi.io.socket.multicast.DiscoverableService; +import org.apache.nifi.security.util.CertificateUtils; public class NodeProtocolSenderImpl implements NodeProtocolSender { @@ -46,7 +44,7 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender { private final ProtocolContext protocolContext; public NodeProtocolSenderImpl(final ClusterServiceLocator clusterManagerProtocolServiceLocator, - final SocketConfiguration socketConfiguration, final ProtocolContext protocolContext) { + final SocketConfiguration socketConfiguration, final ProtocolContext protocolContext) { if (clusterManagerProtocolServiceLocator == null) { throw new IllegalArgumentException("Protocol Service Locator may not be null."); } else if (socketConfiguration == null) { @@ -66,20 +64,7 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender { try { socket = createSocket(); - String ncmDn = null; - if (socket instanceof SSLSocket) { - final SSLSocket sslSocket = (SSLSocket) socket; - try { - final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain(); - if (certChains != null && certChains.length > 0) { - ncmDn = certChains[0].getSubjectDN().getName(); - } - } catch (final ProtocolException pe) { - throw pe; - } catch (final Exception e) { - throw new ProtocolException(e); - } - } + String ncmDn = getNCMDN(socket); try { // marshal message to output stream @@ -110,6 +95,14 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender { } } + private String getNCMDN(Socket socket) { + try { + return CertificateUtils.extractClientDNFromSSLSocket(socket); + } catch (CertificateException e) { + throw new ProtocolException(e); + } + } + @Override public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException { sendProtocolMessage(msg); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java index d48e0ee9e5..1345df318a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java @@ -19,15 +19,12 @@ package org.apache.nifi.cluster.protocol.impl; import java.io.IOException; import java.io.InputStream; import java.net.Socket; +import java.security.cert.CertificateException; import java.util.Collection; import java.util.Collections; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; - -import javax.net.ssl.SSLSocket; -import javax.security.cert.X509Certificate; - import org.apache.nifi.cluster.protocol.ProtocolContext; import org.apache.nifi.cluster.protocol.ProtocolException; import org.apache.nifi.cluster.protocol.ProtocolHandler; @@ -41,8 +38,8 @@ import org.apache.nifi.io.socket.SocketListener; import org.apache.nifi.logging.NiFiLog; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.security.util.CertificateUtils; import org.apache.nifi.util.StopWatch; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,20 +124,7 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi final String requestId = UUID.randomUUID().toString(); logger.info("Received request {} from {}", requestId, hostname); - String requestorDn = null; - if (socket instanceof SSLSocket) { - final SSLSocket sslSocket = (SSLSocket) socket; - try { - final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain(); - if (certChains != null && certChains.length > 0) { - requestorDn = certChains[0].getSubjectDN().getName(); - } - } catch (final ProtocolException pe) { - throw pe; - } catch (final Exception e) { - throw new ProtocolException(e); - } - } + String requestorDn = getRequestorDN(socket); // unmarshall message final ProtocolMessageUnmarshaller unmarshaller = protocolContext.createUnmarshaller(); @@ -186,19 +170,21 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi stopWatch.stop(); logger.info("Finished processing request {} (type={}, length={} bytes) in {} millis", requestId, request.getType(), receivedMessage.length, stopWatch.getDuration(TimeUnit.MILLISECONDS)); - } catch (final IOException e) { + } catch (final IOException | ProtocolException e) { logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e); if (bulletinRepository != null) { final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString())); bulletinRepository.addBulletin(bulletin); } - } catch (final ProtocolException e) { - logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e); - if (bulletinRepository != null) { - final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString())); - bulletinRepository.addBulletin(bulletin); - } + } + } + + private String getRequestorDN(Socket socket) { + try { + return CertificateUtils.extractClientDNFromSSLSocket(socket); + } catch (CertificateException e) { + throw new ProtocolException(e); } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 96ecdf5da5..cd87101e0c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -80,6 +80,10 @@ language governing permissions and limitations under the License. --> org.bouncycastle bcpg-jdk15on + + org.bouncycastle + bcpkix-jdk15on + commons-codec commons-codec diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java index 1be8dd941c..17b6550ef5 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/HandleHttpRequest.java @@ -22,6 +22,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URLDecoder; import java.security.Principal; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; @@ -36,14 +37,11 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; - -import javax.security.cert.X509Certificate; import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -75,7 +73,6 @@ import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.ssl.SslContextFactory; - import com.sun.jersey.api.client.ClientResponse.Status; @InputRequirement(Requirement.INPUT_FORBIDDEN) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java index 760c069a2c..230790abcf 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PostHTTP.java @@ -27,7 +27,9 @@ import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; +import java.security.cert.Certificate; import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -42,13 +44,10 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; - import javax.net.ssl.SSLContext; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSession; -import javax.security.cert.X509Certificate; import javax.servlet.http.HttpServletResponse; - import org.apache.commons.io.IOUtils; import org.apache.http.Header; import org.apache.http.HttpException; @@ -105,6 +104,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.CertificateUtils; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.BufferedOutputStream; @@ -119,9 +119,8 @@ import org.apache.nifi.util.FlowFilePackagerV3; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.ObjectHolder; import org.apache.nifi.util.StopWatch; - -import com.sun.jersey.api.client.ClientResponse.Status; import org.apache.nifi.util.StringUtils; +import com.sun.jersey.api.client.ClientResponse.Status; @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @@ -250,14 +249,14 @@ public class PostHTTP extends AbstractProcessor { .addValidator(StandardValidators.PORT_VALIDATOR) .build(); public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder() - .name("Content-Type") - .description("The Content-Type to specify for the content of the FlowFile being POSTed if " + SEND_AS_FLOWFILE.getName() + " is false. " - + "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to " + DEFAULT_CONTENT_TYPE) - .required(true) - .expressionLanguageSupported(true) - .defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Content-Type") + .description("The Content-Type to specify for the content of the FlowFile being POSTed if " + SEND_AS_FLOWFILE.getName() + " is false. " + + "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to " + DEFAULT_CONTENT_TYPE) + .required(true) + .expressionLanguageSupported(true) + .defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -334,9 +333,9 @@ public class PostHTTP extends AbstractProcessor { int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger(); boolean chunkedSet = context.getProperty(CHUNKED_ENCODING).isSet(); - if(compressionLevel == 0 && !sendAsFlowFile && !chunkedSet) { + if (compressionLevel == 0 && !sendAsFlowFile && !chunkedSet) { results.add(new ValidationResult.Builder().valid(false).subject(CHUNKED_ENCODING.getName()) - .explanation("if compression level is 0 and not sending as a FlowFile, then the \'"+CHUNKED_ENCODING.getName()+"\' property must be set").build()); + .explanation("if compression level is 0 and not sending as a FlowFile, then the \'" + CHUNKED_ENCODING.getName() + "\' property must be set").build()); } return results; @@ -504,13 +503,19 @@ public class PostHTTP extends AbstractProcessor { final SSLSession sslSession = conn.getSSLSession(); if (sslSession != null) { - final X509Certificate[] certChain = sslSession.getPeerCertificateChain(); + final Certificate[] certChain = sslSession.getPeerCertificates(); if (certChain == null || certChain.length == 0) { throw new SSLPeerUnverifiedException("No certificates found"); } - final X509Certificate cert = certChain[0]; - dnHolder.set(cert.getSubjectDN().getName().trim()); + try { + final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]); + dnHolder.set(cert.getSubjectDN().getName().trim()); + } catch (CertificateException e) { + final String msg = "Could not extract subject DN from SSL session peer certificate"; + logger.warn(msg); + throw new SSLPeerUnverifiedException(msg); + } } } }); @@ -637,7 +642,7 @@ public class PostHTTP extends AbstractProcessor { @Override public long getContentLength() { - if(compressionLevel == 0 && !sendAsFlowFile && !context.getProperty(CHUNKED_ENCODING).asBoolean() ) { + if (compressionLevel == 0 && !sendAsFlowFile && !context.getProperty(CHUNKED_ENCODING).asBoolean()) { return toSend.get(0).getSize(); } else { return -1; @@ -645,7 +650,7 @@ public class PostHTTP extends AbstractProcessor { } }; - if(context.getProperty(CHUNKED_ENCODING).isSet()) { + if (context.getProperty(CHUNKED_ENCODING).isSet()) { entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean()); } post.setEntity(entity); @@ -765,7 +770,7 @@ public class PostHTTP extends AbstractProcessor { for (FlowFile flowFile : toSend) { flowFile = session.penalize(flowFile); logger.error("Failed to Post {} to {}: response code was {}:{}; will yield processing, " - + "since the destination is temporarily unavailable", + + "since the destination is temporarily unavailable", new Object[]{flowFile, url, responseCode, responseReason}); session.transfer(flowFile, REL_FAILURE); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/security/util/CertificateUtilsTest.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/security/util/CertificateUtilsTest.groovy new file mode 100644 index 0000000000..2be2e16373 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/security/util/CertificateUtilsTest.groovy @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.security.util + +import org.bouncycastle.asn1.x500.X500Name +import org.bouncycastle.asn1.x509.ExtendedKeyUsage +import org.bouncycastle.asn1.x509.KeyPurposeId +import org.bouncycastle.asn1.x509.KeyUsage +import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo +import org.bouncycastle.asn1.x509.X509Extension +import org.bouncycastle.cert.X509CertificateHolder +import org.bouncycastle.cert.X509v3CertificateBuilder +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter +import org.bouncycastle.jce.provider.BouncyCastleProvider +import org.bouncycastle.operator.ContentSigner +import org.bouncycastle.operator.OperatorCreationException +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.security.InvalidKeyException +import java.security.KeyPair +import java.security.KeyPairGenerator +import java.security.NoSuchAlgorithmException +import java.security.NoSuchProviderException +import java.security.PrivateKey +import java.security.PublicKey +import java.security.Security +import java.security.SignatureException +import java.security.cert.Certificate +import java.security.cert.CertificateException +import java.security.cert.X509Certificate + +@RunWith(JUnit4.class) +class CertificateUtilsTest extends GroovyTestCase { + private static final Logger logger = LoggerFactory.getLogger(CertificateUtilsTest.class); + + private static final int KEY_SIZE = 2048; + + private static final long YESTERDAY = System.currentTimeMillis() - 24 * 60 * 60 * 1000; + private static final long ONE_YEAR_FROM_NOW = System.currentTimeMillis() + 365 * 24 * 60 * 60 * 1000; + private static final String SIGNATURE_ALGORITHM = "SHA256withRSA"; + private static final String PROVIDER = "BC"; + + private static final String SUBJECT_DN = "CN=NiFi Test Server,OU=Security,O=Apache,ST=CA,C=US"; + private static final String ISSUER_DN = "CN=NiFi Test CA,OU=Security,O=Apache,ST=CA,C=US"; + + @BeforeClass + static void setUpOnce() { + Security.addProvider(new BouncyCastleProvider()) + + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + void setUp() { + super.setUp() + + } + + @After + void tearDown() { + + } + + /** + * Generates a public/private RSA keypair using the default key size. + * + * @return the keypair + * @throws java.security.NoSuchAlgorithmException if the RSA algorithm is not available + */ + private static KeyPair generateKeyPair() throws NoSuchAlgorithmException { + KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA"); + keyPairGenerator.initialize(KEY_SIZE); + return keyPairGenerator.generateKeyPair(); + } + + /** + * Generates a signed certificate using an on-demand keypair. + * + * @param dn the DN + * @return the certificate + * @throws IOException + * @throws NoSuchAlgorithmException + * @throws java.security.cert.CertificateException + * @throws java.security.NoSuchProviderException + * @throws java.security.SignatureException + * @throws java.security.InvalidKeyException + * @throws OperatorCreationException + */ + private + static X509Certificate generateCertificate(String dn) throws IOException, NoSuchAlgorithmException, CertificateException, NoSuchProviderException, SignatureException, InvalidKeyException, OperatorCreationException { + KeyPair keyPair = generateKeyPair(); + return generateCertificate(dn, keyPair); + } + + /** + * Generates a signed certificate with a specific keypair. + * + * @param dn the DN + * @param keyPair the public key will be included in the certificate and the the private key is used to sign the certificate + * @return the certificate + * @throws IOException + * @throws NoSuchAlgorithmException + * @throws CertificateException + * @throws NoSuchProviderException + * @throws SignatureException + * @throws InvalidKeyException + * @throws OperatorCreationException + */ + private + static X509Certificate generateCertificate(String dn, KeyPair keyPair) throws IOException, NoSuchAlgorithmException, CertificateException, NoSuchProviderException, SignatureException, InvalidKeyException, OperatorCreationException { + PrivateKey privateKey = keyPair.getPrivate(); + ContentSigner sigGen = new JcaContentSignerBuilder(SIGNATURE_ALGORITHM).setProvider(PROVIDER).build(privateKey); + SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded()); + Date startDate = new Date(YESTERDAY); + Date endDate = new Date(ONE_YEAR_FROM_NOW); + + X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder( + new X500Name(dn), + BigInteger.valueOf(System.currentTimeMillis()), + startDate, endDate, + new X500Name(dn), + subPubKeyInfo); + + // Set certificate extensions + // (1) digitalSignature extension + certBuilder.addExtension(X509Extension.keyUsage, true, + new KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyEncipherment | KeyUsage.dataEncipherment | KeyUsage.keyAgreement)); + + // (2) extendedKeyUsage extension + Vector ekUsages = new Vector<>(); + ekUsages.add(KeyPurposeId.id_kp_clientAuth); + ekUsages.add(KeyPurposeId.id_kp_serverAuth); + certBuilder.addExtension(X509Extension.extendedKeyUsage, false, new ExtendedKeyUsage(ekUsages)); + + // Sign the certificate + X509CertificateHolder certificateHolder = certBuilder.build(sigGen); + return new JcaX509CertificateConverter().setProvider(PROVIDER) + .getCertificate(certificateHolder); + } + + /** + * Generates a certificate signed by the issuer key. + * + * @param dn the subject DN + * @param issuerDn the issuer DN + * @param issuerKey the issuer private key + * @return the certificate + * @throws IOException + * @throws NoSuchAlgorithmException + * @throws CertificateException + * @throws NoSuchProviderException + * @throws SignatureException + * @throws InvalidKeyException + * @throws OperatorCreationException + */ + private + static X509Certificate generateIssuedCertificate(String dn, String issuerDn, PrivateKey issuerKey) throws IOException, NoSuchAlgorithmException, CertificateException, NoSuchProviderException, SignatureException, InvalidKeyException, OperatorCreationException { + KeyPair keyPair = generateKeyPair(); + return generateIssuedCertificate(dn, keyPair.getPublic(), issuerDn, issuerKey); + } + + /** + * Generates a certificate with a specific public key signed by the issuer key. + * + * @param dn the subject DN + * @param publicKey the subject public key + * @param issuerDn the issuer DN + * @param issuerKey the issuer private key + * @return the certificate + * @throws IOException + * @throws NoSuchAlgorithmException + * @throws CertificateException + * @throws NoSuchProviderException + * @throws SignatureException + * @throws InvalidKeyException + * @throws OperatorCreationException + */ + private + static X509Certificate generateIssuedCertificate(String dn, PublicKey publicKey, String issuerDn, PrivateKey issuerKey) throws IOException, NoSuchAlgorithmException, CertificateException, NoSuchProviderException, SignatureException, InvalidKeyException, OperatorCreationException { + ContentSigner sigGen = new JcaContentSignerBuilder(SIGNATURE_ALGORITHM).setProvider(PROVIDER).build(issuerKey); + SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(publicKey.getEncoded()); + Date startDate = new Date(YESTERDAY); + Date endDate = new Date(ONE_YEAR_FROM_NOW); + + X509v3CertificateBuilder v3CertGen = new X509v3CertificateBuilder( + new X500Name(issuerDn), + BigInteger.valueOf(System.currentTimeMillis()), + startDate, endDate, + new X500Name(dn), + subPubKeyInfo); + + X509CertificateHolder certificateHolder = v3CertGen.build(sigGen); + return new JcaX509CertificateConverter().setProvider(PROVIDER) + .getCertificate(certificateHolder); + } + + private static X509Certificate[] generateCertificateChain(String dn = SUBJECT_DN, String issuerDn = ISSUER_DN) { + final KeyPair issuerKeyPair = generateKeyPair(); + final PrivateKey issuerPrivateKey = issuerKeyPair.getPrivate(); + + final X509Certificate issuerCertificate = generateCertificate(issuerDn, issuerKeyPair); + final X509Certificate certificate = generateIssuedCertificate(dn, issuerDn, issuerPrivateKey); + [certificate, issuerCertificate] as X509Certificate[] + } + + private static javax.security.cert.X509Certificate generateLegacyCertificate(X509Certificate x509Certificate) { + return javax.security.cert.X509Certificate.getInstance(x509Certificate.getEncoded()) + } + + private static Certificate generateAbstractCertificate(X509Certificate x509Certificate) { + return x509Certificate as Certificate + } + + @Test + void testShouldConvertLegacyX509Certificate() { + // Arrange + final X509Certificate EXPECTED_NEW_CERTIFICATE = generateCertificate(SUBJECT_DN) + logger.info("Expected certificate: ${EXPECTED_NEW_CERTIFICATE.class.canonicalName} ${EXPECTED_NEW_CERTIFICATE.subjectDN.toString()} (${EXPECTED_NEW_CERTIFICATE.getSerialNumber()})") + + // Form the legacy certificate + final javax.security.cert.X509Certificate LEGACY_CERTIFICATE = generateLegacyCertificate(EXPECTED_NEW_CERTIFICATE) + logger.info("Legacy certificate: ${LEGACY_CERTIFICATE.class.canonicalName} ${LEGACY_CERTIFICATE.subjectDN.toString()} (${LEGACY_CERTIFICATE.getSerialNumber()})") + + // Act + X509Certificate convertedCertificate = CertificateUtils.convertLegacyX509Certificate(LEGACY_CERTIFICATE) + logger.info("Converted certificate: ${convertedCertificate.class.canonicalName} ${convertedCertificate.subjectDN.toString()} (${convertedCertificate.getSerialNumber()})") + + // Assert + assert convertedCertificate instanceof X509Certificate + assert convertedCertificate == EXPECTED_NEW_CERTIFICATE + } + + @Test + void testShouldConvertAbstractX509Certificate() { + // Arrange + final X509Certificate EXPECTED_NEW_CERTIFICATE = generateCertificate(SUBJECT_DN) + logger.info("Expected certificate: ${EXPECTED_NEW_CERTIFICATE.class.canonicalName} ${EXPECTED_NEW_CERTIFICATE.subjectDN.toString()} (${EXPECTED_NEW_CERTIFICATE.getSerialNumber()})") + + // Form the abstract certificate + final Certificate ABSTRACT_CERTIFICATE = generateAbstractCertificate(EXPECTED_NEW_CERTIFICATE) + logger.info("Abstract certificate: ${ABSTRACT_CERTIFICATE.class.canonicalName} (?)") + + // Act + X509Certificate convertedCertificate = CertificateUtils.convertAbstractX509Certificate(ABSTRACT_CERTIFICATE) + logger.info("Converted certificate: ${convertedCertificate.class.canonicalName} ${convertedCertificate.subjectDN.toString()} (${convertedCertificate.getSerialNumber()})") + + // Assert + assert convertedCertificate instanceof X509Certificate + assert convertedCertificate == EXPECTED_NEW_CERTIFICATE + } +}