mirror of
https://github.com/apache/nifi.git
synced 2025-02-06 10:08:42 +00:00
NIFI-5222: Prevent validating components multiple times for each update
- Avoid triggering async validation for each update to component when instantiating a template (such as copy/paste or templates). Added debug logging to indicate when and why we are triggering validation; removed unit test that made poor assumptions about the inner workings of the FlowSynchronizer that resulted in failures when we make calls into processors that the unit test doesn't know about" This closes #2731.
This commit is contained in:
parent
0973c2d8d1
commit
eb0b4283e8
@ -79,6 +79,7 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
||||
private volatile String additionalResourcesFingerprint;
|
||||
private AtomicReference<ValidationState> validationState = new AtomicReference<>(new ValidationState(ValidationStatus.VALIDATING, Collections.emptyList()));
|
||||
private final ValidationTrigger validationTrigger;
|
||||
private volatile boolean triggerValidation = true;
|
||||
|
||||
public AbstractComponentNode(final String id,
|
||||
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider,
|
||||
@ -129,6 +130,7 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
||||
@Override
|
||||
public void setAnnotationData(final String data) {
|
||||
annotationData.set(CharacterFilterUtils.filterInvalidXmlCharacters(data));
|
||||
logger.debug("Resetting Validation State of {} due to setting annotation data", this);
|
||||
resetValidationState();
|
||||
}
|
||||
|
||||
@ -198,7 +200,7 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("Setting properties to {}; resetting validation state", properties);
|
||||
logger.debug("Resetting Validation State of {} due to setting properties", this);
|
||||
resetValidationState();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
@ -609,7 +611,34 @@ public abstract class AbstractComponentNode implements ComponentNode {
|
||||
protected void resetValidationState() {
|
||||
validationContext.set(null);
|
||||
validationState.set(new ValidationState(ValidationStatus.VALIDATING, Collections.emptyList()));
|
||||
validationTrigger.triggerAsync(this);
|
||||
|
||||
if (isTriggerValidation()) {
|
||||
validationTrigger.triggerAsync(this);
|
||||
} else {
|
||||
logger.debug("Reset validation state of {} but will not trigger async validation because trigger has been paused", this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pauseValidationTrigger() {
|
||||
triggerValidation = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeValidationTrigger() {
|
||||
triggerValidation = true;
|
||||
|
||||
final ValidationStatus validationStatus = getValidationStatus();
|
||||
if (validationStatus == ValidationStatus.VALIDATING) {
|
||||
logger.debug("Resuming Triggering of Validation State for {}; status is VALIDATING so will trigger async validation now", this);
|
||||
validationTrigger.triggerAsync(this);
|
||||
} else {
|
||||
logger.debug("Resuming Triggering of Validation State for {}; status is {} so will not trigger async validation now", this, validationStatus);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isTriggerValidation() {
|
||||
return triggerValidation;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -58,6 +58,39 @@ public interface ComponentNode extends ComponentAuthorizable {
|
||||
|
||||
public void setProperties(Map<String, String> properties, boolean allowRemovalOfRequiredProperties);
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Pause triggering asynchronous validation to occur when the component is updated. Often times, it is necessary
|
||||
* to update several aspects of a component, such as the properties and annotation data, at once. When this occurs,
|
||||
* we don't want to trigger validation for each update, so we can follow the pattern:
|
||||
* </p>
|
||||
*
|
||||
* <pre>
|
||||
* <code>
|
||||
* componentNode.pauseValidationTrigger();
|
||||
* try {
|
||||
* componentNode.setProperties(properties);
|
||||
* componentNode.setAnnotationData(annotationData);
|
||||
* } finally {
|
||||
* componentNode.resumeValidationTrigger();
|
||||
* }
|
||||
* </code>
|
||||
* </pre>
|
||||
*
|
||||
* <p>
|
||||
* When calling this method, it is imperative that {@link #resumeValidationTrigger()} is always called within a {@code finally} block to
|
||||
* ensure that validation occurs.
|
||||
* </p>
|
||||
*/
|
||||
void pauseValidationTrigger();
|
||||
|
||||
/**
|
||||
* Resume triggering asynchronous validation to occur when the component is updated. This method is to be used in conjunction
|
||||
* with {@link #pauseValidationTrigger()} as illustrated in its documentation. When this method is called, if the component's Validation Status
|
||||
* is {@link ValidationStatus#VALIDATING}, component validation will immediately be triggered asynchronously.
|
||||
*/
|
||||
void resumeValidationTrigger();
|
||||
|
||||
public Map<PropertyDescriptor, String> getProperties();
|
||||
|
||||
public String getProperty(final PropertyDescriptor property);
|
||||
|
@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.net.URL;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.nifi.authorization.Resource;
|
||||
import org.apache.nifi.authorization.resource.Authorizable;
|
||||
@ -41,24 +43,57 @@ public class TestAbstractComponentNode {
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void testGetValidationStatusWithTimeout() {
|
||||
final ValidationControlledAbstractComponentNode node = new ValidationControlledAbstractComponentNode();
|
||||
final ValidationControlledAbstractComponentNode node = new ValidationControlledAbstractComponentNode(5000, Mockito.mock(ValidationTrigger.class));
|
||||
final ValidationStatus status = node.getValidationStatus(1, TimeUnit.MILLISECONDS);
|
||||
assertEquals(ValidationStatus.VALIDATING, status);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testValidationTriggerPaused() throws InterruptedException {
|
||||
final AtomicLong validationCount = new AtomicLong(0L);
|
||||
|
||||
final ValidationControlledAbstractComponentNode node = new ValidationControlledAbstractComponentNode(0, new ValidationTrigger() {
|
||||
@Override
|
||||
public void triggerAsync(ComponentNode component) {
|
||||
validationCount.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trigger(ComponentNode component) {
|
||||
validationCount.incrementAndGet();
|
||||
}
|
||||
});
|
||||
|
||||
node.pauseValidationTrigger();
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
node.setProperties(Collections.emptyMap());
|
||||
assertEquals(0, validationCount.get());
|
||||
}
|
||||
node.resumeValidationTrigger();
|
||||
|
||||
// wait for validation count to be 1 (this is asynchronous so we want to just keep checking).
|
||||
while (validationCount.get() != 1) {
|
||||
Thread.sleep(50L);
|
||||
}
|
||||
|
||||
assertEquals(1L, validationCount.get());
|
||||
}
|
||||
|
||||
private static class ValidationControlledAbstractComponentNode extends AbstractComponentNode {
|
||||
private final long pauseMillis;
|
||||
|
||||
public ValidationControlledAbstractComponentNode() {
|
||||
public ValidationControlledAbstractComponentNode(final long pauseMillis, final ValidationTrigger validationTrigger) {
|
||||
super("id", Mockito.mock(ValidationContextFactory.class), Mockito.mock(ControllerServiceProvider.class), "unit test component",
|
||||
ValidationControlledAbstractComponentNode.class.getCanonicalName(), Mockito.mock(ComponentVariableRegistry.class), Mockito.mock(ReloadComponent.class),
|
||||
Mockito.mock(ValidationTrigger.class), false);
|
||||
validationTrigger, false);
|
||||
|
||||
this.pauseMillis = pauseMillis;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> computeValidationErrors(ValidationContext context) {
|
||||
try {
|
||||
Thread.sleep(5000L);
|
||||
Thread.sleep(pauseMillis);
|
||||
} catch (final InterruptedException ie) {
|
||||
}
|
||||
|
||||
@ -76,7 +111,7 @@ public class TestAbstractComponentNode {
|
||||
|
||||
@Override
|
||||
public ConfigurableComponent getComponent() {
|
||||
return null;
|
||||
return Mockito.mock(ConfigurableComponent.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -847,7 +847,35 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||
// Perform validation of all components before attempting to start them.
|
||||
LOG.debug("Triggering initial validation of all components");
|
||||
final long start = System.nanoTime();
|
||||
new TriggerValidationTask(this, validationTrigger).run();
|
||||
|
||||
final ValidationTrigger triggerIfValidating = new ValidationTrigger() {
|
||||
@Override
|
||||
public void triggerAsync(final ComponentNode component) {
|
||||
final ValidationStatus status = component.getValidationStatus();
|
||||
|
||||
if (component.getValidationStatus() == ValidationStatus.VALIDATING) {
|
||||
LOG.debug("Will trigger async validation for {} because its status is VALIDATING", component);
|
||||
validationTrigger.triggerAsync(component);
|
||||
} else {
|
||||
LOG.debug("Will not trigger async validation for {} because its status is {}", component, status);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void trigger(final ComponentNode component) {
|
||||
final ValidationStatus status = component.getValidationStatus();
|
||||
|
||||
if (component.getValidationStatus() == ValidationStatus.VALIDATING) {
|
||||
LOG.debug("Will trigger immediate validation for {} because its status is VALIDATING", component);
|
||||
validationTrigger.trigger(component);
|
||||
} else {
|
||||
LOG.debug("Will not trigger immediate validation for {} because its status is {}", component, status);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
new TriggerValidationTask(this, triggerIfValidating).run();
|
||||
|
||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
|
||||
LOG.info("Performed initial validation of all components in {} milliseconds", millis);
|
||||
|
||||
@ -1231,7 +1259,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||
}
|
||||
}
|
||||
|
||||
validationTrigger.triggerAsync(procNode);
|
||||
return procNode;
|
||||
}
|
||||
|
||||
@ -1312,6 +1339,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||
// need to refresh the properties in case we are changing from ghost component to real component
|
||||
existingNode.refreshProperties();
|
||||
|
||||
LOG.debug("Triggering async validation of {} due to processor reload", existingNode);
|
||||
validationTrigger.triggerAsync(existingNode);
|
||||
}
|
||||
|
||||
@ -1855,26 +1883,33 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||
//
|
||||
// Instantiate Controller Services
|
||||
//
|
||||
for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
|
||||
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
|
||||
final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(), true);
|
||||
final List<ControllerServiceNode> serviceNodes = new ArrayList<>();
|
||||
try {
|
||||
for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
|
||||
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
|
||||
final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), bundleCoordinate, Collections.emptySet(), true);
|
||||
serviceNode.pauseValidationTrigger();
|
||||
serviceNodes.add(serviceNode);
|
||||
|
||||
serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
|
||||
serviceNode.setComments(controllerServiceDTO.getComments());
|
||||
serviceNode.setName(controllerServiceDTO.getName());
|
||||
if (!topLevel) {
|
||||
serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId());
|
||||
serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
|
||||
serviceNode.setComments(controllerServiceDTO.getComments());
|
||||
serviceNode.setName(controllerServiceDTO.getName());
|
||||
if (!topLevel) {
|
||||
serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId());
|
||||
}
|
||||
|
||||
group.addControllerService(serviceNode);
|
||||
}
|
||||
|
||||
group.addControllerService(serviceNode);
|
||||
}
|
||||
|
||||
// configure controller services. We do this after creating all of them in case 1 service
|
||||
// references another service.
|
||||
for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
|
||||
final String serviceId = controllerServiceDTO.getId();
|
||||
final ControllerServiceNode serviceNode = getControllerServiceNode(serviceId);
|
||||
serviceNode.setProperties(controllerServiceDTO.getProperties());
|
||||
// configure controller services. We do this after creating all of them in case 1 service
|
||||
// references another service.
|
||||
for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
|
||||
final String serviceId = controllerServiceDTO.getId();
|
||||
final ControllerServiceNode serviceNode = getControllerServiceNode(serviceId);
|
||||
serviceNode.setProperties(controllerServiceDTO.getProperties());
|
||||
}
|
||||
} finally {
|
||||
serviceNodes.stream().forEach(ControllerServiceNode::resumeValidationTrigger);
|
||||
}
|
||||
|
||||
//
|
||||
@ -1963,61 +1998,66 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||
for (final ProcessorDTO processorDTO : dto.getProcessors()) {
|
||||
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(processorDTO.getType(), processorDTO.getBundle());
|
||||
final ProcessorNode procNode = createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate);
|
||||
procNode.pauseValidationTrigger();
|
||||
|
||||
procNode.setPosition(toPosition(processorDTO.getPosition()));
|
||||
procNode.setProcessGroup(group);
|
||||
if (!topLevel) {
|
||||
procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
|
||||
}
|
||||
|
||||
final ProcessorConfigDTO config = processorDTO.getConfig();
|
||||
procNode.setComments(config.getComments());
|
||||
if (config.isLossTolerant() != null) {
|
||||
procNode.setLossTolerant(config.isLossTolerant());
|
||||
}
|
||||
procNode.setName(processorDTO.getName());
|
||||
|
||||
procNode.setYieldPeriod(config.getYieldDuration());
|
||||
procNode.setPenalizationPeriod(config.getPenaltyDuration());
|
||||
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
|
||||
procNode.setAnnotationData(config.getAnnotationData());
|
||||
procNode.setStyle(processorDTO.getStyle());
|
||||
|
||||
if (config.getRunDurationMillis() != null) {
|
||||
procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
if (config.getSchedulingStrategy() != null) {
|
||||
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
|
||||
}
|
||||
|
||||
if (config.getExecutionNode() != null) {
|
||||
procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
|
||||
}
|
||||
|
||||
if (processorDTO.getState().equals(ScheduledState.DISABLED.toString())) {
|
||||
procNode.disable();
|
||||
}
|
||||
|
||||
// ensure that the scheduling strategy is set prior to these values
|
||||
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
|
||||
procNode.setScheduldingPeriod(config.getSchedulingPeriod());
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
if (processorDTO.getRelationships() != null) {
|
||||
for (final RelationshipDTO rel : processorDTO.getRelationships()) {
|
||||
if (rel.isAutoTerminate()) {
|
||||
relationships.add(procNode.getRelationship(rel.getName()));
|
||||
}
|
||||
try {
|
||||
procNode.setPosition(toPosition(processorDTO.getPosition()));
|
||||
procNode.setProcessGroup(group);
|
||||
if (!topLevel) {
|
||||
procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
|
||||
}
|
||||
procNode.setAutoTerminatedRelationships(relationships);
|
||||
}
|
||||
|
||||
if (config.getProperties() != null) {
|
||||
procNode.setProperties(config.getProperties());
|
||||
}
|
||||
final ProcessorConfigDTO config = processorDTO.getConfig();
|
||||
procNode.setComments(config.getComments());
|
||||
if (config.isLossTolerant() != null) {
|
||||
procNode.setLossTolerant(config.isLossTolerant());
|
||||
}
|
||||
procNode.setName(processorDTO.getName());
|
||||
|
||||
group.addProcessor(procNode);
|
||||
procNode.setYieldPeriod(config.getYieldDuration());
|
||||
procNode.setPenalizationPeriod(config.getPenaltyDuration());
|
||||
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
|
||||
procNode.setAnnotationData(config.getAnnotationData());
|
||||
procNode.setStyle(processorDTO.getStyle());
|
||||
|
||||
if (config.getRunDurationMillis() != null) {
|
||||
procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
if (config.getSchedulingStrategy() != null) {
|
||||
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
|
||||
}
|
||||
|
||||
if (config.getExecutionNode() != null) {
|
||||
procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
|
||||
}
|
||||
|
||||
if (processorDTO.getState().equals(ScheduledState.DISABLED.toString())) {
|
||||
procNode.disable();
|
||||
}
|
||||
|
||||
// ensure that the scheduling strategy is set prior to these values
|
||||
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
|
||||
procNode.setScheduldingPeriod(config.getSchedulingPeriod());
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
if (processorDTO.getRelationships() != null) {
|
||||
for (final RelationshipDTO rel : processorDTO.getRelationships()) {
|
||||
if (rel.isAutoTerminate()) {
|
||||
relationships.add(procNode.getRelationship(rel.getName()));
|
||||
}
|
||||
}
|
||||
procNode.setAutoTerminatedRelationships(relationships);
|
||||
}
|
||||
|
||||
if (config.getProperties() != null) {
|
||||
procNode.setProperties(config.getProperties());
|
||||
}
|
||||
|
||||
group.addProcessor(procNode);
|
||||
} finally {
|
||||
procNode.resumeValidationTrigger();
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
@ -3530,7 +3570,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||
new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
|
||||
}
|
||||
|
||||
validationTrigger.triggerAsync(taskNode);
|
||||
return taskNode;
|
||||
}
|
||||
|
||||
@ -3606,6 +3645,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||
// need to refresh the properties in case we are changing from ghost component to real component
|
||||
existingNode.refreshProperties();
|
||||
|
||||
LOG.debug("Triggering async validation of {} due to reporting task reload", existingNode);
|
||||
validationTrigger.triggerAsync(existingNode);
|
||||
}
|
||||
|
||||
@ -3697,7 +3737,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||
}
|
||||
}
|
||||
|
||||
validationTrigger.triggerAsync(serviceNode);
|
||||
return serviceNode;
|
||||
}
|
||||
|
||||
@ -3751,6 +3790,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
|
||||
// need to refresh the properties in case we are changing from ghost component to real component
|
||||
existingNode.refreshProperties();
|
||||
|
||||
LOG.debug("Triggering async validation of {} due to controller service reload", existingNode);
|
||||
validationTrigger.triggerAsync(existingNode);
|
||||
}
|
||||
|
||||
|
@ -515,20 +515,25 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
||||
private void updateReportingTaskControllerServices(final Set<ReportingTaskNode> reportingTasks, final Map<String, ControllerServiceNode> controllerServiceMapping) {
|
||||
for (ReportingTaskNode reportingTask : reportingTasks) {
|
||||
if (reportingTask.getProperties() != null) {
|
||||
final Set<Map.Entry<PropertyDescriptor, String>> propertyDescriptors = reportingTask.getProperties().entrySet().stream()
|
||||
.filter(e -> e.getKey().getControllerServiceDefinition() != null)
|
||||
.filter(e -> controllerServiceMapping.containsKey(e.getValue()))
|
||||
.collect(Collectors.toSet());
|
||||
reportingTask.pauseValidationTrigger();
|
||||
try {
|
||||
final Set<Map.Entry<PropertyDescriptor, String>> propertyDescriptors = reportingTask.getProperties().entrySet().stream()
|
||||
.filter(e -> e.getKey().getControllerServiceDefinition() != null)
|
||||
.filter(e -> controllerServiceMapping.containsKey(e.getValue()))
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
final Map<String,String> controllerServiceProps = new HashMap<>();
|
||||
final Map<String,String> controllerServiceProps = new HashMap<>();
|
||||
|
||||
for (Map.Entry<PropertyDescriptor, String> propEntry : propertyDescriptors) {
|
||||
final PropertyDescriptor propertyDescriptor = propEntry.getKey();
|
||||
final ControllerServiceNode clone = controllerServiceMapping.get(propEntry.getValue());
|
||||
controllerServiceProps.put(propertyDescriptor.getName(), clone.getIdentifier());
|
||||
for (Map.Entry<PropertyDescriptor, String> propEntry : propertyDescriptors) {
|
||||
final PropertyDescriptor propertyDescriptor = propEntry.getKey();
|
||||
final ControllerServiceNode clone = controllerServiceMapping.get(propEntry.getValue());
|
||||
controllerServiceProps.put(propertyDescriptor.getName(), clone.getIdentifier());
|
||||
}
|
||||
|
||||
reportingTask.setProperties(controllerServiceProps);
|
||||
} finally {
|
||||
reportingTask.resumeValidationTrigger();
|
||||
}
|
||||
|
||||
reportingTask.setProperties(controllerServiceProps);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -820,6 +825,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that all services have been validated, so that we don't attempt to enable a service that is still in a 'validating' state
|
||||
toEnable.stream().forEach(ControllerServiceNode::performValidation);
|
||||
|
||||
controller.disableControllerServicesAsync(toDisable);
|
||||
controller.enableControllerServices(toEnable);
|
||||
|
||||
@ -842,6 +850,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
||||
break;
|
||||
case RUNNING:
|
||||
// we want to run now. Make sure processor is not disabled and then start it.
|
||||
procNode.performValidation();
|
||||
procNode.getProcessGroup().enableProcessor(procNode);
|
||||
controller.startProcessor(procNode.getProcessGroupIdentifier(), procNode.getIdentifier(), false);
|
||||
break;
|
||||
@ -1070,48 +1079,55 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
|
||||
|
||||
private void updateProcessor(final ProcessorNode procNode, final ProcessorDTO processorDTO, final ProcessGroup processGroup, final FlowController controller)
|
||||
throws ProcessorInstantiationException {
|
||||
final ProcessorConfigDTO config = processorDTO.getConfig();
|
||||
procNode.setProcessGroup(processGroup);
|
||||
procNode.setLossTolerant(config.isLossTolerant());
|
||||
procNode.setPenalizationPeriod(config.getPenaltyDuration());
|
||||
procNode.setYieldPeriod(config.getYieldDuration());
|
||||
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
|
||||
updateNonFingerprintedProcessorSettings(procNode, processorDTO);
|
||||
|
||||
if (config.getSchedulingStrategy() != null) {
|
||||
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
|
||||
}
|
||||
procNode.pauseValidationTrigger();
|
||||
try {
|
||||
final ProcessorConfigDTO config = processorDTO.getConfig();
|
||||
procNode.setProcessGroup(processGroup);
|
||||
procNode.setLossTolerant(config.isLossTolerant());
|
||||
procNode.setPenalizationPeriod(config.getPenaltyDuration());
|
||||
procNode.setYieldPeriod(config.getYieldDuration());
|
||||
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
|
||||
updateNonFingerprintedProcessorSettings(procNode, processorDTO);
|
||||
|
||||
if (config.getExecutionNode() != null) {
|
||||
procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
|
||||
}
|
||||
|
||||
// must set scheduling strategy before these two
|
||||
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
|
||||
procNode.setScheduldingPeriod(config.getSchedulingPeriod());
|
||||
if (config.getRunDurationMillis() != null) {
|
||||
procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
procNode.setAnnotationData(config.getAnnotationData());
|
||||
|
||||
if (config.getAutoTerminatedRelationships() != null) {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
for (final String rel : config.getAutoTerminatedRelationships()) {
|
||||
relationships.add(procNode.getRelationship(rel));
|
||||
if (config.getSchedulingStrategy() != null) {
|
||||
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
|
||||
}
|
||||
procNode.setAutoTerminatedRelationships(relationships);
|
||||
}
|
||||
|
||||
procNode.setProperties(config.getProperties());
|
||||
if (config.getExecutionNode() != null) {
|
||||
procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
|
||||
}
|
||||
|
||||
final ScheduledState scheduledState = ScheduledState.valueOf(processorDTO.getState());
|
||||
if (ScheduledState.RUNNING.equals(scheduledState)) {
|
||||
controller.startProcessor(processGroup.getIdentifier(), procNode.getIdentifier());
|
||||
} else if (ScheduledState.DISABLED.equals(scheduledState)) {
|
||||
processGroup.disableProcessor(procNode);
|
||||
} else if (ScheduledState.STOPPED.equals(scheduledState)) {
|
||||
controller.stopProcessor(processGroup.getIdentifier(), procNode.getIdentifier());
|
||||
// must set scheduling strategy before these two
|
||||
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
|
||||
procNode.setScheduldingPeriod(config.getSchedulingPeriod());
|
||||
if (config.getRunDurationMillis() != null) {
|
||||
procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
procNode.setAnnotationData(config.getAnnotationData());
|
||||
|
||||
if (config.getAutoTerminatedRelationships() != null) {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
for (final String rel : config.getAutoTerminatedRelationships()) {
|
||||
relationships.add(procNode.getRelationship(rel));
|
||||
}
|
||||
procNode.setAutoTerminatedRelationships(relationships);
|
||||
}
|
||||
|
||||
procNode.setProperties(config.getProperties());
|
||||
|
||||
final ScheduledState scheduledState = ScheduledState.valueOf(processorDTO.getState());
|
||||
if (ScheduledState.RUNNING.equals(scheduledState)) {
|
||||
procNode.performValidation(); // ensure that processor has been validated
|
||||
controller.startProcessor(processGroup.getIdentifier(), procNode.getIdentifier());
|
||||
} else if (ScheduledState.DISABLED.equals(scheduledState)) {
|
||||
processGroup.disableProcessor(procNode);
|
||||
} else if (ScheduledState.STOPPED.equals(scheduledState)) {
|
||||
controller.stopProcessor(processGroup.getIdentifier(), procNode.getIdentifier());
|
||||
}
|
||||
} finally {
|
||||
procNode.resumeValidationTrigger();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -410,6 +410,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||
}
|
||||
|
||||
undefinedRelationshipsToTerminate.set(new HashSet<>(terminate));
|
||||
LOG.debug("Resetting Validation State of {} due to setting auto-terminated relationships", this);
|
||||
resetValidationState();
|
||||
}
|
||||
|
||||
@ -773,6 +774,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||
setIncomingConnections(Collections.unmodifiableList(updatedIncoming));
|
||||
}
|
||||
} finally {
|
||||
LOG.debug("Resetting Validation State of {} due to connection added", this);
|
||||
resetValidationState();
|
||||
}
|
||||
}
|
||||
@ -851,6 +853,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||
}
|
||||
} finally {
|
||||
// need to perform validation in case selected relationships were changed.
|
||||
LOG.debug("Resetting Validation State of {} due to updating connection", this);
|
||||
resetValidationState();
|
||||
}
|
||||
}
|
||||
@ -890,11 +893,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||
"Cannot remove a connection from a ProcessorNode for which the ProcessorNode is not the Source");
|
||||
}
|
||||
|
||||
LOG.debug("Resetting Validation State of {} due to connection removed", this);
|
||||
resetValidationState();
|
||||
}
|
||||
|
||||
private void setIncomingConnections(final List<Connection> incoming) {
|
||||
this.incomingConnections.set(incoming);
|
||||
LOG.debug("Resetting Validation State of {} due to setting incoming connections", this);
|
||||
resetValidationState();
|
||||
}
|
||||
|
||||
@ -1147,6 +1152,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||
@Override
|
||||
public synchronized void setProcessGroup(final ProcessGroup group) {
|
||||
this.processGroup.set(group);
|
||||
LOG.debug("Resetting Validation State of {} due to setting process group", this);
|
||||
resetValidationState();
|
||||
}
|
||||
|
||||
|
@ -213,8 +213,13 @@ public class ControllerServiceLoader {
|
||||
|
||||
private static void configureControllerService(final ControllerServiceNode node, final Element controllerServiceElement, final StringEncryptor encryptor) {
|
||||
final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
|
||||
node.setAnnotationData(dto.getAnnotationData());
|
||||
node.setProperties(dto.getProperties());
|
||||
node.pauseValidationTrigger();
|
||||
try {
|
||||
node.setAnnotationData(dto.getAnnotationData());
|
||||
node.setProperties(dto.getProperties());
|
||||
} finally {
|
||||
node.resumeValidationTrigger();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -222,6 +222,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
||||
writeLock.lock();
|
||||
try {
|
||||
this.processGroup = group;
|
||||
LOG.debug("Resetting Validation State of {} due to setting process group", this);
|
||||
resetValidationState();
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
|
@ -4043,18 +4043,23 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||
}
|
||||
|
||||
private void updateControllerService(final ControllerServiceNode service, final VersionedControllerService proposed) {
|
||||
service.setAnnotationData(proposed.getAnnotationData());
|
||||
service.setComments(proposed.getComments());
|
||||
service.setName(proposed.getName());
|
||||
service.pauseValidationTrigger();
|
||||
try {
|
||||
service.setAnnotationData(proposed.getAnnotationData());
|
||||
service.setComments(proposed.getComments());
|
||||
service.setName(proposed.getName());
|
||||
|
||||
final Map<String, String> properties = populatePropertiesMap(service.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup());
|
||||
service.setProperties(properties, true);
|
||||
final Map<String, String> properties = populatePropertiesMap(service.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), service.getProcessGroup());
|
||||
service.setProperties(properties, true);
|
||||
|
||||
if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
|
||||
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(service.getProperties().keySet());
|
||||
final Set<URL> additionalUrls = service.getAdditionalClasspathResources(descriptors);
|
||||
flowController.reload(service, proposed.getType(), newBundleCoordinate, additionalUrls);
|
||||
if (!isEqual(service.getBundleCoordinate(), proposed.getBundle())) {
|
||||
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(service.getProperties().keySet());
|
||||
final Set<URL> additionalUrls = service.getAdditionalClasspathResources(descriptors);
|
||||
flowController.reload(service, proposed.getType(), newBundleCoordinate, additionalUrls);
|
||||
}
|
||||
} finally {
|
||||
service.resumeValidationTrigger();
|
||||
}
|
||||
}
|
||||
|
||||
@ -4161,28 +4166,33 @@ public final class StandardProcessGroup implements ProcessGroup {
|
||||
}
|
||||
|
||||
private void updateProcessor(final ProcessorNode processor, final VersionedProcessor proposed) throws ProcessorInstantiationException {
|
||||
processor.setAnnotationData(proposed.getAnnotationData());
|
||||
processor.setBulletinLevel(LogLevel.valueOf(proposed.getBulletinLevel()));
|
||||
processor.setComments(proposed.getComments());
|
||||
processor.setName(proposed.getName());
|
||||
processor.setPenalizationPeriod(proposed.getPenaltyDuration());
|
||||
processor.pauseValidationTrigger();
|
||||
try {
|
||||
processor.setAnnotationData(proposed.getAnnotationData());
|
||||
processor.setBulletinLevel(LogLevel.valueOf(proposed.getBulletinLevel()));
|
||||
processor.setComments(proposed.getComments());
|
||||
processor.setName(proposed.getName());
|
||||
processor.setPenalizationPeriod(proposed.getPenaltyDuration());
|
||||
|
||||
final Map<String, String> properties = populatePropertiesMap(processor.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup());
|
||||
processor.setProperties(properties, true);
|
||||
processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS);
|
||||
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
|
||||
processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
|
||||
processor.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
|
||||
processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
|
||||
processor.setStyle(proposed.getStyle());
|
||||
processor.setYieldPeriod(proposed.getYieldDuration());
|
||||
processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
|
||||
final Map<String, String> properties = populatePropertiesMap(processor.getProperties(), proposed.getProperties(), proposed.getPropertyDescriptors(), processor.getProcessGroup());
|
||||
processor.setProperties(properties, true);
|
||||
processor.setRunDuration(proposed.getRunDurationMillis(), TimeUnit.MILLISECONDS);
|
||||
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(proposed.getSchedulingStrategy()));
|
||||
processor.setScheduldingPeriod(proposed.getSchedulingPeriod());
|
||||
processor.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
|
||||
processor.setExecutionNode(ExecutionNode.valueOf(proposed.getExecutionNode()));
|
||||
processor.setStyle(proposed.getStyle());
|
||||
processor.setYieldPeriod(proposed.getYieldDuration());
|
||||
processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
|
||||
|
||||
if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
|
||||
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
|
||||
final Set<URL> additionalUrls = processor.getAdditionalClasspathResources(descriptors);
|
||||
flowController.reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls);
|
||||
if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
|
||||
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
|
||||
final Set<URL> additionalUrls = processor.getAdditionalClasspathResources(descriptors);
|
||||
flowController.reload(processor, proposed.getType(), newBundleCoordinate, additionalUrls);
|
||||
}
|
||||
} finally {
|
||||
processor.resumeValidationTrigger();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,274 +0,0 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.controller
|
||||
|
||||
import groovy.xml.XmlUtil
|
||||
import org.apache.nifi.authorization.Authorizer
|
||||
import org.apache.nifi.bundle.BundleCoordinate
|
||||
import org.apache.nifi.cluster.protocol.DataFlow
|
||||
import org.apache.nifi.connectable.*
|
||||
import org.apache.nifi.controller.label.Label
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue
|
||||
import org.apache.nifi.groups.ProcessGroup
|
||||
import org.apache.nifi.groups.RemoteProcessGroup
|
||||
import org.apache.nifi.nar.ExtensionManager
|
||||
import org.apache.nifi.nar.SystemBundle
|
||||
import org.apache.nifi.processor.Relationship
|
||||
import org.apache.nifi.reporting.BulletinRepository
|
||||
import org.apache.nifi.util.NiFiProperties
|
||||
import spock.lang.Shared
|
||||
import spock.lang.Specification
|
||||
import spock.lang.Unroll
|
||||
|
||||
class StandardFlowSynchronizerSpec extends Specification {
|
||||
|
||||
@Shared
|
||||
def systemBundle
|
||||
@Shared
|
||||
def nifiProperties
|
||||
|
||||
def setupSpec() {
|
||||
def propFile = StandardFlowSynchronizerSpec.class.getResource("/standardflowsynchronizerspec.nifi.properties").getFile()
|
||||
|
||||
nifiProperties = NiFiProperties.createBasicNiFiProperties(propFile, null)
|
||||
systemBundle = SystemBundle.create(nifiProperties)
|
||||
ExtensionManager.discoverExtensions(systemBundle, Collections.emptySet())
|
||||
}
|
||||
|
||||
def teardownSpec() {
|
||||
|
||||
}
|
||||
|
||||
@Unroll
|
||||
def "scaling of #filename with encoding version \"#flowEncodingVersion\""() {
|
||||
given: "a StandardFlowSynchronizer with mocked collaborators"
|
||||
def controller = Mock FlowController
|
||||
def proposedFlow = Mock DataFlow
|
||||
def snippetManager = Mock SnippetManager
|
||||
def bulletinRepository = Mock BulletinRepository
|
||||
def flowFileQueue = Mock FlowFileQueue
|
||||
def authorizer = Mock Authorizer
|
||||
def flowFile = new File(StandardFlowSynchronizerSpec.getResource(filename).toURI())
|
||||
def flowControllerXml = new XmlSlurper().parse(flowFile)
|
||||
def Map<String, Position> originalPositionablePositionsById = flowControllerXml.rootGroup.'**'
|
||||
.findAll { !it.name().equals('connection') && it.id.size() == 1 && it.position.size() == 1 }
|
||||
.collectEntries { [it.id.text(), new Position(it.position.@x.toDouble(), it.position.@y.toDouble())] }
|
||||
def Map<String, List<Position>> originalBendPointsByConnectionId = flowControllerXml.rootGroup.'**'
|
||||
.findAll { it.name().equals('connection') && it.bendPoints.size() > 0 }
|
||||
.collectEntries { [it.id.text(), it.bendPoints.children().collect { new Position(it.@x.toDouble(), it.@y.toDouble()) }] }
|
||||
flowControllerXml.@'encoding-version' = flowEncodingVersion
|
||||
def testFlowBytes = XmlUtil.serialize(flowControllerXml).bytes
|
||||
def Map<String, Position> positionablePositionsById = [:]
|
||||
def Map<String, Positionable> positionableMocksById = [:]
|
||||
def Map<String, Connection> connectionMocksById = [:]
|
||||
def Map<String, List<Position>> bendPointPositionsByConnectionId = [:]
|
||||
// the unit under test
|
||||
def flowSynchronizer = new StandardFlowSynchronizer(null, nifiProperties)
|
||||
def firstRootGroup = Mock ProcessGroup
|
||||
|
||||
when: "the flow is synchronized with the current state of the controller"
|
||||
flowSynchronizer.sync controller, proposedFlow, null
|
||||
|
||||
then: "establish interactions for the mocked collaborators of StandardFlowSynchronizer to store the ending positions of components"
|
||||
1 * firstRootGroup.findAllProcessors() >> []
|
||||
1 * controller.isFlowSynchronized() >> false
|
||||
_ * controller.rootGroupId >> flowControllerXml.rootGroup.id.text()
|
||||
_ * controller.getGroup(_) >> { String id -> positionableMocksById.get(id) }
|
||||
_ * controller.snippetManager >> snippetManager
|
||||
_ * controller.bulletinRepository >> bulletinRepository
|
||||
_ * controller.authorizer >> authorizer
|
||||
_ * controller./set.*/(*_)
|
||||
_ * controller.getAllControllerServices() >> []
|
||||
_ * controller.getAllReportingTasks() >> []
|
||||
_ * controller.getRootGroup() >>> [
|
||||
firstRootGroup,
|
||||
positionableMocksById.get(controller.rootGroupId)
|
||||
]
|
||||
_ * controller.createProcessGroup(_) >> { String pgId ->
|
||||
def processGroup = Mock(ProcessGroup)
|
||||
_ * processGroup.getIdentifier() >> pgId
|
||||
_ * processGroup.getPosition() >> { positionablePositionsById.get(pgId) }
|
||||
_ * processGroup.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put pgId, pos
|
||||
}
|
||||
_ * processGroup./(add|set).*/(*_)
|
||||
_ * processGroup.isEmpty() >> true
|
||||
_ * processGroup.isRootGroup() >> { pgId == flowControllerXml.rootGroup.id }
|
||||
_ * processGroup.getConnectable(_) >> { String connId -> positionableMocksById.get(connId) }
|
||||
_ * processGroup.findAllPositionables() >> {
|
||||
def foundProcessGroup = flowControllerXml.rootGroup.'**'.find { it.id == pgId }
|
||||
def idsUnderPg = foundProcessGroup.'**'.findAll { it.name() == 'id' }.collect { it.text() }
|
||||
positionableMocksById.entrySet().collect {
|
||||
if (idsUnderPg.contains(it.key)) {
|
||||
it.value
|
||||
}
|
||||
}
|
||||
}
|
||||
_ * processGroup.findAllConnections() >> {
|
||||
def foundProcessGroup = flowControllerXml.rootGroup.'**'.find { it.id == pgId }
|
||||
def foundConnections = foundProcessGroup.'**'.findAll { it.name() == 'connection' }.collect { it.id.text() }
|
||||
connectionMocksById.entrySet().collect {
|
||||
if (foundConnections.contains(it.key)) {
|
||||
it.value
|
||||
}
|
||||
}
|
||||
}
|
||||
_ * processGroup.findAllRemoteProcessGroups() >> []
|
||||
|
||||
positionableMocksById.put(pgId, processGroup)
|
||||
return processGroup
|
||||
}
|
||||
|
||||
_ * controller.createProcessor(_, _, _, _) >> { String type, String id, BundleCoordinate coordinate, boolean firstTimeAdded ->
|
||||
def processor = Mock(ProcessorNode)
|
||||
_ * processor.getPosition() >> { positionablePositionsById.get(id) }
|
||||
_ * processor.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put id, pos
|
||||
}
|
||||
_ * processor./(add|set).*/(*_)
|
||||
_ * processor.getIdentifier() >> id
|
||||
_ * processor.getBundleCoordinate() >> coordinate
|
||||
_ * processor.getRelationship(_) >> { String n -> new Relationship.Builder().name(n).build() }
|
||||
positionableMocksById.put(id, processor)
|
||||
return processor
|
||||
}
|
||||
_ * controller.createFunnel(_) >> { String id ->
|
||||
def funnel = Mock(Funnel)
|
||||
_ * funnel.getPosition() >> { positionablePositionsById.get(id) }
|
||||
_ * funnel.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put id, pos
|
||||
}
|
||||
_ * funnel./(add|set).*/(*_)
|
||||
positionableMocksById.put id, funnel
|
||||
return funnel
|
||||
}
|
||||
_ * controller.createLabel(_, _) >> { String id, String text ->
|
||||
def l = Mock(Label)
|
||||
_ * l.getPosition() >> { positionablePositionsById.get(id) }
|
||||
_ * l.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put id, pos
|
||||
}
|
||||
_ * l./(add|set).*/(*_)
|
||||
positionableMocksById.put(id, l)
|
||||
return l
|
||||
}
|
||||
_ * controller./create.*Port/(_, _) >> { String id, String text ->
|
||||
def port = Mock(Port)
|
||||
_ * port.getPosition() >> { positionablePositionsById.get(id) }
|
||||
_ * port.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put id, pos
|
||||
}
|
||||
_ * port./(add|set).*/(*_)
|
||||
positionableMocksById.put(id, port)
|
||||
return port
|
||||
}
|
||||
_ * controller.createRemoteProcessGroup(_, _) >> { String id, String uri ->
|
||||
def rpg = Mock(RemoteProcessGroup)
|
||||
_ * rpg.getPosition() >> { positionablePositionsById.get(id) }
|
||||
_ * rpg.setPosition(_) >> { Position pos ->
|
||||
positionablePositionsById.put id, pos
|
||||
}
|
||||
_ * rpg./(add|set).*/(*_)
|
||||
_ * rpg.getOutputPort(_) >> { String rpgId -> positionableMocksById.get(rpgId) }
|
||||
_ * rpg.getIdentifier() >> id
|
||||
positionableMocksById.put(id, rpg)
|
||||
return rpg
|
||||
}
|
||||
_ * controller.createConnection(_, _, _, _, _) >> { String id, String name, Connectable source, Connectable destination, Collection<String> relationshipNames ->
|
||||
def connection = Mock(Connection)
|
||||
_ * connection.getIdentifier() >> id
|
||||
_ * connection.getBendPoints() >> {
|
||||
def bendpoints = bendPointPositionsByConnectionId.get(id)
|
||||
return bendpoints
|
||||
}
|
||||
_ * connection.setBendPoints(_) >> {
|
||||
// There seems to be a bug in Spock method matching where a list of arguments to a method
|
||||
// is being coerced into an Arrays$ArrayList with the actual list of bend points as an
|
||||
// ArrayList in the 0th element.
|
||||
// Need to keep an eye on this...
|
||||
bendPointPositionsByConnectionId.put id, it[0]
|
||||
}
|
||||
_ * connection./set.*/(*_)
|
||||
_ * connection.flowFileQueue >> flowFileQueue
|
||||
connectionMocksById.put(id, connection)
|
||||
return connection
|
||||
}
|
||||
_ * controller.startProcessor(*_)
|
||||
_ * controller.startConnectable(_)
|
||||
_ * controller.enableControllerServices(_)
|
||||
_ * snippetManager.export() >> {
|
||||
[] as byte[]
|
||||
}
|
||||
_ * snippetManager.clear()
|
||||
1 * proposedFlow.flow >> testFlowBytes
|
||||
_ * proposedFlow.snippets >> {
|
||||
[] as byte[]
|
||||
}
|
||||
_ * proposedFlow.authorizerFingerprint >> null
|
||||
_ * proposedFlow.missingComponents >> []
|
||||
|
||||
_ * flowFileQueue./set.*/(*_)
|
||||
_ * _.hashCode() >> 1
|
||||
0 * _ // no other mock calls allowed
|
||||
|
||||
then: "verify that the flow was scaled properly"
|
||||
originalPositionablePositionsById.entrySet().forEach { entry ->
|
||||
assert positionablePositionsById.containsKey(entry.key)
|
||||
def originalPosition = entry.value
|
||||
def position = positionablePositionsById.get(entry.key)
|
||||
compareOriginalPointToScaledPoint(originalPosition, position, isSyncedPositionGreater)
|
||||
}
|
||||
originalBendPointsByConnectionId.entrySet().forEach { entry ->
|
||||
assert bendPointPositionsByConnectionId.containsKey(entry.key)
|
||||
def originalBendPoints = entry.value
|
||||
def sortedBendPoints = bendPointPositionsByConnectionId.get(entry.key).sort { it.x }
|
||||
def sortedOriginalBendPoints = originalBendPoints.sort { it.x }
|
||||
assert sortedOriginalBendPoints.size() == sortedBendPoints.size()
|
||||
[sortedOriginalBendPoints, sortedBendPoints].transpose().forEach { Position originalPosition, Position position ->
|
||||
compareOriginalPointToScaledPoint(originalPosition, position, isSyncedPositionGreater)
|
||||
}
|
||||
}
|
||||
|
||||
where: "the each flowfile and flow encoding version is run through the StandardFlowSynchronizer"
|
||||
filename | flowEncodingVersion | isSyncedPositionGreater
|
||||
'/conf/scale-positions-flow-0.7.0.xml' | null | true
|
||||
'/conf/scale-positions-flow-0.7.0.xml' | '0.7' | true
|
||||
'/conf/scale-positions-flow-0.7.0.xml' | '1.0' | false
|
||||
'/conf/scale-positions-flow-0.7.0.xml' | '99.0' | false
|
||||
}
|
||||
|
||||
private void compareOriginalPointToScaledPoint(Position originalPosition, Position position, boolean isSyncedPositionGreater) {
|
||||
if (originalPosition.x == 0) {
|
||||
assert position.x == 0
|
||||
}
|
||||
if (originalPosition.y == 0) {
|
||||
assert position.y == 0
|
||||
}
|
||||
if (originalPosition.x > 0) {
|
||||
assert isSyncedPositionGreater == position.x > originalPosition.x
|
||||
}
|
||||
if (originalPosition.y > 0) {
|
||||
assert isSyncedPositionGreater == position.y > originalPosition.y
|
||||
}
|
||||
if (originalPosition.x < 0) {
|
||||
assert isSyncedPositionGreater == position.x < originalPosition.x
|
||||
}
|
||||
if (originalPosition.y < 0) {
|
||||
assert isSyncedPositionGreater == position.y < originalPosition.y
|
||||
}
|
||||
}
|
||||
}
|
@ -333,17 +333,22 @@ public class StandardControllerServiceDAO extends ComponentDAO implements Contro
|
||||
final String comments = controllerServiceDTO.getComments();
|
||||
final Map<String, String> properties = controllerServiceDTO.getProperties();
|
||||
|
||||
if (isNotNull(name)) {
|
||||
controllerService.setName(name);
|
||||
}
|
||||
if (isNotNull(annotationData)) {
|
||||
controllerService.setAnnotationData(annotationData);
|
||||
}
|
||||
if (isNotNull(comments)) {
|
||||
controllerService.setComments(comments);
|
||||
}
|
||||
if (isNotNull(properties)) {
|
||||
controllerService.setProperties(properties);
|
||||
controllerService.pauseValidationTrigger(); // avoid causing validation to be triggered multiple times
|
||||
try {
|
||||
if (isNotNull(name)) {
|
||||
controllerService.setName(name);
|
||||
}
|
||||
if (isNotNull(annotationData)) {
|
||||
controllerService.setAnnotationData(annotationData);
|
||||
}
|
||||
if (isNotNull(comments)) {
|
||||
controllerService.setComments(comments);
|
||||
}
|
||||
if (isNotNull(properties)) {
|
||||
controllerService.setProperties(properties);
|
||||
}
|
||||
} finally {
|
||||
controllerService.resumeValidationTrigger();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -142,51 +142,56 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
|
||||
final String bulletinLevel = config.getBulletinLevel();
|
||||
final Set<String> undefinedRelationshipsToTerminate = config.getAutoTerminatedRelationships();
|
||||
|
||||
// ensure scheduling strategy is set first
|
||||
if (isNotNull(schedulingStrategy)) {
|
||||
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
|
||||
}
|
||||
|
||||
if (isNotNull(executionNode)) {
|
||||
processor.setExecutionNode(ExecutionNode.valueOf(executionNode));
|
||||
}
|
||||
if (isNotNull(comments)) {
|
||||
processor.setComments(comments);
|
||||
}
|
||||
if (isNotNull(annotationData)) {
|
||||
processor.setAnnotationData(annotationData);
|
||||
}
|
||||
if (isNotNull(maxTasks)) {
|
||||
processor.setMaxConcurrentTasks(maxTasks);
|
||||
}
|
||||
if (isNotNull(schedulingPeriod)) {
|
||||
processor.setScheduldingPeriod(schedulingPeriod);
|
||||
}
|
||||
if (isNotNull(penaltyDuration)) {
|
||||
processor.setPenalizationPeriod(penaltyDuration);
|
||||
}
|
||||
if (isNotNull(yieldDuration)) {
|
||||
processor.setYieldPeriod(yieldDuration);
|
||||
}
|
||||
if (isNotNull(runDurationMillis)) {
|
||||
processor.setRunDuration(runDurationMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
if (isNotNull(bulletinLevel)) {
|
||||
processor.setBulletinLevel(LogLevel.valueOf(bulletinLevel));
|
||||
}
|
||||
if (isNotNull(config.isLossTolerant())) {
|
||||
processor.setLossTolerant(config.isLossTolerant());
|
||||
}
|
||||
if (isNotNull(configProperties)) {
|
||||
processor.setProperties(configProperties);
|
||||
}
|
||||
|
||||
if (isNotNull(undefinedRelationshipsToTerminate)) {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
for (final String relName : undefinedRelationshipsToTerminate) {
|
||||
relationships.add(new Relationship.Builder().name(relName).build());
|
||||
processor.pauseValidationTrigger(); // ensure that we don't trigger many validations to occur
|
||||
try {
|
||||
// ensure scheduling strategy is set first
|
||||
if (isNotNull(schedulingStrategy)) {
|
||||
processor.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
|
||||
}
|
||||
processor.setAutoTerminatedRelationships(relationships);
|
||||
|
||||
if (isNotNull(executionNode)) {
|
||||
processor.setExecutionNode(ExecutionNode.valueOf(executionNode));
|
||||
}
|
||||
if (isNotNull(comments)) {
|
||||
processor.setComments(comments);
|
||||
}
|
||||
if (isNotNull(annotationData)) {
|
||||
processor.setAnnotationData(annotationData);
|
||||
}
|
||||
if (isNotNull(maxTasks)) {
|
||||
processor.setMaxConcurrentTasks(maxTasks);
|
||||
}
|
||||
if (isNotNull(schedulingPeriod)) {
|
||||
processor.setScheduldingPeriod(schedulingPeriod);
|
||||
}
|
||||
if (isNotNull(penaltyDuration)) {
|
||||
processor.setPenalizationPeriod(penaltyDuration);
|
||||
}
|
||||
if (isNotNull(yieldDuration)) {
|
||||
processor.setYieldPeriod(yieldDuration);
|
||||
}
|
||||
if (isNotNull(runDurationMillis)) {
|
||||
processor.setRunDuration(runDurationMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
if (isNotNull(bulletinLevel)) {
|
||||
processor.setBulletinLevel(LogLevel.valueOf(bulletinLevel));
|
||||
}
|
||||
if (isNotNull(config.isLossTolerant())) {
|
||||
processor.setLossTolerant(config.isLossTolerant());
|
||||
}
|
||||
if (isNotNull(configProperties)) {
|
||||
processor.setProperties(configProperties);
|
||||
}
|
||||
|
||||
if (isNotNull(undefinedRelationshipsToTerminate)) {
|
||||
final Set<Relationship> relationships = new HashSet<>();
|
||||
for (final String relName : undefinedRelationshipsToTerminate) {
|
||||
relationships.add(new Relationship.Builder().name(relName).build());
|
||||
}
|
||||
processor.setAutoTerminatedRelationships(relationships);
|
||||
}
|
||||
} finally {
|
||||
processor.resumeValidationTrigger();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -313,25 +313,30 @@ public class StandardReportingTaskDAO extends ComponentDAO implements ReportingT
|
||||
final String comments = reportingTaskDTO.getComments();
|
||||
final Map<String, String> properties = reportingTaskDTO.getProperties();
|
||||
|
||||
// ensure scheduling strategy is set first
|
||||
if (isNotNull(schedulingStrategy)) {
|
||||
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
|
||||
}
|
||||
reportingTask.pauseValidationTrigger(); // avoid triggering validation multiple times
|
||||
try {
|
||||
// ensure scheduling strategy is set first
|
||||
if (isNotNull(schedulingStrategy)) {
|
||||
reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(schedulingStrategy));
|
||||
}
|
||||
|
||||
if (isNotNull(name)) {
|
||||
reportingTask.setName(name);
|
||||
}
|
||||
if (isNotNull(schedulingPeriod)) {
|
||||
reportingTask.setSchedulingPeriod(schedulingPeriod);
|
||||
}
|
||||
if (isNotNull(annotationData)) {
|
||||
reportingTask.setAnnotationData(annotationData);
|
||||
}
|
||||
if (isNotNull(comments)) {
|
||||
reportingTask.setComments(comments);
|
||||
}
|
||||
if (isNotNull(properties)) {
|
||||
reportingTask.setProperties(properties);
|
||||
if (isNotNull(name)) {
|
||||
reportingTask.setName(name);
|
||||
}
|
||||
if (isNotNull(schedulingPeriod)) {
|
||||
reportingTask.setSchedulingPeriod(schedulingPeriod);
|
||||
}
|
||||
if (isNotNull(annotationData)) {
|
||||
reportingTask.setAnnotationData(annotationData);
|
||||
}
|
||||
if (isNotNull(comments)) {
|
||||
reportingTask.setComments(comments);
|
||||
}
|
||||
if (isNotNull(properties)) {
|
||||
reportingTask.setProperties(properties);
|
||||
}
|
||||
} finally {
|
||||
reportingTask.resumeValidationTrigger();
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user