mirror of https://github.com/apache/nifi.git
NIFI-277: Added verifyCanXX methods
This commit is contained in:
parent
7bcfc93d6e
commit
d734220d1e
|
@ -53,4 +53,20 @@ public interface ReportingTaskNode extends ConfiguredComponent {
|
|||
ConfigurationContext getConfigurationContext();
|
||||
|
||||
boolean isRunning();
|
||||
|
||||
/**
|
||||
* Indicates the {@link ScheduledState} of this <code>ReportingTask</code>. A
|
||||
* value of stopped does NOT indicate that the <code>ReportingTask</code> has
|
||||
* no active threads, only that it is not currently scheduled to be given
|
||||
* any more threads. To determine whether or not the
|
||||
* <code>ReportingTask</code> has any active threads, see
|
||||
* {@link ProcessScheduler#getActiveThreadCount(ReportingTask)}.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
ScheduledState getScheduledState();
|
||||
|
||||
void setScheduledState(ScheduledState state);
|
||||
|
||||
void verifyCanDelete();
|
||||
}
|
||||
|
|
|
@ -22,7 +22,9 @@ import org.apache.nifi.controller.ControllerService;
|
|||
|
||||
public interface ControllerServiceNode extends ConfiguredComponent {
|
||||
|
||||
ControllerService getControllerService();
|
||||
ControllerService getProxiedControllerService();
|
||||
|
||||
ControllerService getControllerServiceImplementation();
|
||||
|
||||
Availability getAvailability();
|
||||
|
||||
|
@ -37,4 +39,6 @@ public interface ControllerServiceNode extends ConfiguredComponent {
|
|||
void addReference(ConfiguredComponent referringComponent);
|
||||
|
||||
void removeReference(ConfiguredComponent referringComponent);
|
||||
|
||||
void verifyCanDelete();
|
||||
}
|
||||
|
|
|
@ -19,18 +19,25 @@ package org.apache.nifi.controller.reporting;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
||||
import org.apache.nifi.controller.AbstractConfiguredComponent;
|
||||
import org.apache.nifi.controller.Availability;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.ControllerServiceLookup;
|
||||
import org.apache.nifi.controller.ProcessScheduler;
|
||||
import org.apache.nifi.controller.ReportingTaskNode;
|
||||
import org.apache.nifi.controller.ScheduledState;
|
||||
import org.apache.nifi.controller.StandardProcessorNode;
|
||||
import org.apache.nifi.controller.ValidationContextFactory;
|
||||
import org.apache.nifi.controller.annotation.OnConfigured;
|
||||
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
|
||||
import org.apache.nifi.controller.service.ControllerServiceProvider;
|
||||
import org.apache.nifi.controller.service.StandardConfigurationContext;
|
||||
import org.apache.nifi.nar.NarCloseable;
|
||||
import org.apache.nifi.reporting.ReportingTask;
|
||||
import org.apache.nifi.scheduling.SchedulingStrategy;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
import org.apache.nifi.util.ReflectionUtils;
|
||||
|
||||
public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode {
|
||||
|
||||
|
@ -42,6 +49,8 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
|
|||
private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins");
|
||||
private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
|
||||
|
||||
private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
|
||||
|
||||
public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
|
||||
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
|
||||
final ValidationContextFactory validationContextFactory) {
|
||||
|
@ -108,4 +117,46 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledState getScheduledState() {
|
||||
return scheduledState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setScheduledState(final ScheduledState state) {
|
||||
this.scheduledState = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProperty(final String name, final String value) {
|
||||
super.setProperty(name, value);
|
||||
|
||||
onConfigured();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeProperty(String name) {
|
||||
final boolean removed = super.removeProperty(name);
|
||||
if ( removed ) {
|
||||
onConfigured();
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
||||
private void onConfigured() {
|
||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup);
|
||||
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext);
|
||||
} catch (final Exception e) {
|
||||
throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void verifyCanDelete() {
|
||||
if (isRunning()) {
|
||||
throw new IllegalStateException(this + " is running");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,13 +26,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||
|
||||
import org.apache.nifi.controller.AbstractConfiguredComponent;
|
||||
import org.apache.nifi.controller.Availability;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.ConfiguredComponent;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.controller.ValidationContextFactory;
|
||||
import org.apache.nifi.controller.annotation.OnConfigured;
|
||||
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
|
||||
import org.apache.nifi.nar.NarCloseable;
|
||||
import org.apache.nifi.util.ReflectionUtils;
|
||||
|
||||
public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
|
||||
|
||||
private final ControllerService controllerService;
|
||||
private final ControllerService proxedControllerService;
|
||||
private final ControllerService implementation;
|
||||
private final ControllerServiceProvider serviceProvider;
|
||||
|
||||
private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
|
||||
private final AtomicBoolean disabled = new AtomicBoolean(true);
|
||||
|
@ -43,10 +50,12 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
|||
|
||||
private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
|
||||
|
||||
public StandardControllerServiceNode(final ControllerService controllerService, final String id,
|
||||
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
|
||||
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
|
||||
super(controllerService, id, validationContextFactory, serviceProvider);
|
||||
this.controllerService = controllerService;
|
||||
super(proxiedControllerService, id, validationContextFactory, serviceProvider);
|
||||
this.proxedControllerService = proxiedControllerService;
|
||||
this.implementation = implementation;
|
||||
this.serviceProvider = serviceProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -57,7 +66,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
|||
@Override
|
||||
public void setDisabled(final boolean disabled) {
|
||||
if (!disabled && !isValid()) {
|
||||
throw new IllegalStateException("Cannot enable Controller Service " + controllerService + " because it is not valid");
|
||||
throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid");
|
||||
}
|
||||
|
||||
if (disabled) {
|
||||
|
@ -82,8 +91,13 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
|||
}
|
||||
|
||||
@Override
|
||||
public ControllerService getControllerService() {
|
||||
return controllerService;
|
||||
public ControllerService getProxiedControllerService() {
|
||||
return proxedControllerService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ControllerService getControllerServiceImplementation() {
|
||||
return implementation;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -122,4 +136,37 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
|||
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setProperty(final String name, final String value) {
|
||||
super.setProperty(name, value);
|
||||
|
||||
onConfigured();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeProperty(String name) {
|
||||
final boolean removed = super.removeProperty(name);
|
||||
if ( removed ) {
|
||||
onConfigured();
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
||||
private void onConfigured() {
|
||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||
final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider);
|
||||
ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, implementation, configContext);
|
||||
} catch (final Exception e) {
|
||||
throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void verifyCanDelete() {
|
||||
if ( !isDisabled() ) {
|
||||
throw new IllegalStateException(this + " cannot be deleted because it has not been disabled");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue