mirror of https://github.com/apache/nifi.git
NIFI-11814 Optimized Cluster Replication Component status evaluation
When waiting for all controller services to reach the desired status in the ClusterReplicationComponentLifecycle component, return immediately if there are no services to wait for. Otherwise, request that referencing components not be included in the return value of the Controller Services This closes #7493 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
7db956fea7
commit
7405dcb510
|
@ -2095,17 +2095,23 @@ public interface NiFiServiceFacade {
|
|||
* @param groupId the id of the process group of interest
|
||||
* @param includeAncestorGroups if true, parent and ancestor groups' services will be returned as well
|
||||
* @param includeDescendantGroups if true, child and descendant groups' services will be returned as well
|
||||
* @param includeReferencingComponents whether or not the response should include the referencing component services.
|
||||
* These can be expensive to include so should not be included when the are not necessary.
|
||||
*
|
||||
* @return services
|
||||
*/
|
||||
Set<ControllerServiceEntity> getControllerServices(String groupId, boolean includeAncestorGroups, boolean includeDescendantGroups);
|
||||
Set<ControllerServiceEntity> getControllerServices(String groupId, boolean includeAncestorGroups, boolean includeDescendantGroups, boolean includeReferencingComponents);
|
||||
|
||||
/**
|
||||
* Gets the specified controller service.
|
||||
*
|
||||
* @param controllerServiceId id
|
||||
* @param includeReferencingComponents whether or not the response should include the referencing component services.
|
||||
* These can be expensive to include so should not be included when the are not necessary.
|
||||
*
|
||||
* @return service
|
||||
*/
|
||||
ControllerServiceEntity getControllerService(String controllerServiceId);
|
||||
ControllerServiceEntity getControllerService(String controllerServiceId, boolean includeReferencingComponents);
|
||||
|
||||
/**
|
||||
* Get the descriptor for the specified property of the specified controller service.
|
||||
|
|
|
@ -2913,8 +2913,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
awaitValidationCompletion(cs);
|
||||
final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(cs);
|
||||
final ControllerServiceReference ref = controllerService.getReferences();
|
||||
final ControllerServiceReferencingComponentsEntity referencingComponentsEntity =
|
||||
createControllerServiceReferencingComponentsEntity(ref, Collections.singleton(controllerService.getIdentifier()));
|
||||
final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = createControllerServiceReferencingComponentsEntity(ref);
|
||||
dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
|
||||
return dto;
|
||||
});
|
||||
|
@ -3004,7 +3003,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
* @param reference ControllerServiceReference
|
||||
* @return The entity
|
||||
*/
|
||||
private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(final ControllerServiceReference reference, final Set<String> lockedIds) {
|
||||
private ControllerServiceReferencingComponentsEntity createControllerServiceReferencingComponentsEntity(final ControllerServiceReference reference) {
|
||||
final Set<ControllerServiceNode> visited = new HashSet<>();
|
||||
visited.add(reference.getReferencedComponent());
|
||||
findControllerServiceReferencingComponentIdentifiers(reference, visited);
|
||||
|
@ -4674,12 +4673,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
return createProcessGroupEntity(processGroup);
|
||||
}
|
||||
|
||||
private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final Set<String> serviceIds) {
|
||||
private ControllerServiceEntity createControllerServiceEntity(final ControllerServiceNode serviceNode, final boolean includeReferencingComponents) {
|
||||
final ControllerServiceDTO dto = dtoFactory.createControllerServiceDto(serviceNode);
|
||||
|
||||
final ControllerServiceReference ref = serviceNode.getReferences();
|
||||
final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = createControllerServiceReferencingComponentsEntity(ref, serviceIds);
|
||||
dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
|
||||
if (includeReferencingComponents) {
|
||||
final ControllerServiceReference ref = serviceNode.getReferences();
|
||||
final ControllerServiceReferencingComponentsEntity referencingComponentsEntity = createControllerServiceReferencingComponentsEntity(ref);
|
||||
dto.setReferencingComponents(referencingComponentsEntity.getControllerServiceReferencingComponents());
|
||||
}
|
||||
|
||||
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(serviceNode.getIdentifier()));
|
||||
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(serviceNode, NiFiUserUtils.getNiFiUser());
|
||||
|
@ -4740,19 +4741,19 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups) {
|
||||
public Set<ControllerServiceEntity> getControllerServices(final String groupId, final boolean includeAncestorGroups, final boolean includeDescendantGroups,
|
||||
final boolean includeReferencingComponents) {
|
||||
final Set<ControllerServiceNode> serviceNodes = controllerServiceDAO.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups);
|
||||
final Set<String> serviceIds = serviceNodes.stream().map(service -> service.getIdentifier()).collect(Collectors.toSet());
|
||||
|
||||
return serviceNodes.stream()
|
||||
.map(serviceNode -> createControllerServiceEntity(serviceNode, serviceIds))
|
||||
.map(serviceNode -> createControllerServiceEntity(serviceNode, includeReferencingComponents))
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ControllerServiceEntity getControllerService(final String controllerServiceId) {
|
||||
public ControllerServiceEntity getControllerService(final String controllerServiceId, final boolean includeReferencingComponents) {
|
||||
final ControllerServiceNode controllerService = controllerServiceDAO.getControllerService(controllerServiceId);
|
||||
return createControllerServiceEntity(controllerService, Collections.singleton(controllerServiceId));
|
||||
return createControllerServiceEntity(controllerService, includeReferencingComponents);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -4767,7 +4768,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
public ControllerServiceReferencingComponentsEntity getControllerServiceReferencingComponents(final String controllerServiceId) {
|
||||
final ControllerServiceNode service = controllerServiceDAO.getControllerService(controllerServiceId);
|
||||
final ControllerServiceReference ref = service.getReferences();
|
||||
return createControllerServiceReferencingComponentsEntity(ref, Collections.singleton(controllerServiceId));
|
||||
return createControllerServiceReferencingComponentsEntity(ref);
|
||||
}
|
||||
|
||||
private ReportingTaskEntity createReportingTaskEntity(final ReportingTaskNode reportingTask) {
|
||||
|
@ -5954,7 +5955,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
final ProcessorDiagnosticsDTO dto = controllerFacade.getProcessorDiagnostics(processor, processorStatus, bulletinRepository, serviceId -> {
|
||||
final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
|
||||
return createControllerServiceEntity(serviceNode, Collections.emptySet());
|
||||
return createControllerServiceEntity(serviceNode, true);
|
||||
});
|
||||
|
||||
// Filter anything out of diagnostics that the user is not authorized to see.
|
||||
|
|
|
@ -573,7 +573,7 @@ public class StandardNiFiWebConfigurationContext implements NiFiWebConfiguration
|
|||
}
|
||||
controllerService = entity.getComponent();
|
||||
} else {
|
||||
controllerService = serviceFacade.getControllerService(id).getComponent();
|
||||
controllerService = serviceFacade.getControllerService(id, true).getComponent();
|
||||
}
|
||||
|
||||
// return the controller service info
|
||||
|
|
|
@ -210,7 +210,7 @@ public class ControllerServiceResource extends ApplicationResource {
|
|||
});
|
||||
|
||||
// get the controller service
|
||||
final ControllerServiceEntity entity = serviceFacade.getControllerService(id);
|
||||
final ControllerServiceEntity entity = serviceFacade.getControllerService(id, true);
|
||||
if (uiOnly) {
|
||||
stripNonUiRelevantFields(entity);
|
||||
}
|
||||
|
|
|
@ -70,8 +70,6 @@ import org.apache.nifi.web.api.entity.AboutEntity;
|
|||
import org.apache.nifi.web.api.entity.ActionEntity;
|
||||
import org.apache.nifi.web.api.entity.ActivateControllerServicesEntity;
|
||||
import org.apache.nifi.web.api.entity.BannerEntity;
|
||||
import org.apache.nifi.web.api.entity.FlowRegistryBucketEntity;
|
||||
import org.apache.nifi.web.api.entity.FlowRegistryBucketsEntity;
|
||||
import org.apache.nifi.web.api.entity.BulletinBoardEntity;
|
||||
import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
|
||||
import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity;
|
||||
|
@ -85,6 +83,8 @@ import org.apache.nifi.web.api.entity.ControllerServicesEntity;
|
|||
import org.apache.nifi.web.api.entity.ControllerStatusEntity;
|
||||
import org.apache.nifi.web.api.entity.CurrentUserEntity;
|
||||
import org.apache.nifi.web.api.entity.FlowConfigurationEntity;
|
||||
import org.apache.nifi.web.api.entity.FlowRegistryBucketEntity;
|
||||
import org.apache.nifi.web.api.entity.FlowRegistryBucketsEntity;
|
||||
import org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
|
||||
import org.apache.nifi.web.api.entity.FlowRegistryClientsEntity;
|
||||
import org.apache.nifi.web.api.entity.HistoryEntity;
|
||||
|
@ -497,7 +497,9 @@ public class FlowResource extends ApplicationResource {
|
|||
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
|
||||
}
|
||||
)
|
||||
public Response getControllerServicesFromController(@QueryParam("uiOnly") @DefaultValue("false") final boolean uiOnly) {
|
||||
public Response getControllerServicesFromController(@QueryParam("uiOnly") @DefaultValue("false") final boolean uiOnly,
|
||||
@QueryParam("includeReferencingComponents") @DefaultValue("true")
|
||||
@ApiParam("Whether or not to include services' referencing components in the response") boolean includeReferences) {
|
||||
|
||||
authorizeFlow();
|
||||
|
||||
|
@ -506,7 +508,7 @@ public class FlowResource extends ApplicationResource {
|
|||
}
|
||||
|
||||
// get all the controller services
|
||||
final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(null, false, false);
|
||||
final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(null, false, false, includeReferences);
|
||||
if (uiOnly) {
|
||||
controllerServices.forEach(this::stripNonUiRelevantFields);
|
||||
}
|
||||
|
@ -550,8 +552,9 @@ public class FlowResource extends ApplicationResource {
|
|||
)
|
||||
public Response getControllerServicesFromGroup(
|
||||
@ApiParam(value = "The process group id.", required = true) @PathParam("id") String groupId,
|
||||
@ApiParam("Whether or not to include parent/ancestory process groups") @QueryParam("includeAncestorGroups") @DefaultValue("true") boolean includeAncestorGroups,
|
||||
@ApiParam("Whether or not to include parent/ancestor process groups") @QueryParam("includeAncestorGroups") @DefaultValue("true") boolean includeAncestorGroups,
|
||||
@ApiParam("Whether or not to include descendant process groups") @QueryParam("includeDescendantGroups") @DefaultValue("false") boolean includeDescendantGroups,
|
||||
@ApiParam("Whether or not to include services' referencing components in the response") @QueryParam("includeReferencingComponents") @DefaultValue("true") boolean includeReferences,
|
||||
@QueryParam("uiOnly") @DefaultValue("false") final boolean uiOnly) {
|
||||
|
||||
authorizeFlow();
|
||||
|
@ -561,7 +564,7 @@ public class FlowResource extends ApplicationResource {
|
|||
}
|
||||
|
||||
// get all the controller services
|
||||
final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups);
|
||||
final Set<ControllerServiceEntity> controllerServices = serviceFacade.getControllerServices(groupId, includeAncestorGroups, includeDescendantGroups, includeReferences);
|
||||
if (uiOnly) {
|
||||
controllerServices.forEach(this::stripNonUiRelevantFields);
|
||||
}
|
||||
|
|
|
@ -28,57 +28,6 @@ import io.swagger.annotations.ApiParam;
|
|||
import io.swagger.annotations.ApiResponse;
|
||||
import io.swagger.annotations.ApiResponses;
|
||||
import io.swagger.annotations.Authorization;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.PUT;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.MultivaluedHashMap;
|
||||
import javax.ws.rs.core.MultivaluedMap;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import javax.ws.rs.core.UriBuilder;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import javax.xml.bind.JAXBContext;
|
||||
import javax.xml.bind.JAXBElement;
|
||||
import javax.xml.bind.JAXBException;
|
||||
import javax.xml.bind.Unmarshaller;
|
||||
import javax.xml.stream.XMLStreamReader;
|
||||
import javax.xml.transform.stream.StreamSource;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.text.StringEscapeUtils;
|
||||
import org.apache.nifi.authorization.AuthorizableLookup;
|
||||
|
@ -185,6 +134,58 @@ import org.slf4j.LoggerFactory;
|
|||
import org.springframework.security.core.Authentication;
|
||||
import org.springframework.security.core.context.SecurityContextHolder;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.DefaultValue;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.HttpMethod;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.PUT;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.QueryParam;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.MultivaluedHashMap;
|
||||
import javax.ws.rs.core.MultivaluedMap;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
import javax.ws.rs.core.UriBuilder;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import javax.xml.bind.JAXBContext;
|
||||
import javax.xml.bind.JAXBElement;
|
||||
import javax.xml.bind.JAXBException;
|
||||
import javax.xml.bind.Unmarshaller;
|
||||
import javax.xml.stream.XMLStreamReader;
|
||||
import javax.xml.transform.stream.StreamSource;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* RESTful endpoint for managing a Group.
|
||||
*/
|
||||
|
@ -1242,7 +1243,7 @@ public class ProcessGroupResource extends FlowUpdateResource<ProcessGroupImportE
|
|||
|
||||
boolean continuePolling = true;
|
||||
while (continuePolling) {
|
||||
final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true);
|
||||
final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true, false);
|
||||
|
||||
// update the affected controller services
|
||||
updateAffectedControllerServices(serviceEntities, updateRequest);
|
||||
|
|
|
@ -47,7 +47,7 @@ public class AffectedComponentUtils {
|
|||
return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
|
||||
}
|
||||
case AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE:
|
||||
final ControllerServiceEntity serviceEntity = serviceFacade.getControllerService(componentEntity.getId());
|
||||
final ControllerServiceEntity serviceEntity = serviceFacade.getControllerService(componentEntity.getId(), false);
|
||||
return dtoFactory.createAffectedComponentEntity(serviceEntity);
|
||||
case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT: {
|
||||
final RemoteProcessGroupEntity remoteGroupEntity = serviceFacade.getRemoteProcessGroup(componentEntity.getComponent().getProcessGroupId());
|
||||
|
|
|
@ -54,6 +54,7 @@ import javax.ws.rs.core.Response.Status;
|
|||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -73,6 +74,11 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
public Set<AffectedComponentEntity> scheduleComponents(final URI exampleUri, final String groupId, final Set<AffectedComponentEntity> components,
|
||||
final ScheduledState desiredState, final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
||||
|
||||
if (components.isEmpty()) {
|
||||
logger.debug("No components to schedule for group {} so will not issue request", groupId);
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
final Set<String> componentIds = components.stream()
|
||||
.map(ComponentEntity::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
@ -169,6 +175,12 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
|
||||
private boolean waitForProcessorValidation(final NiFiUser user, final URI originalUri, final String groupId,
|
||||
final Map<String, AffectedComponentEntity> processors, final Pause pause) throws InterruptedException {
|
||||
|
||||
if (processors.isEmpty()) {
|
||||
logger.debug("No processors to wait for so will not wait for Processor Validation");
|
||||
return true;
|
||||
}
|
||||
|
||||
URI groupUri;
|
||||
try {
|
||||
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
||||
|
@ -247,6 +259,11 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
private boolean waitForProcessorStatus(final NiFiUser user, final URI originalUri, final String groupId, final Map<String, AffectedComponentEntity> processors,
|
||||
final ScheduledState desiredState, final Pause pause, final InvalidComponentAction invalidComponentAction) throws InterruptedException, LifecycleManagementException {
|
||||
|
||||
if (processors.isEmpty()) {
|
||||
logger.debug("No processors to wait for, so will not wait for processors to reach state of {}", desiredState);
|
||||
return true;
|
||||
}
|
||||
|
||||
URI groupUri;
|
||||
try {
|
||||
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(),
|
||||
|
@ -405,6 +422,11 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
final Set<AffectedComponentEntity> servicesRequiringDesiredState, final ControllerServiceState desiredState,
|
||||
final Pause pause, final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
||||
|
||||
if (affectedServices.isEmpty() && servicesRequiringDesiredState.isEmpty()) {
|
||||
logger.debug("No Controller Services to activate for group {} so will not issue request", groupId);
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
final Set<String> affectedServiceIds = affectedServices.stream()
|
||||
.map(ComponentEntity::getId)
|
||||
.collect(Collectors.toSet());
|
||||
|
@ -436,7 +458,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||
|
||||
// If enabling services, validation must complete first
|
||||
if (desiredState == ControllerServiceState.ENABLED && !affectedServiceIds.isEmpty()) {
|
||||
if (desiredState == ControllerServiceState.ENABLED) {
|
||||
try {
|
||||
waitForControllerServiceValidation(user, originalUri, groupId, affectedServiceIds, pause);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -447,20 +469,18 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
|
||||
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
|
||||
try {
|
||||
if (!affectedServiceIds.isEmpty()) {
|
||||
final NodeResponse clusterResponse;
|
||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
|
||||
} else {
|
||||
clusterResponse = getRequestReplicator().forwardToCoordinator(
|
||||
getClusterCoordinatorNode(), user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
|
||||
}
|
||||
final NodeResponse clusterResponse;
|
||||
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
|
||||
} else {
|
||||
clusterResponse = getRequestReplicator().forwardToCoordinator(
|
||||
getClusterCoordinatorNode(), user, HttpMethod.PUT, controllerServicesUri, activateServicesEntity, headers).awaitMergedResponse();
|
||||
}
|
||||
|
||||
final int disableServicesStatus = clusterResponse.getStatus();
|
||||
if (disableServicesStatus != Status.OK.getStatusCode()) {
|
||||
final String explanation = getResponseEntity(clusterResponse, String.class);
|
||||
throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState + " due to " + explanation);
|
||||
}
|
||||
final int disableServicesStatus = clusterResponse.getStatus();
|
||||
if (disableServicesStatus != Status.OK.getStatusCode()) {
|
||||
final String explanation = getResponseEntity(clusterResponse, String.class);
|
||||
throw new LifecycleManagementException("Failed to update Controller Services to a state of " + desiredState + " due to " + explanation);
|
||||
}
|
||||
|
||||
final boolean serviceTransitioned = waitForControllerServiceStatus(user, originalUri, groupId, idsOfServicesRequiringDesiredState, desiredState, pause, invalidComponentAction);
|
||||
|
@ -474,7 +494,7 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
}
|
||||
|
||||
return affectedServices.stream()
|
||||
.map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId()))
|
||||
.map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId(), false))
|
||||
.map(dtoFactory::createAffectedComponentEntity)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
@ -486,7 +506,8 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
URI groupUri;
|
||||
try {
|
||||
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
||||
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false&includeDescendantGroups=true", originalUri.getFragment());
|
||||
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services",
|
||||
"includeAncestorGroups=false&includeDescendantGroups=true&includeReferencingComponents=false", originalUri.getFragment());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -555,10 +576,15 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
|||
final ControllerServiceState desiredState, final Pause pause, final InvalidComponentAction invalidComponentAction)
|
||||
throws InterruptedException, LifecycleManagementException {
|
||||
|
||||
if (serviceIds.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
URI groupUri;
|
||||
try {
|
||||
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
||||
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false&includeDescendantGroups=true", originalUri.getFragment());
|
||||
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services",
|
||||
"includeAncestorGroups=false&includeDescendantGroups=true&includeReferencingComponents=false", originalUri.getFragment());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
|||
}
|
||||
|
||||
return servicesRequiringDesiredState.stream()
|
||||
.map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId()))
|
||||
.map(componentEntity -> serviceFacade.getControllerService(componentEntity.getId(), false))
|
||||
.map(dtoFactory::createAffectedComponentEntity)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
@ -370,7 +370,7 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
|||
logger.debug("Waiting for {} controller services to complete validation", affectedComponents.size());
|
||||
boolean continuePolling = true;
|
||||
while (continuePolling) {
|
||||
final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true);
|
||||
final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true, false);
|
||||
if (isControllerServiceValidationComplete(serviceEntities, affectedComponents)) {
|
||||
logger.debug("All {} controller services of interest have completed validation", affectedComponents.size());
|
||||
return true;
|
||||
|
@ -407,10 +407,13 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
|||
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
||||
|
||||
logger.debug("Waiting for {} Controller Services to transition their states to {}", affectedServices.size(), desiredState);
|
||||
if (affectedServices.isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
boolean continuePolling = true;
|
||||
while (continuePolling) {
|
||||
final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true);
|
||||
final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true, false);
|
||||
|
||||
// update the affected controller services
|
||||
updateAffectedControllerServices(serviceEntities, affectedServices);
|
||||
|
|
Loading…
Reference in New Issue