NIFI-12101: This closes #7768. Fixed bugs that caused StatelessBasicsIT.testChangeFlowVersion to fail. There were a couple of related issues. Firstly, for Stateless Groups we were not waiting for the group to fully stop before attempting to update the flow but were waiting only for child/descendant groups to stop. This was due to misleading naming in the ProcessGroupUpdateStrategy enum. Renamed enum values to more clearly convey what they mean. Additionally, renamed the enum to more clearly convey what how it is used, since it is used not only for updating groups but also for retrieving groups. Additionally, when waiting for stateless groups to stop, we inadvertently checked the Set of affectedComponents' IDs against Process Group IDs so addressed the logic there and also ensured that we were consistent in setting the AffectedComponentEntity's ProcessGroup (i.e., its ProcessGroupNameDTO) as we were inconsistently setting it across the codebase.

Fixed issue in logic of LocalComponentLifecycle, which was waiting for all PGs to reach desired stateless run schedule, even when the group itself was not stateless

Fixed JavaScript to specify correct value for recursivity when updating process group parameter context
This commit is contained in:
Mark Payne 2023-09-21 10:02:41 -04:00 committed by Joseph Witt
parent 9ae6921272
commit 713be8b539
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
13 changed files with 96 additions and 40 deletions

View File

@ -16,7 +16,7 @@
*/
package org.apache.nifi.web.api.entity;
public enum ProcessGroupUpdateStrategy {
CURRENT_GROUP,
CURRENT_GROUP_WITH_CHILDREN
public enum ProcessGroupRecursivity {
DIRECT_CHILDREN,
ALL_DESCENDANTS
}

View File

@ -90,8 +90,6 @@ import org.apache.nifi.web.api.entity.AccessPolicyEntity;
import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.FlowAnalysisResultEntity;
import org.apache.nifi.web.api.entity.FlowRegistryBucketEntity;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
import org.apache.nifi.web.api.entity.ConfigurationAnalysisEntity;
@ -103,10 +101,12 @@ import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
import org.apache.nifi.web.api.entity.CurrentUserEntity;
import org.apache.nifi.web.api.entity.FlowAnalysisResultEntity;
import org.apache.nifi.web.api.entity.FlowAnalysisRuleEntity;
import org.apache.nifi.web.api.entity.FlowComparisonEntity;
import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
import org.apache.nifi.web.api.entity.FlowEntity;
import org.apache.nifi.web.api.entity.FlowRegistryBucketEntity;
import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
import org.apache.nifi.web.api.entity.FunnelEntity;
import org.apache.nifi.web.api.entity.LabelEntity;
@ -117,8 +117,8 @@ import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.PortStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupRecursivity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupUpdateStrategy;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
@ -1121,13 +1121,13 @@ public interface NiFiServiceFacade {
void verifyDeleteParameterContext(String parameterContextId);
/**
* Gets all process groups in the specified parent group.
* Gets all child/descendant process groups in the specified parent group.
*
* @param parentGroupId The id of the parent group
* @param processGroupUpdateStrategy if process groups with its child groups should be included
* @return List of process groups
* @param processGroupRecursivity how far to recurse into child/descendant groups
* @return Set of all child or descendant process groups
*/
Set<ProcessGroupEntity> getProcessGroups(String parentGroupId, ProcessGroupUpdateStrategy processGroupUpdateStrategy);
Set<ProcessGroupEntity> getProcessGroups(String parentGroupId, ProcessGroupRecursivity processGroupRecursivity);
/**
* Verifies the contents of the specified process group can be scheduled or unscheduled.

View File

@ -226,6 +226,7 @@ import org.apache.nifi.web.api.dto.PermissionsDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.PreviousValueDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessGroupNameDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO;
@ -305,9 +306,9 @@ import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.PortStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupRecursivity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessGroupUpdateStrategy;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorRunStatusDetailsEntity;
@ -4631,10 +4632,10 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public Set<ProcessGroupEntity> getProcessGroups(final String parentGroupId, final ProcessGroupUpdateStrategy processGroupUpdateStrategy) {
final Set<ProcessGroup> groups = processGroupDAO.getProcessGroups(parentGroupId, processGroupUpdateStrategy);
public Set<ProcessGroupEntity> getProcessGroups(final String parentGroupId, final ProcessGroupRecursivity processGroupRecursivity) {
final Set<ProcessGroup> groups = processGroupDAO.getProcessGroups(parentGroupId, processGroupRecursivity);
return groups.stream()
.map(group -> createProcessGroupEntity(group))
.map(this::createProcessGroupEntity)
.collect(Collectors.toSet());
}
@ -5696,11 +5697,23 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
private ProcessGroupNameDTO createProcessGroupNameDto(final ProcessGroup group) {
if (group == null) {
return null;
}
final ProcessGroupNameDTO dto = new ProcessGroupNameDTO();
dto.setId(group.getIdentifier());
dto.setName(group.getName());
return dto;
}
private AffectedComponentEntity createAffectedComponentEntity(final Connectable connectable) {
final AffectedComponentEntity entity = new AffectedComponentEntity();
entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(connectable.getIdentifier())));
entity.setId(connectable.getIdentifier());
entity.setReferenceType(connectable.getConnectableType().name());
entity.setProcessGroup(createProcessGroupNameDto(connectable.getProcessGroup()));
final Authorizable authorizable = getAuthorizable(connectable);
final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
@ -5724,6 +5737,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier())));
entity.setId(serviceNode.getIdentifier());
entity.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE);
entity.setProcessGroup(createProcessGroupNameDto(serviceNode.getProcessGroup()));
final Authorizable authorizable = authorizableLookup.getControllerService(serviceNode.getIdentifier()).getAuthorizable();
final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
@ -5744,6 +5758,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
entity.setRevision(dtoFactory.createRevisionDTO(revisionManager.getRevision(group.getIdentifier())));
entity.setId(group.getIdentifier());
entity.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_STATELESS_GROUP);
entity.setProcessGroup(createProcessGroupNameDto(group.getParent()));
final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(group);
entity.setPermissions(permissionsDto);
@ -5765,6 +5780,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
entity.setId(instance.getInstanceIdentifier());
entity.setReferenceType(componentTypeName);
final String groupId = instance.getInstanceGroupId();
if (groupId != null) {
final ProcessGroupNameDTO groupNameDto = new ProcessGroupNameDTO();
groupNameDto.setId(groupId);
entity.setProcessGroup(groupNameDto);
}
final Authorizable authorizable = getAuthorizable(componentTypeName, instance);
final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(authorizable);
entity.setPermissions(permissionsDto);

View File

@ -30,10 +30,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -70,7 +72,6 @@ import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import java.util.ArrayList;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizableLookup;
import org.apache.nifi.authorization.AuthorizeAccess;
@ -154,8 +155,8 @@ import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
import org.apache.nifi.web.api.entity.ProcessGroupRecursivity;
import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity;
import org.apache.nifi.web.api.entity.ProcessGroupUpdateStrategy;
import org.apache.nifi.web.api.entity.ProcessGroupUploadEntity;
import org.apache.nifi.web.api.entity.ProcessGroupsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
@ -176,8 +177,6 @@ import org.slf4j.LoggerFactory;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import java.util.HashSet;
/**
* RESTful endpoint for managing a Group.
*/
@ -532,11 +531,11 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
}
final String processGroupUpdateStrategy = requestProcessGroupEntity.getProcessGroupUpdateStrategy();
final ProcessGroupUpdateStrategy updateStrategy;
final ProcessGroupRecursivity updateStrategy;
if (processGroupUpdateStrategy == null) {
updateStrategy = ProcessGroupUpdateStrategy.CURRENT_GROUP;
updateStrategy = ProcessGroupRecursivity.DIRECT_CHILDREN;
} else {
updateStrategy = ProcessGroupUpdateStrategy.valueOf(processGroupUpdateStrategy);
updateStrategy = ProcessGroupRecursivity.valueOf(processGroupUpdateStrategy);
}
final String executionEngine = requestProcessGroupDTO.getExecutionEngine();
@ -575,7 +574,7 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
updatableProcessGroups.put(requestProcessGroupEntity, getRevision(requestProcessGroupEntity, requestGroupId));
if (updateStrategy == ProcessGroupUpdateStrategy.CURRENT_GROUP_WITH_CHILDREN) {
if (updateStrategy == ProcessGroupRecursivity.ALL_DESCENDANTS) {
for (ProcessGroupEntity processGroupEntity : serviceFacade.getProcessGroups(requestGroupId, updateStrategy)) {
final ProcessGroupDTO processGroupDTO = processGroupEntity.getComponent();
final String processGroupId = processGroupDTO == null ? processGroupEntity.getId() : processGroupDTO.getId();
@ -2242,7 +2241,7 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
});
// get the process groups
final Set<ProcessGroupEntity> entities = serviceFacade.getProcessGroups(groupId, ProcessGroupUpdateStrategy.CURRENT_GROUP);
final Set<ProcessGroupEntity> entities = serviceFacade.getProcessGroups(groupId, ProcessGroupRecursivity.DIRECT_CHILDREN);
// always prune the contents
for (final ProcessGroupEntity entity : entities) {

View File

@ -118,9 +118,9 @@ import org.apache.nifi.diagnostics.GarbageCollection;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowanalysis.FlowAnalysisRule;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flowanalysis.FlowAnalysisRule;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
@ -2242,6 +2242,13 @@ public final class DtoFactory {
component.setUri(groupEntity.getUri());
component.setReferenceType(AffectedComponentDTO.COMPONENT_TYPE_STATELESS_GROUP);
final String parentGroupId = groupEntity.getComponent().getParentGroupId();
if (parentGroupId != null) {
final ProcessGroupNameDTO groupName = new ProcessGroupNameDTO();
groupName.setId(parentGroupId);
component.setProcessGroup(groupName);
}
final ProcessGroupDTO groupDto = groupEntity.getComponent();
final AffectedComponentDTO componentDto = new AffectedComponentDTO();
if (groupDto == null) {

View File

@ -24,7 +24,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.entity.ProcessGroupUpdateStrategy;
import org.apache.nifi.web.api.entity.ProcessGroupRecursivity;
import java.util.Collection;
import java.util.Map;
@ -60,10 +60,10 @@ public interface ProcessGroupDAO {
* Gets all of the process groups.
*
* @param parentGroupId The parent group id
* @param processGroupUpdateStrategy if process groups with its child groups should be included
* @param processGroupRecursivity how far into child/descendant groups to recurse
* @return The process groups
*/
Set<ProcessGroup> getProcessGroups(String parentGroupId, ProcessGroupUpdateStrategy processGroupUpdateStrategy);
Set<ProcessGroup> getProcessGroups(String parentGroupId, ProcessGroupRecursivity processGroupRecursivity);
/**
* Gets all of the process groups.

View File

@ -46,7 +46,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.VariableRegistryDTO;
import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.entity.ParameterContextReferenceEntity;
import org.apache.nifi.web.api.entity.ProcessGroupUpdateStrategy;
import org.apache.nifi.web.api.entity.ProcessGroupRecursivity;
import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.slf4j.Logger;
@ -156,9 +156,9 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
}
@Override
public Set<ProcessGroup> getProcessGroups(final String parentGroupId, final ProcessGroupUpdateStrategy processGroupUpdateStrategy) {
public Set<ProcessGroup> getProcessGroups(final String parentGroupId, final ProcessGroupRecursivity processGroupRecursivity) {
ProcessGroup group = locateProcessGroup(flowController, parentGroupId);
if (processGroupUpdateStrategy == ProcessGroupUpdateStrategy.CURRENT_GROUP_WITH_CHILDREN) {
if (processGroupRecursivity == ProcessGroupRecursivity.ALL_DESCENDANTS) {
return new HashSet<>(group.findAllProcessGroups());
} else {
return group.getProcessGroups();

View File

@ -21,6 +21,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.flow.ExecutionEngine;
import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.api.dto.AffectedComponentDTO;
@ -31,7 +32,7 @@ import org.apache.nifi.web.api.dto.ProcessorRunStatusDetailsDTO;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupUpdateStrategy;
import org.apache.nifi.web.api.entity.ProcessGroupRecursivity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.revision.RevisionManager;
import org.slf4j.Logger;
@ -191,7 +192,14 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
final Set<ProcessorEntity> processorEntities = serviceFacade.getProcessors(groupId, true);
final boolean processorsComplete = isProcessorActionComplete(processorEntities, affectedComponents, desiredState, invalidComponentAction);
final Set<ProcessGroupEntity> groupEntities = serviceFacade.getProcessGroups(groupId, ProcessGroupUpdateStrategy.CURRENT_GROUP_WITH_CHILDREN);
// Gather the Process Group in question and all of its descendants
final Set<ProcessGroupEntity> groupEntities = new HashSet<>();
groupEntities.add(serviceFacade.getProcessGroup(groupId));
final Set<ProcessGroupEntity> descendantEntities = serviceFacade.getProcessGroups(groupId, ProcessGroupRecursivity.ALL_DESCENDANTS);
groupEntities.addAll(descendantEntities);
// Wait until the stateless group has reached the desired state
final boolean statelessGroupsComplete = isStatelessGroupActionComplete(groupEntities, affectedComponents, desiredState);
if (processorsComplete && statelessGroupsComplete) {
@ -293,8 +301,16 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
updateAffectedGroupComponents(groupEntities, affectedComponents);
final Set<String> affectedGroupIds = affectedComponents.values().stream()
.map(entity -> {
final boolean isGroup = AffectedComponentDTO.COMPONENT_TYPE_STATELESS_GROUP.equals(entity.getReferenceType());
return isGroup ? entity.getId() : entity.getComponent().getProcessGroupId();
})
.collect(Collectors.toSet());
for (final ProcessGroupEntity entity : groupEntities) {
if (!affectedComponents.containsKey(entity.getId())) {
// Only account for groups that are in our affected component list
if (!affectedGroupIds.contains(entity.getId())) {
continue;
}
@ -326,6 +342,12 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
}
private boolean isDesiredStatelessGroupStateReached(final ProcessGroupEntity groupEntity, final ScheduledState desiredState) {
// Only consider this case for stateless groups
final String executionEngine = groupEntity.getComponent().getExecutionEngine();
if (!ExecutionEngine.STATELESS.name().equals(executionEngine)) {
return true;
}
final String runStatus = groupEntity.getComponent().getStatelessGroupScheduledState();
final boolean stateMatches = desiredState.name().equalsIgnoreCase(runStatus);

View File

@ -25,7 +25,7 @@ import org.apache.nifi.web.NiFiServiceFacade;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupUpdateStrategy;
import org.apache.nifi.web.api.entity.ProcessGroupRecursivity;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
@ -37,12 +37,12 @@ import javax.ws.rs.core.Response;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class TestProcessGroupResource {
@ -84,7 +84,7 @@ public class TestProcessGroupResource {
revisionDTO.setVersion(1L);
processGroupEntity.setRevision(revisionDTO);
processGroupEntity.setProcessGroupUpdateStrategy(ProcessGroupUpdateStrategy.CURRENT_GROUP.name());
processGroupEntity.setProcessGroupUpdateStrategy(ProcessGroupRecursivity.DIRECT_CHILDREN.name());
processGroupEntity.setComponent(groupDTO);
doThrow(AccessDeniedException.class).when(serviceFacade).authorizeAccess(any(AuthorizeAccess.class));

View File

@ -95,9 +95,9 @@
// build the entity
var updateStrategy;
if ($('#parameter-contexts-recursive').hasClass('checkbox-unchecked')) {
updateStrategy = 'CURRENT_GROUP';
updateStrategy = 'DIRECT_CHILDREN';
} else {
updateStrategy = 'CURRENT_GROUP_WITH_CHILDREN';
updateStrategy = 'ALL_DESCENDANTS';
};
var entity = {

View File

@ -258,4 +258,6 @@ nifi.kerberos.spnego.authentication.expiration=12 hours
# external properties files for variable registry
# supports a comma delimited list of file locations
nifi.variable.registry.properties=
nifi.variable.registry.properties=
nifi.flow.analysis.background.task.schedule=5 mins

View File

@ -258,4 +258,6 @@ nifi.kerberos.spnego.authentication.expiration=12 hours
# external properties files for variable registry
# supports a comma delimited list of file locations
nifi.variable.registry.properties=
nifi.variable.registry.properties=
nifi.flow.analysis.background.task.schedule=5 mins

View File

@ -260,3 +260,5 @@ nifi.kerberos.spnego.authentication.expiration=12 hours
# external properties files for variable registry
# supports a comma delimited list of file locations
nifi.variable.registry.properties=
nifi.flow.analysis.background.task.schedule=5 mins