NIFI-1753 Replaced usage of javax.security.cert.X509Certificate with java.security.cert.X509Certificate and resolved user-reported ClassCastException when handling client certificates during TLS mutual authentication.

Fixed nifi-utils pom.xml comment about additional dependencies. (+5 squashed commits)
Squashed commits:
[965b766] NIFI-1753 Removed temporary work-around of duplicate certificate conversion util method and added nifi-security-utils as dependency of nifi-utils.
[cd35f9b] NIFI-1753 Replaced legacy X.509 certificate declarations with new declarations in SSLSocketChannel and EndpointConnectionPool.
Temporary work-around of duplicate certificate conversion util method because nifi-utils cannot depend on nifi-security-utils.
[6420897] NIFI-1753 Replaced legacy X.509 certificate declarations with new declarations in PostHTTP.
[b9868ef] NIFI-1753 Added convenience method for extracting DN from peer certificate chain in SSL socket (canonical implementation to reduce code duplication and references to legacy certificate implementations).
Refactored logic retrieving legacy X.509 certificates with reference to convenience method in NodeProtocolSenderImpl.
Replaced logic retrieving legacy X.509 certificates with reference to convenience method in SocketProtocolListener.
Cleaned up exception handling in SocketProtocolListener.
Replaced legacy X.509 certificate declarations with new declarations in HandleHttpRequest (needs manual test).
[e2d1c35] NIFI-1753 Added convenience methods for converting legacy X.509 certificates and abstract certificates to correct X.509 format.
Added unit tests for certificate manipulation.
Replaced logic retrieving legacy X.509 certificates with new logic in NodeProtocolSenderImpl.
Added bcpkix (Bouncy Castle PKI implementation) dependency to nifi-standard-processors pom.

This closes #346.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Andy LoPresto 2016-04-11 20:11:45 -07:00
parent dfa27263d2
commit 378ccf53c2
No known key found for this signature in database
GPG Key ID: 3C6EF65B2F7DEF69
10 changed files with 435 additions and 100 deletions

View File

