mirror of https://github.com/apache/nifi.git
NIFI-4: Fixed issue with lifecycle of Controller Services
This commit is contained in:
parent
413a0b22a7
commit
a4e4af7cba
|
@ -1320,6 +1320,18 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
controllerServiceProvider.removeControllerService(serviceNode);
|
controllerServiceProvider.removeControllerService(serviceNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void enableControllerService(final ControllerServiceNode serviceNode) {
|
||||||
|
controllerServiceProvider.enableControllerService(serviceNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void disableControllerService(final ControllerServiceNode serviceNode) {
|
||||||
|
controllerServiceProvider.disableControllerService(serviceNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle a bulletins message.
|
* Handle a bulletins message.
|
||||||
*
|
*
|
||||||
|
|
|
@ -53,4 +53,17 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
|
||||||
* @throws IllegalStateException if the controller service is not disabled or is not a part of this flow
|
* @throws IllegalStateException if the controller service is not disabled or is not a part of this flow
|
||||||
*/
|
*/
|
||||||
void removeControllerService(ControllerServiceNode serviceNode);
|
void removeControllerService(ControllerServiceNode serviceNode);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enables the given controller service that it can be used by other components
|
||||||
|
* @param serviceNode
|
||||||
|
*/
|
||||||
|
void enableControllerService(ControllerServiceNode serviceNode);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Disables the given controller service so that it cannot be used by other components. This allows
|
||||||
|
* configuration to be updated or allows service to be removed.
|
||||||
|
* @param serviceNode
|
||||||
|
*/
|
||||||
|
void disableControllerService(ControllerServiceNode serviceNode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2569,16 +2569,16 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
|
||||||
processScheduler.disableReportingTask(reportingTaskNode);
|
processScheduler.disableReportingTask(reportingTaskNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void enableControllerService(final ControllerServiceNode serviceNode) {
|
public void enableControllerService(final ControllerServiceNode serviceNode) {
|
||||||
serviceNode.verifyCanEnable();
|
serviceNode.verifyCanEnable();
|
||||||
|
controllerServiceProvider.enableControllerService(serviceNode);
|
||||||
processScheduler.enableControllerService(serviceNode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void disableControllerService(final ControllerServiceNode serviceNode) {
|
public void disableControllerService(final ControllerServiceNode serviceNode) {
|
||||||
serviceNode.verifyCanDisable();
|
serviceNode.verifyCanDisable();
|
||||||
|
controllerServiceProvider.disableControllerService(serviceNode);
|
||||||
processScheduler.disableControllerService(serviceNode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -581,37 +581,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void enableControllerService(final ControllerServiceNode serviceNode) {
|
|
||||||
if ( !serviceNode.isDisabled() ) {
|
|
||||||
throw new IllegalStateException("Controller Service cannot be enabled because it is not disabled");
|
|
||||||
}
|
|
||||||
|
|
||||||
// we set the service to enabled before invoking the @OnEnabled methods. We do this because it must be
|
|
||||||
// done in this order for disabling (serviceNode.setDisabled(true) will throw Exceptions if the service
|
|
||||||
// is currently known to be in use) and we want to be consistent with the ordering of calling setDisabled
|
|
||||||
// before annotated methods.
|
|
||||||
serviceNode.setDisabled(false);
|
|
||||||
|
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
|
||||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized void disableControllerService(final ControllerServiceNode serviceNode) {
|
|
||||||
if ( serviceNode.isDisabled() ) {
|
|
||||||
throw new IllegalStateException("Controller Service cannot be disabled because it is already disabled");
|
|
||||||
}
|
|
||||||
|
|
||||||
// We must set the service to disabled before we invoke the OnDisabled methods because the service node
|
|
||||||
// can throw Exceptions if we attempt to disable the service while it's known to be in use.
|
|
||||||
serviceNode.setDisabled(true);
|
|
||||||
|
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
|
||||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isScheduled(final Object scheduled) {
|
public boolean isScheduled(final Object scheduled) {
|
||||||
final ScheduleState scheduleState = scheduleStates.get(scheduled);
|
final ScheduleState scheduleState = scheduleStates.get(scheduled);
|
||||||
|
|
|
@ -25,9 +25,7 @@ import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.StandardOpenOption;
|
import java.nio.file.StandardOpenOption;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import javax.xml.XMLConstants;
|
import javax.xml.XMLConstants;
|
||||||
import javax.xml.parsers.DocumentBuilder;
|
import javax.xml.parsers.DocumentBuilder;
|
||||||
|
@ -36,11 +34,10 @@ import javax.xml.parsers.ParserConfigurationException;
|
||||||
import javax.xml.validation.Schema;
|
import javax.xml.validation.Schema;
|
||||||
import javax.xml.validation.SchemaFactory;
|
import javax.xml.validation.SchemaFactory;
|
||||||
|
|
||||||
import org.apache.nifi.util.file.FileUtils;
|
|
||||||
import org.apache.nifi.util.DomUtils;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.nifi.util.DomUtils;
|
||||||
|
import org.apache.nifi.util.file.FileUtils;
|
||||||
import org.w3c.dom.Document;
|
import org.w3c.dom.Document;
|
||||||
import org.w3c.dom.Element;
|
import org.w3c.dom.Element;
|
||||||
import org.w3c.dom.NodeList;
|
import org.w3c.dom.NodeList;
|
||||||
|
@ -139,7 +136,7 @@ public class ControllerServiceLoader {
|
||||||
}
|
}
|
||||||
|
|
||||||
services.add(serviceNode);
|
services.add(serviceNode);
|
||||||
serviceNode.setDisabled(false);
|
provider.enableControllerService(serviceNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (SAXException | ParserConfigurationException sxe) {
|
} catch (SAXException | ParserConfigurationException sxe) {
|
||||||
|
|
|
@ -154,6 +154,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
||||||
return removed;
|
return removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
private void onConfigured() {
|
private void onConfigured() {
|
||||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider);
|
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider);
|
||||||
|
|
|
@ -31,6 +31,8 @@ import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnRemoved;
|
import org.apache.nifi.annotation.lifecycle.OnRemoved;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.controller.ControllerService;
|
import org.apache.nifi.controller.ControllerService;
|
||||||
|
@ -165,6 +167,31 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void enableControllerService(final ControllerServiceNode serviceNode) {
|
||||||
|
serviceNode.verifyCanEnable();
|
||||||
|
|
||||||
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
|
final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, this);
|
||||||
|
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation(), configContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceNode.setDisabled(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void disableControllerService(final ControllerServiceNode serviceNode) {
|
||||||
|
serviceNode.verifyCanDisable();
|
||||||
|
|
||||||
|
// We must set the service to disabled before we invoke the OnDisabled methods because the service node
|
||||||
|
// can throw Exceptions if we attempt to disable the service while it's known to be in use.
|
||||||
|
serviceNode.setDisabled(true);
|
||||||
|
|
||||||
|
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||||
|
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ControllerService getControllerService(final String serviceIdentifier) {
|
public ControllerService getControllerService(final String serviceIdentifier) {
|
||||||
|
|
|
@ -412,6 +412,16 @@ public class ControllerFacade implements ControllerServiceProvider {
|
||||||
return flowController.isControllerServiceEnabled(serviceIdentifier);
|
return flowController.isControllerServiceEnabled(serviceIdentifier);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void enableControllerService(final ControllerServiceNode serviceNode) {
|
||||||
|
flowController.enableControllerService(serviceNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void disableControllerService(ControllerServiceNode serviceNode) {
|
||||||
|
flowController.disableControllerService(serviceNode);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the status of this controller.
|
* Gets the status of this controller.
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in New Issue