NIFI-12451 This closes #8104. Removed Bootstrap Notification Services

- Refactored Security Property handling and bootstrap certificate generation
- Removed unnecessary dependencies from nifi-bootstrap
- Removed unnecessary and unused settings from bootstrap.conf

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
exceptionfactory 2023-12-01 18:08:15 -06:00 committed by Joseph Witt
parent 283abadf97
commit 456cf7d88d
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
44 changed files with 596 additions and 3041 deletions

View File

@ -144,6 +144,18 @@ language governing permissions and limitations under the License. -->
<version>2.0.0-SNAPSHOT</version> <version>2.0.0-SNAPSHOT</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flow-encryptor</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-single-user-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId> <artifactId>nifi-api</artifactId>

View File

@ -28,9 +28,12 @@
<include>*:slf4j-api</include> <include>*:slf4j-api</include>
<include>*:logback-classic</include> <include>*:logback-classic</include>
<include>*:logback-core</include> <include>*:logback-core</include>
<include>*:commons-lang3</include>
<include>*:nifi-api</include> <include>*:nifi-api</include>
<include>*:nifi-property-protection-api</include> <include>*:nifi-property-protection-api</include>
<include>*:nifi-per-process-group-logging</include> <include>*:nifi-per-process-group-logging</include>
<include>*:nifi-single-user-utils</include>
<include>*:nifi-flow-encryptor</include>
</includes> </includes>
</dependencySet> </dependencySet>

View File

@ -37,6 +37,8 @@
<exclude>*:nifi-property-protection-factory</exclude> <exclude>*:nifi-property-protection-factory</exclude>
<exclude>*:nifi-resources</exclude> <exclude>*:nifi-resources</exclude>
<exclude>*:nifi-docs</exclude> <exclude>*:nifi-docs</exclude>
<exclude>*:nifi-single-user-utils</exclude>
<exclude>*:nifi-flow-encryptor</exclude>
<!-- exclude AspectJ library from lib included in the aspectj subdir --> <!-- exclude AspectJ library from lib included in the aspectj subdir -->
<exclude>org.aspectj:aspectjweaver</exclude> <exclude>org.aspectj:aspectjweaver</exclude>
</excludes> </excludes>

View File

@ -37,91 +37,13 @@ language governing permissions and limitations under the License. -->
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId> <artifactId>nifi-security-cert-builder</artifactId>
<version>2.0.0-SNAPSHOT</version> <version>2.0.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-xml-processing</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flow-encryptor</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-single-user-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
<!-- Jakarta Mail 1 required for compatibility with Jakarta Activation 1 used with JAXB 2 -->
<dependency>
<groupId>jakarta.mail</groupId>
<artifactId>jakarta.mail-api</artifactId>
<version>1.6.7</version>
</dependency>
<dependency>
<groupId>com.sun.mail</groupId>
<artifactId>jakarta.mail</artifactId>
<version>1.6.7</version>
</dependency>
<dependency>
<groupId>jakarta.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
<version>1.2.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.sun.activation</groupId>
<artifactId>jakarta.activation</artifactId>
<version>1.2.2</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-expression-language</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<!-- This needs to be here because it is relied upon by the nifi-runtime which starts NiFi. It uses this bootstrap module
and the libs that get laid down in lib/bootstrap to create a child classloader with these bits in it -->
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties-loader</artifactId> <artifactId>nifi-properties-loader</artifactId>
<version>2.0.0-SNAPSHOT</version> <version>2.0.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-property-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -25,8 +25,6 @@ import java.io.OutputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.util.Arrays; import java.util.Arrays;
import org.apache.nifi.bootstrap.exception.InvalidCommandException;
public class BootstrapCodec { public class BootstrapCodec {
private final RunNiFi runner; private final RunNiFi runner;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.bootstrap.exception; package org.apache.nifi.bootstrap;
public class InvalidCommandException extends Exception { public class InvalidCommandException extends Exception {

View File

@ -1,435 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.bootstrap.notification.NotificationContext;
import org.apache.nifi.bootstrap.notification.NotificationInitializationContext;
import org.apache.nifi.bootstrap.notification.NotificationService;
import org.apache.nifi.bootstrap.notification.NotificationType;
import org.apache.nifi.bootstrap.notification.NotificationValidationContext;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceContext;
import org.apache.nifi.components.resource.StandardResourceContext;
import org.apache.nifi.components.resource.StandardResourceReferenceFactory;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.xml.processing.parsers.StandardDocumentProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
public class NotificationServiceManager {
private static final Logger logger = LoggerFactory.getLogger(NotificationServiceManager.class);
private final Map<String, ConfiguredNotificationService> servicesById = new HashMap<>();
private final Map<NotificationType, List<ConfiguredNotificationService>> servicesByNotificationType = new HashMap<>();
private final ScheduledExecutorService notificationExecutor;
private int maxAttempts = 5;
public NotificationServiceManager(){
notificationExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("Notification Service Dispatcher");
t.setDaemon(true);
return t;
}
});
}
public void setMaxNotificationAttempts(final int maxAttempts) {
this.maxAttempts = maxAttempts;
}
/**
* Loads the Notification Services from the given XML configuration file.
*
* File is expected to have the following format:
*
* <pre>
* &lt;services&gt;
* &lt;service&gt;
* &lt;id&gt;service-identifier&lt;/id&gt;
* &lt;class&gt;org.apache.nifi.MyNotificationService&lt;/class&gt;
* &lt;property name="My First Property"&gt;Property Value&lt;/property&gt;
* &lt;/service&gt;
* &lt;service&gt;
* &lt;id&gt;other-service&lt;/id&gt;
* &lt;class&gt;org.apache.nifi.MyOtherNotificationService&lt;/class&gt;
* &lt;property name="Another Property"&gt;Property Value 2&lt;/property&gt;
* &lt;/service&gt;
* ...
* &lt;service&gt;
* &lt;id&gt;service-identifier-2&lt;/id&gt;
* &lt;class&gt;org.apache.nifi.FinalNotificationService&lt;/class&gt;
* &lt;property name="Yet Another Property"&gt;3rd Prop Value&lt;/property&gt;
* &lt;/service&gt;
* &lt;/services&gt;
* </pre>
*
* Note that as long as the file can be interpreted properly, a misconfigured service will result in a warning
* or error being logged and the service will be unavailable but will not prevent the rest of the services from loading.
*
* @param servicesFile the XML file to load services from.
* @throws IOException if unable to read from the given file
*/
public void loadNotificationServices(final File servicesFile) throws IOException {
final Map<String, ConfiguredNotificationService> serviceMap = new HashMap<>();
try (final InputStream fis = new FileInputStream(servicesFile);
final InputStream in = new BufferedInputStream(fis)) {
final StandardDocumentProvider documentProvider = new StandardDocumentProvider();
final Document doc = documentProvider.parse(in);
final List<Element> serviceElements = getChildElementsByTagName(doc.getDocumentElement(), "service");
logger.debug("Found {} service elements", serviceElements.size());
for (final Element serviceElement : serviceElements) {
final ConfiguredNotificationService config = createService(serviceElement);
final NotificationService service = config.getService();
if (service == null) {
continue; // reason will have already been logged, so just move on.
}
final String id = service.getIdentifier();
if (serviceMap.containsKey(id)) {
logger.error("Found two different Notification Services configured with the same ID: '{}'. Loaded the first service.", id);
continue;
}
// Check if the service is valid; if not, warn now so that users know this before they fail to receive notifications
final ValidationContext validationContext = new NotificationValidationContext(buildNotificationContext(config));
final Collection<ValidationResult> validationResults = service.validate(validationContext);
final List<String> invalidReasons = new ArrayList<>();
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
invalidReasons.add(result.toString());
}
}
if (!invalidReasons.isEmpty()) {
logger.warn("Configured Notification Service {} is not valid for the following reasons: {}", service, invalidReasons);
}
serviceMap.put(id, config);
}
}
logger.info("Successfully loaded the following {} services: {}", serviceMap.size(), serviceMap.keySet());
servicesById.clear();
servicesById.putAll(serviceMap);
}
public void notify(final NotificationType type, final String subject, final String message) {
final List<ConfiguredNotificationService> configs = servicesByNotificationType.get(type);
if (configs == null || configs.isEmpty()) {
return;
}
for (final ConfiguredNotificationService config : configs) {
final NotificationService service = config.getService();
final AtomicInteger attemptCount = new AtomicInteger(0);
notificationExecutor.submit(new Runnable() {
@Override
public void run() {
// Check if the service is valid; if not, warn now so that users know this before they fail to receive notifications
final ValidationContext validationContext = new NotificationValidationContext(buildNotificationContext(config));
final Collection<ValidationResult> validationResults = service.validate(validationContext);
final List<String> invalidReasons = new ArrayList<>();
for (final ValidationResult result : validationResults) {
if (!result.isValid()) {
invalidReasons.add(result.toString());
}
}
// If the service is valid, attempt to send the notification
boolean failure = false;
if (invalidReasons.isEmpty()) {
final NotificationContext context = buildNotificationContext(config);
try {
service.notify(context, type, subject, message);
logger.info("Successfully sent notification of type {} to {}", type, service);
} catch (final Throwable t) { // keep running even if a Throwable is caught because we need to ensure that we are able to restart NiFi
logger.error("Failed to send notification of type {} to {} with Subject {} due to {}. Will ",
type, service == null ? "Unknown Notification Service" : service.toString(), subject, t.toString());
logger.error("", t);
failure = true;
}
} else {
logger.warn("Notification Service {} is not valid for the following reasons: {}", service, invalidReasons);
failure = true;
}
final int attempts = attemptCount.incrementAndGet();
if (failure) {
if (attempts < maxAttempts) {
logger.info("After failing to send notification to {} {} times, will attempt again in 1 minute", service, attempts);
notificationExecutor.schedule(this, 1, TimeUnit.MINUTES);
} else {
logger.info("After failing to send notification of type {} to {} {} times, will no longer attempt to send notification", type, service, attempts);
}
}
}
});
if (NotificationType.NIFI_STOPPED.equals(type)) {
// If we are stopping NiFi, we want to block until we've tried to send the notification at least once before
// we return. We do this because the executor used to run the task marks the threads as daemon, and on shutdown
// we don't want to return before the notifier has had a chance to perform its task.
while (attemptCount.get() == 0) {
try {
Thread.sleep(1000L);
} catch (final InterruptedException ie) {
}
}
}
}
}
private NotificationContext buildNotificationContext(final ConfiguredNotificationService config) {
return new NotificationContext() {
@Override
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
final PropertyDescriptor fullPropDescriptor = config.getService().getPropertyDescriptor(descriptor.getName());
if (fullPropDescriptor == null) {
return null;
}
String configuredValue = config.getProperties().get(fullPropDescriptor.getName());
if (configuredValue == null) {
configuredValue = fullPropDescriptor.getDefaultValue();
}
final ResourceContext resourceContext = new StandardResourceContext(new StandardResourceReferenceFactory(), descriptor);
return new StandardPropertyValue(resourceContext, configuredValue, null, ParameterLookup.EMPTY);
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
final Map<PropertyDescriptor, String> props = new HashMap<>();
final Map<String, String> configuredProps = config.getProperties();
final NotificationService service = config.getService();
final List<PropertyDescriptor> configuredPropertyDescriptors = new ArrayList<>(service.getPropertyDescriptors());
// This is needed to capture all dynamic properties
configuredProps.forEach((key, value) -> {
PropertyDescriptor propertyDescriptor = config.service.getPropertyDescriptor(key);
props.put(config.service.getPropertyDescriptor(key), value);
configuredPropertyDescriptors.remove(propertyDescriptor);
});
for (final PropertyDescriptor descriptor : configuredPropertyDescriptors) {
props.put(descriptor, descriptor.getDefaultValue());
}
return props;
}
};
}
/**
* Registers the service that has the given identifier to respond to notifications of the given type
*
* @param type the type of notification to register the service for
* @param serviceId the identifier of the service
*/
public void registerNotificationService(final NotificationType type, final String serviceId) {
final ConfiguredNotificationService service = servicesById.get(serviceId);
if (service == null) {
throw new IllegalArgumentException("No Notification Service exists with ID " + serviceId);
}
List<ConfiguredNotificationService> services = servicesByNotificationType.get(type);
if (services == null) {
services = new ArrayList<>();
servicesByNotificationType.put(type, services);
}
services.add(service);
}
/**
* Creates a Notification Service and initializes it. Then returns the service and its configured properties
*
* @param serviceElement the XML element from which to build the Notification Service
* @return a Tuple with the NotificationService as the key and the configured properties as the value, or <code>null</code> if
* unable to create the service
*/
private ConfiguredNotificationService createService(final Element serviceElement) {
final Element idElement = getChild(serviceElement, "id");
if (idElement == null) {
logger.error("Found configuration for Notification Service with no 'id' element; this service cannot be referenced so it will not be loaded");
return null;
}
final String serviceId = idElement.getTextContent().trim();
logger.debug("Loading Notification Service with ID {}", serviceId);
final Element classElement = getChild(serviceElement, "class");
if (classElement == null) {
logger.error("Found configuration for Notification Service with no 'class' element; Service ID is '{}'. This service annot be loaded", serviceId);
return null;
}
final String className = classElement.getTextContent().trim();
final Class<?> clazz;
try {
clazz = Class.forName(className);
} catch (final Exception e) {
logger.error("Found configuration for Notification Service with ID '{}' and Class '{}' but could not load class.", serviceId, className);
logger.error("", e);
return null;
}
if (!NotificationService.class.isAssignableFrom(clazz)) {
logger.error("Found configuration for Notification Service with ID '{}' and Class '{}' but class is not a Notification Service.", serviceId, className);
return null;
}
final Object serviceObject;
try {
serviceObject = clazz.getDeclaredConstructor().newInstance();
} catch (final Exception e) {
logger.error("Found configuration for Notification Service with ID '{}' and Class '{}' but could not instantiate Notification Service.", serviceId, className);
logger.error("", e);
return null;
}
final Map<String, String> propertyValues = new HashMap<>();
final List<Element> propertyElements = getChildElementsByTagName(serviceElement, "property");
for (final Element propertyElement : propertyElements) {
final String propName = propertyElement.getAttribute("name");
if (propName == null || propName.trim().isEmpty()) {
logger.warn("Found configuration for Notification Service with ID '{}' that has property value configured but no name for the property.", serviceId);
continue;
}
final String propValue = propertyElement.getTextContent().trim();
propertyValues.put(propName, propValue);
}
final NotificationService service = (NotificationService) serviceObject;
try {
service.initialize(new NotificationInitializationContext() {
@Override
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
final String propName = descriptor.getName();
String value = propertyValues.get(propName);
if (value == null) {
value = descriptor.getDefaultValue();
}
final ResourceContext resourceContext = new StandardResourceContext(new StandardResourceReferenceFactory(), descriptor);
return new StandardPropertyValue(resourceContext, value, null, ParameterLookup.EMPTY);
}
@Override
public Map<String,String> getAllProperties() {
return Collections.unmodifiableMap(propertyValues);
}
@Override
public String getIdentifier() {
return serviceId;
}
});
} catch (final Exception e) {
logger.error("Failed to load Notification Service with ID '{}'", serviceId);
logger.error("", e);
}
return new ConfiguredNotificationService(service, propertyValues);
}
public static Element getChild(final Element element, final String tagName) {
final List<Element> children = getChildElementsByTagName(element, tagName);
if (children.isEmpty()) {
return null;
}
if (children.size() > 1) {
return null;
}
return children.get(0);
}
public static List<Element> getChildElementsByTagName(final Element element, final String tagName) {
final List<Element> matches = new ArrayList<>();
final NodeList nodeList = element.getChildNodes();
for (int i = 0; i < nodeList.getLength(); i++) {
final Node node = nodeList.item(i);
if (!(node instanceof Element)) {
continue;
}
final Element child = (Element) nodeList.item(i);
if (child.getNodeName().equals(tagName)) {
matches.add(child);
}
}
return matches;
}
private static class ConfiguredNotificationService {
private final NotificationService service;
private final Map<String, String> properties;
public ConfiguredNotificationService(final NotificationService service, final Map<String, String> properties) {
this.service = service;
this.properties = properties;
}
public NotificationService getService() {
return service;
}
public Map<String, String> getProperties() {
return properties;
}
}
}

