NIFI-6201 Suppressed stacktraces for expected error conditions (connection timeout, missing component bundle, etc.) in application startup when the stacktrace provides no additional helpful data.

NIFI-6201 Fixed existing checkstyle complaint on missing Javadoc element.

NIFI-6201 Restored "connection refused" text to relevant registry sync error message.

NIFI-6201 Simplified error logging for bundle not found issues and removed unused bundle coordinate methods.

This closes #3427.

Signed-off-by: Kevin Doran <kdoran@apache.org>
This commit is contained in:
Andy LoPresto 2019-04-09 22:32:58 -07:00 committed by Kevin Doran
parent 7b945182af
commit 22563fcb9a
No known key found for this signature in database
GPG Key ID: 5621A6244B77AC02
5 changed files with 400 additions and 367 deletions

View File

@ -16,6 +16,12 @@
*/
package org.apache.nifi.controller;
import java.lang.reflect.Proxy;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.configuration.DefaultSettings;
@ -56,13 +62,6 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Proxy;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class ExtensionBuilder {
private static final Logger logger = LoggerFactory.getLogger(ExtensionBuilder.class);
@ -190,7 +189,11 @@ public class ExtensionBuilder {
try {
loggableComponent = createLoggableProcessor();
} catch (final ProcessorInstantiationException pie) {
logger.error("Could not create Processor of type " + type + " for ID " + identifier + "; creating \"Ghost\" implementation", pie);
logger.error("Could not create Processor of type " + type + " for ID " + identifier + " due to: " + pie.getMessage() + "; creating \"Ghost\" implementation");
if (logger.isDebugEnabled()) {
logger.debug(pie.getMessage(), pie);
}
final GhostProcessor ghostProc = new GhostProcessor();
ghostProc.setIdentifier(identifier);
ghostProc.setCanonicalClassName(type);
@ -280,7 +283,11 @@ public class ExtensionBuilder {
try {
return createControllerServiceNode();
} catch (final Exception e) {
logger.error("Could not create Controller Service of type " + type + " for ID " + identifier + "; creating \"Ghost\" implementation", e);
logger.error("Could not create Controller Service of type " + type + " for ID " + identifier + " due to: " + e.getMessage() + "; creating \"Ghost\" implementation");
if (logger.isDebugEnabled()) {
logger.debug(e.getMessage(), e);
}
return createGhostControllerServiceNode();
}
}
@ -293,12 +300,12 @@ public class ExtensionBuilder {
final ProcessorNode procNode;
if (creationSuccessful) {
procNode = new StandardProcessorNode(processor, identifier, validationContextFactory, processScheduler, serviceProvider,
componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
procNode = new StandardProcessorNode(processor, identifier, validationContextFactory, processScheduler, serviceProvider,
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
}
applyDefaultSettings(procNode);
@ -312,14 +319,14 @@ public class ExtensionBuilder {
final ReportingTaskNode taskNode;
if (creationSuccessful) {
taskNode = new StandardReportingTaskNode(reportingTask, identifier, flowController, processScheduler,
validationContextFactory, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
validationContextFactory, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName());
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
taskNode = new StandardReportingTaskNode(reportingTask, identifier, flowController, processScheduler, validationContextFactory,
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
taskNode.setName(componentType);
}
@ -374,7 +381,7 @@ public class ExtensionBuilder {
final StateManager stateManager = stateManagerProvider.getStateManager(identifier);
final ControllerServiceInitializationContext initContext = new StandardControllerServiceInitializationContext(identifier, terminationAwareLogger,
serviceProvider, stateManager, kerberosConfig);
serviceProvider, stateManager, kerberosConfig);
serviceImpl.initialize(initContext);
final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(serviceImpl, bundleCoordinate, terminationAwareLogger);
@ -383,7 +390,7 @@ public class ExtensionBuilder {
final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, componentVarRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler,
identifier, validationContextFactory, serviceProvider, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
identifier, validationContextFactory, serviceProvider, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
serviceNode.setName(rawClass.getSimpleName());
invocationHandler.setServiceNode(serviceNode);
@ -407,7 +414,7 @@ public class ExtensionBuilder {
final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, variableRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, identifier,
validationContextFactory, serviceProvider, componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
validationContextFactory, serviceProvider, componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
return serviceNode;
}
@ -417,7 +424,7 @@ public class ExtensionBuilder {
final LoggableComponent<Processor> processorComponent = createLoggableComponent(Processor.class);
final ProcessorInitializationContext initiContext = new StandardProcessorInitializationContext(identifier, processorComponent.getLogger(),
serviceProvider, nodeTypeProvider, kerberosConfig);
serviceProvider, nodeTypeProvider, kerberosConfig);
processorComponent.getComponent().initialize(initiContext);
return processorComponent;
@ -433,7 +440,7 @@ public class ExtensionBuilder {
final String taskName = taskComponent.getComponent().getClass().getSimpleName();
final ReportingInitializationContext config = new StandardReportingInitializationContext(identifier, taskName,
SchedulingStrategy.TIMER_DRIVEN, "1 min", taskComponent.getLogger(), serviceProvider, kerberosConfig, nodeTypeProvider);
SchedulingStrategy.TIMER_DRIVEN, "1 min", taskComponent.getLogger(), serviceProvider, kerberosConfig, nodeTypeProvider);
taskComponent.getComponent().initialize(config);

View File

@ -16,6 +16,23 @@
*/
package org.apache.nifi.controller.service;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
@ -51,24 +68,6 @@ import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class StandardControllerServiceNode extends AbstractComponentNode implements ControllerServiceNode {
private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
@ -288,7 +287,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
@Override
public void verifyCanDisable() {
verifyCanDisable(Collections.<ControllerServiceNode>emptySet());
verifyCanDisable(Collections.emptySet());
}
@Override
@ -571,4 +570,21 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
}
}
}
@Override
public String toString() {
String bundleCoordinate;
try {
bundleCoordinate = controllerServiceHolder.get().getBundleCoordinate().toString();
} catch (NullPointerException e) {
bundleCoordinate = "null";
}
return "StandardControllerServiceNode{" +
"controllerServiceHolder=" + bundleCoordinate +
", versionedComponentId=" + versionedComponentId +
", comment='" + comment + '\'' +
", processGroup=" + processGroup +
", active=" + active +
'}';
}
}

View File

@ -16,23 +16,7 @@
*/
package org.apache.nifi.controller.service;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
import java.util.ArrayList;
import java.util.Collection;
@ -51,8 +35,23 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardControllerServiceProvider implements ControllerServiceProvider {
@ -64,6 +63,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private final FlowManager flowManager;
private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>();
private final String INVALID_CS_MESSAGE_SEGMENT = "cannot be enabled because it is not currently valid";
public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo) {
this.flowController = flowController;
@ -208,8 +208,14 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
future.get(30, TimeUnit.SECONDS);
logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
} catch (final Exception e) {
logger.warn("Failed to enable service {}", controllerServiceNode, e);
// Nothing we can really do. Will attempt to enable this service anyway.
// If the service is not currently valid, there is no need to print the entire stacktrace
if (e.getLocalizedMessage().contains(INVALID_CS_MESSAGE_SEGMENT)) {
logger.warn("Failed to enable service {} because {}", controllerServiceNode, e.getLocalizedMessage());
} else {
// Print the whole stacktrace
logger.warn("Failed to enable service {}", controllerServiceNode, e);
// Nothing we can really do. Will attempt to enable this service anyway.
}
}
}
} catch (Exception e) {
@ -475,13 +481,13 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? false : ControllerServiceState.ENABLED == node.getState();
return node != null && ControllerServiceState.ENABLED == node.getState();
}
@Override
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? false : ControllerServiceState.ENABLING == node.getState();
return node != null && ControllerServiceState.ENABLING == node.getState();
}
@Override

View File

@ -16,6 +16,31 @@
*/
package org.apache.nifi.groups;
import static java.util.Objects.requireNonNull;
import java.io.IOException;
import java.net.ConnectException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
@ -123,31 +148,6 @@ import org.apache.nifi.web.api.dto.TemplateDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public final class StandardProcessGroup implements ProcessGroup {
private final String id;
@ -185,8 +185,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class);
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final StandardProcessScheduler scheduler,
final NiFiProperties nifiProps, final StringEncryptor encryptor, final FlowController flowController,
final MutableVariableRegistry variableRegistry) {
final NiFiProperties nifiProps, final StringEncryptor encryptor, final FlowController flowController,
final MutableVariableRegistry variableRegistry) {
this.id = id;
this.controllerServiceProvider = serviceProvider;
this.parent = new AtomicReference<>();
@ -1683,11 +1683,8 @@ public final class StandardProcessGroup implements ProcessGroup {
owner = owner.getParent();
}
if (owner == this) {
return true;
}
return owner == this;
return false;
}
@Override
@ -2131,14 +2128,14 @@ public final class StandardProcessGroup implements ProcessGroup {
// and notify the Process Group that a component has been modified. This way, we know to re-calculate
// whether or not the Process Group has local modifications.
service.getReferences().getReferencingComponents().stream()
.map(ComponentNode::getProcessGroupIdentifier)
.filter(id -> !id.equals(getIdentifier()))
.forEach(groupId -> {
final ProcessGroup descendant = findProcessGroup(groupId);
if (descendant != null) {
descendant.onComponentModified();
}
});
.map(ComponentNode::getProcessGroupIdentifier)
.filter(id -> !id.equals(getIdentifier()))
.forEach(groupId -> {
final ProcessGroup descendant = findProcessGroup(groupId);
if (descendant != null) {
descendant.onComponentModified();
}
});
flowController.getStateManagerProvider().onComponentRemoved(service.getIdentifier());
@ -2485,9 +2482,9 @@ public final class StandardProcessGroup implements ProcessGroup {
* {@link IllegalStateException}.
*
* @param snippet the snippet
* @throws NullPointerException if the argument is null
* @throws NullPointerException if the argument is null
* @throws IllegalStateException if the snippet contains an ID that
* references a component that is not part of this ProcessGroup
* references a component that is not part of this ProcessGroup
*/
private void verifyContents(final Snippet snippet) throws NullPointerException, IllegalStateException {
requireNonNull(snippet);
@ -2505,7 +2502,7 @@ public final class StandardProcessGroup implements ProcessGroup {
/**
* Verifies that a move request cannot attempt to move a process group into itself.
*
* @param snippet the snippet
* @param snippet the snippet
* @param destination the destination
* @throws IllegalStateException if the snippet contains an ID that is equal to the identifier of the destination
*/
@ -2531,8 +2528,8 @@ public final class StandardProcessGroup implements ProcessGroup {
* If the ids given are null, will do no validation.
* </p>
*
* @param ids ids
* @param map map
* @param ids ids
* @param map map
* @param componentType type
*/
private void verifyAllKeysExist(final Set<String> ids, final Map<String, ?> map, final String componentType) {
@ -2806,7 +2803,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// ensure the configured service is an allowed service if it's still a valid service
if (currentControllerServiceIds.contains(serviceId) && !proposedControllerServiceIds.contains(serviceId)) {
throw new IllegalStateException("Cannot perform Move Operation because Processor with ID " + processorNode.getIdentifier()
+ " references a service that is not available in the destination Process Group");
+ " references a service that is not available in the destination Process Group");
}
}
}
@ -2821,8 +2818,8 @@ public final class StandardProcessGroup implements ProcessGroup {
final Set<ProcessorNode> processors = new HashSet<>();
snippet.getProcessors().keySet().stream()
.map(this::getProcessor)
.forEach(processors::add);
.map(this::getProcessor)
.forEach(processors::add);
for (final String groupId : snippet.getProcessGroups().keySet()) {
processors.addAll(getProcessGroup(groupId).findAllProcessors());
@ -2988,12 +2985,12 @@ public final class StandardProcessGroup implements ProcessGroup {
private List<VariableImpact> getVariableImpact(final ComponentNode component) {
return component.getProperties().keySet().stream()
.map(descriptor -> {
final String configuredVal = component.getProperty(descriptor);
return configuredVal == null ? descriptor.getDefaultValue() : configuredVal;
})
.map(propVal -> Query.prepare(propVal).getVariableImpact())
.collect(Collectors.toList());
.map(descriptor -> {
final String configuredVal = component.getProperty(descriptor);
return configuredVal == null ? descriptor.getDefaultValue() : configuredVal;
})
.map(propVal -> Query.prepare(propVal).getVariableImpact())
.collect(Collectors.toList());
}
@Override
@ -3044,13 +3041,13 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public void setVersionControlInformation(final VersionControlInformation versionControlInformation, final Map<String, String> versionedComponentIds) {
final StandardVersionControlInformation svci = new StandardVersionControlInformation(
versionControlInformation.getRegistryIdentifier(),
versionControlInformation.getRegistryName(),
versionControlInformation.getBucketIdentifier(),
versionControlInformation.getFlowIdentifier(),
versionControlInformation.getVersion(),
stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true),
versionControlInformation.getStatus()) {
versionControlInformation.getRegistryIdentifier(),
versionControlInformation.getRegistryName(),
versionControlInformation.getBucketIdentifier(),
versionControlInformation.getFlowIdentifier(),
versionControlInformation.getVersion(),
stripContentsFromRemoteDescendantGroups(versionControlInformation.getFlowSnapshot(), true),
versionControlInformation.getStatus()) {
@Override
public String getRegistryName() {
@ -3219,27 +3216,27 @@ public final class StandardProcessGroup implements ProcessGroup {
processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier()));
processGroup.getConnections()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getProcessors()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getInputPorts()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getOutputPorts()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getLabels()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getFunnels()
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getControllerServices(false)
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
.forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getRemoteProcessGroups()
.forEach(rpg -> {
rpg.setVersionedComponentId(lookup.apply(rpg.getIdentifier()));
.forEach(rpg -> {
rpg.setVersionedComponentId(lookup.apply(rpg.getIdentifier()));
rpg.getInputPorts().forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
rpg.getOutputPorts().forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
});
rpg.getInputPorts().forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
rpg.getOutputPorts().forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
});
for (final ProcessGroup childGroup : processGroup.getProcessGroups()) {
if (childGroup.getVersionControlInformation() == null) {
@ -3262,11 +3259,11 @@ public final class StandardProcessGroup implements ProcessGroup {
final FlowRegistry flowRegistry = flowRegistryClient.getFlowRegistry(registryId);
if (flowRegistry == null) {
final String message = String.format("Unable to synchronize Process Group with Flow Registry because Process Group was placed under Version Control using Flow Registry "
+ "with identifier %s but cannot find any Flow Registry with this identifier", registryId);
+ "with identifier %s but cannot find any Flow Registry with this identifier", registryId);
versionControlFields.setSyncFailureExplanation(message);
LOG.error("Unable to synchronize {} with Flow Registry because Process Group was placed under Version Control using Flow Registry "
+ "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId);
+ "with identifier {} but cannot find any Flow Registry with this identifier", this, registryId);
return;
}
@ -3280,11 +3277,17 @@ public final class StandardProcessGroup implements ProcessGroup {
vci.setFlowSnapshot(registryFlow);
} catch (final IOException | NiFiRegistryException e) {
final String message = String.format("Failed to synchronize Process Group with Flow Registry because could not retrieve version %s of flow with identifier %s in bucket %s",
vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier());
vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier());
versionControlFields.setSyncFailureExplanation(message);
LOG.error("Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}",
this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e);
final String logErrorMessage = "Failed to synchronize {} with Flow Registry because could not retrieve version {} of flow with identifier {} in bucket {}";
// No need to print a full stacktrace for connection refused
if (e instanceof ConnectException) {
LOG.error(logErrorMessage + " due to: {}", this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e.getLocalizedMessage());
} else {
LOG.error(logErrorMessage, this, vci.getVersion(), vci.getFlowIdentifier(), vci.getBucketIdentifier(), e);
}
return;
}
}
@ -3302,7 +3305,7 @@ public final class StandardProcessGroup implements ProcessGroup {
versionControlFields.setStale(false);
} else {
LOG.info("{} is not the most recent version of the flow that is under Version Control; current version is {}; most recent version is {}",
new Object[] {this, vci.getVersion(), latestVersion});
this, vci.getVersion(), latestVersion);
versionControlFields.setStale(true);
}
@ -3318,7 +3321,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings,
final boolean updateDescendantVersionedFlows) {
final boolean updateDescendantVersionedFlows) {
writeLock.lock();
try {
verifyCanUpdate(proposedSnapshot, true, verifyNotDirty);
@ -3369,8 +3372,8 @@ public final class StandardProcessGroup implements ProcessGroup {
if (LOG.isInfoEnabled()) {
final String differencesByLine = flowComparison.getDifferences().stream()
.map(FlowDifference::toString)
.collect(Collectors.joining("\n"));
.map(FlowDifference::toString)
.collect(Collectors.joining("\n"));
LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshot, flowComparison.getDifferences().size(), differencesByLine);
}
@ -3408,18 +3411,18 @@ public final class StandardProcessGroup implements ProcessGroup {
ancestorServiceIds = Collections.emptySet();
} else {
ancestorServiceIds = parentGroup.getControllerServices(true).stream()
.map(cs -> {
// We want to map the Controller Service to its Versioned Component ID, if it has one.
// If it does not have one, we want to generate it in the same way that our Flow Mapper does
// because this allows us to find the Controller Service when doing a Flow Diff.
final Optional<String> versionedId = cs.getVersionedComponentId();
if (versionedId.isPresent()) {
return versionedId.get();
}
.map(cs -> {
// We want to map the Controller Service to its Versioned Component ID, if it has one.
// If it does not have one, we want to generate it in the same way that our Flow Mapper does
// because this allows us to find the Controller Service when doing a Flow Diff.
final Optional<String> versionedId = cs.getVersionedComponentId();
if (versionedId.isPresent()) {
return versionedId.get();
}
return UUID.nameUUIDFromBytes(cs.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
})
.collect(Collectors.toSet());
return UUID.nameUUIDFromBytes(cs.getIdentifier().getBytes(StandardCharsets.UTF_8)).toString();
})
.collect(Collectors.toSet());
}
return ancestorServiceIds;
@ -3447,8 +3450,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private void populateKnownVariableNames(final ProcessGroup group, final Set<String> knownVariables) {
group.getVariableRegistry().getVariableMap().keySet().stream()
.map(VariableDescriptor::getName)
.forEach(knownVariables::add);
.map(VariableDescriptor::getName)
.forEach(knownVariables::add);
final ProcessGroup parent = group.getParent();
if (parent != null) {
@ -3458,8 +3461,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final boolean updateDescendantVersionedGroups,
final Set<String> variablesToSkip) throws ProcessorInstantiationException {
final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName, final boolean updateDescendantVersionedGroups,
final Set<String> variablesToSkip) throws ProcessorInstantiationException {
// During the flow update, we will use temporary names for process group ports. This is because port names must be
// unique within a process group, but during an update we might temporarily be in a state where two ports have the same name.
@ -3484,8 +3487,8 @@ public final class StandardProcessGroup implements ProcessGroup {
// As a result, once imported, we won't update variables to match the remote flow, but we will add any missing variables
// and remove any variables that are no longer part of the remote flow.
final Set<String> existingVariableNames = group.getVariableRegistry().getVariableMap().keySet().stream()
.map(VariableDescriptor::getName)
.collect(Collectors.toSet());
.map(VariableDescriptor::getName)
.collect(Collectors.toSet());
final Map<String, String> updatedVariableMap = new HashMap<>();
@ -3513,16 +3516,16 @@ public final class StandardProcessGroup implements ProcessGroup {
final VersionedFlowState flowState = remoteCoordinates.getLatest() ? VersionedFlowState.UP_TO_DATE : VersionedFlowState.STALE;
final VersionControlInformation vci = new StandardVersionControlInformation.Builder()
.registryId(registryId)
.registryName(registryName)
.bucketId(bucketId)
.bucketName(bucketId)
.flowId(flowId)
.flowName(flowId)
.version(version)
.flowSnapshot(proposed)
.status(new StandardVersionedFlowStatus(flowState, flowState.getDescription()))
.build();
.registryId(registryId)
.registryName(registryName)
.bucketId(bucketId)
.bucketName(bucketId)
.flowId(flowId)
.flowName(flowId)
.version(version)
.flowSnapshot(proposed)
.status(new StandardVersionedFlowStatus(flowState, flowState.getDescription()))
.build();
group.setVersionControlInformation(vci, Collections.emptyMap());
}
@ -3535,7 +3538,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// Controller Service. This way, we ensure that all services have been created before setting the properties. This allows us to
// properly obtain the correct mapping of Controller Service VersionedComponentID to Controller Service instance id.
final Map<String, ControllerServiceNode> servicesByVersionedId = group.getControllerServices(false).stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> controllerServicesRemoved = new HashSet<>(servicesByVersionedId.keySet());
@ -3567,7 +3570,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// Child groups
final Map<String, ProcessGroup> childGroupsByVersionedId = group.getProcessGroups().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> childGroupsRemoved = new HashSet<>(childGroupsByVersionedId.keySet());
for (final VersionedProcessGroup proposedChildGroup : proposed.getProcessGroups()) {
@ -3589,7 +3592,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// Funnels
final Map<String, Funnel> funnelsByVersionedId = group.getFunnels().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> funnelsRemoved = new HashSet<>(funnelsByVersionedId.keySet());
for (final VersionedFunnel proposedFunnel : proposed.getFunnels()) {
@ -3611,7 +3614,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// Input Ports
final Map<String, Port> inputPortsByVersionedId = group.getInputPorts().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> inputPortsRemoved = new HashSet<>(inputPortsByVersionedId.keySet());
for (final VersionedPort proposedPort : proposed.getInputPorts()) {
@ -3636,7 +3639,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// Output Ports
final Map<String, Port> outputPortsByVersionedId = group.getOutputPorts().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> outputPortsRemoved = new HashSet<>(outputPortsByVersionedId.keySet());
for (final VersionedPort proposedPort : proposed.getOutputPorts()) {
@ -3662,7 +3665,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// Labels
final Map<String, Label> labelsByVersionedId = group.getLabels().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> labelsRemoved = new HashSet<>(labelsByVersionedId.keySet());
for (final VersionedLabel proposedLabel : proposed.getLabels()) {
@ -3683,7 +3686,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// Processors
final Map<String, ProcessorNode> processorsByVersionedId = group.getProcessors().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> processorsRemoved = new HashSet<>(processorsByVersionedId.keySet());
final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships = new HashMap<>();
@ -3722,7 +3725,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// Remote Groups
final Map<String, RemoteProcessGroup> rpgsByVersionedId = group.getRemoteProcessGroups().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> rpgsRemoved = new HashSet<>(rpgsByVersionedId.keySet());
for (final VersionedRemoteProcessGroup proposedRpg : proposed.getRemoteProcessGroups()) {
@ -3743,7 +3746,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// Connections
final Map<String, Connection> connectionsByVersionedId = group.getConnections().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> connectionsRemoved = new HashSet<>(connectionsByVersionedId.keySet());
for (final VersionedConnection proposedConnection : proposed.getConnections()) {
@ -3847,11 +3850,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
final Connectable destination = connection.getDestination();
if (destination.getConnectableType() != ConnectableType.FUNNEL && destination.isRunning()) {
return false;
}
return true;
return destination.getConnectableType() == ConnectableType.FUNNEL || !destination.isRunning();
}
private String generateTemporaryPortName(final VersionedPort proposedPort) {
@ -3898,16 +3897,16 @@ public final class StandardProcessGroup implements ProcessGroup {
private void updateConnection(final Connection connection, final VersionedConnection proposed) {
connection.setBendPoints(proposed.getBends() == null ? Collections.emptyList() :
proposed.getBends().stream()
.map(pos -> new Position(pos.getX(), pos.getY()))
.collect(Collectors.toList()));
proposed.getBends().stream()
.map(pos -> new Position(pos.getX(), pos.getY()))
.collect(Collectors.toList()));
connection.setDestination(getConnectable(connection.getProcessGroup(), proposed.getDestination()));
connection.setLabelIndex(proposed.getLabelIndex());
connection.setName(proposed.getName());
connection.setRelationships(proposed.getSelectedRelationships().stream()
.map(name -> new Relationship.Builder().name(name).build())
.collect(Collectors.toSet()));
.map(name -> new Relationship.Builder().name(name).build())
.collect(Collectors.toSet()));
connection.setZIndex(proposed.getzIndex());
final FlowFileQueue queue = connection.getFlowFileQueue();
@ -3916,14 +3915,14 @@ public final class StandardProcessGroup implements ProcessGroup {
queue.setFlowFileExpiration(proposed.getFlowFileExpiration());
final List<FlowFilePrioritizer> prioritizers = proposed.getPrioritizers() == null ? Collections.emptyList() : proposed.getPrioritizers().stream()
.map(prioritizerName -> {
try {
return flowManager.createPrioritizer(prioritizerName);
} catch (final Exception e) {
throw new IllegalStateException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier());
}
})
.collect(Collectors.toList());
.map(prioritizerName -> {
try {
return flowManager.createPrioritizer(prioritizerName);
} catch (final Exception e) {
throw new IllegalStateException("Failed to create Prioritizer of type " + prioritizerName + " for Connection with ID " + connection.getIdentifier());
}
})
.collect(Collectors.toList());
queue.setPriorities(prioritizers);
@ -3949,17 +3948,17 @@ public final class StandardProcessGroup implements ProcessGroup {
final Connectable source = getConnectable(destinationGroup, proposed.getSource());
if (source == null) {
throw new IllegalArgumentException("Connection has a source with identifier " + proposed.getIdentifier()
+ " but no component could be found in the Process Group with a corresponding identifier");
+ " but no component could be found in the Process Group with a corresponding identifier");
}
final Connectable destination = getConnectable(destinationGroup, proposed.getDestination());
if (destination == null) {
throw new IllegalArgumentException("Connection has a destination with identifier " + proposed.getDestination().getId()
+ " but no component could be found in the Process Group with a corresponding identifier");
+ " but no component could be found in the Process Group with a corresponding identifier");
}
final Connection connection = flowController.createConnection(generateUuid(proposed.getIdentifier(), destination.getIdentifier(), componentIdSeed), proposed.getName(), source, destination,
proposed.getSelectedRelationships());
proposed.getSelectedRelationships());
connection.setVersionedComponentId(proposed.getIdentifier());
destinationGroup.addConnection(connection);
updateConnection(connection, proposed);
@ -3974,15 +3973,15 @@ public final class StandardProcessGroup implements ProcessGroup {
switch (connectableComponent.getType()) {
case FUNNEL:
return group.getFunnels().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
case INPUT_PORT: {
final Optional<Port> port = group.getInputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny();
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny();
if (port.isPresent()) {
return port.get();
@ -3990,34 +3989,34 @@ public final class StandardProcessGroup implements ProcessGroup {
// Attempt to locate child group by versioned component id
final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
.filter(child -> child.getVersionedComponentId().isPresent())
.filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
.findFirst();
.filter(child -> child.getVersionedComponentId().isPresent())
.filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
.findFirst();
if (optionalSpecifiedGroup.isPresent()) {
final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
return specifiedGroup.getInputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
}
// If no child group matched the versioned component id, then look at all child groups. This is done because
// in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result,
// if the flow doesn't contain the properly mapped group id, we need to search all child groups.
return group.getProcessGroups().stream()
.flatMap(gr -> gr.getInputPorts().stream())
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
.flatMap(gr -> gr.getInputPorts().stream())
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
}
case OUTPUT_PORT: {
final Optional<Port> port = group.getOutputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny();
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny();
if (port.isPresent()) {
return port.get();
@ -4025,88 +4024,88 @@ public final class StandardProcessGroup implements ProcessGroup {
// Attempt to locate child group by versioned component id
final Optional<ProcessGroup> optionalSpecifiedGroup = group.getProcessGroups().stream()
.filter(child -> child.getVersionedComponentId().isPresent())
.filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
.findFirst();
.filter(child -> child.getVersionedComponentId().isPresent())
.filter(child -> child.getVersionedComponentId().get().equals(connectableComponent.getGroupId()))
.findFirst();
if (optionalSpecifiedGroup.isPresent()) {
final ProcessGroup specifiedGroup = optionalSpecifiedGroup.get();
return specifiedGroup.getOutputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
}
// If no child group matched the versioned component id, then look at all child groups. This is done because
// in older versions, we did not properly map Versioned Component ID's to Ports' parent groups. As a result,
// if the flow doesn't contain the properly mapped group id, we need to search all child groups.
return group.getProcessGroups().stream()
.flatMap(gr -> gr.getOutputPorts().stream())
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
.flatMap(gr -> gr.getOutputPorts().stream())
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
}
case PROCESSOR:
return group.getProcessors().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny()
.orElse(null);
case REMOTE_INPUT_PORT: {
final String rpgId = connectableComponent.getGroupId();
final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> rpgId.equals(component.getVersionedComponentId().get()))
.findAny();
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> rpgId.equals(component.getVersionedComponentId().get()))
.findAny();
if (!rpgOption.isPresent()) {
throw new IllegalArgumentException("Connection refers to a Port with ID " + id + " within Remote Process Group with ID "
+ rpgId + " but could not find a Remote Process Group corresponding to that ID");
+ rpgId + " but could not find a Remote Process Group corresponding to that ID");
}
final RemoteProcessGroup rpg = rpgOption.get();
final Optional<RemoteGroupPort> portByIdOption = rpg.getInputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny();
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny();
if (portByIdOption.isPresent()) {
return portByIdOption.get();
}
return rpg.getInputPorts().stream()
.filter(component -> connectableComponent.getName().equals(component.getName()))
.findAny()
.orElse(null);
.filter(component -> connectableComponent.getName().equals(component.getName()))
.findAny()
.orElse(null);
}
case REMOTE_OUTPUT_PORT: {
final String rpgId = connectableComponent.getGroupId();
final Optional<RemoteProcessGroup> rpgOption = group.getRemoteProcessGroups().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> rpgId.equals(component.getVersionedComponentId().get()))
.findAny();
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> rpgId.equals(component.getVersionedComponentId().get()))
.findAny();
if (!rpgOption.isPresent()) {
throw new IllegalArgumentException("Connection refers to a Port with ID " + id + " within Remote Process Group with ID "
+ rpgId + " but could not find a Remote Process Group corresponding to that ID");
+ rpgId + " but could not find a Remote Process Group corresponding to that ID");
}
final RemoteProcessGroup rpg = rpgOption.get();
final Optional<RemoteGroupPort> portByIdOption = rpg.getOutputPorts().stream()
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny();
.filter(component -> component.getVersionedComponentId().isPresent())
.filter(component -> id.equals(component.getVersionedComponentId().get()))
.findAny();
if (portByIdOption.isPresent()) {
return portByIdOption.get();
}
return rpg.getOutputPorts().stream()
.filter(component -> connectableComponent.getName().equals(component.getName()))
.findAny()
.orElse(null);
.filter(component -> connectableComponent.getName().equals(component.getName()))
.findAny()
.orElse(null);
}
}
@ -4143,11 +4142,7 @@ public final class StandardProcessGroup implements ProcessGroup {
return false;
}
if (!bundle.getVersion().equals(coordinate.getVersion())) {
return false;
}
return true;
return bundle.getVersion().equals(coordinate.getVersion());
}
private BundleCoordinate toCoordinate(final Bundle bundle) {
@ -4272,7 +4267,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties,
final Map<String, VersionedPropertyDescriptor> proposedDescriptors, final ProcessGroup group) {
final Map<String, VersionedPropertyDescriptor> proposedDescriptors, final ProcessGroup group) {
// since VersionedPropertyDescriptor currently doesn't know if it is sensitive or not,
// keep track of which property descriptors are sensitive from the current properties
@ -4348,13 +4343,13 @@ public final class StandardProcessGroup implements ProcessGroup {
rpg.setComments(proposed.getComments());
rpg.setCommunicationsTimeout(proposed.getCommunicationsTimeout());
rpg.setInputPorts(proposed.getInputPorts() == null ? Collections.emptySet() : proposed.getInputPorts().stream()
.map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
.collect(Collectors.toSet()), false);
.map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
.collect(Collectors.toSet()), false);
rpg.setName(proposed.getName());
rpg.setNetworkInterface(proposed.getLocalNetworkInterface());
rpg.setOutputPorts(proposed.getOutputPorts() == null ? Collections.emptySet() : proposed.getOutputPorts().stream()
.map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
.collect(Collectors.toSet()), false);
.map(port -> createPortDescriptor(port, componentIdSeed, rpg.getIdentifier()))
.collect(Collectors.toSet()), false);
rpg.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
rpg.setProxyHost(proposed.getProxyHost());
rpg.setProxyPort(proposed.getProxyPort());
@ -4443,17 +4438,17 @@ public final class StandardProcessGroup implements ProcessGroup {
if (modified) {
final String changes = modifications.stream()
.map(FlowDifference::toString)
.collect(Collectors.joining("\n"));
.map(FlowDifference::toString)
.collect(Collectors.joining("\n"));
LOG.error("Cannot change the Version of the flow for {} because the Process Group has been modified ({} modifications) "
+ "since it was last synchronized with the Flow Registry. The following differences were found:\n{}",
this, modifications.size(), changes);
+ "since it was last synchronized with the Flow Registry. The following differences were found:\n{}",
this, modifications.size(), changes);
throw new IllegalStateException("Cannot change the Version of the flow for " + this
+ " because the Process Group has been modified (" + modifications.size()
+ " modifications) since it was last synchronized with the Flow Registry. The Process Group must be"
+ " reverted to its original form before changing the version.");
+ " because the Process Group has been modified (" + modifications.size()
+ " modifications) since it was last synchronized with the Flow Registry. The Process Group must be"
+ " reverted to its original form before changing the version.");
}
}
@ -4471,7 +4466,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// that were removed. We can then check for any connections that have data in them. If any Connection is to be removed but
// has data, then we should throw an IllegalStateException.
findAllConnections().stream()
.forEach(conn -> removedConnectionByVersionedId.put(conn.getVersionedComponentId().orElse(conn.getIdentifier()), conn));
.forEach(conn -> removedConnectionByVersionedId.put(conn.getVersionedComponentId().orElse(conn.getIdentifier()), conn));
final Set<String> proposedFlowConnectionIds = new HashSet<>();
findAllConnectionIds(flowContents, proposedFlowConnectionIds);
@ -4485,8 +4480,8 @@ public final class StandardProcessGroup implements ProcessGroup {
final FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
if (!flowFileQueue.isEmpty()) {
throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the "
+ "proposed version does not contain "
+ connection + " and the connection currently has data in the queue.");
+ "proposed version does not contain "
+ connection + " and the connection currently has data in the queue.");
}
}
}
@ -4494,36 +4489,36 @@ public final class StandardProcessGroup implements ProcessGroup {
// Determine which input ports were removed from this process group
final Map<String, Port> removedInputPortsByVersionId = new HashMap<>();
getInputPorts().stream()
.filter(port -> port.getVersionedComponentId().isPresent())
.forEach(port -> removedInputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
.filter(port -> port.getVersionedComponentId().isPresent())
.forEach(port -> removedInputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
flowContents.getInputPorts().stream()
.map(VersionedPort::getIdentifier)
.forEach(removedInputPortsByVersionId::remove);
.map(VersionedPort::getIdentifier)
.forEach(removedInputPortsByVersionId::remove);
// Ensure that there are no incoming connections for any Input Port that was removed.
for (final Port inputPort : removedInputPortsByVersionId.values()) {
final List<Connection> incomingConnections = inputPort.getIncomingConnections();
if (!incomingConnections.isEmpty()) {
throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the proposed version does not contain the Input Port "
+ inputPort + " and the Input Port currently has an incoming connections");
+ inputPort + " and the Input Port currently has an incoming connections");
}
}
// Determine which output ports were removed from this process group
final Map<String, Port> removedOutputPortsByVersionId = new HashMap<>();
getOutputPorts().stream()
.filter(port -> port.getVersionedComponentId().isPresent())
.forEach(port -> removedOutputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
.filter(port -> port.getVersionedComponentId().isPresent())
.forEach(port -> removedOutputPortsByVersionId.put(port.getVersionedComponentId().get(), port));
flowContents.getOutputPorts().stream()
.map(VersionedPort::getIdentifier)
.forEach(removedOutputPortsByVersionId::remove);
.map(VersionedPort::getIdentifier)
.forEach(removedOutputPortsByVersionId::remove);
// Ensure that there are no outgoing connections for any Output Port that was removed.
for (final Port outputPort : removedOutputPortsByVersionId.values()) {
final Set<Connection> outgoingConnections = outputPort.getConnections();
if (!outgoingConnections.isEmpty()) {
throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the proposed version does not contain the Output Port "
+ outputPort + " and the Output Port currently has an outgoing connections");
+ outputPort + " and the Output Port currently has an outgoing connections");
}
}
@ -4546,8 +4541,8 @@ public final class StandardProcessGroup implements ProcessGroup {
if (!proposedProcessGroups.containsKey(versionedId)) {
// Process Group was removed.
throw new IllegalStateException(this + " cannot be updated to the proposed version of the flow because the child " + childGroup
+ " that exists locally has one or more Templates, and the proposed flow does not contain these templates. "
+ "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before attempting to change the version of the flow.");
+ " that exists locally has one or more Templates, and the proposed flow does not contain these templates. "
+ "A Process Group cannot be deleted while it contains Templates. Please remove the Templates before attempting to change the version of the flow.");
}
}
@ -4556,8 +4551,8 @@ public final class StandardProcessGroup implements ProcessGroup {
findAllProcessors(updatedFlow.getFlowContents(), proposedProcessors);
findAllProcessors().stream()
.filter(proc -> proc.getVersionedComponentId().isPresent())
.forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().get()));
.filter(proc -> proc.getVersionedComponentId().isPresent())
.forEach(proc -> proposedProcessors.remove(proc.getVersionedComponentId().get()));
for (final VersionedProcessor processorToAdd : proposedProcessors.values()) {
final String processorToAddClass = processorToAdd.getType();
@ -4576,8 +4571,8 @@ public final class StandardProcessGroup implements ProcessGroup {
findAllControllerServices(updatedFlow.getFlowContents(), proposedServices);
findAllControllerServices().stream()
.filter(service -> service.getVersionedComponentId().isPresent())
.forEach(service -> proposedServices.remove(service.getVersionedComponentId().get()));
.filter(service -> service.getVersionedComponentId().isPresent())
.forEach(service -> proposedServices.remove(service.getVersionedComponentId().get()));
for (final VersionedControllerService serviceToAdd : proposedServices.values()) {
final String serviceToAddClass = serviceToAdd.getType();
@ -4596,8 +4591,8 @@ public final class StandardProcessGroup implements ProcessGroup {
findAllConnections(updatedFlow.getFlowContents(), proposedConnections);
findAllConnections().stream()
.filter(conn -> conn.getVersionedComponentId().isPresent())
.forEach(conn -> proposedConnections.remove(conn.getVersionedComponentId().get()));
.filter(conn -> conn.getVersionedComponentId().isPresent())
.forEach(conn -> proposedConnections.remove(conn.getVersionedComponentId().get()));
for (final VersionedConnection connectionToAdd : proposedConnections.values()) {
if (connectionToAdd.getPrioritizers() != null) {
@ -4685,20 +4680,20 @@ public final class StandardProcessGroup implements ProcessGroup {
final VersionedFlowState state = vci.getStatus().getState();
if (state == VersionedFlowState.STALE || state == VersionedFlowState.LOCALLY_MODIFIED_AND_STALE) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+ " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. "
+ "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry.");
+ " because the Process Group in the flow is not synchronized with the most recent version of the Flow in the Flow Registry. "
+ "In order to publish a new version of the Flow, the Process Group must first be in synch with the latest version in the Flow Registry.");
}
// Flow ID matches. We want to publish the Process Group as the next version of the Flow, so we must
// ensure that all other parameters match as well.
if (!bucketId.equals(vci.getBucketIdentifier())) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
}
if (!registryId.equals(vci.getRegistryIdentifier())) {
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
}
} else if (flowId != null) {
// Flow ID is specified but different. This is not allowed, because Flow ID's are automatically generated,
@ -4707,7 +4702,7 @@ public final class StandardProcessGroup implements ProcessGroup {
// not allowed because the Process Group must be in synch with the latest version of the flow before that
// can be done.
throw new IllegalStateException("Cannot update Version Control Information for Process Group with ID " + getIdentifier()
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
+ " because the Process Group is currently synchronized with a different Versioned Flow than the one specified in the request.");
}
}
}
@ -4736,14 +4731,14 @@ public final class StandardProcessGroup implements ProcessGroup {
if (modified) {
throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
+ "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before "
+ "this action can be performed on the parent Process Group.");
+ "has local modifications. Each descendant Process Group that is under Version Control must first be reverted or have its changes pushed to the Flow Registry before "
+ "this action can be performed on the parent Process Group.");
}
if (flowState == VersionedFlowState.SYNC_FAILURE) {
throw new IllegalStateException("Process Group cannot " + action + " because it contains a child or descendant Process Group that is under Version Control and "
+ "is not synchronized with the Flow Registry. Each descendant Process Group must first be synchronized with the Flow Registry before this action can be "
+ "performed on the parent Process Group. NiFi will continue to attempt to communicate with the Flow Registry periodically in the background.");
+ "is not synchronized with the Flow Registry. Each descendant Process Group must first be synchronized with the Flow Registry before this action can be "
+ "performed on the parent Process Group. NiFi will continue to attempt to communicate with the Flow Registry periodically in the background.");
}
}
}

View File

@ -16,6 +16,38 @@
*/
package org.apache.nifi.remote;
import static java.util.Objects.requireNonNull;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
@ -49,39 +81,6 @@ import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.Response;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
/**
* Represents the Root Process Group of a remote NiFi Instance. Holds
* information about that remote instance, as well as Incoming Ports and
@ -189,7 +188,12 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
try {
refreshFlowContents();
} catch (final Exception e) {
logger.warn("Unable to communicate with remote instance {}", new Object[] {this, e});
// If the root cause is a connection error, don't print the entire stacktrace
if (isConnectionTimeoutError(e)) {
logger.warn("Unable to communicate with remote instance {}", this);
} else {
logger.warn("Unable to communicate with remote instance {}", this, e);
}
}
});
@ -197,6 +201,16 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
backgroundThreadExecutor.scheduleWithFixedDelay(checkAuthorizations, 0L, 60L, TimeUnit.SECONDS);
}
/**
* Returns true if the exception indicates the root cause is a connection timeout error.
*
* @param e the exception thrown connecting to the remote instance
* @return true if the error is due to a connection timeout
*/
private boolean isConnectionTimeoutError(Exception e) {
return e instanceof CommunicationsException && e.getLocalizedMessage().contains("connect timed out");
}
@Override
public void setTargetUris(final String targetUris) {
requireNonNull(targetUris);
@ -442,10 +456,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
* not configured and is in the set given, that port will be instantiated
* and started.
*
* @param ports the new ports
* @param ports the new ports
* @param pruneUnusedPorts if true, any ports that are not included in the given set of ports
* and that do not have any incoming connections will be removed.
*
* and that do not have any incoming connections will be removed.
* @throws NullPointerException if the given argument is null
*/
@Override
@ -457,10 +470,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
newPortTargetIds.add(descriptor.getTargetId());
final Map<String, StandardRemoteGroupPort> inputPortByTargetId = inputPorts.values().stream()
.collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity()));
.collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity()));
final Map<String, StandardRemoteGroupPort> inputPortByName = inputPorts.values().stream()
.collect(Collectors.toMap(StandardRemoteGroupPort::getName, Function.identity()));
.collect(Collectors.toMap(StandardRemoteGroupPort::getName, Function.identity()));
// Check if we have a matching port already and add the port if not. We determine a matching port
// by first finding a port that has the same Target ID. If none exists, then we try to find a port with
@ -532,10 +545,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
* not configured and is in the set given, that port will be instantiated
* and started.
*
* @param ports the new ports
* @param ports the new ports
* @param pruneUnusedPorts if true, will remove any ports that are not in the given list and that have
* no outgoing connections
*
* no outgoing connections
* @throws NullPointerException if the given argument is null
*/
@Override
@ -547,10 +559,10 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
newPortTargetIds.add(descriptor.getTargetId());
final Map<String, StandardRemoteGroupPort> outputPortByTargetId = outputPorts.values().stream()
.collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity()));
.collect(Collectors.toMap(StandardRemoteGroupPort::getTargetIdentifier, Function.identity()));
final Map<String, StandardRemoteGroupPort> outputPortByName = outputPorts.values().stream()
.collect(Collectors.toMap(StandardRemoteGroupPort::getName, Function.identity()));
.collect(Collectors.toMap(StandardRemoteGroupPort::getName, Function.identity()));
// Check if we have a matching port already and add the port if not. We determine a matching port
// by first finding a port that has the same Target ID. If none exists, then we try to find a port with
@ -601,10 +613,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
/**
* Shuts down and removes the given port
*
*
* @throws NullPointerException if the given output Port is null
* @throws NullPointerException if the given output Port is null
* @throws IllegalStateException if the port does not belong to this remote
* process group
* process group
*/
@Override
public void removeNonExistentPort(final RemoteGroupPort port) {
@ -638,10 +649,9 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
* Adds an Output Port to this Remote Process Group that is described by
* this DTO.
*
* @param descriptor
*
* @param descriptor the RPG port descriptor
* @throws IllegalStateException if an Output Port already exists with the
* ID given by dto.getId()
* ID given by dto.getId()
*/
private StandardRemoteGroupPort addOutputPort(final RemoteProcessGroupPortDescriptor descriptor) {
writeLock.lock();
@ -651,7 +661,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), getProcessGroup(),
this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext, scheduler, nifiProperties);
this, TransferDirection.RECEIVE, ConnectableType.REMOTE_OUTPUT_PORT, sslContext, scheduler, nifiProperties);
outputPorts.put(descriptor.getId(), port);
if (descriptor.getConcurrentlySchedulableTaskCount() != null) {
@ -717,9 +727,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
* DTO.
*
* @param descriptor port descriptor
*
* @throws IllegalStateException if an Input Port already exists with the ID
* given by the ID of the DTO.
* given by the ID of the DTO.
*/
private StandardRemoteGroupPort addInputPort(final RemoteProcessGroupPortDescriptor descriptor) {
writeLock.lock();
@ -733,7 +742,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
// unique for each Remote Group Port, so that if we have multiple RPG's pointing
// to the same target, we have unique ID's for each of those ports.
final StandardRemoteGroupPort port = new StandardRemoteGroupPort(descriptor.getId(), descriptor.getTargetId(), descriptor.getName(), getProcessGroup(), this,
TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler, nifiProperties);
TransferDirection.SEND, ConnectableType.REMOTE_INPUT_PORT, sslContext, scheduler, nifiProperties);
if (descriptor.getConcurrentlySchedulableTaskCount() != null) {
port.setMaxConcurrentTasks(descriptor.getConcurrentlySchedulableTaskCount());
@ -909,20 +918,20 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
} else {
this.localAddress = null;
this.nicValidationResult = new ValidationResult.Builder()
.input(interfaceName)
.subject("Network Interface Name")
.valid(false)
.explanation("No IP Address could be found that is bound to the interface with name " + interfaceName)
.build();
.input(interfaceName)
.subject("Network Interface Name")
.valid(false)
.explanation("No IP Address could be found that is bound to the interface with name " + interfaceName)
.build();
}
} catch (final Exception e) {
this.localAddress = null;
this.nicValidationResult = new ValidationResult.Builder()
.input(interfaceName)
.subject("Network Interface Name")
.valid(false)
.explanation("Could not obtain Network Interface with name " + interfaceName)
.build();
.input(interfaceName)
.subject("Network Interface Name")
.valid(false)
.explanation("Could not obtain Network Interface with name " + interfaceName)
.build();
}
}
} finally {