diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java b/nifi/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java
index 76702f174b..d645d60d6e 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/events/EventReporter.java
@@ -16,12 +16,14 @@
*/
package org.apache.nifi.events;
+import java.io.Serializable;
+
import org.apache.nifi.reporting.Severity;
/**
* Implementations MUST be thread-safe
*/
-public interface EventReporter {
+public interface EventReporter extends Serializable {
void reportEvent(Severity severity, String category, String message);
}
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/KeystoreType.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/KeystoreType.java
new file mode 100644
index 0000000000..63c3d63c20
--- /dev/null
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/KeystoreType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.remote.client;
+
+import java.io.Serializable;
+
+public enum KeystoreType implements Serializable {
+ PKCS12,
+ JKS;
+}
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
index 5c4ce55d20..78237b997c 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java
@@ -18,11 +18,17 @@ package org.apache.nifi.remote.client;
import java.io.Closeable;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.Serializable;
+import java.security.KeyStore;
+import java.security.SecureRandom;
import java.util.concurrent.TimeUnit;
+import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Transaction;
@@ -143,6 +149,12 @@ public interface SiteToSiteClient extends Closeable {
private long penalizationNanos = TimeUnit.SECONDS.toNanos(3);
private long idleExpirationNanos = TimeUnit.SECONDS.toNanos(30L);
private SSLContext sslContext;
+ private String keystoreFilename;
+ private String keystorePass;
+ private KeystoreType keystoreType;
+ private String truststoreFilename;
+ private String truststorePass;
+ private KeystoreType truststoreType;
private EventReporter eventReporter;
private File peerPersistenceFile;
private boolean useCompression;
@@ -164,6 +176,12 @@ public interface SiteToSiteClient extends Closeable {
this.penalizationNanos = config.getPenalizationPeriod(TimeUnit.NANOSECONDS);
this.idleExpirationNanos = config.getIdleConnectionExpiration(TimeUnit.NANOSECONDS);
this.sslContext = config.getSslContext();
+ this.keystoreFilename = config.getKeystoreFilename();
+ this.keystorePass = config.getKeystorePassword();
+ this.keystoreType = config.getKeystoreType();
+ this.truststoreFilename = config.getTruststoreFilename();
+ this.truststorePass = config.getTruststorePassword();
+ this.truststoreType = config.getTruststoreType();
this.eventReporter = config.getEventReporter();
this.peerPersistenceFile = config.getPeerPersistenceFile();
this.useCompression = config.isUseCompression();
@@ -240,7 +258,12 @@ public interface SiteToSiteClient extends Closeable {
* secure. The remote instance of NiFi always determines whether or not
* Site-to-Site communications are secure (i.e., the client will always
* use secure or non-secure communications, depending on what the server
- * dictates).
+ * dictates). Note: The SSLContext provided by this method will be
+ * ignored if using a Serializable Configuration (see {@link #buildSerializableConfig()}).
+ * If a Serializable Configuration is required and communications are to be
+ * secure, the {@link #keystoreFilename(String)}, {@link #keystorePass(String)},
+ * {@link #keystoreType}, {@link #truststoreFilename}, {@link #truststorePass(String)},
+ * and {@link #truststoreType(KeystoreType)} methods must be used instead.
*
* @param sslContext the context
* @return the builder
@@ -250,6 +273,131 @@ public interface SiteToSiteClient extends Closeable {
return this;
}
+ /**
+ * @return the filename to use for the Keystore in order to communicate securely
+ * with the remote instance of NiFi
+ */
+ public String getKeystoreFilename() {
+ return keystoreFilename;
+ }
+
+ /**
+ * Sets the filename to use for the Keystore in order to communicate securely
+ * with the remote instance of NiFi
+ *
+ * @param keystoreFilename the filename to use for the Keystore in order to communicate securely
+ * with the remote instance of NiFi
+ * @return the builder
+ */
+ public Builder keystoreFilename(final String keystoreFilename) {
+ this.keystoreFilename = keystoreFilename;
+ return this;
+ }
+
+ /**
+ * @return the password to use for the Keystore in order to communicate securely
+ * with the remote instance of NiFi
+ */
+ public String getKeystorePass() {
+ return keystorePass;
+ }
+
+ /**
+ * Sets the password to use for the Keystore in order to communicate securely
+ * with the remote instance of NiFi
+ *
+ * @param keystorePass the password to use for the Keystore in order to communicate securely
+ * with the remote instance of NiFi
+ * @return the builder
+ */
+ public Builder keystorePass(final String keystorePass) {
+ this.keystorePass = keystorePass;
+ return this;
+ }
+
+ /**
+ * @return the type of Keystore to use in order to communicate securely
+ * with the remote instance of NiFi
+ */
+ public KeystoreType getKeystoreType() {
+ return keystoreType;
+ }
+
+ /**
+ * Sets the type of the Keystore to use in order to communicate securely
+ * with the remote instance of NiFi
+ *
+ * @param keystoreType the type of the Keystore to use in order to communicate securely
+ * with the remote instance of NiFi
+ * @return the builder
+ */
+ public Builder keystoreType(final KeystoreType keystoreType) {
+ this.keystoreType = keystoreType;
+ return this;
+ }
+
+ /**
+ * @return the filename to use for the Truststore in order to communicate securely
+ * with the remote instance of NiFi
+ */
+ public String getTruststoreFilename() {
+ return truststoreFilename;
+ }
+
+ /**
+ * Sets the filename to use for the Truststore in order to communicate securely
+ * with the remote instance of NiFi
+ *
+ * @param truststoreFilename the filename to use for the Truststore in order to communicate securely
+ * with the remote instance of NiFi
+ * @return the builder
+ */
+ public Builder truststoreFilename(final String truststoreFilename) {
+ this.truststoreFilename = truststoreFilename;
+ return this;
+ }
+
+ /**
+ * @return the password to use for the Truststore in order to communicate securely
+ * with the remote instance of NiFi
+ */
+ public String getTruststorePass() {
+ return truststorePass;
+ }
+
+ /**
+ * Sets the password to use for the Truststore in order to communicate securely
+ * with the remote instance of NiFi
+ *
+ * @param truststorePass the filename to use for the Truststore in order to communicate securely
+ * with the remote instance of NiFi
+ */
+ public Builder truststorePass(final String truststorePass) {
+ this.truststorePass = truststorePass;
+ return this;
+ }
+
+ /**
+ * @return the type of the Truststore to use in order to communicate securely
+ * with the remote instance of NiFi
+ */
+ public KeystoreType getTruststoreType() {
+ return truststoreType;
+ }
+
+ /**
+ * Sets the password type of the Truststore to use in order to communicate securely
+ * with the remote instance of NiFi
+ *
+ * @param truststoreType the type of the Truststore to use in order to communicate securely
+ * with the remote instance of NiFi
+ * @return the builder
+ */
+ public Builder truststoreType(final KeystoreType truststoreType) {
+ this.truststoreType = truststoreType;
+ return this;
+ }
+
/**
* Provides an EventReporter that can be used by the client in order to
* report any events that could be of interest when communicating with
@@ -365,8 +513,8 @@ public interface SiteToSiteClient extends Closeable {
* but does not create a SiteToSiteClient
*/
public SiteToSiteClientConfig buildConfig() {
- final SiteToSiteClientConfig config = new SiteToSiteClientConfig() {
- private static final long serialVersionUID = 1323119754841633818L;
+ return new SiteToSiteClientConfig() {
+ private static final long serialVersionUID = 1L;
@Override
public boolean isUseCompression() {
@@ -420,29 +568,57 @@ public interface SiteToSiteClient extends Closeable {
@Override
public long getPreferredBatchDuration(final TimeUnit timeUnit) {
- return timeUnit.convert(Builder.this.batchNanos, TimeUnit.NANOSECONDS);
+ return timeUnit.convert(batchNanos, TimeUnit.NANOSECONDS);
}
@Override
public long getPreferredBatchSize() {
- return Builder.this.batchSize;
+ return batchSize;
}
@Override
public int getPreferredBatchCount() {
- return Builder.this.batchCount;
+ return batchCount;
+ }
+
+ @Override
+ public String getKeystoreFilename() {
+ return keystoreFilename;
+ }
+
+ @Override
+ public String getKeystorePassword() {
+ return keystorePass;
+ }
+
+ @Override
+ public KeystoreType getKeystoreType() {
+ return keystoreType;
+ }
+
+ @Override
+ public String getTruststoreFilename() {
+ return truststoreFilename;
+ }
+
+ @Override
+ public String getTruststorePassword() {
+ return truststorePass;
+ }
+
+ @Override
+ public KeystoreType getTruststoreType() {
+ return truststoreType;
}
};
-
- return config;
}
/**
* @return a new SiteToSiteClient that can be used to send and receive
- * data with remote instances of NiFi
+ * data with remote instances of NiFi
*
* @throws IllegalStateException if either the url is not set or neither
- * the port name nor port identifier is set.
+ * the port name nor port identifier is set.
*/
public SiteToSiteClient build() {
if (url == null) {
@@ -450,7 +626,7 @@ public interface SiteToSiteClient extends Closeable {
}
if (portName == null && portIdentifier == null) {
- throw new IllegalStateException("Must specify either Port Name or Port Identifier to builder Site-to-Site client");
+ throw new IllegalStateException("Must specify either Port Name or Port Identifier to build Site-to-Site client");
}
return new SocketClient(buildConfig());
@@ -493,7 +669,58 @@ public interface SiteToSiteClient extends Closeable {
* @return the SSL Context that is configured for this builder
*/
public SSLContext getSslContext() {
- return sslContext;
+ if (sslContext != null) {
+ return sslContext;
+ }
+
+ final KeyManagerFactory keyManagerFactory;
+ if (keystoreFilename != null && keystorePass != null && keystoreType != null) {
+ try {
+ // prepare the keystore
+ final KeyStore keyStore = KeyStore.getInstance(getKeystoreType().name());
+ try (final InputStream keyStoreStream = new FileInputStream(new File(getKeystoreFilename()))) {
+ keyStore.load(keyStoreStream, getKeystorePass().toCharArray());
+ }
+ keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(keyStore, getKeystorePass().toCharArray());
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to load Keystore", e);
+ }
+ } else {
+ keyManagerFactory = null;
+ }
+
+ final TrustManagerFactory trustManagerFactory;
+ if (truststoreFilename != null && truststorePass != null && truststoreType != null) {
+ try {
+ // prepare the truststore
+ final KeyStore trustStore = KeyStore.getInstance(getTruststoreType().name());
+ try (final InputStream trustStoreStream = new FileInputStream(new File(getTruststoreFilename()))) {
+ trustStore.load(trustStoreStream, getTruststorePass().toCharArray());
+ }
+ trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ trustManagerFactory.init(trustStore);
+ } catch (final Exception e) {
+ throw new RuntimeException("Failed to load Truststore", e);
+ }
+ } else {
+ trustManagerFactory = null;
+ }
+
+ if (keyManagerFactory != null || trustManagerFactory != null) {
+ try {
+ // initialize the ssl context
+ final SSLContext sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), new SecureRandom());
+ sslContext.getDefaultSSLParameters().setNeedClientAuth(true);
+
+ return sslContext;
+ } catch (final Exception e) {
+ throw new RuntimeException("Created keystore and truststore but failed to initialize SSLContext");
+ }
+ } else {
+ return null;
+ }
}
/**
@@ -535,4 +762,10 @@ public interface SiteToSiteClient extends Closeable {
return portIdentifier;
}
}
+
+
+ public abstract class SerializableSiteToSiteClientConfig implements SiteToSiteClientConfig, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ }
}
diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
index c4b0d220e6..50a0d3c31e 100644
--- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
+++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClientConfig.java
@@ -57,9 +57,39 @@ public interface SiteToSiteClientConfig extends Serializable {
*/
SSLContext getSslContext();
+ /**
+ * @return the filename to use for the keystore, or null
if none is configured
+ */
+ String getKeystoreFilename();
+
+ /**
+ * @return the password to use for the keystore, or null
if none is configured
+ */
+ String getKeystorePassword();
+
+ /**
+ * @return the Type of the keystore, or null
if none is configured
+ */
+ KeystoreType getKeystoreType();
+
+ /**
+ * @return the filename to use for the truststore, or null
if none is configured
+ */
+ String getTruststoreFilename();
+
+ /**
+ * @return the password to use for the truststore, or null
if none is configured
+ */
+ String getTruststorePassword();
+
+ /**
+ * @return the type of the truststore, or null
if none is configured
+ */
+ KeystoreType getTruststoreType();
+
/**
* @return the file that is to be used for persisting the nodes of a remote
- * cluster, if any
+ * cluster, if any
*/
File getPeerPersistenceFile();
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 255a35a988..3d78b3a935 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -528,6 +528,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private static EventReporter createEventReporter(final BulletinRepository bulletinRepository) {
return new EventReporter() {
+ private static final long serialVersionUID = 1L;
+
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index 61516d0d3a..bd934461e9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -160,6 +160,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
final BulletinRepository bulletinRepository = flowController.getBulletinRepository();
eventReporter = new EventReporter() {
+ private static final long serialVersionUID = 1L;
+
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
final String groupId = StandardRemoteProcessGroup.this.getProcessGroup().getIdentifier();
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 9eadec02d1..66fd303141 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -104,6 +104,8 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
this.scheduler = scheduler;
setYieldPeriod("100 millis");
eventReporter = new EventReporter() {
+ private static final long serialVersionUID = 1L;
+
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
final String groupId = StandardRootGroupPort.this.getProcessGroup().getIdentifier();
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 16f0312839..3737588d38 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -135,6 +135,8 @@ public class TestPersistentProvenanceRepository {
private EventReporter getEventReporter() {
return new EventReporter() {
+ private static final long serialVersionUID = 1L;
+
@Override
public void reportEvent(Severity severity, String category, String message) {
System.out.println(severity + " : " + category + " : " + message);