View File

@ -16,11 +16,10 @@
*/ */
package org.apache.nifi.bootstrap; package org.apache.nifi.bootstrap;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.bootstrap.notification.NotificationType;
import org.apache.nifi.bootstrap.process.RuntimeValidatorExecutor; import org.apache.nifi.bootstrap.process.RuntimeValidatorExecutor;
import org.apache.nifi.bootstrap.property.ApplicationPropertyHandler;
import org.apache.nifi.bootstrap.property.SecurityApplicationPropertyHandler;
import org.apache.nifi.bootstrap.util.DumpFileValidator; import org.apache.nifi.bootstrap.util.DumpFileValidator;
import org.apache.nifi.bootstrap.util.SecureNiFiConfigUtil;
import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -39,7 +38,6 @@ import java.io.OutputStream;
import java.io.Reader; import java.io.Reader;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -52,7 +50,6 @@ import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions; import java.nio.file.attribute.PosixFilePermissions;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -65,7 +62,6 @@ import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -100,13 +96,6 @@ public class RunNiFi {
public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds"; public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20"; public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
public static final String NOTIFICATION_SERVICES_FILE_PROP = "notification.services.file";
public static final String NOTIFICATION_ATTEMPTS_PROP = "notification.max.attempts";
public static final String NIFI_START_NOTIFICATION_SERVICE_IDS_PROP = "nifi.start.notification.services";
public static final String NIFI_STOP_NOTIFICATION_SERVICE_IDS_PROP = "nifi.stop.notification.services";
public static final String NIFI_DEAD_NOTIFICATION_SERVICE_IDS_PROP = "nifi.dead.notification.services";
public static final String NIFI_PID_DIR_PROP = "org.apache.nifi.bootstrap.config.pid.dir"; public static final String NIFI_PID_DIR_PROP = "org.apache.nifi.bootstrap.config.pid.dir";
public static final String NIFI_PID_FILE_NAME = "nifi.pid"; public static final String NIFI_PID_FILE_NAME = "nifi.pid";
@ -154,23 +143,17 @@ public class RunNiFi {
private final ExecutorService loggingExecutor; private final ExecutorService loggingExecutor;
private final RuntimeValidatorExecutor runtimeValidatorExecutor; private final RuntimeValidatorExecutor runtimeValidatorExecutor;
private volatile Set<Future<?>> loggingFutures = new HashSet<>(2); private volatile Set<Future<?>> loggingFutures = new HashSet<>(2);
private final NotificationServiceManager serviceManager;
public RunNiFi(final File bootstrapConfigFile) throws IOException { public RunNiFi(final File bootstrapConfigFile) throws IOException {
this.bootstrapConfigFile = bootstrapConfigFile; this.bootstrapConfigFile = bootstrapConfigFile;
loggingExecutor = Executors.newFixedThreadPool(2, new ThreadFactory() { loggingExecutor = Executors.newFixedThreadPool(2, runnable -> {
@Override final Thread t = Executors.defaultThreadFactory().newThread(runnable);
public Thread newThread(final Runnable runnable) { t.setDaemon(true);
final Thread t = Executors.defaultThreadFactory().newThread(runnable); t.setName("NiFi logging handler");
t.setDaemon(true); return t;
t.setName("NiFi logging handler");
return t;
}
}); });
serviceManager = loadServices();
runtimeValidatorExecutor = new RuntimeValidatorExecutor(); runtimeValidatorExecutor = new RuntimeValidatorExecutor();
} }
@ -209,8 +192,6 @@ public class RunNiFi {
if (cmd.equalsIgnoreCase("dump")) { if (cmd.equalsIgnoreCase("dump")) {
if (args.length > 1) { if (args.length > 1) {
dumpFile = new File(args[1]); dumpFile = new File(args[1]);
} else {
dumpFile = null;
} }
} else if (cmd.equalsIgnoreCase("diagnostics")) { } else if (cmd.equalsIgnoreCase("diagnostics")) {
if (args.length > 2) { if (args.length > 2) {
@ -219,14 +200,9 @@ public class RunNiFi {
} else if (args.length > 1) { } else if (args.length > 1) {
if (args[1].equalsIgnoreCase("--verbose")) { if (args[1].equalsIgnoreCase("--verbose")) {
verbose = true; verbose = true;
dumpFile = null;
} else { } else {
verbose = false;
dumpFile = new File(args[1]); dumpFile = new File(args[1]);
} }
} else {
dumpFile = null;
verbose = false;
} }
} else if (cmd.equalsIgnoreCase("status-history")) { } else if (cmd.equalsIgnoreCase("status-history")) {
if (args.length < 2) { if (args.length < 2) {
@ -348,91 +324,9 @@ public class RunNiFi {
configFilename = DEFAULT_CONFIG_FILE; configFilename = DEFAULT_CONFIG_FILE;
} }
final File configFile = new File(configFilename); return new File(configFilename);
return configFile;
} }
private NotificationServiceManager loadServices() throws IOException {
final File bootstrapConfFile = this.bootstrapConfigFile;
final Properties properties = new Properties();
try (final FileInputStream fis = new FileInputStream(bootstrapConfFile)) {
properties.load(fis);
}
final NotificationServiceManager manager = new NotificationServiceManager();
final String attemptProp = properties.getProperty(NOTIFICATION_ATTEMPTS_PROP);
if (attemptProp != null) {
try {
final int maxAttempts = Integer.parseInt(attemptProp.trim());
if (maxAttempts >= 0) {
manager.setMaxNotificationAttempts(maxAttempts);
}
} catch (final NumberFormatException nfe) {
defaultLogger.error("Maximum number of attempts to send notification email is set to an invalid value of {}; will use default value", attemptProp);
}
}
final String notificationServicesXmlFilename = properties.getProperty(NOTIFICATION_SERVICES_FILE_PROP);
if (notificationServicesXmlFilename == null) {
defaultLogger.info("No Bootstrap Notification Services configured.");
return manager;
}
final File xmlFile = new File(notificationServicesXmlFilename);
final File servicesFile;
if (xmlFile.isAbsolute()) {
servicesFile = xmlFile;
} else {
final File confDir = bootstrapConfigFile.getParentFile();
final File nifiHome = confDir.getParentFile();
servicesFile = new File(nifiHome, notificationServicesXmlFilename);
}
if (!servicesFile.exists()) {
defaultLogger.error("Bootstrap Notification Services file configured as " + servicesFile + " but could not find file; will not load notification services");
return manager;
}
try {
manager.loadNotificationServices(servicesFile);
} catch (final Exception e) {
defaultLogger.error("Bootstrap Notification Services file configured as " + servicesFile + " but failed to load notification services", e);
}
registerNotificationServices(manager, NotificationType.NIFI_STARTED, properties.getProperty(NIFI_START_NOTIFICATION_SERVICE_IDS_PROP));
registerNotificationServices(manager, NotificationType.NIFI_STOPPED, properties.getProperty(NIFI_STOP_NOTIFICATION_SERVICE_IDS_PROP));
registerNotificationServices(manager, NotificationType.NIFI_DIED, properties.getProperty(NIFI_DEAD_NOTIFICATION_SERVICE_IDS_PROP));
return manager;
}
private void registerNotificationServices(final NotificationServiceManager manager, final NotificationType type, final String serviceIds) {
if (serviceIds == null) {
defaultLogger.info("Registered no Notification Services for Notification Type {}", type);
return;
}
int registered = 0;
for (final String id : serviceIds.split(",")) {
final String trimmed = id.trim();
if (trimmed.isEmpty()) {
continue;
}
try {
manager.registerNotificationService(type, trimmed);
registered++;
} catch (final Exception e) {
defaultLogger.warn("Failed to register Notification Service with ID {} for Notifications of type {} due to {}", trimmed, type, e.toString());
defaultLogger.error("", e);
}
}
defaultLogger.info("Registered {} Notification Services for Notification Type {}", registered, type);
}
protected File getBootstrapFile(final Logger logger, String directory, String defaultDirectory, String fileName) throws IOException { protected File getBootstrapFile(final Logger logger, String directory, String defaultDirectory, String fileName) throws IOException {
final File confDir = bootstrapConfigFile.getParentFile(); final File confDir = bootstrapConfigFile.getParentFile();
@ -491,7 +385,7 @@ public class RunNiFi {
private synchronized void savePidProperties(final Properties pidProperties, final Logger logger) throws IOException { private synchronized void savePidProperties(final Properties pidProperties, final Logger logger) throws IOException {
final String pid = pidProperties.getProperty(PID_KEY); final String pid = pidProperties.getProperty(PID_KEY);
if (!StringUtils.isBlank(pid)) { if (pid != null && !pid.isBlank()) {
writePidFile(pid, logger); writePidFile(pid, logger);
} }
@ -866,18 +760,6 @@ public class RunNiFi {
socketOut.flush(); socketOut.flush();
} }
public void notifyStop() {
final String hostname = getHostname();
final String now = Instant.now().toString();
String user = System.getProperty("user.name");
if (user == null || user.trim().isEmpty()) {
user = "Unknown User";
}
serviceManager.notify(NotificationType.NIFI_STOPPED, "NiFi Stopped on Host " + hostname,
"Hello,\n\nApache NiFi has been told to initiate a shutdown on host " + hostname + " at " + now + " by user " + user);
}
public Integer decommission() throws IOException { public Integer decommission() throws IOException {
final Logger logger = cmdLogger; final Logger logger = cmdLogger;
final Integer port = getCurrentPort(logger); final Integer port = getCurrentPort(logger);
@ -986,7 +868,6 @@ public class RunNiFi {
+ "the process should be killed manually.", port, ioe.toString()); + "the process should be killed manually.", port, ioe.toString());
} else { } else {
logger.error("Failed to send shutdown command to port {} due to {}. Will kill the NiFi Process with PID {}.", port, ioe, pid); logger.error("Failed to send shutdown command to port {} due to {}. Will kill the NiFi Process with PID {}.", port, ioe, pid);
notifyStop();
killProcessTree(pid, logger); killProcessTree(pid, logger);
if (statusFile.exists() && !statusFile.delete()) { if (statusFile.exists() && !statusFile.delete()) {
logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile); logger.error("Failed to delete status file {}; this file should be cleaned up manually", statusFile);
@ -1023,7 +904,6 @@ public class RunNiFi {
gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE); gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
} }
notifyStop();
final long startWait = System.nanoTime(); final long startWait = System.nanoTime();
while (isProcessRunning(pid, logger)) { while (isProcessRunning(pid, logger)) {
logger.info("NiFi PID [{}] shutdown in progress...", pid); logger.info("NiFi PID [{}] shutdown in progress...", pid);
@ -1095,20 +975,6 @@ public class RunNiFi {
} }
} }
private String getHostname() {
String hostname = "Unknown Host";
String ip = "Unknown IP Address";
try {
final InetAddress localhost = InetAddress.getLocalHost();
hostname = localhost.getHostName();
ip = localhost.getHostAddress();
} catch (final Exception e) {
defaultLogger.warn("Failed to obtain hostname for notification due to:", e);
}
return hostname + " (" + ip + ")";
}
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
public void start(final boolean monitor) throws IOException { public void start(final boolean monitor) throws IOException {
final Integer port = getCurrentPort(cmdLogger); final Integer port = getCurrentPort(cmdLogger);
@ -1181,10 +1047,10 @@ public class RunNiFi {
if (key.startsWith("java.arg")) { if (key.startsWith("java.arg")) {
javaAdditionalArgs.add(value); javaAdditionalArgs.add(value);
if (value.startsWith("-Xms")) { if (value.startsWith("-Xms")) {
minimumHeapSize = StringUtils.substringAfter(value, "-Xms"); minimumHeapSize = value.replace("-Xms", "");
} }
if (value.startsWith("-Xmx")) { if (value.startsWith("-Xmx")) {
maximumHeapSize = StringUtils.substringAfter(value, "-Xmx"); maximumHeapSize = value.replace("-Xmx", "");
} }
} }
} }
@ -1240,8 +1106,10 @@ public class RunNiFi {
} }
try { try {
SecureNiFiConfigUtil.configureSecureNiFiProperties(nifiPropsFilename, cmdLogger); final ApplicationPropertyHandler propertyHandler = new SecurityApplicationPropertyHandler(cmdLogger);
} catch (IOException | RuntimeException e) { final Path applicationPropertiesLocation = Paths.get(nifiPropsFilename);
propertyHandler.handleProperties(applicationPropertiesLocation);
} catch (final RuntimeException e) {
cmdLogger.error("Self-Signed Certificate Generation Failed", e); cmdLogger.error("Self-Signed Certificate Generation Failed", e);
} }
@ -1315,14 +1183,6 @@ public class RunNiFi {
shutdownHook = new ShutdownHook(process, nifiPid, this, secretKey, gracefulShutdownSeconds, loggingExecutor); shutdownHook = new ShutdownHook(process, nifiPid, this, secretKey, gracefulShutdownSeconds, loggingExecutor);
final String hostname = getHostname();
String now = Instant.now().toString();
String user = System.getProperty("user.name");
if (user == null || user.trim().isEmpty()) {
user = "Unknown User";
}
serviceManager.notify(NotificationType.NIFI_STARTED, "NiFi Started on Host " + hostname, "Hello,\n\nApache NiFi has been started on host " + hostname + " at " + now + " by user " + user);
if (monitor) { if (monitor) {
final Runtime runtime = Runtime.getRuntime(); final Runtime runtime = Runtime.getRuntime();
runtime.addShutdownHook(shutdownHook); runtime.addShutdownHook(shutdownHook);
@ -1374,7 +1234,6 @@ public class RunNiFi {
handleLogging(process); handleLogging(process);
nifiPid = process.pid(); nifiPid = process.pid();
now = Instant.now().toString();
pidProperties.setProperty(PID_KEY, String.valueOf(nifiPid)); pidProperties.setProperty(PID_KEY, String.valueOf(nifiPid));
savePidProperties(pidProperties, defaultLogger); savePidProperties(pidProperties, defaultLogger);
cmdLogger.info("Application Process [{}] launched", nifiPid); cmdLogger.info("Application Process [{}] launched", nifiPid);
@ -1386,17 +1245,8 @@ public class RunNiFi {
if (started) { if (started) {
cmdLogger.info("Application Process [{}] started", nifiPid); cmdLogger.info("Application Process [{}] started", nifiPid);
// We are expected to restart nifi, so send a notification that it died. If we are not restarting nifi,
// then this means that we are intentionally stopping the service.
serviceManager.notify(NotificationType.NIFI_DIED, "NiFi Died on Host " + hostname,
"Hello,\n\nIt appears that Apache NiFi has died on host " + hostname + " at " + now + "; automatically restarting NiFi");
} else { } else {
defaultLogger.error("Application Process [{}] not started", nifiPid); defaultLogger.error("Application Process [{}] not started", nifiPid);
// We are expected to restart nifi, so send a notification that it died. If we are not restarting nifi,
// then this means that we are intentionally stopping the service.
serviceManager.notify(NotificationType.NIFI_DIED, "NiFi Died on Host " + hostname,
"Hello,\n\nIt appears that Apache NiFi has died on host " + hostname + " at " + now +
". Attempted to restart NiFi but the services does not appear to have restarted!");
} }
} else { } else {
return; return;
@ -1478,7 +1328,7 @@ public class RunNiFi {
} }
private boolean isSensitiveKeyPresent(Map<String, String> props) { private boolean isSensitiveKeyPresent(Map<String, String> props) {
return props.containsKey(NIFI_BOOTSTRAP_SENSITIVE_KEY) && !StringUtils.isBlank(props.get(NIFI_BOOTSTRAP_SENSITIVE_KEY)); return props.containsKey(NIFI_BOOTSTRAP_SENSITIVE_KEY) && !props.get(NIFI_BOOTSTRAP_SENSITIVE_KEY).isBlank();
} }
private void handleLogging(final Process process) { private void handleLogging(final Process process) {

View File

@ -67,7 +67,6 @@ public class ShutdownHook extends Thread {
} }
} }
runner.notifyStop();
System.out.printf("NiFi PID [%d] shutdown in progress...%n", pid); System.out.printf("NiFi PID [%d] shutdown in progress...%n", pid);
final long startWait = System.nanoTime(); final long startWait = System.nanoTime();
while (RunNiFi.isAlive(nifiProcess)) { while (RunNiFi.isAlive(nifiProcess)) {

View File

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.notification;
import org.apache.nifi.components.AbstractConfigurableComponent;
public abstract class AbstractNotificationService extends AbstractConfigurableComponent implements NotificationService {
private String identifier; // effectively final
@Override
public final void initialize(final NotificationInitializationContext context) {
this.identifier = context.getIdentifier();
init(context);
}
protected void init(final NotificationInitializationContext context) {
}
@Override
public String getIdentifier() {
return identifier;
}
}

View File

@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.notification;
import java.util.Map;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
public interface NotificationContext {
/**
* Returns the configured value for the given PropertyDescriptor. Note that the implementation
* of PropertyValue will throw an Exception if calling {@link PropertyValue#asControllerService()} or
* {@link PropertyValue#asControllerService(Class)}, as Controller Services are not allowed to be
* referenced by Notification Services
*
* @param descriptor the property whose value should be returned
* @return the configured value for the given PropertyDescriptor, or the default
* value for the PropertyDescriptor if no value has been configured
*/
PropertyValue getProperty(PropertyDescriptor descriptor);
/**
* @return a Map of all PropertyDescriptors to their configured values. This
* Map may or may not be modifiable, but modifying its values will not
* change the values of the processor's properties
*/
Map<PropertyDescriptor, String> getProperties();
}

View File

@ -1,33 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.notification;
public class NotificationFailedException extends Exception {
private static final long serialVersionUID = 1L;
public NotificationFailedException(final String message) {
super(message);
}
public NotificationFailedException(final String message, final Throwable t) {
super(message, t);
}
public NotificationFailedException(final Throwable t) {
super(t);
}
}

View File

@ -1,57 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.notification;
import org.apache.nifi.components.ConfigurableComponent;
/**
* <p>
* A NotificationService is simple mechanism that the Bootstrap can use to notify
* interested parties when some event takes place, such as NiFi being started, stopped,
* or restarted because the process died.
* </p>
*
* <p>
* <b>Note:</b> This feature was introduced in version 0.3.0 of NiFi and is likely to undergo
* significant refactorings. As such, at this time it is NOT considered a public API and may well
* change from version to version until the API has stabilized. At that point, it will become a public
* API.
* </p>
*
* @since 0.3.0
*/
public interface NotificationService extends ConfigurableComponent {
/**
* Provides the NotificationService with access to objects that may be of use
* throughout the life of the service
*
* @param context of initialization
*/
void initialize(NotificationInitializationContext context);
/**
* Notifies the configured recipients of some event
*
* @param context the context that is relevant for this notification
* @param notificationType the notification type
* @param subject the subject of the message
* @param message the message to be provided to recipients
*/
void notify(NotificationContext context, NotificationType notificationType, String subject, String message) throws NotificationFailedException;
}

View File

@ -1,23 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.notification;
public enum NotificationType {
NIFI_STARTED,
NIFI_STOPPED,
NIFI_DIED;
}

View File

@ -1,148 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.notification;
import org.apache.nifi.components.resource.ResourceContext;
import org.apache.nifi.components.resource.StandardResourceContext;
import org.apache.nifi.components.resource.StandardResourceReferenceFactory;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.expression.ExpressionLanguageCompiler;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
public class NotificationValidationContext implements ValidationContext {
private final NotificationContext context;
private final Map<String, Boolean> expressionLanguageSupported;
public NotificationValidationContext(final NotificationContext processContext) {
this.context = processContext;
final Map<PropertyDescriptor, String> properties = processContext.getProperties();
expressionLanguageSupported = new HashMap<>(properties.size());
for (final PropertyDescriptor descriptor : properties.keySet()) {
expressionLanguageSupported.put(descriptor.getName(), descriptor.isExpressionLanguageSupported());
}
}
@Override
public PropertyValue newPropertyValue(final String rawValue) {
final ResourceContext resourceContext = new StandardResourceContext(new StandardResourceReferenceFactory(), null);
return new StandardPropertyValue(resourceContext, rawValue, null, ParameterLookup.EMPTY);
}
@Override
public ExpressionLanguageCompiler newExpressionLanguageCompiler() {
return new StandardExpressionLanguageCompiler(ParameterLookup.EMPTY);
}
@Override
public ValidationContext getControllerServiceValidationContext(final ControllerService controllerService) {
throw new UnsupportedOperationException();
}
@Override
public PropertyValue getProperty(final PropertyDescriptor property) {
return context.getProperty(property);
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
return context.getProperties();
}
@Override
public Map<String, String> getAllProperties() {
final Map<String,String> propValueMap = new LinkedHashMap<>();
for (final Map.Entry<PropertyDescriptor, String> entry : getProperties().entrySet()) {
propValueMap.put(entry.getKey().getName(), entry.getValue());
}
return propValueMap;
}
@Override
public String getAnnotationData() {
throw new UnsupportedOperationException();
}
@Override
public ControllerServiceLookup getControllerServiceLookup() {
return null;
}
@Override
public boolean isValidationRequired(final ControllerService service) {
return true;
}
@Override
public boolean isExpressionLanguagePresent(final String value) {
if (value == null) {
return false;
}
final List<Range> elRanges = Query.extractExpressionRanges(value);
return (elRanges != null && !elRanges.isEmpty());
}
@Override
public boolean isExpressionLanguageSupported(final String propertyName) {
final Boolean supported = expressionLanguageSupported.get(propertyName);
return Boolean.TRUE.equals(supported);
}
@Override
public String getProcessGroupIdentifier() {
return null;
}
@Override
public Collection<String> getReferencedParameters(final String propertyName) {
return Collections.emptyList();
}
@Override
public boolean isParameterDefined(final String parameterName) {
return false;
}
@Override
public boolean isParameterSet(final String parameterName) {
return false;
}
@Override
public boolean isDependencySatisfied(final PropertyDescriptor propertyDescriptor, final Function<String, PropertyDescriptor> propertyDescriptorLookup) {
return true;
}
}

View File

@ -1,289 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.notification.email;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Map.Entry;
import javax.mail.Authenticator;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.PasswordAuthentication;
import javax.mail.Session;
import javax.mail.Transport;
import javax.mail.Message.RecipientType;
import javax.mail.internet.AddressException;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import org.apache.nifi.bootstrap.notification.AbstractNotificationService;
import org.apache.nifi.bootstrap.notification.NotificationContext;
import org.apache.nifi.bootstrap.notification.NotificationFailedException;
import org.apache.nifi.bootstrap.notification.NotificationType;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
public class EmailNotificationService extends AbstractNotificationService {
public static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
.name("SMTP Hostname")
.description("The hostname of the SMTP Server that is used to send Email Notifications")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
public static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
.name("SMTP Port")
.description("The Port used for SMTP communications")
.required(true)
.defaultValue("25")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor SMTP_USERNAME = new PropertyDescriptor.Builder()
.name("SMTP Username")
.description("Username for the SMTP account")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.build();
public static final PropertyDescriptor SMTP_PASSWORD = new PropertyDescriptor.Builder()
.name("SMTP Password")
.description("Password for the SMTP account")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(false)
.sensitive(true)
.build();
public static final PropertyDescriptor SMTP_AUTH = new PropertyDescriptor.Builder()
.name("SMTP Auth")
.description("Flag indicating whether authentication should be used")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("true")
.build();
public static final PropertyDescriptor SMTP_TLS = new PropertyDescriptor.Builder()
.name("SMTP TLS")
.description("Flag indicating whether TLS should be enabled")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();
public static final PropertyDescriptor SMTP_SOCKET_FACTORY = new PropertyDescriptor.Builder()
.name("SMTP Socket Factory")
.description("Socket Factory to use for SMTP Connection")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("javax.net.ssl.SSLSocketFactory")
.build();
public static final PropertyDescriptor HEADER_XMAILER = new PropertyDescriptor.Builder()
.name("SMTP X-Mailer Header")
.description("X-Mailer used in the header of the outgoing email")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("NiFi")
.build();
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
.name("Content Type")
.description("Mime Type used to interpret the contents of the email, such as text/plain or text/html")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("text/plain")
.build();
public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder()
.name("From")
.description("Specifies the Email address to use as the sender")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor TO = new PropertyDescriptor.Builder()
.name("To")
.description("The recipients to include in the To-Line of the email")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CC = new PropertyDescriptor.Builder()
.name("CC")
.description("The recipients to include in the CC-Line of the email")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor BCC = new PropertyDescriptor.Builder()
.name("BCC")
.description("The recipients to include in the BCC-Line of the email")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
/**
* Mapping of the mail properties to the NiFi PropertyDescriptors that will be evaluated at runtime
*/
private static final Map<String, PropertyDescriptor> propertyToContext = new HashMap<>();
static {
propertyToContext.put("mail.smtp.host", SMTP_HOSTNAME);
propertyToContext.put("mail.smtp.port", SMTP_PORT);
propertyToContext.put("mail.smtp.socketFactory.port", SMTP_PORT);
propertyToContext.put("mail.smtp.socketFactory.class", SMTP_SOCKET_FACTORY);
propertyToContext.put("mail.smtp.auth", SMTP_AUTH);
propertyToContext.put("mail.smtp.starttls.enable", SMTP_TLS);
propertyToContext.put("mail.smtp.user", SMTP_USERNAME);
propertyToContext.put("mail.smtp.password", SMTP_PASSWORD);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SMTP_HOSTNAME);
properties.add(SMTP_PORT);
properties.add(SMTP_USERNAME);
properties.add(SMTP_PASSWORD);
properties.add(SMTP_AUTH);
properties.add(SMTP_TLS);
properties.add(SMTP_SOCKET_FACTORY);
properties.add(HEADER_XMAILER);
properties.add(CONTENT_TYPE);
properties.add(FROM);
properties.add(TO);
properties.add(CC);
properties.add(BCC);
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
final String to = context.getProperty(TO).getValue();
final String cc = context.getProperty(CC).getValue();
final String bcc = context.getProperty(BCC).getValue();
if (to == null && cc == null && bcc == null) {
errors.add(new ValidationResult.Builder().subject("To, CC, BCC").valid(false).explanation("Must specify at least one To/CC/BCC address").build());
}
return errors;
}
@Override
public void notify(final NotificationContext context, final NotificationType notificationType, final String subject, final String messageText) throws NotificationFailedException {
final Properties properties = getMailProperties(context);
final Session mailSession = createMailSession(properties);
final Message message = new MimeMessage(mailSession);
try {
message.setFrom(InternetAddress.parse(context.getProperty(FROM).evaluateAttributeExpressions().getValue())[0]);
final InternetAddress[] toAddresses = toInetAddresses(context.getProperty(TO).evaluateAttributeExpressions().getValue());
message.setRecipients(RecipientType.TO, toAddresses);
final InternetAddress[] ccAddresses = toInetAddresses(context.getProperty(CC).evaluateAttributeExpressions().getValue());
message.setRecipients(RecipientType.CC, ccAddresses);
final InternetAddress[] bccAddresses = toInetAddresses(context.getProperty(BCC).evaluateAttributeExpressions().getValue());
message.setRecipients(RecipientType.BCC, bccAddresses);
message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions().getValue());
message.setSubject(subject);
final String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
message.setContent(messageText, contentType);
message.setSentDate(new Date());
Transport.send(message);
} catch (final ProcessException | MessagingException e) {
throw new NotificationFailedException("Failed to send E-mail Notification", e);
}
}
/**
* Creates an array of 0 or more InternetAddresses for the given String
*
* @param val the String to parse for InternetAddresses
* @return an array of 0 or more InetAddresses
* @throws AddressException if val contains an invalid address
*/
private static InternetAddress[] toInetAddresses(final String val) throws AddressException {
if (val == null) {
return new InternetAddress[0];
}
return InternetAddress.parse(val);
}
/**
* Uses the mapping of javax.mail properties to NiFi PropertyDescriptors to build the required Properties object to be used for sending this email
*
* @param context context
* @return mail properties
*/
private Properties getMailProperties(final NotificationContext context) {
final Properties properties = new Properties();
for (Entry<String, PropertyDescriptor> entry : propertyToContext.entrySet()) {
// Evaluate the property descriptor against the flow file
String property = entry.getKey();
String propValue = context.getProperty(entry.getValue()).evaluateAttributeExpressions().getValue();
// Nullable values are not allowed, so filter out
if (null != propValue) {
properties.setProperty(property, propValue);
}
}
return properties;
}
/**
* Based on the input properties, determine whether an authenticate or unauthenticated session should be used. If authenticated, creates a Password Authenticator for use in sending the email.
*
* @param properties mail properties
* @return session
*/
private Session createMailSession(final Properties properties) {
String authValue = properties.getProperty("mail.smtp.auth");
final boolean auth = Boolean.parseBoolean(authValue);
/*
* Conditionally create a password authenticator if the 'auth' parameter is set.
*/
return auth ? Session.getInstance(properties, new Authenticator() {
@Override
public PasswordAuthentication getPasswordAuthentication() {
String username = properties.getProperty("mail.smtp.user"), password = properties.getProperty("mail.smtp.password");
return new PasswordAuthentication(username, password);
}
}) : Session.getInstance(properties); // without auth
}
}

View File

@ -1,270 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.notification.http;
import okhttp3.Call;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.nifi.bootstrap.notification.AbstractNotificationService;
import org.apache.nifi.bootstrap.notification.NotificationContext;
import org.apache.nifi.bootstrap.notification.NotificationFailedException;
import org.apache.nifi.bootstrap.notification.NotificationInitializationContext;
import org.apache.nifi.bootstrap.notification.NotificationType;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.KeystoreType;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TlsConfiguration;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class HttpNotificationService extends AbstractNotificationService {
public static final String NOTIFICATION_TYPE_KEY = "notification.type";
public static final String NOTIFICATION_SUBJECT_KEY = "notification.subject";
public static final PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
.name("URL")
.description("The URL to send the notification to.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_CONNECTION_TIMEOUT = new PropertyDescriptor.Builder()
.name("Connection timeout")
.description("Max wait time for connection to remote service.")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10s")
.build();
public static final PropertyDescriptor PROP_WRITE_TIMEOUT = new PropertyDescriptor.Builder()
.name("Write timeout")
.description("Max wait time for remote service to read the request sent.")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10s")
.build();
public static final PropertyDescriptor PROP_TRUSTSTORE = new PropertyDescriptor.Builder()
.name("Truststore Filename")
.description("The fully-qualified filename of the Truststore")
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
.sensitive(false)
.build();
public static final PropertyDescriptor PROP_TRUSTSTORE_TYPE = new PropertyDescriptor.Builder()
.name("Truststore Type")
.description("The Type of the Truststore")
.allowableValues(KeystoreType.values())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(false)
.build();
public static final PropertyDescriptor PROP_TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder()
.name("Truststore Password")
.description("The password for the Truststore")
.defaultValue(null)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor PROP_KEYSTORE = new PropertyDescriptor.Builder()
.name("Keystore Filename")
.description("The fully-qualified filename of the Keystore")
.defaultValue(null)
.identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
.sensitive(false)
.build();
public static final PropertyDescriptor PROP_KEYSTORE_TYPE = new PropertyDescriptor.Builder()
.name("Keystore Type")
.description("The Type of the Keystore")
.allowableValues(KeystoreType.values())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(false)
.build();
public static final PropertyDescriptor PROP_KEYSTORE_PASSWORD = new PropertyDescriptor.Builder()
.name("Keystore Password")
.defaultValue(null)
.description("The password for the Keystore")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor PROP_KEY_PASSWORD = new PropertyDescriptor.Builder()
.name("Key Password")
.displayName("Key Password")
.description("The password for the key. If this is not specified, but the Keystore Filename, Password, and Type are specified, "
+ "then the Keystore Password will be assumed to be the same as the Key Password.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
public static final PropertyDescriptor SSL_ALGORITHM = new PropertyDescriptor.Builder()
.name("SSL Protocol")
.defaultValue("TLS")
.allowableValues("SSL", "TLS")
.description("The algorithm to use for this SSL context.")
.sensitive(false)
.build();
private final AtomicReference<OkHttpClient> httpClientReference = new AtomicReference<>();
private final AtomicReference<String> urlReference = new AtomicReference<>();
private static final List<PropertyDescriptor> supportedProperties;
static {
supportedProperties = new ArrayList<>();
supportedProperties.add(PROP_URL);
supportedProperties.add(PROP_CONNECTION_TIMEOUT);
supportedProperties.add(PROP_WRITE_TIMEOUT);
supportedProperties.add(PROP_TRUSTSTORE);
supportedProperties.add(PROP_TRUSTSTORE_PASSWORD);
supportedProperties.add(PROP_TRUSTSTORE_TYPE);
supportedProperties.add(PROP_KEYSTORE);
supportedProperties.add(PROP_KEYSTORE_PASSWORD);
supportedProperties.add(PROP_KEYSTORE_TYPE);
supportedProperties.add(PROP_KEY_PASSWORD);
supportedProperties.add(SSL_ALGORITHM);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return supportedProperties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.required(false)
.name(propertyDescriptorName)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
}
@Override
protected void init(final NotificationInitializationContext context) {
final String url = context.getProperty(PROP_URL).evaluateAttributeExpressions().getValue();
if (url == null || url.isEmpty()) {
throw new IllegalArgumentException("Property, \"" + PROP_URL.getDisplayName() + "\", for the URL to POST notifications to must be set.");
}
urlReference.set(url);
httpClientReference.set(null);
final OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient.Builder();
Long connectTimeout = context.getProperty(PROP_CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
Long writeTimeout = context.getProperty(PROP_WRITE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
// Set timeouts
okHttpClientBuilder.connectTimeout(connectTimeout, TimeUnit.MILLISECONDS);
okHttpClientBuilder.writeTimeout(writeTimeout, TimeUnit.MILLISECONDS);
// check if the keystore is set and add the factory if so
if (url.toLowerCase().startsWith("https")) {
try {
final TlsConfiguration tlsConfiguration = createTlsConfigurationFromContext(context);
final X509TrustManager x509TrustManager = SslContextFactory.getX509TrustManager(tlsConfiguration);
if (x509TrustManager == null) {
throw new IllegalStateException("Unable to get X.509 Trust Manager for HTTP Notification Service configured for TLS");
}
final TrustManager[] trustManagers = new TrustManager[] { x509TrustManager };
final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration, trustManagers);
if (sslContext == null) {
throw new IllegalStateException("Unable to get SSL Context for HTTP Notification Service configured for TLS");
}
final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
okHttpClientBuilder.sslSocketFactory(sslSocketFactory, x509TrustManager);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
httpClientReference.set(okHttpClientBuilder.build());
}
private static TlsConfiguration createTlsConfigurationFromContext(NotificationInitializationContext context) {
String keystorePath = context.getProperty(HttpNotificationService.PROP_KEYSTORE).getValue();
String keystorePassword = context.getProperty(HttpNotificationService.PROP_KEYSTORE_PASSWORD).getValue();
String keyPassword = context.getProperty(HttpNotificationService.PROP_KEY_PASSWORD).getValue();
String keystoreType = context.getProperty(HttpNotificationService.PROP_KEYSTORE_TYPE).getValue();
String truststorePath = context.getProperty(HttpNotificationService.PROP_TRUSTSTORE).getValue();
String truststorePassword = context.getProperty(HttpNotificationService.PROP_TRUSTSTORE_PASSWORD).getValue();
String truststoreType = context.getProperty(HttpNotificationService.PROP_TRUSTSTORE_TYPE).getValue();
String protocol = context.getProperty(HttpNotificationService.SSL_ALGORITHM).getValue();
return new StandardTlsConfiguration(keystorePath, keystorePassword, keyPassword, keystoreType, truststorePath, truststorePassword, truststoreType, protocol);
}
@Override
public void notify(NotificationContext context, NotificationType notificationType, String subject, String message) throws NotificationFailedException {
try {
final RequestBody requestBody = RequestBody.create(message, MediaType.parse("text/plain"));
Request.Builder requestBuilder = new Request.Builder()
.post(requestBody)
.url(urlReference.get());
Map<PropertyDescriptor, String> configuredProperties = context.getProperties();
for (PropertyDescriptor propertyDescriptor : configuredProperties.keySet()) {
if (propertyDescriptor.isDynamic()) {
String propertyValue = context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue();
requestBuilder = requestBuilder.addHeader(propertyDescriptor.getDisplayName(), propertyValue);
}
}
final Request request = requestBuilder
.addHeader(NOTIFICATION_SUBJECT_KEY, subject)
.addHeader(NOTIFICATION_TYPE_KEY, notificationType.name())
.build();
final OkHttpClient httpClient = httpClientReference.get();
final Call call = httpClient.newCall(request);
try (final Response response = call.execute()) {
if (!response.isSuccessful()) {
throw new NotificationFailedException("Failed to send Http Notification. Received an unsuccessful status code response '" + response.code() + "'. The message was '" +
response.message() + "'");
}
}
} catch (NotificationFailedException e) {
throw e;
} catch (Exception e) {
throw new NotificationFailedException("Failed to send Http Notification", e);
}
}
}

View File

@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.bootstrap.notification; package org.apache.nifi.bootstrap.property;
import java.nio.file.Path;
import org.apache.nifi.context.PropertyContext; /**
* Abstraction for evaluating and updating application properties prior to startup
public interface NotificationInitializationContext extends PropertyContext { */
public interface ApplicationPropertyHandler {
/** /**
* @return the identifier for the NotificationService * Handle Application Properties based on provided path location
*
* @param applicationPropertiesLocation Path to Application Properties
*/ */
String getIdentifier(); void handleProperties(Path applicationPropertiesLocation);
} }

View File

@ -0,0 +1,324 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.property;
import org.apache.nifi.security.cert.builder.StandardCertificateBuilder;
import org.slf4j.Logger;
import javax.security.auth.x500.X500Principal;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.SecureRandom;
import java.security.cert.CertificateEncodingException;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HexFormat;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
/**
* Standard implementation for application security generates Key Pair and Certificate when not configured
*/
public class SecurityApplicationPropertyHandler implements ApplicationPropertyHandler {
protected static final String ENTRY_ALIAS = "generated";
protected static final X500Principal CERTIFICATE_ISSUER = new X500Principal("CN=localhost");
private static final String DIGEST_ALGORITHM = "SHA-256";
private static final String KEY_ALGORITHM = "RSA";
private static final int KEY_SIZE = 4096;
private static final String LOCALHOST = "localhost";
private static final Duration CERTIFICATE_VALIDITY_PERIOD = Duration.ofDays(60);
private static final int RANDOM_BYTE_LENGTH = 16;
private static final String PROPERTY_SEPARATOR = "=";
private final Logger logger;
public SecurityApplicationPropertyHandler(final Logger logger) {
this.logger = Objects.requireNonNull(logger, "Logger required");
}
@Override
public void handleProperties(final Path applicationPropertiesLocation) {
Objects.requireNonNull(applicationPropertiesLocation);
final Properties applicationProperties = loadProperties(applicationPropertiesLocation);
if (isCertificateGenerationRequired(applicationProperties)) {
processApplicationProperties(applicationProperties);
writePasswordProperties(applicationProperties, applicationPropertiesLocation);
}
}
private void processApplicationProperties(final Properties applicationProperties) {
final KeyPair keyPair = generateKeyPair();
final Collection<String> subjectAlternativeNames = getSubjectAlternativeNames(applicationProperties);
final X509Certificate certificate = new StandardCertificateBuilder(keyPair, CERTIFICATE_ISSUER, CERTIFICATE_VALIDITY_PERIOD)
.setDnsSubjectAlternativeNames(subjectAlternativeNames)
.build();
final String certificateDigestEncoded = getDigest(certificate);
logger.info("Generated Self-Signed Certificate Expiration: {}", LocalDate.now().plusDays(CERTIFICATE_VALIDITY_PERIOD.toDays()));
logger.info("Generated Self-Signed Certificate SHA-256: {}", certificateDigestEncoded);
final PrivateKey privateKey = keyPair.getPrivate();
writeKeyStore(applicationProperties, certificate, privateKey);
writeTrustStore(applicationProperties, certificate);
}
private void writeTrustStore(final Properties applicationProperties, final X509Certificate certificate) {
final String storeType = applicationProperties.getProperty(SecurityProperty.TRUSTSTORE_TYPE.getName());
final KeyStore trustStore = newKeyStore(storeType);
try {
trustStore.load(null, null);
trustStore.setCertificateEntry(ENTRY_ALIAS, certificate);
} catch (final GeneralSecurityException|IOException e) {
throw new IllegalStateException("Trust Store creation failed", e);
}
final Path outputLocation = Paths.get(applicationProperties.getProperty(SecurityProperty.TRUSTSTORE.getName()));
try (final OutputStream outputStream = Files.newOutputStream(outputLocation)) {
final String truststorePasswd = generatePassword();
trustStore.store(outputStream, truststorePasswd.toCharArray());
applicationProperties.setProperty(SecurityProperty.TRUSTSTORE_PASSWD.getName(), truststorePasswd);
} catch (final GeneralSecurityException|IOException e) {
throw new IllegalStateException("Trust Store storage failed", e);
}
}
private void writeKeyStore(final Properties applicationProperties, final X509Certificate certificate, final PrivateKey privateKey) {
final String keystorePasswd = generatePassword();
final char[] password = keystorePasswd.toCharArray();
final String storeType = applicationProperties.getProperty(SecurityProperty.KEYSTORE_TYPE.getName());
final KeyStore keyStore = newKeyStore(storeType);
try {
keyStore.load(null, null);
final X509Certificate[] certificates = new X509Certificate[]{ certificate };
keyStore.setKeyEntry(ENTRY_ALIAS, privateKey, password, certificates);
} catch (final GeneralSecurityException|IOException e) {
throw new IllegalStateException("Key Store creation failed", e);
}
final Path outputLocation = Paths.get(applicationProperties.getProperty(SecurityProperty.KEYSTORE.getName()));
try (final OutputStream outputStream = Files.newOutputStream(outputLocation)) {
keyStore.store(outputStream, password);
applicationProperties.setProperty(SecurityProperty.KEYSTORE_PASSWD.getName(), keystorePasswd);
applicationProperties.setProperty(SecurityProperty.KEY_PASSWD.getName(), keystorePasswd);
} catch (final GeneralSecurityException|IOException e) {
throw new IllegalStateException("Key Store storage failed", e);
}
}
private void writePasswordProperties(final Properties applicationProperties, final Path applicationPropertiesLocation) {
final ByteArrayOutputStream propertiesOutputStream = new ByteArrayOutputStream();
try (
final BufferedReader reader = Files.newBufferedReader(applicationPropertiesLocation);
final PrintWriter writer = new PrintWriter(propertiesOutputStream)
) {
String line = reader.readLine();
while (line != null) {
if (line.startsWith(SecurityProperty.KEYSTORE_PASSWD.getName())) {
writeProperty(writer, SecurityProperty.KEYSTORE_PASSWD, applicationProperties);
} else if (line.startsWith(SecurityProperty.KEY_PASSWD.getName())) {
writeProperty(writer, SecurityProperty.KEY_PASSWD, applicationProperties);
} else if (line.startsWith(SecurityProperty.TRUSTSTORE_PASSWD.getName())) {
writeProperty(writer, SecurityProperty.TRUSTSTORE_PASSWD, applicationProperties);
} else {
writer.println(line);
}
line = reader.readLine();
}
} catch (final IOException e) {
throw new IllegalStateException("Read Application Properties failed", e);
}
final byte[] updatedProperties = propertiesOutputStream.toByteArray();
try (final OutputStream outputStream = Files.newOutputStream(applicationPropertiesLocation)) {
outputStream.write(updatedProperties);
} catch (final IOException e) {
throw new IllegalStateException("Write Application Properties failed", e);
}
}
private void writeProperty(final PrintWriter writer, final SecurityProperty securityProperty, final Properties applicationProperties) {
writer.print(securityProperty.getName());
writer.print(PROPERTY_SEPARATOR);
final String propertyValue = applicationProperties.getProperty(securityProperty.getName());
writer.println(propertyValue);
}
private KeyStore newKeyStore(final String storeType) {
try {
return KeyStore.getInstance(storeType);
} catch (final KeyStoreException e) {
throw new IllegalStateException("Key Store Type [%s] instantiation failed".formatted(storeType), e);
}
}
private boolean isCertificateGenerationRequired(final Properties applicationProperties) {
final boolean required;
final String keystoreLocation = applicationProperties.getProperty(SecurityProperty.KEYSTORE.getName());
final String truststoreLocation = applicationProperties.getProperty(SecurityProperty.TRUSTSTORE.getName());
if (isBlank(applicationProperties.getProperty(SecurityProperty.HTTPS_PORT.getName()))) {
required = false;
} else if (isBlank(keystoreLocation)) {
required = false;
} else if (isBlank(truststoreLocation)) {
required = false;
} else if (isBlank(applicationProperties.getProperty(SecurityProperty.KEYSTORE_PASSWD.getName()))
&& isBlank(applicationProperties.getProperty(SecurityProperty.TRUSTSTORE_PASSWD.getName()))) {
final Path keystorePath = Paths.get(keystoreLocation);
final Path truststorePath = Paths.get(truststoreLocation);
required = Files.notExists(keystorePath) && Files.notExists(truststorePath);
} else {
required = false;
}
return required;
}
private Collection<String> getSubjectAlternativeNames(final Properties applicationProperties) {
try {
final InetAddress localHost = InetAddress.getLocalHost();
final String localHostName = localHost.getHostName();
final List<String> subjectAlternativeNames = new ArrayList<>();
subjectAlternativeNames.add(LOCALHOST);
subjectAlternativeNames.add(localHostName);
final String proxyHost = applicationProperties.getProperty(SecurityProperty.WEB_PROXY_HOST.getName());
if (!isBlank(proxyHost)) {
subjectAlternativeNames.add(proxyHost);
}
return subjectAlternativeNames;
} catch (final UnknownHostException e) {
return Collections.emptyList();
}
}
private KeyPair generateKeyPair() {
final KeyPairGenerator keyPairGenerator;
try {
keyPairGenerator = KeyPairGenerator.getInstance(KEY_ALGORITHM);
} catch (final NoSuchAlgorithmException e) {
throw new IllegalStateException("Key Pair Algorithm not supported [%s]".formatted(KEY_ALGORITHM), e);
}
keyPairGenerator.initialize(KEY_SIZE);
return keyPairGenerator.generateKeyPair();
}
protected String generatePassword() {
final SecureRandom secureRandom = new SecureRandom();
final byte[] bytes = new byte[RANDOM_BYTE_LENGTH];
secureRandom.nextBytes(bytes);
return HexFormat.of().formatHex(bytes);
}
private static String getDigest(final X509Certificate certificate) {
try {
final MessageDigest messageDigest = MessageDigest.getInstance(DIGEST_ALGORITHM);
final byte[] certificateEncoded = certificate.getEncoded();
final byte[] digest = messageDigest.digest(certificateEncoded);
return HexFormat.of().formatHex(digest).toUpperCase();
} catch (final NoSuchAlgorithmException e) {
throw new IllegalStateException("Message Digest Algorithm not found", e);
} catch (final CertificateEncodingException e) {
throw new IllegalArgumentException("Certificate encoding processing failed", e);
}
}
private Properties loadProperties(final Path applicationPropertiesLocation) {
try (final InputStream inputStream = Files.newInputStream(applicationPropertiesLocation)) {
final Properties properties = new Properties();
properties.load(inputStream);
return properties;
} catch (final IOException e) {
throw new IllegalStateException("Reading Application Properties failed [%s]".formatted(applicationPropertiesLocation), e);
}
}
private boolean isBlank(final String propertyValue) {
return propertyValue == null || propertyValue.isBlank();
}
protected enum SecurityProperty {
HTTPS_PORT("nifi.web.https.port"),
WEB_PROXY_HOST("nifi.web.proxy.host"),
KEYSTORE("nifi.security.keystore"),
KEYSTORE_TYPE("nifi.security.keystoreType"),
KEYSTORE_PASSWD("nifi.security.keystorePasswd"),
KEY_PASSWD("nifi.security.keyPasswd"),
TRUSTSTORE("nifi.security.truststore"),
TRUSTSTORE_TYPE("nifi.security.truststoreType"),
TRUSTSTORE_PASSWD("nifi.security.truststorePasswd");
private final String name;
SecurityProperty(final String name) {
this.name = name;
}
public String getName() {
return name;
}
}
}

View File

@ -1,228 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.util;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.util.NiFiProperties;
import org.bouncycastle.util.IPAddress;
import org.slf4j.Logger;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
public class SecureNiFiConfigUtil {
private static final int CERT_DURATION_DAYS = 60;
private static final String LOCALHOST_NAME = "localhost";
private static final String PROPERTY_VALUE_PATTERN = "%s=%s";
private SecureNiFiConfigUtil() {
}
private static boolean fileExists(String filename) {
return StringUtils.isNotEmpty(filename) && Paths.get(filename).toFile().exists();
}
/**
* Returns true only if nifi.web.https.port is set, nifi.security.keystore and nifi.security.truststore
* are both set, and both nifi.security.keystorePasswd and nifi.security.truststorePassword are NOT set.
*
* This would indicate that the user intends to auto-generate a keystore and truststore, rather than
* using their existing kestore and truststore.
* @param nifiProperties The nifi properties
* @return HTTPS Security Configured status
*/
private static boolean isHttpsSecurityConfiguredWithEmptyPasswords(final Properties nifiProperties) {
if (StringUtils.isEmpty(nifiProperties.getProperty(NiFiProperties.WEB_HTTPS_PORT, StringUtils.EMPTY))) {
return false;
}
String keystorePath = nifiProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE, StringUtils.EMPTY);
String truststorePath = nifiProperties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE, StringUtils.EMPTY);
if (StringUtils.isEmpty(keystorePath) || StringUtils.isEmpty(truststorePath)) {
return false;
}
String keystorePassword = nifiProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD, StringUtils.EMPTY);
String truststorePassword = nifiProperties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD, StringUtils.EMPTY);
if (StringUtils.isNotEmpty(keystorePassword) || StringUtils.isNotEmpty(truststorePassword)) {
return false;
}
return true;
}
/**
* If HTTPS is enabled (nifi.web.https.port is set), but the keystore file specified in nifi.security.keystore
* does not exist, this will generate a key pair and self-signed certificate, generate the associated keystore
* and truststore and write them to disk under the configured filepaths, generate a secure random keystore password
* and truststore password, and write these to the nifi.properties file.
* @param nifiPropertiesFilename The filename of the nifi.properties file
* @param cmdLogger The bootstrap logger
* @throws IOException can be thrown when writing keystores to disk
* @throws RuntimeException indicates a security exception while generating keystores
*/
public static void configureSecureNiFiProperties(final String nifiPropertiesFilename, final Logger cmdLogger) throws IOException, RuntimeException {
final File propertiesFile = new File(nifiPropertiesFilename);
final Properties nifiProperties = loadProperties(propertiesFile);
if (!isHttpsSecurityConfiguredWithEmptyPasswords(nifiProperties)) {
cmdLogger.debug("Skipping Apache Nifi certificate generation.");
return;
}
String keystorePath = nifiProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE, StringUtils.EMPTY);
String truststorePath = nifiProperties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE, StringUtils.EMPTY);
boolean keystoreExists = fileExists(keystorePath);
boolean truststoreExists = fileExists(truststorePath);
if (!keystoreExists && !truststoreExists) {
TlsConfiguration tlsConfiguration;
cmdLogger.info("Generating Self-Signed Certificate: Expires on {}", LocalDate.now().plus(CERT_DURATION_DAYS, ChronoUnit.DAYS));
try {
String[] subjectAlternativeNames = getSubjectAlternativeNames(nifiProperties, cmdLogger);
tlsConfiguration = KeyStoreUtils.createTlsConfigAndNewKeystoreTruststore(StandardTlsConfiguration
.fromNiFiProperties(nifiProperties), CERT_DURATION_DAYS, subjectAlternativeNames);
final KeyStore keyStore = KeyStoreUtils.loadKeyStore(tlsConfiguration.getKeystorePath(),
tlsConfiguration.getKeystorePassword().toCharArray(), tlsConfiguration.getKeystoreType().getType());
final Enumeration<String> aliases = keyStore.aliases();
while (aliases.hasMoreElements()) {
final String alias = aliases.nextElement();
final Certificate certificate = keyStore.getCertificate(alias);
if (certificate != null) {
final String sha256 = DigestUtils.sha256Hex(certificate.getEncoded());
cmdLogger.info("Generated Self-Signed Certificate SHA-256: {}", sha256.toUpperCase(Locale.ROOT));
}
}
} catch (GeneralSecurityException e) {
throw new RuntimeException(e);
}
// Move over the new stores from temp dir
Files.move(Paths.get(tlsConfiguration.getKeystorePath()), Paths.get(keystorePath),
StandardCopyOption.REPLACE_EXISTING);
Files.move(Paths.get(tlsConfiguration.getTruststorePath()), Paths.get(truststorePath),
StandardCopyOption.REPLACE_EXISTING);
updateProperties(propertiesFile, tlsConfiguration);
cmdLogger.debug("Generated Keystore [{}] Truststore [{}]", keystorePath, truststorePath);
} else if (!keystoreExists && truststoreExists) {
cmdLogger.warn("Truststore file {} already exists. Apache NiFi will not generate keystore and truststore separately.",
truststorePath);
} else if (keystoreExists && !truststoreExists) {
cmdLogger.warn("Keystore file {} already exists. Apache NiFi will not generate keystore and truststore separately.",
keystorePath);
}
}
/**
* Attempts to add some reasonable guesses at desired SAN values that can be added to the generated
* certificate.
* @param nifiProperties The nifi.properties
* @return A Pair with IP SANs on the left and DNS SANs on the right
*/
private static String[] getSubjectAlternativeNames(Properties nifiProperties, Logger cmdLogger) {
Set<String> dnsSubjectAlternativeNames = new HashSet<>();
try {
dnsSubjectAlternativeNames.add(InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
cmdLogger.debug("Could not add localhost hostname as certificate SAN", e);
}
addSubjectAlternativeName(nifiProperties, NiFiProperties.REMOTE_INPUT_HOST, dnsSubjectAlternativeNames);
addSubjectAlternativeName(nifiProperties, NiFiProperties.WEB_HTTPS_HOST, dnsSubjectAlternativeNames);
addSubjectAlternativeName(nifiProperties, NiFiProperties.WEB_PROXY_HOST, dnsSubjectAlternativeNames);
addSubjectAlternativeName(nifiProperties, NiFiProperties.LOAD_BALANCE_HOST, dnsSubjectAlternativeNames);
// Not necessary to add as a SAN
dnsSubjectAlternativeNames.remove(LOCALHOST_NAME);
return dnsSubjectAlternativeNames.toArray(new String[dnsSubjectAlternativeNames.size()]);
}
private static void addSubjectAlternativeName(Properties nifiProperties, String propertyName,
Set<String> dnsSubjectAlternativeNames) {
String hostValue = nifiProperties.getProperty(propertyName, StringUtils.EMPTY);
if (!hostValue.isEmpty()) {
if (!IPAddress.isValid(hostValue)) {
dnsSubjectAlternativeNames.add(hostValue);
}
}
}
private static String getPropertyLine(String name, String value) {
return String.format(PROPERTY_VALUE_PATTERN, name, value);
}
private static void updateProperties(final File propertiesFile, final TlsConfiguration tlsConfiguration) throws IOException {
final Path propertiesFilePath = propertiesFile.toPath();
final List<String> lines = Files.readAllLines(propertiesFilePath);
final List<String> updatedLines = lines.stream().map(line -> {
if (line.startsWith(NiFiProperties.SECURITY_KEYSTORE_PASSWD)) {
return getPropertyLine(NiFiProperties.SECURITY_KEYSTORE_PASSWD, tlsConfiguration.getKeystorePassword());
} else if (line.startsWith(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD)) {
return getPropertyLine(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD, tlsConfiguration.getTruststorePassword());
} else if (line.startsWith(NiFiProperties.SECURITY_KEY_PASSWD)) {
return getPropertyLine(NiFiProperties.SECURITY_KEY_PASSWD, tlsConfiguration.getKeystorePassword());
} else if (line.startsWith(NiFiProperties.SECURITY_KEYSTORE_TYPE)) {
return getPropertyLine(NiFiProperties.SECURITY_KEYSTORE_TYPE, tlsConfiguration.getKeystoreType().getType());
} else if (line.startsWith(NiFiProperties.SECURITY_TRUSTSTORE_TYPE)) {
return getPropertyLine(NiFiProperties.SECURITY_TRUSTSTORE_TYPE, tlsConfiguration.getTruststoreType().getType());
} else {
return line;
}
}).collect(Collectors.toList());
Files.write(propertiesFilePath, updatedLines);
}
private static Properties loadProperties(final File propertiesFile) {
final Properties properties = new Properties();
try (final FileReader reader = new FileReader(propertiesFile)) {
properties.load(reader);
} catch (final IOException e) {
final String message = String.format("Failed to read NiFi Properties [%s]", propertiesFile);
throw new UncheckedIOException(message, e);
}
return properties;
}
}

View File

@ -1,112 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.email;
import org.apache.nifi.bootstrap.notification.NotificationContext;
import org.apache.nifi.bootstrap.notification.NotificationFailedException;
import org.apache.nifi.bootstrap.notification.NotificationInitializationContext;
import org.apache.nifi.bootstrap.notification.NotificationType;
import org.apache.nifi.bootstrap.notification.email.EmailNotificationService;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class EmailNotificationServiceTest {
private static final String SUBJECT = "Subject";
private static final String MESSAGE = "Message";
private static final String LOCALHOST_ADDRESS = "127.0.0.1";
private static final String ADDRESS = "username@localhost.localdomain";
@Test
public void testNotifyMessagingException() {
final Map<PropertyDescriptor, PropertyValue> properties = getProperties();
final EmailNotificationService service = getNotificationService(properties);
assertThrows(NotificationFailedException.class, () -> service.notify(getNotificationContext(), NotificationType.NIFI_STARTED, SUBJECT, MESSAGE));
}
private EmailNotificationService getNotificationService(final Map<PropertyDescriptor, PropertyValue> properties) {
final EmailNotificationService service = new EmailNotificationService();
final NotificationInitializationContext context = new NotificationInitializationContext() {
@Override
public String getIdentifier() {
return NotificationInitializationContext.class.getName();
}
@Override
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
final PropertyValue propertyValue = properties.get(descriptor);
return propertyValue == null ? new MockPropertyValue(descriptor.getDefaultValue()) : propertyValue;
}
@Override
public Map<String, String> getAllProperties() {
final Map<String, String> allProperties = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : properties.entrySet()) {
allProperties.put(entry.getKey().getName(), entry.getValue().getValue());
}
return allProperties;
}
};
service.initialize(context);
return service;
}
private NotificationContext getNotificationContext() {
final Map<PropertyDescriptor, PropertyValue> properties = getProperties();
return new NotificationContext() {
@Override
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
final PropertyValue propertyValue = properties.get(descriptor);
return propertyValue == null ? new MockPropertyValue(descriptor.getDefaultValue()) : propertyValue;
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
final Map<PropertyDescriptor, String> propertyValues = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : properties.entrySet()) {
propertyValues.put(entry.getKey(), entry.getValue().getValue());
}
return propertyValues;
}
};
}
private Map<PropertyDescriptor, PropertyValue> getProperties() {
final Map<PropertyDescriptor, PropertyValue> properties = new HashMap<>();
properties.put(EmailNotificationService.SMTP_HOSTNAME, new MockPropertyValue(LOCALHOST_ADDRESS));
properties.put(EmailNotificationService.SMTP_PORT, new MockPropertyValue("0"));
properties.put(EmailNotificationService.SMTP_AUTH, new MockPropertyValue(Boolean.FALSE.toString()));
properties.put(EmailNotificationService.FROM, new MockPropertyValue(ADDRESS));
properties.put(EmailNotificationService.TO, new MockPropertyValue(ADDRESS));
return properties;
}
}

View File

@ -1,237 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.http;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import okio.Buffer;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.bootstrap.notification.NotificationContext;
import org.apache.nifi.bootstrap.notification.NotificationFailedException;
import org.apache.nifi.bootstrap.notification.NotificationInitializationContext;
import org.apache.nifi.bootstrap.notification.NotificationType;
import org.apache.nifi.bootstrap.notification.http.HttpNotificationService;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.PROP_URL;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.PROP_CONNECTION_TIMEOUT;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.PROP_WRITE_TIMEOUT;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.PROP_KEYSTORE;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.PROP_KEYSTORE_PASSWORD;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.PROP_KEYSTORE_TYPE;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.PROP_KEY_PASSWORD;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.PROP_TRUSTSTORE;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.PROP_TRUSTSTORE_PASSWORD;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.PROP_TRUSTSTORE_TYPE;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.NOTIFICATION_SUBJECT_KEY;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.NOTIFICATION_TYPE_KEY;
import static org.apache.nifi.bootstrap.notification.http.HttpNotificationService.SSL_ALGORITHM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class HttpNotificationServiceTest {
private static final long REQUEST_TIMEOUT = 2;
private static final String SUBJECT = "Subject";
private static final String MESSAGE = "Message";
private static final String LOCALHOST = "localhost";
private static final String BASE_PATH = "/";
private static final String TIMEOUT = "10s";
private MockWebServer mockWebServer;
@BeforeEach
public void startServer() {
mockWebServer = new MockWebServer();
}
@AfterEach
public void shutdownServer() throws IOException {
mockWebServer.shutdown();
}
@Test
public void testStartNotification() throws InterruptedException, NotificationFailedException {
enqueueResponseCode(200);
final Map<PropertyDescriptor, PropertyValue> properties = getProperties();
final HttpNotificationService service = getHttpNotificationService(properties);
service.notify(getNotificationContext(), NotificationType.NIFI_STARTED, SUBJECT, MESSAGE);
assertRequestMatches(NotificationType.NIFI_STARTED);
}
@Test
public void testStopNotification() throws InterruptedException, NotificationFailedException {
enqueueResponseCode(200);
final Map<PropertyDescriptor, PropertyValue> properties = getProperties();
final HttpNotificationService service = getHttpNotificationService(properties);
service.notify(getNotificationContext(), NotificationType.NIFI_STOPPED, SUBJECT, MESSAGE);
assertRequestMatches(NotificationType.NIFI_STOPPED);
}
@Test
public void testDiedNotification() throws InterruptedException, NotificationFailedException {
enqueueResponseCode(200);
final Map<PropertyDescriptor, PropertyValue> properties = getProperties();
final HttpNotificationService service = getHttpNotificationService(properties);
service.notify(getNotificationContext(), NotificationType.NIFI_DIED, SUBJECT, MESSAGE);
assertRequestMatches(NotificationType.NIFI_DIED);
}
@Test
public void testStartNotificationFailure() throws InterruptedException {
enqueueResponseCode(500);
final Map<PropertyDescriptor, PropertyValue> properties = getProperties();
final HttpNotificationService service = getHttpNotificationService(properties);
assertThrows(NotificationFailedException.class, () -> service.notify(getNotificationContext(), NotificationType.NIFI_STARTED, SUBJECT, MESSAGE));
assertRequestMatches(NotificationType.NIFI_STARTED);
}
@Test
public void testStartNotificationHttps() throws GeneralSecurityException, NotificationFailedException, InterruptedException {
final TlsConfiguration tlsConfiguration = new TemporaryKeyStoreBuilder().build();
final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
mockWebServer.useHttps(sslSocketFactory, false);
enqueueResponseCode(200);
final Map<PropertyDescriptor, PropertyValue> properties = getProperties();
properties.put(PROP_KEYSTORE, createPropertyValue(tlsConfiguration.getKeystorePath()));
properties.put(PROP_KEYSTORE_PASSWORD, createPropertyValue(tlsConfiguration.getKeystorePassword()));
properties.put(PROP_KEY_PASSWORD, createPropertyValue(tlsConfiguration.getKeyPassword()));
properties.put(PROP_KEYSTORE_TYPE, createPropertyValue(tlsConfiguration.getKeystoreType().getType()));
properties.put(PROP_TRUSTSTORE, createPropertyValue(tlsConfiguration.getTruststorePath()));
properties.put(PROP_TRUSTSTORE_PASSWORD, createPropertyValue(tlsConfiguration.getTruststorePassword()));
properties.put(PROP_TRUSTSTORE_TYPE, createPropertyValue(tlsConfiguration.getTruststoreType().getType()));
properties.put(SSL_ALGORITHM, createPropertyValue(tlsConfiguration.getProtocol()));
final HttpNotificationService service = getHttpNotificationService(properties);
service.notify(getNotificationContext(), NotificationType.NIFI_STARTED, SUBJECT, MESSAGE);
assertRequestMatches(NotificationType.NIFI_STARTED);
}
private void assertRequestMatches(final NotificationType notificationType) throws InterruptedException {
final RecordedRequest recordedRequest = mockWebServer.takeRequest(REQUEST_TIMEOUT, TimeUnit.SECONDS);
assertNotNull(recordedRequest);
assertEquals(notificationType.name(), recordedRequest.getHeader(NOTIFICATION_TYPE_KEY));
assertEquals(SUBJECT, recordedRequest.getHeader(NOTIFICATION_SUBJECT_KEY));
final Buffer bodyBuffer = recordedRequest.getBody();
final String bodyString = new String(bodyBuffer.readByteArray(), UTF_8);
assertEquals(MESSAGE, bodyString);
}
private void enqueueResponseCode(final int responseCode) {
mockWebServer.enqueue(new MockResponse().setResponseCode(responseCode));
}
private HttpNotificationService getHttpNotificationService(final Map<PropertyDescriptor, PropertyValue> properties) {
final HttpNotificationService service = new HttpNotificationService();
final NotificationInitializationContext context = new NotificationInitializationContext() {
@Override
public String getIdentifier() {
return NotificationInitializationContext.class.getName();
}
@Override
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
return properties.get(descriptor);
}
@Override
public Map<String, String> getAllProperties() {
final Map<String, String> allProperties = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : properties.entrySet()) {
allProperties.put(entry.getKey().getName(), entry.getValue().getValue());
}
return allProperties;
}
};
service.initialize(context);
return service;
}
private Map<PropertyDescriptor, PropertyValue> getProperties() {
final Map<PropertyDescriptor, PropertyValue> properties = new HashMap<>();
// Setting localhost is necessary to avoid hostname verification failures on Windows
final String url = mockWebServer.url(BASE_PATH).newBuilder().host(LOCALHOST).build().toString();
properties.put(PROP_URL, createPropertyValue(url));
properties.put(PROP_CONNECTION_TIMEOUT, createPropertyValue(TIMEOUT));
properties.put(PROP_WRITE_TIMEOUT, createPropertyValue(TIMEOUT));
return properties;
}
private PropertyValue createPropertyValue(final String value) {
return new StandardPropertyValue(value, null, null);
}
private NotificationContext getNotificationContext() {
final Map<PropertyDescriptor, PropertyValue> properties = getProperties();
return new NotificationContext() {
@Override
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
return properties.get(descriptor);
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
final Map<PropertyDescriptor, String> propertyValues = new HashMap<>();
for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : properties.entrySet()) {
propertyValues.put(entry.getKey(), entry.getValue().getValue());
}
return propertyValues;
}
};
}
}

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.property;
import org.apache.nifi.bootstrap.property.SecurityApplicationPropertyHandler.SecurityProperty;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import java.security.cert.X509Certificate;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
class SecurityApplicationPropertyHandlerTest {
private static final String EMPTY = "";
private static final String PROPERTIES_FILE_NAME = "nifi.properties";
private static final String PORT = "8443";
private static final String STORE_TYPE = "PKCS12";
private static final String KEYSTORE_FILE = "keystore.p12";
private static final String TRUSTSTORE_FILE = "truststore.p12";
private static final Logger logger = LoggerFactory.getLogger(SecurityApplicationPropertyHandlerTest.class);
@TempDir
private Path tempDir;
private SecurityApplicationPropertyHandler handler;
@BeforeEach
void setHandler() {
handler = new SecurityApplicationPropertyHandler(logger);
}
@Test
void testHandlePropertiesSuccess() throws IOException, GeneralSecurityException {
final Properties sourceProperties = getSourceProperties();
final Path propertiesPath = writeProperties(sourceProperties);
final Path propertiesLocation = tempDir.resolve(propertiesPath.getFileName());
Files.copy(propertiesPath, propertiesLocation);
try {
handler.handleProperties(propertiesLocation);
final Properties properties = loadProperties(propertiesLocation);
assertStoreCreated(properties, SecurityProperty.KEYSTORE);
assertStoreCreated(properties, SecurityProperty.TRUSTSTORE);
assertNotEquals(EMPTY, properties.getProperty(SecurityProperty.TRUSTSTORE_PASSWD.getName()));
assertNotEquals(EMPTY, properties.getProperty(SecurityProperty.KEYSTORE_PASSWD.getName()));
assertNotEquals(EMPTY, properties.getProperty(SecurityProperty.KEY_PASSWD.getName()));
// Run again to evaluate handling of existing store files
handler.handleProperties(propertiesLocation);
final Properties reloadedProperties = loadProperties(propertiesLocation);
assertEquals(properties.getProperty(SecurityProperty.TRUSTSTORE_PASSWD.getName()), reloadedProperties.getProperty(SecurityProperty.TRUSTSTORE_PASSWD.getName()));
assertEquals(properties.getProperty(SecurityProperty.KEYSTORE_PASSWD.getName()), reloadedProperties.getProperty(SecurityProperty.KEYSTORE_PASSWD.getName()));
assertEquals(properties.getProperty(SecurityProperty.KEY_PASSWD.getName()), reloadedProperties.getProperty(SecurityProperty.KEY_PASSWD.getName()));
assertKeyStoreReadable(reloadedProperties);
} finally {
deleteStores(propertiesLocation);
}
}
private Properties getSourceProperties() {
final Properties sourceProperties = new Properties();
sourceProperties.setProperty(SecurityProperty.HTTPS_PORT.getName(), PORT);
final Path keystorePath = tempDir.resolve(KEYSTORE_FILE);
sourceProperties.setProperty(SecurityProperty.KEYSTORE.getName(), keystorePath.toAbsolutePath().toString());
sourceProperties.setProperty(SecurityProperty.KEYSTORE_TYPE.getName(), STORE_TYPE);
sourceProperties.setProperty(SecurityProperty.KEYSTORE_PASSWD.getName(), EMPTY);
sourceProperties.setProperty(SecurityProperty.KEY_PASSWD.getName(), EMPTY);
final Path truststorePath = tempDir.resolve(TRUSTSTORE_FILE);
sourceProperties.setProperty(SecurityProperty.TRUSTSTORE.getName(), truststorePath.toAbsolutePath().toString());
sourceProperties.setProperty(SecurityProperty.TRUSTSTORE_TYPE.getName(), STORE_TYPE);
sourceProperties.setProperty(SecurityProperty.TRUSTSTORE_PASSWD.getName(), EMPTY);
return sourceProperties;
}
@Test
void testHandlePropertiesHttpsNotConfigured() throws IOException {
final Properties sourceProperties = new Properties();
sourceProperties.setProperty(SecurityProperty.KEYSTORE.getName(), EMPTY);
sourceProperties.setProperty(SecurityProperty.KEYSTORE_PASSWD.getName(), EMPTY);
sourceProperties.setProperty(SecurityProperty.TRUSTSTORE.getName(), EMPTY);
sourceProperties.setProperty(SecurityProperty.TRUSTSTORE_PASSWD.getName(), EMPTY);
final Path propertiesPath = writeProperties(sourceProperties);
final Path propertiesLocation = tempDir.resolve(propertiesPath.getFileName());
Files.copy(propertiesPath, propertiesLocation);
handler.handleProperties(propertiesLocation);
final Properties properties = loadProperties(propertiesLocation);
final String keystorePasswd = properties.getProperty(SecurityProperty.KEYSTORE_PASSWD.getName());
assertTrue(keystorePasswd.isBlank());
final String truststorePasswd = properties.getProperty(SecurityProperty.TRUSTSTORE_PASSWD.getName());
assertTrue(truststorePasswd.isBlank());
}
private void assertKeyStoreReadable(final Properties properties) throws GeneralSecurityException, IOException {
final String storeLocation = properties.getProperty(SecurityProperty.KEYSTORE.getName());
final String storePasswd = properties.getProperty(SecurityProperty.KEYSTORE_PASSWD.getName());
final Path storePath = Paths.get(storeLocation);
final KeyStore keyStore = KeyStore.getInstance(STORE_TYPE);
try (InputStream inputStream = Files.newInputStream(storePath)) {
keyStore.load(inputStream, storePasswd.toCharArray());
}
final X509Certificate certificate = (X509Certificate) keyStore.getCertificate(SecurityApplicationPropertyHandler.ENTRY_ALIAS);
assertNotNull(certificate);
assertEquals(SecurityApplicationPropertyHandler.CERTIFICATE_ISSUER, certificate.getIssuerX500Principal());
assertEquals(SecurityApplicationPropertyHandler.CERTIFICATE_ISSUER, certificate.getSubjectX500Principal());
}
private void assertStoreCreated(final Properties properties, final SecurityProperty securityProperty) {
final String location = properties.getProperty(securityProperty.getName());
final Path path = Paths.get(location);
assertTrue(Files.exists(path));
}
private void deleteStores(final Path propertiesLocation) throws IOException {
final Properties properties = loadProperties(propertiesLocation);
deleteStore(properties, SecurityProperty.KEYSTORE);
deleteStore(properties, SecurityProperty.TRUSTSTORE);
}
private void deleteStore(final Properties properties, final SecurityProperty securityProperty) throws IOException {
final String location = properties.getProperty(securityProperty.getName());
if (location != null && !location.isBlank()) {
final Path path = Paths.get(location);
if (Files.exists(path)) {
Files.delete(path);
}
}
}
private Path writeProperties(final Properties properties) throws IOException {
final Path propertiesPath = tempDir.resolve(PROPERTIES_FILE_NAME);
try (OutputStream outputStream = Files.newOutputStream(propertiesPath)) {
properties.store(outputStream, null);
}
return propertiesPath;
}
private Properties loadProperties(final Path propertiesLocation) throws IOException {
try (final InputStream inputStream = Files.newInputStream(propertiesLocation)) {
final Properties properties = new Properties();
properties.load(inputStream);
return properties;
}
}
}

View File

@ -1,234 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.util;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.nifi.properties.NiFiPropertiesLoader;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StringUtils;
import org.bouncycastle.asn1.x509.GeneralName;
import org.bouncycastle.util.IPAddress;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.security.GeneralSecurityException;
import java.security.Key;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestSecureNiFiConfigUtil {
public static final String TEST_RESOURCE_DIR = "src/test/resources/";
private final Logger logger = LoggerFactory.getLogger("org.apache.nifi.bootstrap.util.TestSecureNiFiConfigUtil");
private static final String PROPERTIES_PREFIX = "nifi-properties";
private static final boolean EXPECT_STORES_TO_EXIST = true;
private Path nifiPropertiesFile;
private Path keystorePath;
private Path truststorePath;
private final Path existingKeystorePath = getTestFilePath("existing-keystore.p12");
private final Path existingTruststorePath = getTestFilePath("existing-truststore.p12");
private NiFiProperties configureSecureNiFiProperties(Path testPropertiesFile) throws IOException {
Files.copy(testPropertiesFile, nifiPropertiesFile, StandardCopyOption.REPLACE_EXISTING);
SecureNiFiConfigUtil.configureSecureNiFiProperties(nifiPropertiesFile.toString(), logger);
return new NiFiPropertiesLoader().load(nifiPropertiesFile.toFile());
}
private static Path getPathFromClasspath(String filename) {
try {
return Paths.get(TestSecureNiFiConfigUtil.class.getClassLoader().getResource(filename).toURI());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
private static Path getTestFilePath(String filename) {
return Paths.get(TEST_RESOURCE_DIR + filename);
}
private static String getFileHash(Path filepath) throws IOException {
return DigestUtils.sha256Hex(Files.readAllBytes(filepath));
}
@BeforeEach
public void init() throws IOException {
nifiPropertiesFile = Files.createTempFile(PROPERTIES_PREFIX, ".properties");
TlsConfiguration tlsConfig = new TemporaryKeyStoreBuilder().build();
Files.move(Paths.get(tlsConfig.getKeystorePath()), existingKeystorePath, StandardCopyOption.REPLACE_EXISTING);
Files.move(Paths.get(tlsConfig.getTruststorePath()), existingTruststorePath, StandardCopyOption.REPLACE_EXISTING);
}
@AfterEach
public void cleanUp() throws IOException {
deleteIfExists(nifiPropertiesFile);
deleteIfExists(keystorePath);
deleteIfExists(truststorePath);
deleteIfExists(existingKeystorePath);
deleteIfExists(existingTruststorePath);
}
private static void deleteIfExists(Path path) throws IOException {
if (path != null && StringUtils.isNotEmpty(path.toString())) {
Files.deleteIfExists(path);
}
}
private void runTestWithExpectedSuccess(String testPropertiesFile, List<String> expectedSANs) throws IOException, GeneralSecurityException {
Path testPropertiesFilePath = getPathFromClasspath(testPropertiesFile);
NiFiProperties niFiProperties = this.configureSecureNiFiProperties(testPropertiesFilePath);
keystorePath = Paths.get(niFiProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE));
assertTrue(keystorePath.toFile().exists());
assertFalse(niFiProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD).isEmpty());
assertEquals("PKCS12", niFiProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE));
char[] keyPassword = niFiProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD).toCharArray();
KeyStore keyStore = KeyStoreUtils.loadKeyStore(keystorePath.toString(),
niFiProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD).toCharArray(),
niFiProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE));
String alias = keyStore.aliases().nextElement();
assertTrue(keyStore.isKeyEntry(alias));
Key key = keyStore.getKey(alias, keyPassword);
assertNotNull(key);
truststorePath = Paths.get(niFiProperties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE));
assertTrue(truststorePath.toFile().exists());
assertFalse(niFiProperties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD).isEmpty());
assertEquals("PKCS12", niFiProperties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE));
KeyStore trustStore = KeyStoreUtils.loadKeyStore(truststorePath.toString(),
niFiProperties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD).toCharArray(),
niFiProperties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE));
String trustAlias = trustStore.aliases().nextElement();
assertTrue(trustStore.isCertificateEntry(trustAlias));
Certificate certificate = trustStore.getCertificate(trustAlias);
certificate.verify(certificate.getPublicKey());
if (!expectedSANs.isEmpty()) {
Collection<List<?>> sans = ((X509Certificate)certificate).getSubjectAlternativeNames();
Set<String> foundSands = new HashSet<>();
for(List<?> list : sans) {
String san = (String) list.get(1);
if (IPAddress.isValid(san)) {
assertEquals(GeneralName.iPAddress, list.get(0));
} else {
assertEquals(GeneralName.dNSName, list.get(0));
}
foundSands.add((String) list.get(1));
}
for(String expectedSAN : expectedSANs) {
assertTrue(foundSands.contains(expectedSAN));
}
}
}
private void runTestWithNoExpectedUpdates(String testPropertiesFile, boolean expectBothStoresToExist) throws IOException {
this.runTestWithNoExpectedUpdates(testPropertiesFile, expectBothStoresToExist, expectBothStoresToExist);
}
private void runTestWithNoExpectedUpdates(String testPropertiesFile, boolean expectKeystoreToExist, boolean expectTruststoreToExist)
throws IOException {
Path testPropertiesFilePath = getPathFromClasspath(testPropertiesFile);
NiFiProperties niFiProperties = this.configureSecureNiFiProperties(testPropertiesFilePath);
keystorePath = Paths.get(niFiProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE));
truststorePath = Paths.get(niFiProperties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE));
assertEquals(expectKeystoreToExist, Paths.get(niFiProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE)).toFile().exists());
assertEquals(expectTruststoreToExist, Paths.get(niFiProperties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE)).toFile().exists());
// Show that nifi.properties was not updated
assertEquals(getFileHash(nifiPropertiesFile), getFileHash(testPropertiesFilePath));
}
@Test
public void testSuccessfulConfiguration() throws IOException, GeneralSecurityException {
runTestWithExpectedSuccess("nifi.properties.success", Collections.emptyList());
}
@Test
public void testSuccessfulDNSSANs() throws IOException, GeneralSecurityException {
runTestWithExpectedSuccess("nifi.properties.dns-sans",
Arrays.asList("test-host", "remote-host", "proxy-host", "cluster-host"));
}
@Test
public void testNoHttps() throws IOException {
runTestWithNoExpectedUpdates("nifi.properties.no-https", !EXPECT_STORES_TO_EXIST);
}
@Test
public void testNoKeystores() throws IOException {
runTestWithNoExpectedUpdates("nifi.properties.no-keystores", !EXPECT_STORES_TO_EXIST);
}
@Test
public void testTruststorePasswordSet() throws IOException {
runTestWithNoExpectedUpdates("nifi.properties.truststore-password", !EXPECT_STORES_TO_EXIST);
}
@Test
public void testKeystorePasswordSet() throws IOException {
runTestWithNoExpectedUpdates("nifi.properties.keystore-password", !EXPECT_STORES_TO_EXIST);
}
@Test
public void test_keystoreAndTruststoreAlreadyExist() throws IOException {
runTestWithNoExpectedUpdates("nifi.properties.stores-exist", EXPECT_STORES_TO_EXIST);
}
@Test
public void testNoKeystoresTypes() throws IOException, GeneralSecurityException {
runTestWithExpectedSuccess("nifi.properties.no-keystore-types", Collections.emptyList());
}
@Test
public void testFailure_onlyTruststoreExists() throws IOException {
runTestWithNoExpectedUpdates("nifi.properties.only-truststore", !EXPECT_STORES_TO_EXIST, EXPECT_STORES_TO_EXIST);
}
@Test
public void testFailure_onlyKeystoreExists() throws IOException {
runTestWithNoExpectedUpdates("nifi.properties.only-keystore", EXPECT_STORES_TO_EXIST, !EXPECT_STORES_TO_EXIST);
}
}

