mirror of https://github.com/apache/nifi.git
NIFI-259, NIFI-1339: Added OnConfigurationRestored annotation, always invoke onPropertyModified even on restart when properties are changed from defaults, as was done previously
This commit is contained in:
parent
5b62ff0fc3
commit
eba25ecaca
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.annotation.lifecycle;
|
||||
|
||||
import java.lang.annotation.Documented;
|
||||
import java.lang.annotation.ElementType;
|
||||
import java.lang.annotation.Inherited;
|
||||
import java.lang.annotation.Retention;
|
||||
import java.lang.annotation.RetentionPolicy;
|
||||
import java.lang.annotation.Target;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Marker Annotation that a Processor, Reporting Task, or Controller Service can use to indicate
|
||||
* that the method with this Annotation should be invoked whenever the component's configuration
|
||||
* is restored after a restart of NiFi.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* Methods with this annotation must take zero arguments.
|
||||
* </p>
|
||||
*
|
||||
* <p>
|
||||
* Whenever a new component is added to the flow, this method will be called immediately, since
|
||||
* there is no configuration to restore (in this case all configuration has already been restored,
|
||||
* since there is no configuration to restore).
|
||||
* </p>
|
||||
*
|
||||
* @since 0.5.0
|
||||
*/
|
||||
@Documented
|
||||
@Target({ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Inherited
|
||||
public @interface OnConfigurationRestored {
|
||||
|
||||
}
|
|
@ -19,6 +19,8 @@ package org.apache.nifi.components;
|
|||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
|
||||
|
||||
public interface ConfigurableComponent {
|
||||
|
||||
/**
|
||||
|
@ -49,6 +51,14 @@ public interface ConfigurableComponent {
|
|||
* necessary lazily evaluate it. Any throwable that escapes this method will
|
||||
* simply be ignored.
|
||||
*
|
||||
* When NiFi is restarted, this method will be called for each 'dynamic' property that is
|
||||
* added, as well as for each property that is not set to the default value. I.e., if the
|
||||
* Properties are modified from the default values. If it is undesirable for your use case
|
||||
* to react to properties being modified in this situation, you can add the {@link OnConfigurationRestored}
|
||||
* annotation to a method - this will allow the Processor to know when configuration has
|
||||
* been restored, so that it can determine whether or not to perform some action in the
|
||||
* onPropertyModified method.
|
||||
*
|
||||
* @param descriptor the descriptor for the property being modified
|
||||
* @param oldValue the value that was previously set, or null if no value
|
||||
* was previously set for this property
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.nifi.processor;
|
|||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
|
||||
import org.apache.nifi.components.AbstractConfigurableComponent;
|
||||
|
@ -47,6 +48,7 @@ public abstract class AbstractSessionFactoryProcessor extends AbstractConfigurab
|
|||
private String identifier;
|
||||
private ProcessorLog logger;
|
||||
private volatile boolean scheduled = false;
|
||||
private volatile boolean configurationRestored = false;
|
||||
private ControllerServiceLookup serviceLookup;
|
||||
private String description;
|
||||
|
||||
|
@ -104,6 +106,22 @@ public abstract class AbstractSessionFactoryProcessor extends AbstractConfigurab
|
|||
scheduled = false;
|
||||
}
|
||||
|
||||
@OnConfigurationRestored
|
||||
public final void updateConfiguredRestoredTrue() {
|
||||
configurationRestored = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a boolean indicating whether or not the configuration of the Processor has already been restored.
|
||||
* See the {@link OnConfigurationRestored} annotation for more information about what it means for the configuration
|
||||
* to be restored.
|
||||
*
|
||||
* @return <code>true</code> if configuration has been restored, <code>false</code> otherwise.
|
||||
*/
|
||||
protected boolean isConfigurationRestored() {
|
||||
return configurationRestored;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String getIdentifier() {
|
||||
return identifier;
|
||||
|
|
|
@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
|
||||
import org.apache.nifi.annotation.behavior.TriggerSerially;
|
||||
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
||||
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
|
||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnRemoved;
|
||||
|
@ -119,6 +120,8 @@ public class StandardProcessorTestRunner implements TestRunner {
|
|||
}
|
||||
|
||||
triggerSerially = null != processor.getClass().getAnnotation(TriggerSerially.class);
|
||||
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
|
|
|
@ -71,6 +71,7 @@ import javax.xml.transform.stream.StreamResult;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.admin.service.AuditService;
|
||||
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
||||
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
|
||||
import org.apache.nifi.annotation.lifecycle.OnRemoved;
|
||||
import org.apache.nifi.cluster.BulletinsPayload;
|
||||
import org.apache.nifi.cluster.HeartbeatPayload;
|
||||
|
@ -559,6 +560,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
if (serializedReportingTasks != null && serializedReportingTasks.length > 0) {
|
||||
loadReportingTasks(serializedReportingTasks);
|
||||
}
|
||||
|
||||
notifyComponentsConfigurationRestored();
|
||||
} catch (final IOException ioe) {
|
||||
logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
|
||||
stop();
|
||||
|
@ -695,6 +698,25 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private void notifyComponentsConfigurationRestored() {
|
||||
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
|
||||
final ControllerService service = serviceNode.getControllerServiceImplementation();
|
||||
|
||||
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
|
||||
}
|
||||
}
|
||||
|
||||
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
|
||||
final ReportingTask task = taskNode.getReportingTask();
|
||||
|
||||
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Services connection requests. If the data flow management service is unable to provide a current copy of the data flow, then the returned connection response will indicate the node should try
|
||||
* later. Otherwise, the connection response will contain the the flow and the node identifier.
|
||||
|
@ -1053,7 +1075,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) {
|
||||
if (entry.getValue() != null) {
|
||||
reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue(), false);
|
||||
reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1128,6 +1150,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
if (firstTimeAdded) {
|
||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask());
|
||||
} catch (final Exception e) {
|
||||
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
|
||||
}
|
||||
|
@ -1434,6 +1457,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
|||
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
|
||||
new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
|
||||
|
||||
if (firstTimeAdded) {
|
||||
final ControllerService service = serviceNode.getControllerServiceImplementation();
|
||||
|
||||
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
|
||||
}
|
||||
}
|
||||
|
||||
return serviceNode;
|
||||
}
|
||||
|
||||
|
|
|
@ -85,7 +85,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setProperty(final String name, final String value, final boolean triggerOnPropertyModified) {
|
||||
public void setProperty(final String name, final String value) {
|
||||
if (null == name || null == value) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
@ -114,7 +114,6 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
|
|||
}
|
||||
}
|
||||
|
||||
if (triggerOnPropertyModified) {
|
||||
try {
|
||||
component.onPropertyModified(descriptor, oldValue, value);
|
||||
} catch (final Exception e) {
|
||||
|
@ -122,7 +121,6 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
@ -135,12 +133,11 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
|
|||
* if was a dynamic property.
|
||||
*
|
||||
* @param name the property to remove
|
||||
* @param triggerOnPropertyModified specifies whether or not the onPropertyModified method should be called
|
||||
* @return true if removed; false otherwise
|
||||
* @throws java.lang.IllegalArgumentException if the name is null
|
||||
*/
|
||||
@Override
|
||||
public boolean removeProperty(final String name, final boolean triggerOnPropertyModified) {
|
||||
public boolean removeProperty(final String name) {
|
||||
if (null == name) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
@ -163,8 +160,10 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
|
|||
}
|
||||
}
|
||||
|
||||
if (triggerOnPropertyModified) {
|
||||
try {
|
||||
component.onPropertyModified(descriptor, value, null);
|
||||
} catch (final Exception e) {
|
||||
// nothing really to do here...
|
||||
}
|
||||
|
||||
return true;
|
||||
|
|
|
@ -39,10 +39,8 @@ public interface ConfiguredComponent {
|
|||
*
|
||||
* @param name the name of the property to update
|
||||
* @param value the value to update the property to
|
||||
* @param triggerOnPropertyModified if <code>true</code>, will trigger the #onPropertyModified method of the component
|
||||
* to be called, otherwise will not
|
||||
*/
|
||||
public void setProperty(String name, String value, boolean triggerOnPropertyModified);
|
||||
public void setProperty(String name, String value);
|
||||
|
||||
/**
|
||||
* Removes the property and value for the given property name if a
|
||||
|
@ -51,12 +49,10 @@ public interface ConfiguredComponent {
|
|||
* if was a dynamic property.
|
||||
*
|
||||
* @param name the property to remove
|
||||
* @param triggerOnPropertyModified if <code>true</code>, will trigger the #onPropertyModified method of the component
|
||||
* to be called, otherwise will not
|
||||
* @return true if removed; false otherwise
|
||||
* @throws java.lang.IllegalArgumentException if the name is null
|
||||
*/
|
||||
public boolean removeProperty(String name, boolean triggerOnPropertyModified);
|
||||
public boolean removeProperty(String name);
|
||||
|
||||
public Map<PropertyDescriptor, String> getProperties();
|
||||
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.nifi.action.Action;
|
|||
import org.apache.nifi.admin.service.AuditService;
|
||||
import org.apache.nifi.admin.service.UserService;
|
||||
import org.apache.nifi.annotation.lifecycle.OnAdded;
|
||||
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
|
||||
import org.apache.nifi.annotation.lifecycle.OnRemoved;
|
||||
import org.apache.nifi.annotation.lifecycle.OnShutdown;
|
||||
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
|
||||
|
@ -608,6 +609,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
externalSiteListener.start();
|
||||
}
|
||||
|
||||
notifyComponentsConfigurationRestored();
|
||||
|
||||
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -628,6 +631,31 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
}
|
||||
}
|
||||
|
||||
private void notifyComponentsConfigurationRestored() {
|
||||
for (final ProcessorNode procNode : getGroup(getRootGroupId()).findAllProcessors()) {
|
||||
final Processor processor = procNode.getProcessor();
|
||||
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
|
||||
}
|
||||
}
|
||||
|
||||
for (final ControllerServiceNode serviceNode : getAllControllerServices()) {
|
||||
final ControllerService service = serviceNode.getControllerServiceImplementation();
|
||||
|
||||
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
|
||||
}
|
||||
}
|
||||
|
||||
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
|
||||
final ReportingTask task = taskNode.getReportingTask();
|
||||
|
||||
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Causes any processors that were added to the flow with a 'delayStart' flag of true to now start
|
||||
|
@ -910,6 +938,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID);
|
||||
throw new ComponentLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e);
|
||||
}
|
||||
|
||||
if (firstTimeAdded) {
|
||||
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, procNode.getProcessor());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return procNode;
|
||||
|
@ -1484,7 +1518,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
|
||||
for (final Map.Entry<String, String> entry : controllerServiceDTO.getProperties().entrySet()) {
|
||||
if (entry.getValue() != null) {
|
||||
serviceNode.setProperty(entry.getKey(), entry.getValue(), true);
|
||||
serviceNode.setProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1602,7 +1636,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
if (config.getProperties() != null) {
|
||||
for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) {
|
||||
if (entry.getValue() != null) {
|
||||
procNode.setProperty(entry.getKey(), entry.getValue(), true);
|
||||
procNode.setProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2638,6 +2672,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
|
||||
try (final NarCloseable x = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, taskNode.getReportingTask());
|
||||
} catch (final Exception e) {
|
||||
throw new ComponentLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
|
||||
}
|
||||
|
@ -2721,6 +2756,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
|||
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
|
||||
new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
|
||||
|
||||
if (firstTimeAdded) {
|
||||
final ControllerService service = serviceNode.getControllerServiceImplementation();
|
||||
|
||||
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
|
||||
}
|
||||
}
|
||||
|
||||
return serviceNode;
|
||||
}
|
||||
|
||||
|
|
|
@ -403,9 +403,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
|
||||
for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
|
||||
if (entry.getValue() == null) {
|
||||
reportingTask.removeProperty(entry.getKey(), false);
|
||||
reportingTask.removeProperty(entry.getKey());
|
||||
} else {
|
||||
reportingTask.setProperty(entry.getKey(), entry.getValue(), false);
|
||||
reportingTask.setProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -735,9 +735,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
|||
|
||||
for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) {
|
||||
if (entry.getValue() == null) {
|
||||
procNode.removeProperty(entry.getKey(), false);
|
||||
procNode.removeProperty(entry.getKey());
|
||||
} else {
|
||||
procNode.setProperty(entry.getKey(), entry.getValue(), false);
|
||||
procNode.setProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -125,15 +125,15 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setProperty(final String name, final String value, final boolean triggerOnPropertyModified) {
|
||||
super.setProperty(name, value, triggerOnPropertyModified);
|
||||
public void setProperty(final String name, final String value) {
|
||||
super.setProperty(name, value);
|
||||
|
||||
onConfigured();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeProperty(String name, final boolean triggerOnPropertyModified) {
|
||||
final boolean removed = super.removeProperty(name, triggerOnPropertyModified);
|
||||
public boolean removeProperty(String name) {
|
||||
final boolean removed = super.removeProperty(name);
|
||||
if (removed) {
|
||||
onConfigured();
|
||||
}
|
||||
|
|
|
@ -155,9 +155,9 @@ public class ControllerServiceLoader {
|
|||
|
||||
for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
|
||||
if (entry.getValue() == null) {
|
||||
node.removeProperty(entry.getKey(), false);
|
||||
node.removeProperty(entry.getKey());
|
||||
} else {
|
||||
node.setProperty(entry.getKey(), entry.getValue(), false);
|
||||
node.setProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,14 +123,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setProperty(final String name, final String value, final boolean triggerOnPropertyModified) {
|
||||
super.setProperty(name, value, triggerOnPropertyModified);
|
||||
public void setProperty(final String name, final String value) {
|
||||
super.setProperty(name, value);
|
||||
onConfigured();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeProperty(String name, final boolean triggerOnPropertyModified) {
|
||||
final boolean removed = super.removeProperty(name, triggerOnPropertyModified);
|
||||
public boolean removeProperty(String name) {
|
||||
final boolean removed = super.removeProperty(name);
|
||||
if (removed) {
|
||||
onConfigured();
|
||||
}
|
||||
|
|
|
@ -124,7 +124,7 @@ public class TestStandardProcessScheduler {
|
|||
final ProcessorNode procNode = new StandardProcessorNode(proc, UUID.randomUUID().toString(),
|
||||
new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider);
|
||||
|
||||
procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier(), true);
|
||||
procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier());
|
||||
|
||||
scheduler.enableControllerService(service);
|
||||
scheduler.startProcessor(procNode);
|
||||
|
|
|
@ -102,7 +102,7 @@ public class TestStandardControllerServiceProvider {
|
|||
final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false);
|
||||
final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false);
|
||||
|
||||
serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B", true);
|
||||
serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
|
||||
|
||||
try {
|
||||
provider.enableControllerService(serviceNodeA);
|
||||
|
@ -169,10 +169,10 @@ public class TestStandardControllerServiceProvider {
|
|||
final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
|
||||
final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
|
||||
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
|
||||
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4", true);
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4", true);
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
|
||||
|
||||
provider.enableControllerService(serviceNode4);
|
||||
provider.enableReferencingServices(serviceNode4);
|
||||
|
@ -237,20 +237,20 @@ public class TestStandardControllerServiceProvider {
|
|||
final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1,
|
||||
new StandardValidationContextFactory(provider), scheduler, provider);
|
||||
procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1, null, provider));
|
||||
procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1", true);
|
||||
procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1");
|
||||
procNodeA.setProcessGroup(mockProcessGroup);
|
||||
|
||||
final String id2 = UUID.randomUUID().toString();
|
||||
final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(), id2,
|
||||
new StandardValidationContextFactory(provider), scheduler, provider);
|
||||
procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2, null, provider));
|
||||
procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3", true);
|
||||
procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3");
|
||||
procNodeB.setProcessGroup(mockProcessGroup);
|
||||
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
|
||||
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4", true);
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4", true);
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
|
||||
|
||||
provider.enableControllerService(serviceNode4);
|
||||
provider.enableReferencingServices(serviceNode4);
|
||||
|
@ -308,7 +308,7 @@ public class TestStandardControllerServiceProvider {
|
|||
final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
|
||||
final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceB.class.getName(), "2", false);
|
||||
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
|
||||
final Map<String, ControllerServiceNode> nodeMap = new LinkedHashMap<>();
|
||||
nodeMap.put("1", serviceNode1);
|
||||
|
@ -338,7 +338,7 @@ public class TestStandardControllerServiceProvider {
|
|||
|
||||
// add circular dependency on self.
|
||||
nodeMap.clear();
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1", true);
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1");
|
||||
nodeMap.put("1", serviceNode1);
|
||||
nodeMap.put("2", serviceNode2);
|
||||
|
||||
|
@ -365,8 +365,8 @@ public class TestStandardControllerServiceProvider {
|
|||
// like that.
|
||||
nodeMap.clear();
|
||||
final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3", true);
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1", true);
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "3");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "1");
|
||||
nodeMap.put("1", serviceNode1);
|
||||
nodeMap.put("3", serviceNode3);
|
||||
branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
|
||||
|
@ -388,10 +388,10 @@ public class TestStandardControllerServiceProvider {
|
|||
|
||||
// Add multiple completely disparate branches.
|
||||
nodeMap.clear();
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
|
||||
final ControllerServiceNode serviceNode5 = provider.createControllerService(ServiceB.class.getName(), "5", false);
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4", true);
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
|
||||
nodeMap.put("1", serviceNode1);
|
||||
nodeMap.put("2", serviceNode2);
|
||||
nodeMap.put("3", serviceNode3);
|
||||
|
@ -422,8 +422,8 @@ public class TestStandardControllerServiceProvider {
|
|||
|
||||
// create 2 branches both dependent on the same service
|
||||
nodeMap.clear();
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2", true);
|
||||
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
|
||||
nodeMap.put("1", serviceNode1);
|
||||
nodeMap.put("2", serviceNode2);
|
||||
nodeMap.put("3", serviceNode3);
|
||||
|
|
|
@ -246,9 +246,9 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
|
|||
final String propName = entry.getKey();
|
||||
final String propVal = entry.getValue();
|
||||
if (isNotNull(propName) && propVal == null) {
|
||||
controllerService.removeProperty(propName, true);
|
||||
controllerService.removeProperty(propName);
|
||||
} else if (isNotNull(propName)) {
|
||||
controllerService.setProperty(propName, propVal, true);
|
||||
controllerService.setProperty(propName, propVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -175,9 +175,9 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
|
|||
final String propName = entry.getKey();
|
||||
final String propVal = entry.getValue();
|
||||
if (isNotNull(propName) && propVal == null) {
|
||||
processor.removeProperty(propName, true);
|
||||
processor.removeProperty(propName);
|
||||
} else if (isNotNull(propName)) {
|
||||
processor.setProperty(propName, propVal, true);
|
||||
processor.setProperty(propName, propVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -289,9 +289,9 @@ public class StandardReportingTaskDAO extends ComponentDAO implements ReportingT
|
|||
final String propName = entry.getKey();
|
||||
final String propVal = entry.getValue();
|
||||
if (isNotNull(propName) && propVal == null) {
|
||||
reportingTask.removeProperty(propName, true);
|
||||
reportingTask.removeProperty(propName);
|
||||
} else if (isNotNull(propName)) {
|
||||
reportingTask.setProperty(propName, propVal, true);
|
||||
reportingTask.setProperty(propName, propVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -167,7 +167,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
|||
|
||||
@Override
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
if ( descriptor.equals(DIRECTORY) ) {
|
||||
if (isConfigurationRestored() && descriptor.equals(DIRECTORY)) {
|
||||
lastListingTime = null; // clear lastListingTime so that we have to fetch new time
|
||||
latestPathsListed = new HashSet<>();
|
||||
}
|
||||
|
|
|
@ -182,7 +182,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||
|
||||
@Override
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
if (isListingResetNecessary(descriptor)) {
|
||||
if (isConfigurationRestored() && isListingResetNecessary(descriptor)) {
|
||||
lastListingTime = null; // clear lastListingTime so that we have to fetch new time
|
||||
latestIdentifiersListed = new HashSet<>();
|
||||
resetListing = true;
|
||||
|
|
|
@ -161,7 +161,7 @@ public class TailFile extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
|
||||
if (FILENAME.equals(descriptor)) {
|
||||
if (isConfigurationRestored() && FILENAME.equals(descriptor)) {
|
||||
state = new TailFileState(newValue, null, null, 0L, 0L, null, ByteBuffer.allocate(65536));
|
||||
tailFileChanged = true;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue