mirror of https://github.com/apache/nifi.git
NIFI-948: Provide capability to register notifications for nifi lifecycle events
This commit is contained in:
parent
4a7d1fe618
commit
992e841027
|
@ -51,6 +51,7 @@
|
|||
<include>nifi-bootstrap</include>
|
||||
<include>slf4j-api</include>
|
||||
<include>logback-classic</include>
|
||||
<include>nifi-api</include>
|
||||
</includes>
|
||||
</dependencySet>
|
||||
|
||||
|
|
|
@ -1,32 +1,46 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi</artifactId>
|
||||
<version>0.3.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-bootstrap</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
|
||||
license agreements. See the NOTICE file distributed with this work for additional
|
||||
information regarding copyright ownership. The ASF licenses this file to
|
||||
You under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
this file except in compliance with the License. You may obtain a copy of
|
||||
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
|
||||
by applicable law or agreed to in writing, software distributed under the
|
||||
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
|
||||
OF ANY KIND, either express or implied. See the License for the specific
|
||||
language governing permissions and limitations under the License. -->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi</artifactId>
|
||||
<version>0.3.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-bootstrap</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.mail</groupId>
|
||||
<artifactId>mail</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-expression-language</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,430 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.bootstrap;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
|
||||
import org.apache.nifi.bootstrap.notification.NotificationContext;
|
||||
import org.apache.nifi.bootstrap.notification.NotificationInitializationContext;
|
||||
import org.apache.nifi.bootstrap.notification.NotificationService;
|
||||
import org.apache.nifi.bootstrap.notification.NotificationType;
|
||||
import org.apache.nifi.bootstrap.notification.NotificationValidationContext;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.Node;
|
||||
import org.w3c.dom.NodeList;
|
||||
import org.xml.sax.InputSource;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
public class NotificationServiceManager {
|
||||
private static final Logger logger = LoggerFactory.getLogger(NotificationServiceManager.class);
|
||||
private final Map<String, ConfiguredNotificationService> servicesById = new HashMap<>();
|
||||
private final Map<NotificationType, List<ConfiguredNotificationService>> servicesByNotificationType = new HashMap<>();
|
||||
|
||||
private final ScheduledExecutorService notificationExecutor;
|
||||
private int maxAttempts = 5;
|
||||
|
||||
public NotificationServiceManager() {
|
||||
notificationExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(final Runnable r) {
|
||||
final Thread t = Executors.defaultThreadFactory().newThread(r);
|
||||
t.setName("Notification Service Dispatcher");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void setMaxNotificationAttempts(final int maxAttempts) {
|
||||
this.maxAttempts = maxAttempts;
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads the Notification Services from the given XML configuration file.
|
||||
*
|
||||
* File is expected to have the following format:
|
||||
*
|
||||
* <pre>
|
||||
* <services>
|
||||
* <service>
|
||||
* <id>service-identifier</id>
|
||||
* <class>org.apache.nifi.MyNotificationService</class>
|
||||
* <property name="My First Property">Property Value</property>
|
||||
* </service>
|
||||
* <service>
|
||||
* <id>other-service</id>
|
||||
* <class>org.apache.nifi.MyOtherNotificationService</class>
|
||||
* <property name="Another Property">Property Value 2</property>
|
||||
* </service>
|
||||
* ...
|
||||
* <service>
|
||||
* <id>service-identifier-2</id>
|
||||
* <class>org.apache.nifi.FinalNotificationService</class>
|
||||
* <property name="Yet Another Property">3rd Prop Value</property>
|
||||
* </service>
|
||||
* </services>
|
||||
* </pre>
|
||||
*
|
||||
* Note that as long as the file can be interpreted properly, a misconfigured service will result in a warning
|
||||
* or error being logged and the service will be unavailable but will not prevent the rest of the services from loading.
|
||||
*
|
||||
* @param servicesFile the XML file to load services from.
|
||||
* @throws IOException if unable to read from the given file
|
||||
* @throws ParserConfigurationException if unable to parse the given file as XML properly
|
||||
* @throws SAXException if unable to parse the given file properly
|
||||
*/
|
||||
public void loadNotificationServices(final File servicesFile) throws IOException, ParserConfigurationException, SAXException {
|
||||
final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
|
||||
docBuilderFactory.setNamespaceAware(false);
|
||||
final DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
|
||||
|
||||
final Map<String, ConfiguredNotificationService> serviceMap = new HashMap<>();
|
||||
try (final InputStream fis = new FileInputStream(servicesFile);
|
||||
final InputStream in = new BufferedInputStream(fis)) {
|
||||
|
||||
final Document doc = docBuilder.parse(new InputSource(in));
|
||||
final List<Element> serviceElements = getChildElementsByTagName(doc.getDocumentElement(), "service");
|
||||
logger.debug("Found {} service elements", serviceElements.size());
|
||||
|
||||
for (final Element serviceElement : serviceElements) {
|
||||
final ConfiguredNotificationService config = createService(serviceElement);
|
||||
final NotificationService service = config.getService();
|
||||
|
||||
if (service == null) {
|
||||
continue; // reason will have already been logged, so just move on.
|
||||
}
|
||||
|
||||
final String id = service.getIdentifier();
|
||||
if (serviceMap.containsKey(id)) {
|
||||
logger.error("Found two different Notification Services configured with the same ID: '{}'. Loaded the first service.", id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if the service is valid; if not, warn now so that users know this before they fail to receive notifications
|
||||
final ValidationContext validationContext = new NotificationValidationContext(buildNotificationContext(config));
|
||||
final Collection<ValidationResult> validationResults = service.validate(validationContext);
|
||||
final List<String> invalidReasons = new ArrayList<>();
|
||||
|
||||
for (final ValidationResult result : validationResults) {
|
||||
if (!result.isValid()) {
|
||||
invalidReasons.add(result.toString());
|
||||
}
|
||||
}
|
||||
|
||||
if (!invalidReasons.isEmpty()) {
|
||||
logger.warn("Configured Notification Service {} is not valid for the following reasons: {}", service, invalidReasons);
|
||||
}
|
||||
|
||||
serviceMap.put(id, config);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("Successfully loaded the following {} services: {}", serviceMap.size(), serviceMap.keySet());
|
||||
|
||||
servicesById.clear();
|
||||
servicesById.putAll(serviceMap);
|
||||
}
|
||||
|
||||
public void notify(final NotificationType type, final String subject, final String message) {
|
||||
final List<ConfiguredNotificationService> configs = servicesByNotificationType.get(type);
|
||||
if (configs == null || configs.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (final ConfiguredNotificationService config : configs) {
|
||||
final NotificationService service = config.getService();
|
||||
final AtomicInteger attemptCount = new AtomicInteger(0);
|
||||
|
||||
notificationExecutor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Check if the service is valid; if not, warn now so that users know this before they fail to receive notifications
|
||||
final ValidationContext validationContext = new NotificationValidationContext(buildNotificationContext(config));
|
||||
final Collection<ValidationResult> validationResults = service.validate(validationContext);
|
||||
final List<String> invalidReasons = new ArrayList<>();
|
||||
|
||||
for (final ValidationResult result : validationResults) {
|
||||
if (!result.isValid()) {
|
||||
invalidReasons.add(result.toString());
|
||||
}
|
||||
}
|
||||
|
||||
// If the service is valid, attempt to send the notification
|
||||
boolean failure = false;
|
||||
if (invalidReasons.isEmpty()) {
|
||||
final NotificationContext context = buildNotificationContext(config);
|
||||
try {
|
||||
service.notify(context, subject, message);
|
||||
logger.info("Successfully sent notification of type {} to {}", type, service);
|
||||
} catch (final Throwable t) { // keep running even if a Throwable is caught because we need to ensure that we are able to restart NiFi
|
||||
logger.error("Failed to send notification of type {} to {} with Subject {} due to {}. Will ",
|
||||
type, service == null ? "Unknown Notification Service" : service.toString(), subject, t.toString());
|
||||
logger.error("", t);
|
||||
failure = true;
|
||||
}
|
||||
} else {
|
||||
logger.warn("Notification Service {} is not valid for the following reasons: {}", service, invalidReasons);
|
||||
failure = true;
|
||||
}
|
||||
|
||||
final int attempts = attemptCount.incrementAndGet();
|
||||
if (failure) {
|
||||
if (attempts < maxAttempts) {
|
||||
logger.info("After failing to send notification to {} {} times, will attempt again in 1 minute", service, attempts);
|
||||
notificationExecutor.schedule(this, 1, TimeUnit.MINUTES);
|
||||
} else {
|
||||
logger.info("After failing to send notification of type {} to {} {} times, will no longer attempt to send notification", type, service, attempts);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (NotificationType.NIFI_STOPPED.equals(type)) {
|
||||
// If we are stopping NiFi, we want to block until we've tried to send the notification at least once before
|
||||
// we return. We do this because the executor used to run the task marks the threads as daemon, and on shutdown
|
||||
// we don't want to return before the notifier has had a chance to perform its task.
|
||||
while (attemptCount.get() == 0) {
|
||||
try {
|
||||
Thread.sleep(1000L);
|
||||
} catch (final InterruptedException ie) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private NotificationContext buildNotificationContext(final ConfiguredNotificationService config) {
|
||||
return new NotificationContext() {
|
||||
@Override
|
||||
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
|
||||
final PropertyDescriptor fullPropDescriptor = config.getService().getPropertyDescriptor(descriptor.getName());
|
||||
if (fullPropDescriptor == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
String configuredValue = config.getProperties().get(fullPropDescriptor.getName());
|
||||
if (configuredValue == null) {
|
||||
configuredValue = fullPropDescriptor.getDefaultValue();
|
||||
}
|
||||
|
||||
return new NotificationServicePropertyValue(configuredValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<PropertyDescriptor, String> getProperties() {
|
||||
final Map<PropertyDescriptor, String> props = new HashMap<>();
|
||||
final Map<String, String> configuredProps = config.getProperties();
|
||||
|
||||
final NotificationService service = config.getService();
|
||||
for (final PropertyDescriptor descriptor : service.getPropertyDescriptors()) {
|
||||
final String configuredValue = configuredProps.get(descriptor.getName());
|
||||
if (configuredValue == null) {
|
||||
props.put(descriptor, descriptor.getDefaultValue());
|
||||
} else {
|
||||
props.put(descriptor, configuredValue);
|
||||
}
|
||||
}
|
||||
|
||||
return props;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the service that has the given identifier to respond to notifications of the given type
|
||||
*
|
||||
* @param type the type of notification to register the service for
|
||||
* @param serviceId the identifier of the service
|
||||
*/
|
||||
public void registerNotificationService(final NotificationType type, final String serviceId) {
|
||||
final ConfiguredNotificationService service = servicesById.get(serviceId);
|
||||
if (service == null) {
|
||||
throw new IllegalArgumentException("No Notification Service exists with ID " + serviceId);
|
||||
}
|
||||
|
||||
List<ConfiguredNotificationService> services = servicesByNotificationType.get(type);
|
||||
if (services == null) {
|
||||
services = new ArrayList<>();
|
||||
servicesByNotificationType.put(type, services);
|
||||
}
|
||||
|
||||
services.add(service);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Notification Service and initializes it. Then returns the service and its configured properties
|
||||
*
|
||||
* @param serviceElement the XML element from which to build the Notification Service
|
||||
* @return a Tuple with the NotificationService as the key and the configured properties as the value, or <code>null</code> if
|
||||
* unable to create the service
|
||||
*/
|
||||
private ConfiguredNotificationService createService(final Element serviceElement) {
|
||||
final Element idElement = getChild(serviceElement, "id");
|
||||
if (idElement == null) {
|
||||
logger.error("Found configuration for Notification Service with no 'id' element; this service cannot be referenced so it will not be loaded");
|
||||
return null;
|
||||
}
|
||||
|
||||
final String serviceId = idElement.getTextContent().trim();
|
||||
logger.debug("Loading Notification Service with ID {}", serviceId);
|
||||
|
||||
final Element classElement = getChild(serviceElement, "class");
|
||||
if (classElement == null) {
|
||||
logger.error("Found configuration for Notification Service with no 'class' element; Service ID is '{}'. This service annot be loaded", serviceId);
|
||||
return null;
|
||||
}
|
||||
|
||||
final String className = classElement.getTextContent().trim();
|
||||
final Class<?> clazz;
|
||||
try {
|
||||
clazz = Class.forName(className);
|
||||
} catch (final Exception e) {
|
||||
logger.error("Found configuration for Notification Service with ID '{}' and Class '{}' but could not load class.", serviceId, className);
|
||||
logger.error("", e);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (!NotificationService.class.isAssignableFrom(clazz)) {
|
||||
logger.error("Found configuration for Notification Service with ID '{}' and Class '{}' but class is not a Notification Service.", serviceId, className);
|
||||
return null;
|
||||
}
|
||||
|
||||
final Object serviceObject;
|
||||
try {
|
||||
serviceObject = clazz.newInstance();
|
||||
} catch (final Exception e) {
|
||||
logger.error("Found configuration for Notification Service with ID '{}' and Class '{}' but could not instantiate Notification Service.", serviceId, className);
|
||||
logger.error("", e);
|
||||
return null;
|
||||
}
|
||||
|
||||
final Map<String, String> propertyValues = new HashMap<>();
|
||||
final List<Element> propertyElements = getChildElementsByTagName(serviceElement, "property");
|
||||
for (final Element propertyElement : propertyElements) {
|
||||
final String propName = propertyElement.getAttribute("name");
|
||||
if (propName == null || propName.trim().isEmpty()) {
|
||||
logger.warn("Found configuration for Notification Service with ID '{}' that has property value configured but no name for the property.", serviceId);
|
||||
continue;
|
||||
}
|
||||
|
||||
final String propValue = propertyElement.getTextContent().trim();
|
||||
propertyValues.put(propName, propValue);
|
||||
}
|
||||
|
||||
final NotificationService service = (NotificationService) serviceObject;
|
||||
|
||||
try {
|
||||
service.initialize(new NotificationInitializationContext() {
|
||||
@Override
|
||||
public PropertyValue getProperty(final PropertyDescriptor descriptor) {
|
||||
final String propName = descriptor.getName();
|
||||
String value = propertyValues.get(propName);
|
||||
if (value == null) {
|
||||
value = descriptor.getDefaultValue();
|
||||
}
|
||||
|
||||
return new NotificationServicePropertyValue(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getIdentifier() {
|
||||
return serviceId;
|
||||
}
|
||||
});
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to load Notification Service with ID '{}'", serviceId);
|
||||
logger.error("", e);
|
||||
}
|
||||
|
||||
return new ConfiguredNotificationService(service, propertyValues);
|
||||
}
|
||||
|
||||
public static Element getChild(final Element element, final String tagName) {
|
||||
final List<Element> children = getChildElementsByTagName(element, tagName);
|
||||
if (children.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (children.size() > 1) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return children.get(0);
|
||||
}
|
||||
|
||||
public static List<Element> getChildElementsByTagName(final Element element, final String tagName) {
|
||||
final List<Element> matches = new ArrayList<>();
|
||||
final NodeList nodeList = element.getChildNodes();
|
||||
for (int i = 0; i < nodeList.getLength(); i++) {
|
||||
final Node node = nodeList.item(i);
|
||||
if (!(node instanceof Element)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
final Element child = (Element) nodeList.item(i);
|
||||
if (child.getNodeName().equals(tagName)) {
|
||||
matches.add(child);
|
||||
}
|
||||
}
|
||||
|
||||
return matches;
|
||||
}
|
||||
|
||||
private static class ConfiguredNotificationService {
|
||||
private final NotificationService service;
|
||||
private final Map<String, String> properties;
|
||||
|
||||
public ConfiguredNotificationService(final NotificationService service, final Map<String, String> properties) {
|
||||
this.service = service;
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public NotificationService getService() {
|
||||
return service;
|
||||
}
|
||||
|
||||
public Map<String, String> getProperties() {
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.bootstrap;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.nifi.attribute.expression.language.PreparedQuery;
|
||||
import org.apache.nifi.attribute.expression.language.Query;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.expression.AttributeValueDecorator;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
|
||||
public class NotificationServicePropertyValue implements PropertyValue {
|
||||
private final String rawValue;
|
||||
private final PreparedQuery preparedQuery;
|
||||
|
||||
public NotificationServicePropertyValue(final String rawValue) {
|
||||
this.rawValue = rawValue;
|
||||
this.preparedQuery = Query.prepare(rawValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getValue() {
|
||||
return rawValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer asInteger() {
|
||||
return (rawValue == null) ? null : Integer.parseInt(rawValue.trim());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long asLong() {
|
||||
return (rawValue == null) ? null : Long.parseLong(rawValue.trim());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean asBoolean() {
|
||||
return (rawValue == null) ? null : Boolean.parseBoolean(rawValue.trim());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Float asFloat() {
|
||||
return (rawValue == null) ? null : Float.parseFloat(rawValue.trim());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double asDouble() {
|
||||
return (rawValue == null) ? null : Double.parseDouble(rawValue.trim());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long asTimePeriod(final TimeUnit timeUnit) {
|
||||
return (rawValue == null) ? null : FormatUtils.getTimeDuration(rawValue.trim(), timeUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double asDataSize(final DataUnit dataUnit) {
|
||||
return rawValue == null ? null : DataUnit.parseDataSize(rawValue.trim(), dataUnit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PropertyValue evaluateAttributeExpressions() throws ProcessException {
|
||||
return evaluateAttributeExpressions((AttributeValueDecorator) null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PropertyValue evaluateAttributeExpressions(final AttributeValueDecorator decorator) throws ProcessException {
|
||||
return new NotificationServicePropertyValue(preparedQuery.evaluateExpressions((FlowFile) null, decorator));
|
||||
}
|
||||
|
||||
@Override
|
||||
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile) throws ProcessException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PropertyValue evaluateAttributeExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return rawValue;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ControllerService asControllerService() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends ControllerService> T asControllerService(final Class<T> serviceType) throws IllegalArgumentException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSet() {
|
||||
return rawValue != null;
|
||||
}
|
||||
|
||||
}
|
|
@ -28,11 +28,13 @@ import java.io.InputStreamReader;
|
|||
import java.io.OutputStream;
|
||||
import java.io.Reader;
|
||||
import java.lang.reflect.Field;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.attribute.PosixFilePermission;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
|
@ -50,6 +52,7 @@ import java.util.concurrent.locks.Condition;
|
|||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.nifi.bootstrap.notification.NotificationType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -78,6 +81,14 @@ public class RunNiFi {
|
|||
public static final String GRACEFUL_SHUTDOWN_PROP = "graceful.shutdown.seconds";
|
||||
public static final String DEFAULT_GRACEFUL_SHUTDOWN_VALUE = "20";
|
||||
|
||||
public static final String NOTIFICATION_SERVICES_FILE_PROP = "notification.services.file";
|
||||
public static final String DEFAULT_NOTIFICATION_SERVICES_FILE = "./conf/bootstrap-notification-services.xml";
|
||||
public static final String NOTIFICATION_ATTEMPTS_PROP = "notification.max.attempts";
|
||||
|
||||
public static final String NIFI_START_NOTIFICATION_SERVICE_IDS_PROP = "nifi.start.notification.services";
|
||||
public static final String NIFI_STOP_NOTIFICATION_SERVICE_IDS_PROP = "nifi.stop.notification.services";
|
||||
public static final String NIFI_DEAD_NOTIFICATION_SERVICE_IDS_PROP = "nifi.dead.notification.services";
|
||||
|
||||
public static final String RUN_AS_PROP = "run.as";
|
||||
|
||||
public static final int MAX_RESTART_ATTEMPTS = 5;
|
||||
|
@ -108,8 +119,9 @@ public class RunNiFi {
|
|||
|
||||
private final ExecutorService loggingExecutor;
|
||||
private volatile Set<Future<?>> loggingFutures = new HashSet<>(2);
|
||||
private final NotificationServiceManager serviceManager;
|
||||
|
||||
public RunNiFi(final File bootstrapConfigFile, final boolean verbose) {
|
||||
public RunNiFi(final File bootstrapConfigFile, final boolean verbose) throws IOException {
|
||||
this.bootstrapConfigFile = bootstrapConfigFile;
|
||||
|
||||
loggingExecutor = Executors.newFixedThreadPool(2, new ThreadFactory() {
|
||||
|
@ -121,6 +133,8 @@ public class RunNiFi {
|
|||
return t;
|
||||
}
|
||||
});
|
||||
|
||||
serviceManager = loadServices();
|
||||
}
|
||||
|
||||
private static void printUsage() {
|
||||
|
@ -178,23 +192,7 @@ public class RunNiFi {
|
|||
return;
|
||||
}
|
||||
|
||||
String configFilename = System.getProperty("org.apache.nifi.bootstrap.config.file");
|
||||
|
||||
if (configFilename == null) {
|
||||
final String nifiHome = System.getenv("NIFI_HOME");
|
||||
if (nifiHome != null) {
|
||||
final File nifiHomeFile = new File(nifiHome.trim());
|
||||
final File configFile = new File(nifiHomeFile, DEFAULT_CONFIG_FILE);
|
||||
configFilename = configFile.getAbsolutePath();
|
||||
}
|
||||
}
|
||||
|
||||
if (configFilename == null) {
|
||||
configFilename = DEFAULT_CONFIG_FILE;
|
||||
}
|
||||
|
||||
final File configFile = new File(configFilename);
|
||||
|
||||
final File configFile = getBootstrapConfFile();
|
||||
final RunNiFi runNiFi = new RunNiFi(configFile, verbose);
|
||||
|
||||
switch (cmd.toLowerCase()) {
|
||||
|
@ -220,6 +218,106 @@ public class RunNiFi {
|
|||
}
|
||||
}
|
||||
|
||||
private static File getBootstrapConfFile() {
|
||||
String configFilename = System.getProperty("org.apache.nifi.bootstrap.config.file");
|
||||
|
||||
if (configFilename == null) {
|
||||
final String nifiHome = System.getenv("NIFI_HOME");
|
||||
if (nifiHome != null) {
|
||||
final File nifiHomeFile = new File(nifiHome.trim());
|
||||
final File configFile = new File(nifiHomeFile, DEFAULT_CONFIG_FILE);
|
||||
configFilename = configFile.getAbsolutePath();
|
||||
}
|
||||
}
|
||||
|
||||
if (configFilename == null) {
|
||||
configFilename = DEFAULT_CONFIG_FILE;
|
||||
}
|
||||
|
||||
final File configFile = new File(configFilename);
|
||||
return configFile;
|
||||
}
|
||||
|
||||
private NotificationServiceManager loadServices() throws IOException {
|
||||
final File bootstrapConfFile = getBootstrapConfFile();
|
||||
final Properties properties = new Properties();
|
||||
try (final FileInputStream fis = new FileInputStream(bootstrapConfFile)) {
|
||||
properties.load(fis);
|
||||
}
|
||||
|
||||
final NotificationServiceManager manager = new NotificationServiceManager();
|
||||
final String attemptProp = properties.getProperty(NOTIFICATION_ATTEMPTS_PROP);
|
||||
if (attemptProp != null) {
|
||||
try {
|
||||
final int maxAttempts = Integer.parseInt(attemptProp.trim());
|
||||
if (maxAttempts >= 0) {
|
||||
manager.setMaxNotificationAttempts(maxAttempts);
|
||||
}
|
||||
} catch (final NumberFormatException nfe) {
|
||||
defaultLogger.error("Maximum number of attempts to send notification email is set to an invalid value of {}; will use default value", attemptProp);
|
||||
}
|
||||
}
|
||||
|
||||
final String notificationServicesXmlFilename = properties.getProperty(NOTIFICATION_SERVICES_FILE_PROP);
|
||||
if (notificationServicesXmlFilename == null) {
|
||||
defaultLogger.info("No Bootstrap Notification Services configured.");
|
||||
return manager;
|
||||
}
|
||||
|
||||
final File xmlFile = new File(notificationServicesXmlFilename);
|
||||
final File servicesFile;
|
||||
|
||||
if (xmlFile.isAbsolute()) {
|
||||
servicesFile = xmlFile;
|
||||
} else {
|
||||
final File confDir = bootstrapConfigFile.getParentFile();
|
||||
final File nifiHome = confDir.getParentFile();
|
||||
servicesFile = new File(nifiHome, notificationServicesXmlFilename);
|
||||
}
|
||||
|
||||
if (!servicesFile.exists()) {
|
||||
defaultLogger.error("Bootstrap Notification Services file configured as " + servicesFile + " but could not find file; will not load notification services");
|
||||
return manager;
|
||||
}
|
||||
|
||||
try {
|
||||
manager.loadNotificationServices(servicesFile);
|
||||
} catch (final Exception e) {
|
||||
defaultLogger.error("Bootstrap Notification Services file configured as " + servicesFile + " but failed to load notification services", e);
|
||||
}
|
||||
|
||||
registerNotificationServices(manager, NotificationType.NIFI_STARTED, properties.getProperty(NIFI_START_NOTIFICATION_SERVICE_IDS_PROP));
|
||||
registerNotificationServices(manager, NotificationType.NIFI_STOPPED, properties.getProperty(NIFI_STOP_NOTIFICATION_SERVICE_IDS_PROP));
|
||||
registerNotificationServices(manager, NotificationType.NIFI_DIED, properties.getProperty(NIFI_DEAD_NOTIFICATION_SERVICE_IDS_PROP));
|
||||
|
||||
return manager;
|
||||
}
|
||||
|
||||
private void registerNotificationServices(final NotificationServiceManager manager, final NotificationType type, final String serviceIds) {
|
||||
if (serviceIds == null) {
|
||||
defaultLogger.info("Registered no Notification Services for Notification Type {}", type);
|
||||
return;
|
||||
}
|
||||
|
||||
int registered = 0;
|
||||
for (final String id : serviceIds.split(",")) {
|
||||
final String trimmed = id.trim();
|
||||
if (trimmed.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
manager.registerNotificationService(type, trimmed);
|
||||
registered++;
|
||||
} catch (final Exception e) {
|
||||
defaultLogger.warn("Failed to register Notification Service with ID {} for Notifications of type {} due to {}", trimmed, type, e.toString());
|
||||
defaultLogger.error("", e);
|
||||
}
|
||||
}
|
||||
|
||||
defaultLogger.info("Registered {} Notification Services for Notification Type {}", registered, type);
|
||||
}
|
||||
|
||||
File getStatusFile() {
|
||||
return getStatusFile(defaultLogger);
|
||||
}
|
||||
|
@ -500,6 +598,19 @@ public class RunNiFi {
|
|||
}
|
||||
}
|
||||
|
||||
public void notifyStop() {
|
||||
final String hostname = getHostname();
|
||||
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
|
||||
final String now = sdf.format(System.currentTimeMillis());
|
||||
String user = System.getProperty("user.name");
|
||||
if (user == null || user.trim().isEmpty()) {
|
||||
user = "Unknown User";
|
||||
}
|
||||
|
||||
serviceManager.notify(NotificationType.NIFI_STOPPED, "NiFi Stopped on Host " + hostname,
|
||||
"Hello,\n\nApache NiFi has been told to initiate a shutdown on host " + hostname + " at " + now + " by user " + user);
|
||||
}
|
||||
|
||||
public void stop() throws IOException {
|
||||
final Logger logger = cmdLogger;
|
||||
final Integer port = getCurrentPort(logger);
|
||||
|
@ -558,6 +669,7 @@ public class RunNiFi {
|
|||
gracefulShutdownSeconds = Integer.parseInt(DEFAULT_GRACEFUL_SHUTDOWN_VALUE);
|
||||
}
|
||||
|
||||
notifyStop();
|
||||
final long startWait = System.nanoTime();
|
||||
while (isProcessRunning(pid, logger)) {
|
||||
logger.info("Waiting for Apache NiFi to finish shutting down...");
|
||||
|
@ -636,6 +748,20 @@ public class RunNiFi {
|
|||
}
|
||||
}
|
||||
|
||||
private String getHostname() {
|
||||
String hostname = "Unknown Host";
|
||||
String ip = "Unknown IP Address";
|
||||
try {
|
||||
final InetAddress localhost = InetAddress.getLocalHost();
|
||||
hostname = localhost.getHostName();
|
||||
ip = localhost.getHostAddress();
|
||||
} catch (final Exception e) {
|
||||
defaultLogger.warn("Failed to obtain hostname for notification due to:", e);
|
||||
}
|
||||
|
||||
return hostname + " (" + ip + ")";
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public void start() throws IOException, InterruptedException {
|
||||
final Integer port = getCurrentPort(cmdLogger);
|
||||
|
@ -808,6 +934,15 @@ public class RunNiFi {
|
|||
final Runtime runtime = Runtime.getRuntime();
|
||||
runtime.addShutdownHook(shutdownHook);
|
||||
|
||||
final String hostname = getHostname();
|
||||
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");
|
||||
String now = sdf.format(System.currentTimeMillis());
|
||||
String user = System.getProperty("user.name");
|
||||
if (user == null || user.trim().isEmpty()) {
|
||||
user = "Unknown User";
|
||||
}
|
||||
serviceManager.notify(NotificationType.NIFI_STARTED, "NiFi Started on Host " + hostname, "Hello,\n\nApache NiFi has been started on host " + hostname + " at " + now + " by user " + user);
|
||||
|
||||
while (true) {
|
||||
final boolean alive = isAlive(process);
|
||||
|
||||
|
@ -823,6 +958,7 @@ public class RunNiFi {
|
|||
// happens when already shutting down
|
||||
}
|
||||
|
||||
now = sdf.format(System.currentTimeMillis());
|
||||
if (autoRestartNiFi) {
|
||||
final File statusFile = getStatusFile(defaultLogger);
|
||||
if (!statusFile.exists()) {
|
||||
|
@ -863,8 +999,17 @@ public class RunNiFi {
|
|||
|
||||
if (started) {
|
||||
defaultLogger.info("Successfully started Apache NiFi{}", (pid == null ? "" : " with PID " + pid));
|
||||
// We are expected to restart nifi, so send a notification that it died. If we are not restarting nifi,
|
||||
// then this means that we are intentionally stopping the service.
|
||||
serviceManager.notify(NotificationType.NIFI_DIED, "NiFi Died on Host " + hostname,
|
||||
"Hello,\n\nIt appears that Apache NiFi has died on host " + hostname + " at " + now + "; automatically restarting NiFi");
|
||||
} else {
|
||||
defaultLogger.error("Apache NiFi does not appear to have started");
|
||||
// We are expected to restart nifi, so send a notification that it died. If we are not restarting nifi,
|
||||
// then this means that we are intentionally stopping the service.
|
||||
serviceManager.notify(NotificationType.NIFI_DIED, "NiFi Died on Host " + hostname,
|
||||
"Hello,\n\nIt appears that Apache NiFi has died on host " + hostname + " at " + now +
|
||||
". Attempted to restart NiFi but the services does not appear to have restarted!");
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
|
|
|
@ -65,6 +65,7 @@ public class ShutdownHook extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
runner.notifyStop();
|
||||
System.out.println("Waiting for Apache NiFi to finish shutting down...");
|
||||
final long startWait = System.nanoTime();
|
||||
while (RunNiFi.isAlive(nifiProcess)) {
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.bootstrap.notification;
|
||||
|
||||
import org.apache.nifi.components.AbstractConfigurableComponent;
|
||||
|
||||
public abstract class AbstractNotificationService extends AbstractConfigurableComponent implements NotificationService {
|
||||
private String identifier; // effectively final
|
||||
|
||||
@Override
|
||||
public final void initialize(final NotificationInitializationContext context) {
|
||||
this.identifier = context.getIdentifier();
|
||||
init(context);
|
||||
}
|
||||
|
||||
protected void init(final NotificationInitializationContext context) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getIdentifier() {
|
||||
return identifier;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.bootstrap.notification;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
|
||||
public interface NotificationContext {
|
||||
|
||||
/**
|
||||
* Returns the configured value for the given PropertyDescriptor. Note that the implementation
|
||||
* of PropertyValue will throw an Exception if calling {@link PropertyValue#asControllerService()} or
|
||||
* {@link PropertyValue#asControllerService(Class)}, as Controller Services are not allowed to be
|
||||
* referenced by Notification Services
|
||||
*
|
||||
* @param descriptor the property whose value should be returned
|
||||
* @return the configured value for the given PropertyDescriptor, or the default
|
||||
* value for the PropertyDescriptor if no value has been configured
|
||||
*/
|
||||
PropertyValue getProperty(PropertyDescriptor descriptor);
|
||||
|
||||
/**
|
||||
* @return a Map of all PropertyDescriptors to their configured values. This
|
||||
* Map may or may not be modifiable, but modifying its values will not
|
||||
* change the values of the processor's properties
|
||||
*/
|
||||
Map<PropertyDescriptor, String> getProperties();
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.bootstrap.notification;
|
||||
|
||||
public class NotificationFailedException extends Exception {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public NotificationFailedException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public NotificationFailedException(final String message, final Throwable t) {
|
||||
super(message, t);
|
||||
}
|
||||
|
||||
public NotificationFailedException(final Throwable t) {
|
||||
super(t);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.bootstrap.notification;
|
||||
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
|
||||
public interface NotificationInitializationContext {
|
||||
|
||||
/**
|
||||
* Returns the configured value for the given PropertyDescriptor
|
||||
*
|
||||
* @param descriptor the property to fetch the value for
|
||||
* @return the configured value for the given PropertyDescriptor, or the default value for the PropertyDescriptor
|
||||
* if no value has been configured.
|
||||
*/
|
||||
PropertyValue getProperty(PropertyDescriptor descriptor);
|
||||
|
||||
/**
|
||||
* @return the identifier for the NotificationService
|
||||
*/
|
||||
String getIdentifier();
|
||||
|
||||
}
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.bootstrap.notification;
|
||||
|
||||
import org.apache.nifi.components.ConfigurableComponent;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* A NotificationService is simple mechanism that the Bootstrap can use to notify
|
||||
* interested parties when some event takes place, such as NiFi being started, stopped,
|
||||
* or restarted because the process died.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* <b>Note:</b> This feature was introduced in version 0.3.0 of NiFi and is likely to undergo
|
||||
* significant refactorings. As such, at this time it is NOT considered a public API and may well
|
||||
* change from version to version until the API has stabilized. At that point, it will become a public
|
||||
* API.
|
||||
* </p>
|
||||
*
|
||||
* @since 0.3.0
|
||||
*/
|
||||
public interface NotificationService extends ConfigurableComponent {
|
||||
|
||||
/**
|
||||
* Provides the NotificatoinService with access to objects that may be of use
|
||||
* throughout the life of the service
|
||||
*
|
||||
* @param context of initialization
|
||||
*/
|
||||
void initialize(NotificationInitializationContext context);
|
||||
|
||||
/**
|
||||
* Notifies the configured recipients of some event
|
||||
*
|
||||
* @param context the context that is relevant for this notification
|
||||
* @param subject the subject of the message
|
||||
* @param message the message to be provided to recipients
|
||||
*/
|
||||
void notify(NotificationContext context, String subject, String message) throws NotificationFailedException;
|
||||
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.bootstrap.notification;
|
||||
|
||||
public enum NotificationType {
|
||||
NIFI_STARTED,
|
||||
NIFI_STOPPED,
|
||||
NIFI_DIED;
|
||||
}
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.bootstrap.notification;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.nifi.attribute.expression.language.Query;
|
||||
import org.apache.nifi.attribute.expression.language.Query.Range;
|
||||
import org.apache.nifi.attribute.expression.language.StandardExpressionLanguageCompiler;
|
||||
import org.apache.nifi.bootstrap.NotificationServicePropertyValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.controller.ControllerServiceLookup;
|
||||
import org.apache.nifi.expression.ExpressionLanguageCompiler;
|
||||
|
||||
public class NotificationValidationContext implements ValidationContext {
|
||||
private final NotificationContext context;
|
||||
private final Map<String, Boolean> expressionLanguageSupported;
|
||||
|
||||
public NotificationValidationContext(final NotificationContext processContext) {
|
||||
this.context = processContext;
|
||||
|
||||
final Map<PropertyDescriptor, String> properties = processContext.getProperties();
|
||||
expressionLanguageSupported = new HashMap<>(properties.size());
|
||||
for (final PropertyDescriptor descriptor : properties.keySet()) {
|
||||
expressionLanguageSupported.put(descriptor.getName(), descriptor.isExpressionLanguageSupported());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public PropertyValue newPropertyValue(final String rawValue) {
|
||||
return new NotificationServicePropertyValue(rawValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExpressionLanguageCompiler newExpressionLanguageCompiler() {
|
||||
return new StandardExpressionLanguageCompiler();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValidationContext getControllerServiceValidationContext(final ControllerService controllerService) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PropertyValue getProperty(final PropertyDescriptor property) {
|
||||
return context.getProperty(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<PropertyDescriptor, String> getProperties() {
|
||||
return context.getProperties();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getAnnotationData() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ControllerServiceLookup getControllerServiceLookup() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isValidationRequired(final ControllerService service) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isExpressionLanguagePresent(final String value) {
|
||||
if (value == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final List<Range> elRanges = Query.extractExpressionRanges(value);
|
||||
return (elRanges != null && !elRanges.isEmpty());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isExpressionLanguageSupported(final String propertyName) {
|
||||
final Boolean supported = expressionLanguageSupported.get(propertyName);
|
||||
return Boolean.TRUE.equals(supported);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,289 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.bootstrap.notification.email;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import javax.mail.Authenticator;
|
||||
import javax.mail.Message;
|
||||
import javax.mail.MessagingException;
|
||||
import javax.mail.PasswordAuthentication;
|
||||
import javax.mail.Session;
|
||||
import javax.mail.Transport;
|
||||
import javax.mail.Message.RecipientType;
|
||||
import javax.mail.internet.AddressException;
|
||||
import javax.mail.internet.InternetAddress;
|
||||
import javax.mail.internet.MimeMessage;
|
||||
import org.apache.nifi.bootstrap.notification.AbstractNotificationService;
|
||||
import org.apache.nifi.bootstrap.notification.NotificationContext;
|
||||
import org.apache.nifi.bootstrap.notification.NotificationFailedException;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
public class EmailNotificationService extends AbstractNotificationService {
|
||||
|
||||
public static final PropertyDescriptor SMTP_HOSTNAME = new PropertyDescriptor.Builder()
|
||||
.name("SMTP Hostname")
|
||||
.description("The hostname of the SMTP Server that is used to send Email Notifications")
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor SMTP_PORT = new PropertyDescriptor.Builder()
|
||||
.name("SMTP Port")
|
||||
.description("The Port used for SMTP communications")
|
||||
.required(true)
|
||||
.defaultValue("25")
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor SMTP_USERNAME = new PropertyDescriptor.Builder()
|
||||
.name("SMTP Username")
|
||||
.description("Username for the SMTP account")
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(false)
|
||||
.build();
|
||||
public static final PropertyDescriptor SMTP_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("SMTP Password")
|
||||
.description("Password for the SMTP account")
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.required(false)
|
||||
.sensitive(true)
|
||||
.build();
|
||||
public static final PropertyDescriptor SMTP_AUTH = new PropertyDescriptor.Builder()
|
||||
.name("SMTP Auth")
|
||||
.description("Flag indicating whether authentication should be used")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.defaultValue("true")
|
||||
.build();
|
||||
public static final PropertyDescriptor SMTP_TLS = new PropertyDescriptor.Builder()
|
||||
.name("SMTP TLS")
|
||||
.description("Flag indicating whether TLS should be enabled")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.defaultValue("false")
|
||||
.build();
|
||||
public static final PropertyDescriptor SMTP_SOCKET_FACTORY = new PropertyDescriptor.Builder()
|
||||
.name("SMTP Socket Factory")
|
||||
.description("Socket Factory to use for SMTP Connection")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.defaultValue("javax.net.ssl.SSLSocketFactory")
|
||||
.build();
|
||||
public static final PropertyDescriptor HEADER_XMAILER = new PropertyDescriptor.Builder()
|
||||
.name("SMTP X-Mailer Header")
|
||||
.description("X-Mailer used in the header of the outgoing email")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.defaultValue("NiFi")
|
||||
.build();
|
||||
public static final PropertyDescriptor CONTENT_TYPE = new PropertyDescriptor.Builder()
|
||||
.name("Content Type")
|
||||
.description("Mime Type used to interpret the contents of the email, such as text/plain or text/html")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.defaultValue("text/plain")
|
||||
.build();
|
||||
public static final PropertyDescriptor FROM = new PropertyDescriptor.Builder()
|
||||
.name("From")
|
||||
.description("Specifies the Email address to use as the sender")
|
||||
.required(true)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor TO = new PropertyDescriptor.Builder()
|
||||
.name("To")
|
||||
.description("The recipients to include in the To-Line of the email")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor CC = new PropertyDescriptor.Builder()
|
||||
.name("CC")
|
||||
.description("The recipients to include in the CC-Line of the email")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor BCC = new PropertyDescriptor.Builder()
|
||||
.name("BCC")
|
||||
.description("The recipients to include in the BCC-Line of the email")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* Mapping of the mail properties to the NiFi PropertyDescriptors that will be evaluated at runtime
|
||||
*/
|
||||
private static final Map<String, PropertyDescriptor> propertyToContext = new HashMap<>();
|
||||
|
||||
static {
|
||||
propertyToContext.put("mail.smtp.host", SMTP_HOSTNAME);
|
||||
propertyToContext.put("mail.smtp.port", SMTP_PORT);
|
||||
propertyToContext.put("mail.smtp.socketFactory.port", SMTP_PORT);
|
||||
propertyToContext.put("mail.smtp.socketFactory.class", SMTP_SOCKET_FACTORY);
|
||||
propertyToContext.put("mail.smtp.auth", SMTP_AUTH);
|
||||
propertyToContext.put("mail.smtp.starttls.enable", SMTP_TLS);
|
||||
propertyToContext.put("mail.smtp.user", SMTP_USERNAME);
|
||||
propertyToContext.put("mail.smtp.password", SMTP_PASSWORD);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(SMTP_HOSTNAME);
|
||||
properties.add(SMTP_PORT);
|
||||
properties.add(SMTP_USERNAME);
|
||||
properties.add(SMTP_PASSWORD);
|
||||
properties.add(SMTP_AUTH);
|
||||
properties.add(SMTP_TLS);
|
||||
properties.add(SMTP_SOCKET_FACTORY);
|
||||
properties.add(HEADER_XMAILER);
|
||||
properties.add(CONTENT_TYPE);
|
||||
properties.add(FROM);
|
||||
properties.add(TO);
|
||||
properties.add(CC);
|
||||
properties.add(BCC);
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
|
||||
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
|
||||
|
||||
final String to = context.getProperty(TO).getValue();
|
||||
final String cc = context.getProperty(CC).getValue();
|
||||
final String bcc = context.getProperty(BCC).getValue();
|
||||
|
||||
if (to == null && cc == null && bcc == null) {
|
||||
errors.add(new ValidationResult.Builder().subject("To, CC, BCC").valid(false).explanation("Must specify at least one To/CC/BCC address").build());
|
||||
}
|
||||
|
||||
return errors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notify(final NotificationContext context, final String subject, final String messageText) throws NotificationFailedException {
|
||||
final Properties properties = getMailProperties(context);
|
||||
final Session mailSession = createMailSession(properties);
|
||||
final Message message = new MimeMessage(mailSession);
|
||||
|
||||
try {
|
||||
message.setFrom(InternetAddress.parse(context.getProperty(FROM).evaluateAttributeExpressions().getValue())[0]);
|
||||
|
||||
final InternetAddress[] toAddresses = toInetAddresses(context.getProperty(TO).evaluateAttributeExpressions().getValue());
|
||||
message.setRecipients(RecipientType.TO, toAddresses);
|
||||
|
||||
final InternetAddress[] ccAddresses = toInetAddresses(context.getProperty(CC).evaluateAttributeExpressions().getValue());
|
||||
message.setRecipients(RecipientType.CC, ccAddresses);
|
||||
|
||||
final InternetAddress[] bccAddresses = toInetAddresses(context.getProperty(BCC).evaluateAttributeExpressions().getValue());
|
||||
message.setRecipients(RecipientType.BCC, bccAddresses);
|
||||
|
||||
message.setHeader("X-Mailer", context.getProperty(HEADER_XMAILER).evaluateAttributeExpressions().getValue());
|
||||
message.setSubject(subject);
|
||||
|
||||
final String contentType = context.getProperty(CONTENT_TYPE).evaluateAttributeExpressions().getValue();
|
||||
message.setContent(messageText, contentType);
|
||||
message.setSentDate(new Date());
|
||||
|
||||
Transport.send(message);
|
||||
} catch (final ProcessException | MessagingException e) {
|
||||
throw new NotificationFailedException("Failed to send E-mail Notification", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an array of 0 or more InternetAddresses for the given String
|
||||
*
|
||||
* @param val the String to parse for InternetAddresses
|
||||
* @return an array of 0 or more InetAddresses
|
||||
* @throws AddressException if val contains an invalid address
|
||||
*/
|
||||
private static InternetAddress[] toInetAddresses(final String val) throws AddressException {
|
||||
if (val == null) {
|
||||
return new InternetAddress[0];
|
||||
}
|
||||
return InternetAddress.parse(val);
|
||||
}
|
||||
|
||||
/**
|
||||
* Uses the mapping of javax.mail properties to NiFi PropertyDescriptors to build the required Properties object to be used for sending this email
|
||||
*
|
||||
* @param context context
|
||||
* @return mail properties
|
||||
*/
|
||||
private Properties getMailProperties(final NotificationContext context) {
|
||||
final Properties properties = new Properties();
|
||||
|
||||
for (Entry<String, PropertyDescriptor> entry : propertyToContext.entrySet()) {
|
||||
// Evaluate the property descriptor against the flow file
|
||||
String property = entry.getKey();
|
||||
String propValue = context.getProperty(entry.getValue()).evaluateAttributeExpressions().getValue();
|
||||
|
||||
// Nullable values are not allowed, so filter out
|
||||
if (null != propValue) {
|
||||
properties.setProperty(property, propValue);
|
||||
}
|
||||
}
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
||||
/**
|
||||
* Based on the input properties, determine whether an authenticate or unauthenticated session should be used. If authenticated, creates a Password Authenticator for use in sending the email.
|
||||
*
|
||||
* @param properties mail properties
|
||||
* @return session
|
||||
*/
|
||||
private Session createMailSession(final Properties properties) {
|
||||
String authValue = properties.getProperty("mail.smtp.auth");
|
||||
Boolean auth = Boolean.valueOf(authValue);
|
||||
|
||||
/*
|
||||
* Conditionally create a password authenticator if the 'auth' parameter is set.
|
||||
*/
|
||||
final Session mailSession = auth ? Session.getInstance(properties, new Authenticator() {
|
||||
@Override
|
||||
public PasswordAuthentication getPasswordAuthentication() {
|
||||
String username = properties.getProperty("mail.smtp.user"), password = properties.getProperty("mail.smtp.password");
|
||||
return new PasswordAuthentication(username, password);
|
||||
}
|
||||
}) : Session.getInstance(properties); // without auth
|
||||
|
||||
return mailSession;
|
||||
}
|
||||
|
||||
}
|
|
@ -375,10 +375,119 @@ additivity="false">
|
|||
|
||||
|
||||
|
||||
[[bootstrap_properties]]
|
||||
Bootstrap Properties
|
||||
--------------------
|
||||
The _bootstrap.conf_ file in the _conf_ directory allows users to configure settings for how NiFi should be started.
|
||||
This includes parameters, such as the size of the Java Heap, what Java command to run, and Java System Properties.
|
||||
|
||||
Here, we will address the different properties that are made available in the file. Any chances to this file will
|
||||
take affect only after NiFi has been stopped and restarted.
|
||||
|
||||
|====
|
||||
|*Property*|*Description*
|
||||
|java|Specifies the fully qualified java command to run. By default, it is simply `java` but could be changed to an absolute path or a reference an environment variable, such as `$JAVA_HOME/bin/java`
|
||||
|run.as|The username to run NiFi as. For instance, if NiFi should be run as the 'nifi' user, setting this value to 'nifi' will cause the NiFi Process to be run as the 'nifi' user.
|
||||
This property is ignored on Windows. For Linux, the specified user may require sudo permissions.
|
||||
|lib.dir|The _lib_ directory to use for NiFi. By default, this is set to `./lib`
|
||||
|conf.dir|The _conf_ directory to use for NiFi. By default, this is set to `./conf`
|
||||
|graceful.shutdown.seconds|When NiFi is instructed to shutdown, the Bootstrap will wait this number of seconds for the process to shutdown cleanly. At this amount of time,
|
||||
if the service is still running, the Bootstrap will "kill" the process, or terminate it abruptly.
|
||||
|java.arg.N|Any number of JVM arguments can be passed to the NiFi JVM when the process is started. These arguments are defined by adding properties to _bootstrap.conf_ that
|
||||
begin with `java.arg.`. The rest of the property name is not relevant, other than to different property names, and will be ignored. The default includes
|
||||
properties for minimum and maximum Java Heap size, the garbage collector to use, etc.
|
||||
|notification.services.file|When NiFi is started, or stopped, or when the Bootstrap detects that NiFi has died, the Bootstrap is able to send notifications of these events
|
||||
to interested parties. This is configured by specifying an XML file that defines which notification services can be used. More about this
|
||||
file can be found in the <<notification_services>> section.
|
||||
|notification.max.attempts|If a notification service is configured but is unable to perform its function, it will try again up to a maximum number of attempts. This property
|
||||
configures what that maximum number of attempts is. The default is `5`.
|
||||
|nifi.start.notification.services|This property is a comma-separated list of Notification Service identifiers that correspond to the Notification Services
|
||||
defined in the `notification.services.file` property. The services with the specified identifiers will be used to notify their
|
||||
configured recipients whenever NiFi is started.
|
||||
|nifi.stop.notification.services|This property is a comma-separated list of Notification Service identifiers that correspond to the Notification Services
|
||||
defined in the `notification.services.file` property. The services with the specified identifiers will be used to notify their
|
||||
configured recipients whenever NiFi is stopped.
|
||||
|nifi.died.notification.services|This property is a comma-separated list of Notification Service identifiers that correspond to the Notification Services
|
||||
defined in the `notification.services.file` property. The services with the specified identifiers will be used to notify their
|
||||
configured recipients if the bootstrap determines that NiFi has unexpectedly died.
|
||||
|====
|
||||
|
||||
|
||||
|
||||
[[notification_services]]
|
||||
Notification Services
|
||||
---------------------
|
||||
When the NiFi bootstrap starts or stops NiFi, or detects that it has died unexpectedly, it is able to notify configured recipients. At this point
|
||||
(version 0.3.0 of Apache NiFi), the only mechanism supplied is to send an e-mail notification. The notification services configuration file, however,
|
||||
is a configurable XML file so that as new notification capabilities are developed, they will be configured similarly.
|
||||
|
||||
The default location of the XML file is _conf/bootstrap-notification-services.xml_, but this value can be changed in the _conf/bootstrap.conf_ file.
|
||||
|
||||
The syntax of the XML file is as follows:
|
||||
|
||||
....
|
||||
<services>
|
||||
<!-- any number of service elements can be defined. -->
|
||||
<service>
|
||||
<id>some-identifier</id>
|
||||
<!-- The fully-qualified class name of the Notification Service. -->
|
||||
<class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
|
||||
|
||||
<!-- Any number of properties can be set using this syntax.
|
||||
The properties available depend on the Notification Service. -->
|
||||
<property name="Property Name 1">Property Value</property>
|
||||
<property name="Another Property Name">Property Value 2</property>
|
||||
</service>
|
||||
</services>
|
||||
....
|
||||
|
||||
Once the desired services have been configured, they can then be referenced in the _bootstrap.conf_ file.
|
||||
Currently, the only implementation is the `org.apache.nifi.bootstrap.notification.email.EmailNotificationService` implementation.
|
||||
It has the following properties available:
|
||||
|
||||
|====
|
||||
|*Property*|*Required*|*Description*
|
||||
|SMTP Hostname|true|The hostname of the SMTP Server that is used to send Email Notifications
|
||||
|SMTP Port|true|The Port used for SMTP communications
|
||||
|SMTP Username|true|Username for the SMTP account
|
||||
|SMTP Password||Password for the SMTP account
|
||||
|SMTP Auth||Flag indicating whether authentication should be used
|
||||
|SMTP TLS||Flag indicating whether TLS should be enabled
|
||||
|SMTP Socket Factory||javax.net.ssl.SSLSocketFactory
|
||||
|SMTP X-Mailer Header||X-Mailer used in the header of the outgoing email
|
||||
|Content Type||Mime Type used to interpret the contents of the email, such as text/plain or text/html
|
||||
|From|true|Specifies the Email address to use as the sender. Otherwise, a "friendly name" can be used as the From address, but the value
|
||||
must be enclosed in double-quotes.
|
||||
|To||The recipients to include in the To-Line of the email
|
||||
|CC||The recipients to include in the CC-Line of the email
|
||||
|BCC||The recipients to include in the BCC-Line of the email
|
||||
|====
|
||||
|
||||
|
||||
In addition to the properties above that are marked as required, at least one of the `To`, `CC`, or `BCC` properties
|
||||
must be set.
|
||||
|
||||
A complete example of configuring the Email service would look like the following:
|
||||
|
||||
....
|
||||
<service>
|
||||
<id>email-notification</id>
|
||||
<class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
|
||||
<property name="SMTP Hostname">smtp.gmail.com</property>
|
||||
<property name="SMTP Port">587</property>
|
||||
<property name="SMTP Username">username@gmail.com</property>
|
||||
<property name="SMTP Password">super-secret-password</property>
|
||||
<property name="SMTP TLS">true</property>
|
||||
<property name="From">"NiFi Service Notifier"</property>
|
||||
<property name="To">username@gmail.com</property>
|
||||
</service>
|
||||
....
|
||||
|
||||
|
||||
[[system_properties]]
|
||||
System Properties
|
||||
-----------------
|
||||
The _nifi.properties_ file in the conf directory is the main configuration file for controlling how NiFi runs. This section provides an overview of the properties in this file and includes some notes on how to configure it in a way that will make upgrading easier. *After making changes to this file, restart NiFi in order
|
||||
The _nifi.properties_ file in the _conf_ directory is the main configuration file for controlling how NiFi runs. This section provides an overview of the properties in this file and includes some notes on how to configure it in a way that will make upgrading easier. *After making changes to this file, restart NiFi in order
|
||||
for the changes to take effect.*
|
||||
|
||||
NOTE: The contents of this file are relatively stable but do change from time to time. It is always a good idea to
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
<?xml version="1.0"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<services>
|
||||
<!-- This file is used to define how interested parties are notified when events in NiFi's lifecycle occur. -->
|
||||
<!-- The format of this file is:
|
||||
<services>
|
||||
<service>
|
||||
<id>service-identifier</id>
|
||||
<class>org.apache.nifi.notifications.DesiredNotificationService</class>
|
||||
<property name="property name">property value</property>
|
||||
<property name="another property">another property value</property>
|
||||
</service>
|
||||
</services>
|
||||
|
||||
This file can contain 0 to many different service definitions.
|
||||
The id can then be referenced from the bootstrap.conf file in order to configure the notification service
|
||||
to be used when particular lifecycle events occur.
|
||||
-->
|
||||
|
||||
<!--
|
||||
<service>
|
||||
<id>email-notification</id>
|
||||
<class>org.apache.nifi.bootstrap.notification.email.EmailNotificationService</class>
|
||||
<property name="SMTP Hostname"></property>
|
||||
<property name="SMTP Port"></property>
|
||||
<property name="SMTP Username"></property>
|
||||
<property name="SMTP Password"></property>
|
||||
<property name="SMTP TLS"></property>
|
||||
<property name="From"></property>
|
||||
<property name="To"></property>
|
||||
</service>
|
||||
-->
|
||||
</services>
|
|
@ -58,3 +58,23 @@ java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
|
|||
|
||||
#Set headless mode by default
|
||||
java.arg.14=-Djava.awt.headless=true
|
||||
|
||||
|
||||
###
|
||||
# Notification Services for notifying interested parties when NiFi is stopped, started, dies
|
||||
###
|
||||
|
||||
# XML File that contains the definitions of the notification services
|
||||
notification.services.file=./conf/bootstrap-notification-services.xml
|
||||
|
||||
# In the case that we are unable to send a notification for an event, how many times should we retry?
|
||||
notification.max.attempts=5
|
||||
|
||||
# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is started?
|
||||
#nifi.start.notification.services=email-notification
|
||||
|
||||
# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi is stopped?
|
||||
#nifi.stop.notification.services=email-notification
|
||||
|
||||
# Comma-separated list of identifiers that are present in the notification.services.file; which services should be used to notify when NiFi dies?
|
||||
#nifi.dead.notification.services=email-notification
|
Loading…
Reference in New Issue