View File

@ -1,38 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.json.gz
# Removing most properties for testing...
nifi.remote.input.host=remote-host
nifi.web.proxy.host=proxy-host
nifi.cluster.load.balance.host=cluster-host
# web properties #
nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=test-host
nifi.web.https.port=8443
nifi.security.keystore=./src/test/resources/keystore.p12
nifi.security.keystoreType=PKCS12
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=./src/test/resources/truststore.p12
nifi.security.truststoreType=PKCS12
nifi.security.truststorePasswd=
nifi.security.user.authorizer=

View File

@ -1,38 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.json.gz
# Removing most properties for testing...
nifi.remote.input.host=1.2.3.4
nifi.web.proxy.host=1.2.3.5
nifi.cluster.load.balance.host=1.2.3.6
# web properties #
nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=1.2.3.4
nifi.web.https.port=8443
nifi.security.keystore=./src/test/resources/keystore.p12
nifi.security.keystoreType=PKCS12
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=./src/test/resources/truststore.p12
nifi.security.truststoreType=PKCS12
nifi.security.truststorePasswd=
nifi.security.user.authorizer=

View File

@ -1,34 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.json.gz
# Removing most properties for testing...
# web properties #
nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=
nifi.web.https.port=8443
nifi.security.keystore=./src/test/resources/keystore.p12
nifi.security.keystoreType=PKCS12
nifi.security.keystorePasswd=12345
nifi.security.keyPasswd=
nifi.security.truststore=./src/test/resources/truststore.p12
nifi.security.truststoreType=PKCS12
nifi.security.truststorePasswd=
nifi.security.user.authorizer=

