NIFI-762: Allow user to set keystore and truststore properties instead of setting sslcontext

This commit is contained in:
Mark Payne 2015-07-13 14:17:42 -04:00
parent 7f6f404baa
commit 8bd20510ee
8 changed files with 311 additions and 14 deletions

View File

@ -16,12 +16,14 @@
*/
package org.apache.nifi.events;
import java.io.Serializable;
import org.apache.nifi.reporting.Severity;
/**
* Implementations MUST be thread-safe
*/
public interface EventReporter {
public interface EventReporter extends Serializable {
void reportEvent(Severity severity, String category, String message);
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.remote.client;
import java.io.Serializable;
public enum KeystoreType implements Serializable {
PKCS12,
JKS;
}

View File

@ -18,11 +18,17 @@ package org.apache.nifi.remote.client;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.security.KeyStore;
import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Transaction;
@ -143,6 +149,12 @@ public interface SiteToSiteClient extends Closeable {
private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
private SSLContext sslContext;
private String keystoreFilename;
private String keystorePass;
private KeystoreType keystoreType;
private String truststoreFilename;
private String truststorePass;
private KeystoreType truststoreType;
private EventReporter eventReporter;
private File peerPersistenceFile;
private boolean useCompression;
@ -164,6 +176,12 @@ public interface SiteToSiteClient extends Closeable {
this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS);
this.sslContext = config.getSslContext();
this.keystoreFilename = config.getKeystoreFilename();
this.keystorePass = config.getKeystorePassword();
this.keystoreType = config.getKeystoreType();
this.truststoreFilename = config.getTruststoreFilename();
this.truststorePass = config.getTruststorePassword();
this.truststoreType = config.getTruststoreType();
this.eventReporter = config.getEventReporter();
this.peerPersistenceFile = config.getPeerPersistenceFile();
this.useCompression = config.isUseCompression();
@ -240,7 +258,12 @@ public interface SiteToSiteClient extends Closeable {
* secure. The remote instance of NiFi always determines whether or not
* Site-to-Site communications are secure (i.e., the client will always
* use secure or non-secure communications, depending on what the server
* dictates).
* dictates). <b>Note:</b> The SSLContext provided by this method will be
* ignored if using a Serializable Configuration (see {@link #buildSerializableConfig()}).
* If a Serializable Configuration is required and communications are to be
* secure, the {@link #keystoreFilename(String)}, {@link #keystorePass(String)},
* {@link #keystoreType}, {@link #truststoreFilename}, {@link #truststorePass(String)},
* and {@link #truststoreType(KeystoreType)} methods must be used instead.
*
* @param sslContext the context
* @return the builder
@ -250,6 +273,131 @@ public interface SiteToSiteClient extends Closeable {
return this;
}
/**
* @return the filename to use for the Keystore in order to communicate securely
* with the remote instance of NiFi
*/
public String getKeystoreFilename() {
return keystoreFilename;
}
/**
* Sets the filename to use for the Keystore in order to communicate securely
* with the remote instance of NiFi
*
* @param keystoreFilename the filename to use for the Keystore in order to communicate securely
* with the remote instance of NiFi
* @return the builder
*/
public Builder keystoreFilename(final String keystoreFilename) {
this.keystoreFilename = keystoreFilename;
return this;
}
/**
* @return the password to use for the Keystore in order to communicate securely
* with the remote instance of NiFi
*/
public String getKeystorePass() {
return keystorePass;
}
/**
* Sets the password to use for the Keystore in order to communicate securely
* with the remote instance of NiFi
*
* @param keystorePass the password to use for the Keystore in order to communicate securely
* with the remote instance of NiFi
* @return the builder
*/
public Builder keystorePass(final String keystorePass) {
this.keystorePass = keystorePass;
return this;
}
/**
* @return the type of Keystore to use in order to communicate securely
* with the remote instance of NiFi
*/
public KeystoreType getKeystoreType() {
return keystoreType;
}
/**
* Sets the type of the Keystore to use in order to communicate securely
* with the remote instance of NiFi
*
* @param keystoreType the type of the Keystore to use in order to communicate securely
* with the remote instance of NiFi
* @return the builder
*/
public Builder keystoreType(final KeystoreType keystoreType) {
this.keystoreType = keystoreType;
return this;
}
/**
* @return the filename to use for the Truststore in order to communicate securely
* with the remote instance of NiFi
*/
public String getTruststoreFilename() {
return truststoreFilename;
}
/**
* Sets the filename to use for the Truststore in order to communicate securely
* with the remote instance of NiFi
*
* @param truststoreFilename the filename to use for the Truststore in order to communicate securely
* with the remote instance of NiFi
* @return the builder
*/
public Builder truststoreFilename(final String truststoreFilename) {
this.truststoreFilename = truststoreFilename;
return this;
}
/**
* @return the password to use for the Truststore in order to communicate securely
* with the remote instance of NiFi
*/
public String getTruststorePass() {
return truststorePass;
}
/**
* Sets the password to use for the Truststore in order to communicate securely
* with the remote instance of NiFi
*
* @param truststorePass the filename to use for the Truststore in order to communicate securely
* with the remote instance of NiFi
*/
public Builder truststorePass(final String truststorePass) {
this.truststorePass = truststorePass;
return this;
}
/**
* @return the type of the Truststore to use in order to communicate securely
* with the remote instance of NiFi
*/
public KeystoreType getTruststoreType() {
return truststoreType;
}
/**
* Sets the password type of the Truststore to use in order to communicate securely
* with the remote instance of NiFi
*
* @param truststoreType the type of the Truststore to use in order to communicate securely
* with the remote instance of NiFi
* @return the builder
*/
public Builder truststoreType(final KeystoreType truststoreType) {
this.truststoreType = truststoreType;
return this;
}
/**
* Provides an EventReporter that can be used by the client in order to
* report any events that could be of interest when communicating with
@ -365,8 +513,8 @@ public interface SiteToSiteClient extends Closeable {
* but does not create a SiteToSiteClient
*/
public SiteToSiteClientConfig buildConfig() {
final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
private static final long serialVersionUID = 1323119754841633818L;
return new SiteToSiteClientConfig() {
private static final long serialVersionUID = 1L;
@Override
public boolean isUseCompression() {
@ -420,29 +568,57 @@ public interface SiteToSiteClient extends Closeable {
@Override
public long getPreferredBatchDuration(final TimeUnit timeUnit) {
return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS);
return timeUnit.convert(batchNanos, TimeUnit.NANOSECONDS);
}
@Override
public long getPreferredBatchSize() {
return Builder.this.batchSize;
return batchSize;
}
@Override
public int getPreferredBatchCount() {
return Builder.this.batchCount;
return batchCount;
}
@Override
public String getKeystoreFilename() {
return keystoreFilename;
}
@Override
public String getKeystorePassword() {
return keystorePass;
}
@Override
public KeystoreType getKeystoreType() {
return keystoreType;
}
@Override
public String getTruststoreFilename() {
return truststoreFilename;
}
@Override
public String getTruststorePassword() {
return truststorePass;
}
@Override
public KeystoreType getTruststoreType() {
return truststoreType;
}
};
return config;
}
/**
* @return a new SiteToSiteClient that can be used to send and receive
* data with remote instances of NiFi
* data with remote instances of NiFi
*
* @throws IllegalStateException if either the url is not set or neither
* the port name nor port identifier is set.
* the port name nor port identifier is set.
*/
public SiteToSiteClient build() {
if (url == null) {
@ -450,7 +626,7 @@ public interface SiteToSiteClient extends Closeable {
}
if (portName == null && portIdentifier == null) {
throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
throw new IllegalStateException("Must specify either Port Name or Port Identifier to build Site-to-Site client");
}
return new SocketClient(buildConfig());
@ -493,7 +669,58 @@ public interface SiteToSiteClient extends Closeable {
* @return the SSL Context that is configured for this builder
*/
public SSLContext getSslContext() {
return sslContext;
if (sslContext != null) {
return sslContext;
}
final KeyManagerFactory keyManagerFactory;
if (keystoreFilename != null && keystorePass != null && keystoreType != null) {
try {
// prepare the keystore
final KeyStore keyStore = KeyStore.getInstance(getKeystoreType().name());
try (final InputStream keyStoreStream = new FileInputStream(new File(getKeystoreFilename()))) {
keyStore.load(keyStoreStream, getKeystorePass().toCharArray());
}
keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, getKeystorePass().toCharArray());
} catch (final Exception e) {
throw new RuntimeException("Failed to load Keystore", e);
}
} else {
keyManagerFactory = null;
}
final TrustManagerFactory trustManagerFactory;
if (truststoreFilename != null && truststorePass != null && truststoreType != null) {
try {
// prepare the truststore
final KeyStore trustStore = KeyStore.getInstance(getTruststoreType().name());
try (final InputStream trustStoreStream = new FileInputStream(new File(getTruststoreFilename()))) {
trustStore.load(trustStoreStream, getTruststorePass().toCharArray());
}
trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(trustStore);
} catch (final Exception e) {
throw new RuntimeException("Failed to load Truststore", e);
}
} else {
trustManagerFactory = null;
}
if (keyManagerFactory != null || trustManagerFactory != null) {
try {
// initialize the ssl context
final SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom());
sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
return sslContext;
} catch (final Exception e) {
throw new RuntimeException("Created keystore and truststore but failed to initialize SSLContext");
}
} else {
return null;
}
}
/**
@ -535,4 +762,10 @@ public interface SiteToSiteClient extends Closeable {
return portIdentifier;
}
}
public abstract class SerializableSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializable {
private static final long serialVersionUID = 1L;
}
}

View File

@ -57,9 +57,39 @@ public interface SiteToSiteClientConfig extends Serializable {
*/
SSLContext getSslContext();
/**
* @return the filename to use for the keystore, or <code>null</code> if none is configured
*/
String getKeystoreFilename();
/**
* @return the password to use for the keystore, or <code>null</code> if none is configured
*/
String getKeystorePassword();
/**
* @return the Type of the keystore, or <code>null</code> if none is configured
*/
KeystoreType getKeystoreType();
/**
* @return the filename to use for the truststore, or <code>null</code> if none is configured
*/
String getTruststoreFilename();
/**
* @return the password to use for the truststore, or <code>null</code> if none is configured
*/
String getTruststorePassword();
/**
* @return the type of the truststore, or <code>null</code> if none is configured
*/
KeystoreType getTruststoreType();
/**
* @return the file that is to be used for persisting the nodes of a remote
* cluster, if any
* cluster, if any
*/
File getPeerPersistenceFile();

View File

@ -528,6 +528,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) {
return new EventReporter() {
private static final long serialVersionUID = 1L;
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);

View File

@ -160,6 +160,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
final BulletinRepository bulletinRepository = flowController.getBulletinRepository();
eventReporter = new EventReporter() {
private static final long serialVersionUID = 1L;
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
final String groupId = StandardRemoteProcessGroup.this.getProcessGroup().getIdentifier();

View File

@ -104,6 +104,8 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
this.scheduler = scheduler;
setYieldPeriod("100 millis");
eventReporter = new EventReporter() {
private static final long serialVersionUID = 1L;
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
final String groupId = StandardRootGroupPort.this.getProcessGroup().getIdentifier();

View File

@ -135,6 +135,8 @@ public class TestPersistentProvenanceRepository {
private EventReporter getEventReporter() {
return new EventReporter() {
private static final long serialVersionUID = 1L;
@Override
public void reportEvent(Severity severity, String category, String message) {
System.out.println(severity + " : " + category + " : " + message);