@ -17,14 +17,21 @@
package org.apache.nifi.security.util;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.URL;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.CertificateParsingException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSocket;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,9 +43,9 @@ public final class CertificateUtils {
/**
* Returns true if the given keystore can be loaded using the given keystore type and password. Returns false otherwise.
*
* @param keystore the keystore to validate
* @param keystore the keystore to validate
* @param keystoreType the type of the keystore
* @param password the password to access the keystore
* @param password the password to access the keystore
* @return true if valid; false otherwise
*/
public static boolean isStoreValid(final URL keystore, final KeystoreType keystoreType, final char[] password) {
@ -137,6 +144,75 @@ public final class CertificateUtils {
return result;
}
public static String extractClientDNFromSSLSocket(Socket socket) throws CertificateException {
String dn = null;
if (socket instanceof SSLSocket) {
final SSLSocket sslSocket = (SSLSocket) socket;
try {
final Certificate[] certChains = sslSocket.getSession().getPeerCertificates();
if (certChains != null && certChains.length > 0) {
X509Certificate x509Certificate = convertAbstractX509Certificate(certChains[0]);
dn = x509Certificate.getSubjectDN().getName().trim();
}
} catch (SSLPeerUnverifiedException e) {
throw new CertificateException(e);
}
}
return dn;
}
/**
* Accepts a legacy {@link javax.security.cert.X509Certificate} and returns an {@link X509Certificate}. The {@code javax.*} package certificate classes are for legacy compatibility and should
* not be used for new development.
*
* @param legacyCertificate the {@code javax.security.cert.X509Certificate}
* @return a new {@code java.security.cert.X509Certificate}
* @throws CertificateException if there is an error generating the new certificate
*/
public static X509Certificate convertLegacyX509Certificate(javax.security.cert.X509Certificate legacyCertificate) throws CertificateException {
if (legacyCertificate == null) {
throw new IllegalArgumentException("The X.509 certificate cannot be null");
}
try {
return formX509Certificate(legacyCertificate.getEncoded());
} catch (javax.security.cert.CertificateEncodingException e) {
throw new CertificateException(e);
}
}
/**
* Accepts an abstract {@link java.security.cert.Certificate} and returns an {@link X509Certificate}. Because {@code sslSocket.getSession().getPeerCertificates()} returns an array of the
* abstract certificates, they must be translated to X.509 to replace the functionality of {@code sslSocket.getSession().getPeerCertificateChain()}.
*
* @param abstractCertificate the {@code java.security.cert.Certificate}
* @return a new {@code java.security.cert.X509Certificate}
* @throws CertificateException if there is an error generating the new certificate
*/
public static X509Certificate convertAbstractX509Certificate(java.security.cert.Certificate abstractCertificate) throws CertificateException {
if (abstractCertificate == null || !(abstractCertificate instanceof X509Certificate)) {
throw new IllegalArgumentException("The certificate cannot be null and must be an X.509 certificate");
}
try {
return formX509Certificate(abstractCertificate.getEncoded());
} catch (java.security.cert.CertificateEncodingException e) {
throw new CertificateException(e);
}
}
private static X509Certificate formX509Certificate(byte[] encodedCertificate) throws CertificateException {
try {
CertificateFactory cf = CertificateFactory.getInstance("X.509");
ByteArrayInputStream bais = new ByteArrayInputStream(encodedCertificate);
return (X509Certificate) cf.generateCertificate(bais);
} catch (CertificateException e) {
logger.error("Error converting the certificate", e);
throw e;
}
}
private CertificateUtils() {
}
}

View File

@ -31,6 +31,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -54,11 +55,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import javax.security.cert.CertificateExpiredException;
import javax.security.cert.CertificateNotYetValidException;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
@ -72,7 +69,6 @@ import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.PortNotRunningException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.exception.UnknownPortException;
import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession;
@ -236,12 +232,12 @@ public class EndpointConnectionPool {
}
}
public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException {
return getEndpointConnection(direction, null);
}
public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config)
throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException {
throws IOException {
//
// Attempt to get a connection state that already exists for this URL.
//
@ -532,10 +528,10 @@ public class EndpointConnectionPool {
private boolean isPenalized(final PeerStatus peerStatus) {
final Long expirationEnd = peerTimeoutExpirations.get(peerStatus.getPeerDescription());
return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis());
return (expirationEnd != null && expirationEnd > System.currentTimeMillis());
}
private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException {
Set<PeerStatus> statuses = getPeerStatuses();
if (statuses == null) {
refreshPeers();
@ -576,7 +572,7 @@ public class EndpointConnectionPool {
return cache.getStatuses();
}
private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException, HandshakeException, UnknownPortException, PortNotRunningException {
private Set<PeerStatus> fetchRemotePeerStatuses() throws IOException {
final String hostname = clusterUrl.getHost();
final Integer port = getSiteToSitePort();
if (port == null) {
@ -704,7 +700,7 @@ public class EndpointConnectionPool {
try {
commsSession.setUserDn(socketChannel.getDn());
} catch (final CertificateNotYetValidException | CertificateExpiredException ex) {
} catch (final CertificateException ex) {
throw new IOException(ex);
}
} else {
@ -801,7 +797,7 @@ public class EndpointConnectionPool {
connection.getSocketClientProtocol().shutdown(connection.getPeer());
} catch (final Exception e) {
logger.debug("Failed to shut down {} using {} due to {}",
new Object[]{connection.getSocketClientProtocol(), connection.getPeer(), e});
connection.getSocketClientProtocol(), connection.getPeer(), e);
}
terminate(connection);

View File

@ -24,7 +24,12 @@
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<!--
This project intentionally has no additional dependencies beyond that pulled in by the parent. It is a general purpose utility library
and should keep its surface/tension minimal.
This project intentionally minimizes dependencies beyond that pulled in by the parent. It is a general purpose utility library and should keep its surface/tension minimal.
-->
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -25,22 +25,20 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SocketChannel;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.CertificateExpiredException;
import javax.security.cert.CertificateNotYetValidException;
import javax.security.cert.X509Certificate;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.io.socket.BufferStateManager;
import org.apache.nifi.remote.io.socket.BufferStateManager.Direction;
import org.apache.nifi.security.util.CertificateUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -133,7 +131,7 @@ public class SSLSocketChannel implements Closeable {
return timeoutMillis;
}
public void connect() throws SSLHandshakeException, IOException {
public void connect() throws IOException {
try {
channel.configureBlocking(false);
if (!channel.isConnected()) {
@ -177,13 +175,13 @@ public class SSLSocketChannel implements Closeable {
}
}
public String getDn() throws CertificateExpiredException, CertificateNotYetValidException, SSLPeerUnverifiedException {
final X509Certificate[] certs = engine.getSession().getPeerCertificateChain();
public String getDn() throws CertificateException, SSLPeerUnverifiedException {
final Certificate[] certs = engine.getSession().getPeerCertificates();
if (certs == null || certs.length == 0) {
throw new SSLPeerUnverifiedException("No certificates found");
}
final X509Certificate cert = certs[0];
final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certs[0]);
cert.checkValidity();
return cert.getSubjectDN().getName().trim();
}
@ -230,7 +228,7 @@ public class SSLSocketChannel implements Closeable {
final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
// Read handshake response from other side
logger.trace("{} Unwrapping: {} to {}", new Object[]{this, readableDataIn, appData});
logger.trace("{} Unwrapping: {} to {}", this, readableDataIn, appData);
SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData);
logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult);
@ -402,7 +400,7 @@ public class SSLSocketChannel implements Closeable {
final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
try {
SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", this, handshaking, unwrapResponse);
if (unwrapResponse.getStatus().equals(Status.CLOSED)) {
// Drain the incoming TCP buffer
final ByteBuffer discardBuffer = ByteBuffer.allocate(8192);
@ -486,7 +484,7 @@ public class SSLSocketChannel implements Closeable {
final int bytesCopied = appDataRemaining - appDataBuffer.remaining();
logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space",
new Object[]{this, bytesToCopy, bytesCopied});
this, bytesToCopy, bytesCopied);
return bytesCopied;
}
return 0;
@ -555,7 +553,7 @@ public class SSLSocketChannel implements Closeable {
SSLEngineResult unwrapResponse = null;
final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize());
unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer);
logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse});
logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", this, handshaking, unwrapResponse);
switch (unwrapResponse.getStatus()) {
case BUFFER_OVERFLOW:

View File

@ -18,10 +18,7 @@ package org.apache.nifi.cluster.protocol.impl;
import java.io.IOException;
import java.net.Socket;
import javax.net.ssl.SSLSocket;
import javax.security.cert.X509Certificate;
import java.security.cert.CertificateException;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
@ -38,6 +35,7 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.security.util.CertificateUtils;
public class NodeProtocolSenderImpl implements NodeProtocolSender {
@ -46,7 +44,7 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
private final ProtocolContext<ProtocolMessage> protocolContext;
public NodeProtocolSenderImpl(final ClusterServiceLocator clusterManagerProtocolServiceLocator,
final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
if (clusterManagerProtocolServiceLocator == null) {
throw new IllegalArgumentException("Protocol Service Locator may not be null.");
} else if (socketConfiguration == null) {
@ -66,20 +64,7 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
try {
socket = createSocket();
String ncmDn = null;
if (socket instanceof SSLSocket) {
final SSLSocket sslSocket = (SSLSocket) socket;
try {
final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain();
if (certChains != null && certChains.length > 0) {
ncmDn = certChains[0].getSubjectDN().getName();
}
} catch (final ProtocolException pe) {
throw pe;
} catch (final Exception e) {
throw new ProtocolException(e);
}
}
String ncmDn = getNCMDN(socket);
try {
// marshal message to output stream
@ -110,6 +95,14 @@ public class NodeProtocolSenderImpl implements NodeProtocolSender {
}
}
private String getNCMDN(Socket socket) {
try {
return CertificateUtils.extractClientDNFromSSLSocket(socket);
} catch (CertificateException e) {
throw new ProtocolException(e);
}
}
@Override
public void heartbeat(final HeartbeatMessage msg) throws ProtocolException, UnknownServiceAddressException {
sendProtocolMessage(msg);

View File

@ -19,15 +19,12 @@ package org.apache.nifi.cluster.protocol.impl;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.security.cert.CertificateException;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLSocket;
import javax.security.cert.X509Certificate;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
@ -41,8 +38,8 @@ import org.apache.nifi.io.socket.SocketListener;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -127,20 +124,7 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
final String requestId = UUID.randomUUID().toString();
logger.info("Received request {} from {}", requestId, hostname);
String requestorDn = null;
if (socket instanceof SSLSocket) {
final SSLSocket sslSocket = (SSLSocket) socket;
try {
final X509Certificate[] certChains = sslSocket.getSession().getPeerCertificateChain();
if (certChains != null && certChains.length > 0) {
requestorDn = certChains[0].getSubjectDN().getName();
}
} catch (final ProtocolException pe) {
throw pe;
} catch (final Exception e) {
throw new ProtocolException(e);
}
}
String requestorDn = getRequestorDN(socket);
// unmarshall message
final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
@ -186,19 +170,21 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi
stopWatch.stop();
logger.info("Finished processing request {} (type={}, length={} bytes) in {} millis", requestId, request.getType(), receivedMessage.length, stopWatch.getDuration(TimeUnit.MILLISECONDS));
} catch (final IOException e) {
} catch (final IOException | ProtocolException e) {
logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);
if (bulletinRepository != null) {
final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString()));
bulletinRepository.addBulletin(bulletin);
}
} catch (final ProtocolException e) {
logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);
if (bulletinRepository != null) {
final Bulletin bulletin = BulletinFactory.createBulletin("Clustering", "WARNING", String.format("Failed to process protocol message from %s due to: %s", hostname, e.toString()));
bulletinRepository.addBulletin(bulletin);
}
}
}
private String getRequestorDN(Socket socket) {
try {
return CertificateUtils.extractClientDNFromSSLSocket(socket);
} catch (CertificateException e) {
throw new ProtocolException(e);
}
}
}

View File

@ -80,6 +80,10 @@ language governing permissions and limitations under the License. -->
<groupId>org.bouncycastle</groupId>
<artifactId>bcpg-jdk15on</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>

View File

@ -22,6 +22,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.security.Principal;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
@ -36,14 +37,11 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.security.cert.X509Certificate;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -75,7 +73,6 @@ import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import com.sun.jersey.api.client.ClientResponse.Status;
@InputRequirement(Requirement.INPUT_FORBIDDEN)

View File

@ -27,7 +27,9 @@ import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -42,13 +44,10 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.cert.X509Certificate;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpException;
@ -105,6 +104,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
@ -119,9 +119,8 @@ import org.apache.nifi.util.FlowFilePackagerV3;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.StopWatch;
import com.sun.jersey.api.client.ClientResponse.Status;
import org.apache.nifi.util.StringUtils;
import com.sun.jersey.api.client.ClientResponse.Status;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -250,14 +249,14 @@ public class PostHTTP extends AbstractProcessor {
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content-Type")
.description("The Content-Type to specify for the content of the FlowFile being POSTed if " + SEND_AS_FLOWFILE.getName() + " is false. "
+ "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to " + DEFAULT_CONTENT_TYPE)
.required(true)
.expressionLanguageSupported(true)
.defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Content-Type")
.description("The Content-Type to specify for the content of the FlowFile being POSTed if " + SEND_AS_FLOWFILE.getName() + " is false. "
+ "In the case of an empty value after evaluating an expression language expression, Content-Type defaults to " + DEFAULT_CONTENT_TYPE)
.required(true)
.expressionLanguageSupported(true)
.defaultValue("${" + CoreAttributes.MIME_TYPE.key() + "}")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -334,9 +333,9 @@ public class PostHTTP extends AbstractProcessor {
int compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
boolean chunkedSet = context.getProperty(CHUNKED_ENCODING).isSet();
if(compressionLevel == 0 && !sendAsFlowFile && !chunkedSet) {
if (compressionLevel == 0 && !sendAsFlowFile && !chunkedSet) {
results.add(new ValidationResult.Builder().valid(false).subject(CHUNKED_ENCODING.getName())
.explanation("if compression level is 0 and not sending as a FlowFile, then the \'"+CHUNKED_ENCODING.getName()+"\' property must be set").build());
.explanation("if compression level is 0 and not sending as a FlowFile, then the \'" + CHUNKED_ENCODING.getName() + "\' property must be set").build());
}
return results;
@ -504,13 +503,19 @@ public class PostHTTP extends AbstractProcessor {
final SSLSession sslSession = conn.getSSLSession();
if (sslSession != null) {
final X509Certificate[] certChain = sslSession.getPeerCertificateChain();
final Certificate[] certChain = sslSession.getPeerCertificates();
if (certChain == null || certChain.length == 0) {
throw new SSLPeerUnverifiedException("No certificates found");
}
final X509Certificate cert = certChain[0];
dnHolder.set(cert.getSubjectDN().getName().trim());
try {
final X509Certificate cert = CertificateUtils.convertAbstractX509Certificate(certChain[0]);
dnHolder.set(cert.getSubjectDN().getName().trim());
} catch (CertificateException e) {
final String msg = "Could not extract subject DN from SSL session peer certificate";
logger.warn(msg);
throw new SSLPeerUnverifiedException(msg);
}
}
}
});
@ -637,7 +642,7 @@ public class PostHTTP extends AbstractProcessor {
@Override
public long getContentLength() {
if(compressionLevel == 0 && !sendAsFlowFile && !context.getProperty(CHUNKED_ENCODING).asBoolean() ) {
if (compressionLevel == 0 && !sendAsFlowFile && !context.getProperty(CHUNKED_ENCODING).asBoolean()) {
return toSend.get(0).getSize();
} else {
return -1;
@ -645,7 +650,7 @@ public class PostHTTP extends AbstractProcessor {
}
};
if(context.getProperty(CHUNKED_ENCODING).isSet()) {
if (context.getProperty(CHUNKED_ENCODING).isSet()) {
entity.setChunked(context.getProperty(CHUNKED_ENCODING).asBoolean());
}
post.setEntity(entity);
@ -765,7 +770,7 @@ public class PostHTTP extends AbstractProcessor {
for (FlowFile flowFile : toSend) {
flowFile = session.penalize(flowFile);
logger.error("Failed to Post {} to {}: response code was {}:{}; will yield processing, "
+ "since the destination is temporarily unavailable",
+ "since the destination is temporarily unavailable",
new Object[]{flowFile, url, responseCode, responseReason});
session.transfer(flowFile, REL_FAILURE);
}

View File

@ -0,0 +1,275 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.security.util
import org.bouncycastle.asn1.x500.X500Name
import org.bouncycastle.asn1.x509.ExtendedKeyUsage
import org.bouncycastle.asn1.x509.KeyPurposeId
import org.bouncycastle.asn1.x509.KeyUsage
import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo
import org.bouncycastle.asn1.x509.X509Extension
import org.bouncycastle.cert.X509CertificateHolder
import org.bouncycastle.cert.X509v3CertificateBuilder
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter
import org.bouncycastle.jce.provider.BouncyCastleProvider
import org.bouncycastle.operator.ContentSigner
import org.bouncycastle.operator.OperatorCreationException
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.security.InvalidKeyException
import java.security.KeyPair
import java.security.KeyPairGenerator
import java.security.NoSuchAlgorithmException
import java.security.NoSuchProviderException
import java.security.PrivateKey
import java.security.PublicKey
import java.security.Security
import java.security.SignatureException
import java.security.cert.Certificate
import java.security.cert.CertificateException
import java.security.cert.X509Certificate
@RunWith(JUnit4.class)
class CertificateUtilsTest extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(CertificateUtilsTest.class);
private static final int KEY_SIZE = 2048;
private static final long YESTERDAY = System.currentTimeMillis() - 24 * 60 * 60 * 1000;
private static final long ONE_YEAR_FROM_NOW = System.currentTimeMillis() + 365 * 24 * 60 * 60 * 1000;
private static final String SIGNATURE_ALGORITHM = "SHA256withRSA";
private static final String PROVIDER = "BC";
private static final String SUBJECT_DN = "CN=NiFi Test Server,OU=Security,O=Apache,ST=CA,C=US";
private static final String ISSUER_DN = "CN=NiFi Test CA,OU=Security,O=Apache,ST=CA,C=US";
@BeforeClass
static void setUpOnce() {
Security.addProvider(new BouncyCastleProvider())
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
void setUp() {
super.setUp()
}
@After
void tearDown() {
}
/**
* Generates a public/private RSA keypair using the default key size.
*
* @return the keypair
* @throws java.security.NoSuchAlgorithmException if the RSA algorithm is not available
*/
private static KeyPair generateKeyPair() throws NoSuchAlgorithmException {
KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
keyPairGenerator.initialize(KEY_SIZE);
return keyPairGenerator.generateKeyPair();
}
/**
* Generates a signed certificate using an on-demand keypair.
*
* @param dn the DN
* @return the certificate
* @throws IOException
* @throws NoSuchAlgorithmException
* @throws java.security.cert.CertificateException
* @throws java.security.NoSuchProviderException
* @throws java.security.SignatureException
* @throws java.security.InvalidKeyException
* @throws OperatorCreationException
*/
private
static X509Certificate generateCertificate(String dn) throws IOException, NoSuchAlgorithmException, CertificateException, NoSuchProviderException, SignatureException, InvalidKeyException, OperatorCreationException {
KeyPair keyPair = generateKeyPair();
return generateCertificate(dn, keyPair);
}
/**
* Generates a signed certificate with a specific keypair.
*
* @param dn the DN
* @param keyPair the public key will be included in the certificate and the the private key is used to sign the certificate
* @return the certificate
* @throws IOException
* @throws NoSuchAlgorithmException
* @throws CertificateException
* @throws NoSuchProviderException
* @throws SignatureException
* @throws InvalidKeyException
* @throws OperatorCreationException
*/
private
static X509Certificate generateCertificate(String dn, KeyPair keyPair) throws IOException, NoSuchAlgorithmException, CertificateException, NoSuchProviderException, SignatureException, InvalidKeyException, OperatorCreationException {
PrivateKey privateKey = keyPair.getPrivate();
ContentSigner sigGen = new JcaContentSignerBuilder(SIGNATURE_ALGORITHM).setProvider(PROVIDER).build(privateKey);
SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded());
Date startDate = new Date(YESTERDAY);
Date endDate = new Date(ONE_YEAR_FROM_NOW);
X509v3CertificateBuilder certBuilder = new X509v3CertificateBuilder(
new X500Name(dn),
BigInteger.valueOf(System.currentTimeMillis()),
startDate, endDate,
new X500Name(dn),
subPubKeyInfo);
// Set certificate extensions
// (1) digitalSignature extension
certBuilder.addExtension(X509Extension.keyUsage, true,
new KeyUsage(KeyUsage.digitalSignature | KeyUsage.keyEncipherment | KeyUsage.dataEncipherment | KeyUsage.keyAgreement));
// (2) extendedKeyUsage extension
Vector<KeyPurposeId> ekUsages = new Vector<>();
ekUsages.add(KeyPurposeId.id_kp_clientAuth);
ekUsages.add(KeyPurposeId.id_kp_serverAuth);
certBuilder.addExtension(X509Extension.extendedKeyUsage, false, new ExtendedKeyUsage(ekUsages));
// Sign the certificate
X509CertificateHolder certificateHolder = certBuilder.build(sigGen);
return new JcaX509CertificateConverter().setProvider(PROVIDER)
.getCertificate(certificateHolder);
}
/**
* Generates a certificate signed by the issuer key.
*
* @param dn the subject DN
* @param issuerDn the issuer DN
* @param issuerKey the issuer private key
* @return the certificate
* @throws IOException
* @throws NoSuchAlgorithmException
* @throws CertificateException
* @throws NoSuchProviderException
* @throws SignatureException
* @throws InvalidKeyException
* @throws OperatorCreationException
*/
private
static X509Certificate generateIssuedCertificate(String dn, String issuerDn, PrivateKey issuerKey) throws IOException, NoSuchAlgorithmException, CertificateException, NoSuchProviderException, SignatureException, InvalidKeyException, OperatorCreationException {
KeyPair keyPair = generateKeyPair();
return generateIssuedCertificate(dn, keyPair.getPublic(), issuerDn, issuerKey);
}
/**
* Generates a certificate with a specific public key signed by the issuer key.
*
* @param dn the subject DN
* @param publicKey the subject public key
* @param issuerDn the issuer DN
* @param issuerKey the issuer private key
* @return the certificate
* @throws IOException
* @throws NoSuchAlgorithmException
* @throws CertificateException
* @throws NoSuchProviderException
* @throws SignatureException
* @throws InvalidKeyException
* @throws OperatorCreationException
*/
private
static X509Certificate generateIssuedCertificate(String dn, PublicKey publicKey, String issuerDn, PrivateKey issuerKey) throws IOException, NoSuchAlgorithmException, CertificateException, NoSuchProviderException, SignatureException, InvalidKeyException, OperatorCreationException {
ContentSigner sigGen = new JcaContentSignerBuilder(SIGNATURE_ALGORITHM).setProvider(PROVIDER).build(issuerKey);
SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(publicKey.getEncoded());
Date startDate = new Date(YESTERDAY);
Date endDate = new Date(ONE_YEAR_FROM_NOW);
X509v3CertificateBuilder v3CertGen = new X509v3CertificateBuilder(
new X500Name(issuerDn),
BigInteger.valueOf(System.currentTimeMillis()),
startDate, endDate,
new X500Name(dn),
subPubKeyInfo);
X509CertificateHolder certificateHolder = v3CertGen.build(sigGen);
return new JcaX509CertificateConverter().setProvider(PROVIDER)
.getCertificate(certificateHolder);
}
private static X509Certificate[] generateCertificateChain(String dn = SUBJECT_DN, String issuerDn = ISSUER_DN) {
final KeyPair issuerKeyPair = generateKeyPair();
final PrivateKey issuerPrivateKey = issuerKeyPair.getPrivate();
final X509Certificate issuerCertificate = generateCertificate(issuerDn, issuerKeyPair);
final X509Certificate certificate = generateIssuedCertificate(dn, issuerDn, issuerPrivateKey);
[certificate, issuerCertificate] as X509Certificate[]
}
private static javax.security.cert.X509Certificate generateLegacyCertificate(X509Certificate x509Certificate) {
return javax.security.cert.X509Certificate.getInstance(x509Certificate.getEncoded())
}
private static Certificate generateAbstractCertificate(X509Certificate x509Certificate) {
return x509Certificate as Certificate
}
@Test
void testShouldConvertLegacyX509Certificate() {
// Arrange
final X509Certificate EXPECTED_NEW_CERTIFICATE = generateCertificate(SUBJECT_DN)
logger.info("Expected certificate: ${EXPECTED_NEW_CERTIFICATE.class.canonicalName} ${EXPECTED_NEW_CERTIFICATE.subjectDN.toString()} (${EXPECTED_NEW_CERTIFICATE.getSerialNumber()})")
// Form the legacy certificate
final javax.security.cert.X509Certificate LEGACY_CERTIFICATE = generateLegacyCertificate(EXPECTED_NEW_CERTIFICATE)
logger.info("Legacy certificate: ${LEGACY_CERTIFICATE.class.canonicalName} ${LEGACY_CERTIFICATE.subjectDN.toString()} (${LEGACY_CERTIFICATE.getSerialNumber()})")
// Act
X509Certificate convertedCertificate = CertificateUtils.convertLegacyX509Certificate(LEGACY_CERTIFICATE)
logger.info("Converted certificate: ${convertedCertificate.class.canonicalName} ${convertedCertificate.subjectDN.toString()} (${convertedCertificate.getSerialNumber()})")
// Assert
assert convertedCertificate instanceof X509Certificate
assert convertedCertificate == EXPECTED_NEW_CERTIFICATE
}
@Test
void testShouldConvertAbstractX509Certificate() {
// Arrange
final X509Certificate EXPECTED_NEW_CERTIFICATE = generateCertificate(SUBJECT_DN)
logger.info("Expected certificate: ${EXPECTED_NEW_CERTIFICATE.class.canonicalName} ${EXPECTED_NEW_CERTIFICATE.subjectDN.toString()} (${EXPECTED_NEW_CERTIFICATE.getSerialNumber()})")
// Form the abstract certificate
final Certificate ABSTRACT_CERTIFICATE = generateAbstractCertificate(EXPECTED_NEW_CERTIFICATE)
logger.info("Abstract certificate: ${ABSTRACT_CERTIFICATE.class.canonicalName} (?)")
// Act
X509Certificate convertedCertificate = CertificateUtils.convertAbstractX509Certificate(ABSTRACT_CERTIFICATE)
logger.info("Converted certificate: ${convertedCertificate.class.canonicalName} ${convertedCertificate.subjectDN.toString()} (${convertedCertificate.getSerialNumber()})")
// Assert
assert convertedCertificate instanceof X509Certificate
assert convertedCertificate == EXPECTED_NEW_CERTIFICATE
}
}