View File

@ -1,34 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.json.gz
# Removing most properties for testing...
# web properties #
nifi.web.http.host=
nifi.web.http.port=8080
nifi.web.https.host=
nifi.web.https.port=
nifi.security.keystore=./src/test/resources/keystore.p12
nifi.security.keystoreType=PKCS12
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=./src/test/resources/truststore.p12
nifi.security.truststoreType=PKCS12
nifi.security.truststorePasswd=
nifi.security.user.authorizer=

View File

@ -1,34 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.json.gz
# Removing most properties for testing...
# web properties #
nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=
nifi.web.https.port=8443
nifi.security.keystore=./src/test/resources/keystore.p12
nifi.security.keystoreType=
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=./src/test/resources/truststore.p12
nifi.security.truststoreType=
nifi.security.truststorePasswd=
nifi.security.user.authorizer=

View File

@ -1,34 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.json.gz
# Removing most properties for testing...
# web properties #
nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=
nifi.web.https.port=8443
nifi.security.keystore=
nifi.security.keystoreType=PKCS12
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=
nifi.security.truststoreType=PKCS12
nifi.security.truststorePasswd=
nifi.security.user.authorizer=

View File

@ -1,34 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.json.gz
# Removing most properties for testing...
# web properties #
nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=
nifi.web.https.port=8443
nifi.security.keystore=./src/test/resources/existing-keystore.p12
nifi.security.keystoreType=PKCS12
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=./src/test/resources/truststore.p12
nifi.security.truststoreType=PKCS12
nifi.security.truststorePasswd=
nifi.security.user.authorizer=

View File

@ -1,34 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.json.gz
# Removing most properties for testing...
# web properties #
nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=
nifi.web.https.port=8443
nifi.security.keystore=./src/test/resources/keystore.p12
nifi.security.keystoreType=PKCS12
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=./src/test/resources/existing-truststore.p12
nifi.security.truststoreType=PKCS12
nifi.security.truststorePasswd=
nifi.security.user.authorizer=

View File

@ -1,34 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.json.gz
# Removing most properties for testing...
# web properties #
nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=
nifi.web.https.port=8443
nifi.security.keystore=./src/test/resources/existing-keystore.p12
nifi.security.keystoreType=PKCS12
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=./src/test/resources/existing-truststore.p12
nifi.security.truststoreType=PKCS12
nifi.security.truststorePasswd=
nifi.security.user.authorizer=

View File

@ -1,34 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.json.gz
# Removing most properties for testing...
# web properties #
nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=
nifi.web.https.port=8443
nifi.security.keystore=./src/test/resources/keystore.p12
nifi.security.keystoreType=PKCS12
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=./src/test/resources/truststore.p12
nifi.security.truststoreType=PKCS12
nifi.security.truststorePasswd=
nifi.security.user.authorizer=

View File

@ -1,34 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Core Properties #
nifi.flow.configuration.file=./target/flow.json.gz
# Removing most properties for testing...
# web properties #
nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=
nifi.web.https.port=8443
nifi.security.keystore=./src/test/resources/keystore.p12
nifi.security.keystoreType=PKCS12
nifi.security.keystorePasswd=
nifi.security.keyPasswd=
nifi.security.truststore=./src/test/resources/truststore.p12
nifi.security.truststoreType=PKCS12
nifi.security.truststorePasswd=12345
nifi.security.user.authorizer=

View File

@ -43,14 +43,5 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-property-utils</artifactId> <artifactId>nifi-property-utils</artifactId>
</dependency> </dependency>
<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -20,7 +20,6 @@ import org.apache.nifi.util.security.MessageDigestUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.xml.bind.DatatypeConverter;
import java.io.File; import java.io.File;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.net.MalformedURLException; import java.net.MalformedURLException;
@ -31,6 +30,7 @@ import java.net.URLClassLoader;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HexFormat;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -151,7 +151,7 @@ public class ClassLoaderUtils {
formattedUrls.append(classloaderIsolationKey); formattedUrls.append(classloaderIsolationKey);
final byte[] formattedUrlsBinary = formattedUrls.toString().getBytes(StandardCharsets.UTF_8); final byte[] formattedUrlsBinary = formattedUrls.toString().getBytes(StandardCharsets.UTF_8);
return DatatypeConverter.printHexBinary(MessageDigestUtils.getDigest(formattedUrlsBinary)); return HexFormat.of().formatHex(MessageDigestUtils.getDigest(formattedUrlsBinary));
} }
private static long getLastModified(String url) { private static long getLastModified(String url) {

View File

@ -26,8 +26,6 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -541,7 +539,7 @@ public class TestStandardValidators {
ValidationResult vr = val.validate("foo", "invalid", vc); ValidationResult vr = val.validate("foo", "invalid", vc);
assertFalse(vr.isValid()); assertFalse(vr.isValid());
assertThat(vr.getExplanation(), containsString("Failed to evaluate the Attribute Expression Language")); assertTrue(vr.getExplanation().contains("Failed to evaluate the Attribute Expression Language"));
} }
@Test @Test

View File

@ -2974,20 +2974,6 @@ properties for minimum and maximum Java Heap size, the garbage collector to use,
|`nifi.bootstrap.sensitive.key`|The root key (in hexadecimal format) for encrypted sensitive configuration values. When NiFi is started, this root key is used to decrypt sensitive values from the _nifi.properties_ file into memory for later use. |`nifi.bootstrap.sensitive.key`|The root key (in hexadecimal format) for encrypted sensitive configuration values. When NiFi is started, this root key is used to decrypt sensitive values from the _nifi.properties_ file into memory for later use.
The Encrypt-Config Tool can be used to specify the root key, encrypt sensitive values in _nifi.properties_ and update _bootstrap.conf_. See the <<toolkit-guide.adoc#encrypt_config_tool,NiFi Toolkit Guide>> for an example. The Encrypt-Config Tool can be used to specify the root key, encrypt sensitive values in _nifi.properties_ and update _bootstrap.conf_. See the <<toolkit-guide.adoc#encrypt_config_tool,NiFi Toolkit Guide>> for an example.
|`notification.services.file`|When NiFi is started, or stopped, or when the Bootstrap detects that NiFi has died, the Bootstrap is able to send notifications of these events
to interested parties. This is configured by specifying an XML file that defines which notification services can be used. More about this
file can be found in the <<notification_services>> section.
|`notification.max.attempts`|If a notification service is configured but is unable to perform its function, it will try again up to a maximum number of attempts. This property
configures what that maximum number of attempts is. The default value is `5`.
|`nifi.start.notification.services`|This property is a comma-separated list of Notification Service identifiers that correspond to the Notification Services
defined in the `notification.services.file` property. The services with the specified identifiers will be used to notify their
configured recipients whenever NiFi is started.
|`nifi.stop.notification.services`|This property is a comma-separated list of Notification Service identifiers that correspond to the Notification Services
defined in the `notification.services.file` property. The services with the specified identifiers will be used to notify their
configured recipients whenever NiFi is stopped.
|`nifi.died.notification.services`|This property is a comma-separated list of Notification Service identifiers that correspond to the Notification Services
defined in the `notification.services.file` property. The services with the specified identifiers will be used to notify their
configured recipients if the bootstrap determines that NiFi has unexpectedly died.
|`nifi.diagnostics.on.shutdown.enabled`|(true or false) This property decides whether to run NiFi diagnostics before shutting down. The default value is `false`. |`nifi.diagnostics.on.shutdown.enabled`|(true or false) This property decides whether to run NiFi diagnostics before shutting down. The default value is `false`.
|`nifi.diagnostics.on.shutdown.verbose`|(true or false) This property decides whether to run NiFi diagnostics in verbose mode. The default value is `false`. |`nifi.diagnostics.on.shutdown.verbose`|(true or false) This property decides whether to run NiFi diagnostics in verbose mode. The default value is `false`.
|`nifi.diagnostics.on.shutdown.directory`|This property specifies the location of the NiFi diagnostics directory. The default value is `./diagnostics`. |`nifi.diagnostics.on.shutdown.directory`|This property specifies the location of the NiFi diagnostics directory. The default value is `./diagnostics`.
@ -2996,118 +2982,6 @@ configured recipients whenever NiFi is stopped.
|`nifi.bootstrap.listen.port`|This property defines the port used to listen for communications from NiFi. If this property is missing, empty, or `0`, a random ephemeral port is used. |`nifi.bootstrap.listen.port`|This property defines the port used to listen for communications from NiFi. If this property is missing, empty, or `0`, a random ephemeral port is used.
|==== |====
[[notification_services]]
== Notification Services
When the NiFi bootstrap starts or stops NiFi, or detects that it has died unexpectedly, it is able to notify configured recipients. Currently,
the only mechanisms supplied are to send an e-mail or HTTP POST notification. The notification services configuration file
is an XML file where the notification capabilities are configured.
The default location of the XML file is _conf/bootstrap-notification-services.xml_, but this value can be changed in the _conf/bootstrap.conf_ file.
The syntax of the XML file is as follows:
....
<services>
<!-- any number of service elements can be defined. -->
<service>
<id>some-identifier</id>
<!-- The fully-qualified class name of the Notification Service. -->
<class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
<!-- Any number of properties can be set using this syntax.
The properties available depend on the Notification Service. -->
<property name="Property Name 1">Property Value</property>
<property name="Another Property Name">Property Value 2</property>
</service>
</services>
....
Once the desired services have been configured, they can then be referenced in the _bootstrap.conf_ file.
=== Email Notification Service +
The first Notifier is to send emails and the implementation is `org.apache.nifi.bootstrap.notification.email.EmailNotificationService`.
It has the following properties available:
|====
|*Property*|*Required*|*Description*
|`SMTP Hostname`|true|The hostname of the SMTP Server that is used to send Email Notifications
|`SMTP Port`|true|The Port used for SMTP communications
|`SMTP Username`|true|Username for the SMTP account
|`SMTP Password`||Password for the SMTP account
|`SMTP Auth`||Flag indicating whether authentication should be used
|`SMTP TLS`||Flag indicating whether TLS should be enabled
|`SMTP Socket Factory`||`javax.net.ssl.SSLSocketFactory`
|`SMTP X-Mailer Header`||X-Mailer used in the header of the outgoing email
|`Content Type`||Mime Type used to interpret the contents of the email, such as `text/plain` or `text/html`
|`From`|true|Specifies the Email address to use as the sender. Otherwise, a "friendly name" can be used as the From address, but the value
must be enclosed in double-quotes.
|`To`||The recipients to include in the To-Line of the email
|`CC`||The recipients to include in the CC-Line of the email
|`BCC`||The recipients to include in the BCC-Line of the email
|====
In addition to the properties above that are marked as required, at least one of the `To`, `CC`, or `BCC` properties
must be set.
A complete example of configuring the Email service would look like the following:
....
<service>
<id>email-notification</id>
<class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
<property name="SMTP Hostname">smtp.gmail.com</property>
<property name="SMTP Port">587</property>
<property name="SMTP Username">username@gmail.com</property>
<property name="SMTP Password">super-secret-password</property>
<property name="SMTP TLS">true</property>
<property name="From">"NiFi Service Notifier"</property>
<property name="To">username@gmail.com</property>
</service>
....
=== HTTP Notification Service +
The second Notifier is to send HTTP POST requests and the implementation is `org.apache.nifi.bootstrap.notification.http.HttpNotificationService`.
It has the following properties available:
|====
|*Property*|*Required*|*Description*
|`URL`|true|The URL to send the notification to. Expression language is supported.
|`Connection timeout`||Max wait time for connection to remote service. Expression language is supported. This defaults to `10s`.
|`Write timeout`||Max wait time for remote service to read the request sent. Expression language is supported. This defaults to `10s`.
|`Truststore Filename`||The fully-qualified filename of the Truststore
|`Truststore Type`||The Type of the Truststore. Either `JKS` or `PKCS12`
|`Truststore Password`||The password for the Truststore
|`Keystore Filename`||The fully-qualified filename of the Keystore
|`Keystore Type`||The Type of the Keystore. Either `JKS` or `PKCS12`
|`Keystore Password`||The password for the Keystore
|`Key Password`||The password for the key. If this is not specified, but the Keystore Filename, Password, and Type are specified, then the Key Password will be assumed to be the same as the Keystore Password.
|`SSL Protocol`||The algorithm to use for this SSL context. This can either be `SSL` or `TLS`.
|====
In addition to the properties above, dynamic properties can be added. They will be added as headers to the HTTP request. Expression language is supported.
The notification message is in the body of the POST request. The type of notification is in the header "notification.type" and the subject uses the header "notification.subject".
A complete example of configuring the HTTP service could look like the following:
....
<service>
<id>http-notification</id>
<class>org.apache.nifi.bootstrap.notification.http.HttpNotificationService</class>
<property name="URL">https://testServer.com:8080/</property>
<property name="Truststore Filename">localhost-ts.jks</property>
<property name="Truststore Type">JKS</property>
<property name="Truststore Password">localtest<property>
<property name="Keystore Filename">localhost-ts.jks</property>
<property name="Keystore Type">JKS</property>
<property name="Keystore Password">localtest</property>
<property name="notification.timestamp">${now()}</property>
</service>
....
[[proxy_configuration]] [[proxy_configuration]]
== Proxy Configuration == Proxy Configuration
When running Apache NiFi behind a proxy there are a couple of key items to be aware of during deployment. When running Apache NiFi behind a proxy there are a couple of key items to be aware of during deployment.
@ -4378,7 +4252,6 @@ Use the following table to guide the update of configuration files located in `<
If you are using the `file-provider` authorizer, ensure that you copy the _users.xml_ and _authorizations.xml_ files from the existing to the new NiFi. If you are using the `file-provider` authorizer, ensure that you copy the _users.xml_ and _authorizations.xml_ files from the existing to the new NiFi.
Configuration best practices recommend creating a separate location outside of the NiFi base directory for storing such configuration files, for example: `/opt/nifi/configuration-resources/`. If you are storing these files in a separate directory, you do not need to move them. Instead, ensure that the new NiFi is pointing to the same files. Configuration best practices recommend creating a separate location outside of the NiFi base directory for storing such configuration files, for example: `/opt/nifi/configuration-resources/`. If you are storing these files in a separate directory, you do not need to move them. Instead, ensure that the new NiFi is pointing to the same files.
|_bootstrap-notification-services.xml_ | Use the existing NiFi _bootstrap-notification-services.xml_ file to update properties in the new NiFi.
|_bootstrap.conf_ | Use the existing NiFi _bootstrap.conf_ file to update properties in the new NiFi. |_bootstrap.conf_ | Use the existing NiFi _bootstrap.conf_ file to update properties in the new NiFi.
|_flow.json.gz_ | If you retained the default location for storing flows (`<installation-directory>/conf/`), copy _flow.json.gz_ from the existing to the new NiFi base install `conf` directory. If you stored flows to an external location via _nifi.properties_, update the property `nifi.flow.configuration.file` to point there. |_flow.json.gz_ | If you retained the default location for storing flows (`<installation-directory>/conf/`), copy _flow.json.gz_ from the existing to the new NiFi base install `conf` directory. If you stored flows to an external location via _nifi.properties_, update the property `nifi.flow.configuration.file` to point there.

View File

@ -66,12 +66,12 @@ import org.apache.nifi.util.StringUtils;
import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService; import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
import org.apache.nifi.accumulo.data.AccumuloRecordConfiguration; import org.apache.nifi.accumulo.data.AccumuloRecordConfiguration;
import javax.xml.bind.DatatypeConverter;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.HexFormat;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -577,7 +577,7 @@ public class PutAccumuloRecord extends BaseAccumuloProcessor {
final String delim = config.getFieldDelimiter(); final String delim = config.getFieldDelimiter();
if (!StringUtils.isEmpty(delim)) { if (!StringUtils.isEmpty(delim)) {
if (config.getEncodeDelimiter()) { if (config.getEncodeDelimiter()) {
byte [] asHex = DatatypeConverter.parseHexBinary(delim); byte [] asHex = HexFormat.of().parseHex(delim);
cq.append(asHex, 0, asHex.length); cq.append(asHex, 0, asHex.length);
}else{ }else{
cq.append(delim.getBytes(), 0, delim.length()); cq.append(delim.getBytes(), 0, delim.length());

View File

@ -25,12 +25,12 @@ import java.time.ZoneOffset;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Base64;
import java.util.List; import java.util.List;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import javax.crypto.Mac; import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec; import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPost;
@ -95,8 +95,8 @@ public abstract class AbstractAzureLogAnalyticsReportingTask extends AbstractRep
String signature = String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs", contentLength, String signature = String.format("POST\n%d\napplication/json\nx-ms-date:%s\n/api/logs", contentLength,
rfc1123Date); rfc1123Date);
Mac mac = Mac.getInstance(HMAC_SHA256_ALG); Mac mac = Mac.getInstance(HMAC_SHA256_ALG);
mac.init(new SecretKeySpec(DatatypeConverter.parseBase64Binary(key), HMAC_SHA256_ALG)); mac.init(new SecretKeySpec(Base64.getDecoder().decode(key), HMAC_SHA256_ALG));
String hmac = DatatypeConverter.printBase64Binary(mac.doFinal(signature.getBytes(UTF8))); String hmac = Base64.getEncoder().encodeToString(mac.doFinal(signature.getBytes(UTF8)));
return String.format("SharedKey %s:%s", workspaceId, hmac); return String.format("SharedKey %s:%s", workspaceId, hmac);
} catch (NoSuchAlgorithmException | InvalidKeyException e) { } catch (NoSuchAlgorithmException | InvalidKeyException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -47,6 +47,10 @@
<groupId>com.fasterxml.jackson.core</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId> <artifactId>jackson-databind</artifactId>
</dependency> </dependency>
<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId> <artifactId>nifi-distributed-cache-client-service-api</artifactId>

View File

@ -1,53 +0,0 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<services>
<!-- This file is used to define how interested parties are notified when events in NiFi's lifecycle occur. -->
<!-- The format of this file is:
<services>
<service>
<id>service-identifier</id>
<class>org.apache.nifi.notifications.DesiredNotificationService</class>
<property name="property name">property value</property>
<property name="another property">another property value</property>
</service>
</services>
This file can contain 0 to many different service definitions.
The id can then be referenced from the bootstrap.conf file in order to configure the notification service
to be used when particular lifecycle events occur.
-->
<!--
<service>
<id>email-notification</id>
<class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
<property name="SMTP Hostname"></property>
<property name="SMTP Port"></property>
<property name="SMTP Username"></property>
<property name="SMTP Password"></property>
<property name="SMTP TLS"></property>
<property name="From"></property>
<property name="To"></property>
</service>
-->
<!--
<service>
<id>http-notification</id>
<class>org.apache.nifi.bootstrap.notification.http.HttpNotificationService</class>
<property name="URL"></property>
</service>
-->
</services>

View File

@ -31,9 +31,6 @@ conf.dir=./conf
# How long to wait after telling NiFi to shutdown before explicitly killing the Process # How long to wait after telling NiFi to shutdown before explicitly killing the Process
graceful.shutdown.seconds=20 graceful.shutdown.seconds=20
# Disable JSR 199 so that we can use JSP's without running a JDK
java.arg.1=-Dorg.apache.jasper.compiler.disablejsr199=true
# JVM memory settings # JVM memory settings
java.arg.2=-Xms${nifi.jvm.heap.init} java.arg.2=-Xms${nifi.jvm.heap.init}
java.arg.3=-Xmx${nifi.jvm.heap.max} java.arg.3=-Xmx${nifi.jvm.heap.max}
@ -45,8 +42,24 @@ java.arg.3=-Xmx${nifi.jvm.heap.max}
java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
#Set headless mode by default # Enable Headless mode to avoid HeadlessException with Java AWT libraries
java.arg.14=-Djava.awt.headless=true java.arg.headless=-Djava.awt.headless=true
# Configure Apache Curator connection logging for Apache ZooKeeper to avoid excessive ERROR messages
java.arg.curatorLogOnlyFirstConnectionIssue=-Dcurator-log-only-first-connection-issue-as-error-level=true
# Requires JAAS to use only the provided JAAS configuration to authenticate a Subject, without using any "fallback" methods (such as prompting for username/password)
# Please see https://docs.oracle.com/en/java/javase/17/security/single-sign-using-kerberos-java1.html, section "EXCEPTIONS TO THE MODEL"
java.arg.securityAuthUseSubjectCredsOnly=-Djavax.security.auth.useSubjectCredsOnly=true
# The following options configure a Java Agent to handle native library loading.
# It is needed when a custom jar (eg. JDBC driver) has been configured on a component in the flow and this custom jar depends on a native library
# and tries to load it by its absolute path (java.lang.System.load(String filename) method call).
# Use this Java Agent only if you get "Native Library ... already loaded in another classloader" errors otherwise!
#java.arg.18=-javaagent:./lib/aspectj/aspectjweaver-${aspectj.version}.jar
#java.arg.19=-Daj.weaving.loadersToSkip=sun.misc.Launcher$AppClassLoader,jdk.internal.loader.ClassLoaders$AppClassLoader,org.eclipse.jetty.webapp.WebAppClassLoader,\
# org.apache.jasper.servlet.JasperLoader,org.jvnet.hk2.internal.DelegatingClassLoader,org.apache.nifi.nar.NarClassLoader
# End of Java Agent config for native library loading.
# Root key in hexadecimal format for encrypted sensitive configuration values # Root key in hexadecimal format for encrypted sensitive configuration values
nifi.bootstrap.sensitive.key= nifi.bootstrap.sensitive.key=
@ -65,55 +78,5 @@ nifi.bootstrap.sensitive.key=
# GCP KMS Sensitive Property Providers # GCP KMS Sensitive Property Providers
#nifi.bootstrap.protection.gcp.kms.conf=./conf/bootstrap-gcp.conf #nifi.bootstrap.protection.gcp.kms.conf=./conf/bootstrap-gcp.conf
# Sets the provider of SecureRandom to /dev/urandom to prevent blocking on VMs
java.arg.15=-Djava.security.egd=file:/dev/urandom
# Requires JAAS to use only the provided JAAS configuration to authenticate a Subject, without using any "fallback" methods (such as prompting for username/password)
# Please see https://docs.oracle.com/en/java/javase/17/security/single-sign-using-kerberos-java1.html, section "EXCEPTIONS TO THE MODEL"
java.arg.16=-Djavax.security.auth.useSubjectCredsOnly=true
# Zookeeper 3.5 now includes an Admin Server that starts on port 8080, since NiFi is already using that port disable by default.
# Please see https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_adminserver_config for configuration options.
java.arg.17=-Dzookeeper.admin.enableServer=false
# The following options configure a Java Agent to handle native library loading.
# It is needed when a custom jar (eg. JDBC driver) has been configured on a component in the flow and this custom jar depends on a native library
# and tries to load it by its absolute path (java.lang.System.load(String filename) method call).
# Use this Java Agent only if you get "Native Library ... already loaded in another classloader" errors otherwise!
#java.arg.18=-javaagent:./lib/aspectj/aspectjweaver-${aspectj.version}.jar
#java.arg.19=-Daj.weaving.loadersToSkip=sun.misc.Launcher$AppClassLoader,jdk.internal.loader.ClassLoaders$AppClassLoader,org.eclipse.jetty.webapp.WebAppClassLoader,\
# org.apache.jasper.servlet.JasperLoader,org.jvnet.hk2.internal.DelegatingClassLoader,org.apache.nifi.nar.NarClassLoader
# End of Java Agent config for native library loading.
# The following entry is needed in Java 21 because some libraries invoke
# reflective calls that Java no longer considers allowed by default.
# https://docs.oracle.com/en/java/javase/16/migrate/migrating-jdk-8-later-jdk-releases.html#GUID-7BB28E4D-99B3-4078-BDC4-FC24180CE82B
# This may need to be modified if additional reflective access is needed by certain libraries
# This is only known to be needed for the Hive3 processors as of now.
java.arg.20=--add-opens=java.base/java.net=ALL-UNNAMED
###
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
###
# XML File that contains the definitions of the notification services
notification.services.file=./conf/bootstrap-notification-services.xml
# In the case that we are unable to send a notification for an event, how many times should we retry?
notification.max.attempts=5
# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is started?
#nifi.start.notification.services=email-notification
# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is stopped?
#nifi.stop.notification.services=email-notification
# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi dies?
#nifi.dead.notification.services=email-notification
# The first curator connection issue is logged as ERROR, for example when NiFi cannot connect to one of the Zookeeper nodes.
# Additional connection issues are logged as DEBUG until the connection is restored.
java.arg.curator.supress.excessive.logs=-Dcurator-log-only-first-connection-issue-as-error-level=true
# Port used to listen for communications from NiFi. If this property is missing, empty, or 0, a random ephemeral port is used. # Port used to listen for communications from NiFi. If this property is missing, empty, or 0, a random ephemeral port is used.
nifi.bootstrap.listen.port=0 nifi.bootstrap.listen.port=0