mirror of https://github.com/apache/nifi.git
NIFI-6735 - validate components before restarting processors following parameter context update.
This closes #3782. Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
555004cdde
commit
df90c65246
|
@ -865,6 +865,7 @@ public class ParameterContextResource extends ApplicationResource {
|
||||||
final ParameterContextEntity updatedEntity;
|
final ParameterContextEntity updatedEntity;
|
||||||
try {
|
try {
|
||||||
updatedEntity = performParameterContextUpdate(asyncRequest, uri, replicateRequest, revision, updatedContextEntity);
|
updatedEntity = performParameterContextUpdate(asyncRequest, uri, replicateRequest, revision, updatedContextEntity);
|
||||||
|
asyncRequest.markStepComplete();
|
||||||
logger.info("Successfully updated Parameter Context with ID {}", updatedContextEntity.getId());
|
logger.info("Successfully updated Parameter Context with ID {}", updatedContextEntity.getId());
|
||||||
} finally {
|
} finally {
|
||||||
// TODO: can almost certainly be refactored so that the same code is shared between VersionsResource and ParameterContextResource.
|
// TODO: can almost certainly be refactored so that the same code is shared between VersionsResource and ParameterContextResource.
|
||||||
|
@ -968,8 +969,6 @@ public class ParameterContextResource extends ApplicationResource {
|
||||||
logger.info("Restarting {} Processors after having updated Parameter Context", processors.size());
|
logger.info("Restarting {} Processors after having updated Parameter Context", processors.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
asyncRequest.markStepComplete();
|
|
||||||
|
|
||||||
// Step 14. Restart all components
|
// Step 14. Restart all components
|
||||||
final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(processors);
|
final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(processors);
|
||||||
|
|
||||||
|
@ -978,6 +977,7 @@ public class ParameterContextResource extends ApplicationResource {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
componentLifecycle.scheduleComponents(uri, "root", componentsToStart, ScheduledState.RUNNING, startComponentsPause, InvalidComponentAction.SKIP);
|
componentLifecycle.scheduleComponents(uri, "root", componentsToStart, ScheduledState.RUNNING, startComponentsPause, InvalidComponentAction.SKIP);
|
||||||
|
asyncRequest.markStepComplete();
|
||||||
} catch (final IllegalStateException ise) {
|
} catch (final IllegalStateException ise) {
|
||||||
// Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide
|
// Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide
|
||||||
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
|
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
|
||||||
|
@ -1003,8 +1003,6 @@ public class ParameterContextResource extends ApplicationResource {
|
||||||
logger.info("Re-Enabling {} Controller Services after having updated Parameter Context", controllerServices.size());
|
logger.info("Re-Enabling {} Controller Services after having updated Parameter Context", controllerServices.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
asyncRequest.markStepComplete();
|
|
||||||
|
|
||||||
// Step 13. Re-enable all disabled controller services
|
// Step 13. Re-enable all disabled controller services
|
||||||
final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||||
asyncRequest.setCancelCallback(enableServicesPause::cancel);
|
asyncRequest.setCancelCallback(enableServicesPause::cancel);
|
||||||
|
@ -1012,6 +1010,7 @@ public class ParameterContextResource extends ApplicationResource {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
componentLifecycle.activateControllerServices(uri, "root", servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
|
componentLifecycle.activateControllerServices(uri, "root", servicesToEnable, ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
|
||||||
|
asyncRequest.markStepComplete();
|
||||||
} catch (final IllegalStateException ise) {
|
} catch (final IllegalStateException ise) {
|
||||||
// Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
|
// Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
|
||||||
// a more intelligent error message as to exactly what happened, rather than indicate that the Parameter Context could not be updated.
|
// a more intelligent error message as to exactly what happened, rather than indicate that the Parameter Context could not be updated.
|
||||||
|
|
|
@ -101,6 +101,16 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
||||||
|
|
||||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||||
|
|
||||||
|
// If attempting to run the processors, validation must complete first
|
||||||
|
if (desiredState == ScheduledState.RUNNING) {
|
||||||
|
try {
|
||||||
|
waitForProcessorValidation(user, exampleUri, groupId, componentMap, pause);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new LifecycleManagementException("Interrupted while waiting for processors to complete validation");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
|
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
|
||||||
try {
|
try {
|
||||||
final NodeResponse clusterResponse;
|
final NodeResponse clusterResponse;
|
||||||
|
@ -156,6 +166,64 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
||||||
return processorRevisions.stream().collect(Collectors.toMap(Revision::getComponentId, Function.identity()));
|
return processorRevisions.stream().collect(Collectors.toMap(Revision::getComponentId, Function.identity()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean waitForProcessorValidation(final NiFiUser user, final URI originalUri, final String groupId,
|
||||||
|
final Map<String, AffectedComponentEntity> processors, final Pause pause) throws InterruptedException {
|
||||||
|
URI groupUri;
|
||||||
|
try {
|
||||||
|
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
||||||
|
originalUri.getPort(), "/nifi-api/process-groups/" + groupId + "/processors", "includeDescendantGroups=true", originalUri.getFragment());
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<String, String> headers = new HashMap<>();
|
||||||
|
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>();
|
||||||
|
|
||||||
|
boolean continuePolling = true;
|
||||||
|
while (continuePolling) {
|
||||||
|
|
||||||
|
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
|
||||||
|
final NodeResponse clusterResponse;
|
||||||
|
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||||
|
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
|
||||||
|
} else {
|
||||||
|
clusterResponse = getRequestReplicator().forwardToCoordinator(
|
||||||
|
getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
final ProcessorsEntity processorsEntity = getResponseEntity(clusterResponse, ProcessorsEntity.class);
|
||||||
|
final Set<ProcessorEntity> processorEntities = processorsEntity.getProcessors();
|
||||||
|
|
||||||
|
if (isProcessorValidationComplete(processorEntities, processors)) {
|
||||||
|
logger.debug("All {} processors of interest now have been validated", processors.size());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Not all of the processors are done validating. Pause for a bit and poll again.
|
||||||
|
continuePolling = pause.pause();
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isProcessorValidationComplete(Set<ProcessorEntity> processorEntities, Map<String, AffectedComponentEntity> affectedComponents) {
|
||||||
|
updateAffectedProcessors(processorEntities, affectedComponents);
|
||||||
|
for (final ProcessorEntity entity : processorEntities) {
|
||||||
|
if (!affectedComponents.containsKey(entity.getId())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ProcessorDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State.
|
* Periodically polls the process group with the given ID, waiting for all processors whose ID's are given to have the given Scheduled State.
|
||||||
*
|
*
|
||||||
|
@ -230,30 +298,32 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
||||||
return entity;
|
return entity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateAffectedProcessors(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents) {
|
||||||
|
// update the affected processors
|
||||||
|
processorEntities.stream()
|
||||||
|
.filter(entity -> affectedComponents.containsKey(entity.getId()))
|
||||||
|
.forEach(entity -> {
|
||||||
|
final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId());
|
||||||
|
affectedComponentEntity.setRevision(entity.getRevision());
|
||||||
|
|
||||||
|
// only consider update this component if the user had permissions to it
|
||||||
|
if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
|
||||||
|
final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
|
||||||
|
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
|
||||||
|
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
|
||||||
|
|
||||||
|
if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
|
||||||
|
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents, final ScheduledState desiredState,
|
private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents, final ScheduledState desiredState,
|
||||||
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
||||||
|
|
||||||
final String desiredStateName = desiredState.name();
|
final String desiredStateName = desiredState.name();
|
||||||
|
|
||||||
// update the affected processors
|
updateAffectedProcessors(processorEntities, affectedComponents);
|
||||||
processorEntities.stream()
|
|
||||||
.filter(entity -> affectedComponents.containsKey(entity.getId()))
|
|
||||||
.forEach(entity -> {
|
|
||||||
final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId());
|
|
||||||
affectedComponentEntity.setRevision(entity.getRevision());
|
|
||||||
|
|
||||||
// only consider update this component if the user had permissions to it
|
|
||||||
if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
|
|
||||||
final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
|
|
||||||
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
|
|
||||||
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
|
|
||||||
|
|
||||||
if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
|
|
||||||
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for (final ProcessorEntity entity : processorEntities) {
|
for (final ProcessorEntity entity : processorEntities) {
|
||||||
if (!affectedComponents.containsKey(entity.getId())) {
|
if (!affectedComponents.containsKey(entity.getId())) {
|
||||||
|
@ -320,6 +390,16 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
||||||
|
|
||||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||||
|
|
||||||
|
// If enabling services, validation must complete first
|
||||||
|
if (desiredState == ControllerServiceState.ENABLED) {
|
||||||
|
try {
|
||||||
|
waitForControllerServiceValidation(user, originalUri, groupId, affectedServiceIds, pause);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new LifecycleManagementException("Interrupted while waiting for Controller Services to complete validation");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
|
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
|
||||||
try {
|
try {
|
||||||
final NodeResponse clusterResponse;
|
final NodeResponse clusterResponse;
|
||||||
|
@ -352,6 +432,67 @@ public class ClusterReplicationComponentLifecycle implements ComponentLifecycle
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private boolean waitForControllerServiceValidation(final NiFiUser user, final URI originalUri, final String groupId,
|
||||||
|
final Set<String> serviceIds, final Pause pause)
|
||||||
|
throws InterruptedException {
|
||||||
|
|
||||||
|
URI groupUri;
|
||||||
|
try {
|
||||||
|
groupUri = new URI(originalUri.getScheme(), originalUri.getUserInfo(), originalUri.getHost(),
|
||||||
|
originalUri.getPort(), "/nifi-api/flow/process-groups/" + groupId + "/controller-services", "includeAncestorGroups=false,includeDescendantGroups=true", originalUri.getFragment());
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<String, String> headers = new HashMap<>();
|
||||||
|
final MultivaluedMap<String, String> requestEntity = new MultivaluedHashMap<>();
|
||||||
|
|
||||||
|
boolean continuePolling = true;
|
||||||
|
while (continuePolling) {
|
||||||
|
|
||||||
|
// Determine whether we should replicate only to the cluster coordinator, or if we should replicate directly to the cluster nodes themselves.
|
||||||
|
final NodeResponse clusterResponse;
|
||||||
|
if (getReplicationTarget() == ReplicationTarget.CLUSTER_NODES) {
|
||||||
|
clusterResponse = getRequestReplicator().replicate(user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
|
||||||
|
} else {
|
||||||
|
clusterResponse = getRequestReplicator().forwardToCoordinator(
|
||||||
|
getClusterCoordinatorNode(), user, HttpMethod.GET, groupUri, requestEntity, headers).awaitMergedResponse();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (clusterResponse.getStatus() != Status.OK.getStatusCode()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
final ControllerServicesEntity controllerServicesEntity = getResponseEntity(clusterResponse, ControllerServicesEntity.class);
|
||||||
|
final Set<ControllerServiceEntity> serviceEntities = controllerServicesEntity.getControllerServices();
|
||||||
|
|
||||||
|
final Map<String, AffectedComponentEntity> affectedServices = serviceEntities.stream()
|
||||||
|
.collect(Collectors.toMap(ControllerServiceEntity::getId, dtoFactory::createAffectedComponentEntity));
|
||||||
|
|
||||||
|
if (isControllerServiceValidationComplete(serviceEntities, affectedServices)) {
|
||||||
|
logger.debug("All {} controller services of interest have completed validation", affectedServices.size());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
continuePolling = pause.pause();
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isControllerServiceValidationComplete(final Set<ControllerServiceEntity> controllerServiceEntities, final Map<String, AffectedComponentEntity> affectedComponents) {
|
||||||
|
updateAffectedControllerServices(controllerServiceEntities, affectedComponents);
|
||||||
|
for (final ControllerServiceEntity entity : controllerServiceEntities) {
|
||||||
|
if (!affectedComponents.containsKey(entity.getId())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ControllerServiceDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
|
* Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
|
||||||
*
|
*
|
||||||
|
|
|
@ -105,6 +105,9 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
||||||
|
|
||||||
logger.debug("Starting components with ID's {} from Process Group {}", componentRevisions.keySet(), processGroupId);
|
logger.debug("Starting components with ID's {} from Process Group {}", componentRevisions.keySet(), processGroupId);
|
||||||
|
|
||||||
|
// Wait for all affected processors to be either VALID or INVALID
|
||||||
|
waitForProcessorValidation(processGroupId, affectedComponents, pause);
|
||||||
|
|
||||||
serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, componentRevisions.keySet());
|
serviceFacade.verifyScheduleComponents(processGroupId, ScheduledState.RUNNING, componentRevisions.keySet());
|
||||||
serviceFacade.scheduleComponents(processGroupId, ScheduledState.RUNNING, componentRevisions);
|
serviceFacade.scheduleComponents(processGroupId, ScheduledState.RUNNING, componentRevisions);
|
||||||
|
|
||||||
|
@ -130,6 +133,42 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
||||||
waitForProcessorState(processGroupId, affectedComponents, ScheduledState.STOPPED, pause, invalidComponentAction);
|
waitForProcessorState(processGroupId, affectedComponents, ScheduledState.STOPPED, pause, invalidComponentAction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits for all given Processors to complete validation
|
||||||
|
*
|
||||||
|
* @return <code>true</code> if all processors have completed validation, <code>false</code> if the given {@link Pause}
|
||||||
|
* indicated to give up before all of the processors have completed validation
|
||||||
|
*/
|
||||||
|
private boolean waitForProcessorValidation(final String groupId, final Map<String, AffectedComponentEntity> affectedComponents, final Pause pause) {
|
||||||
|
|
||||||
|
logger.debug("Waiting for {} processors to complete validation", affectedComponents.size());
|
||||||
|
boolean continuePolling = true;
|
||||||
|
while (continuePolling) {
|
||||||
|
final Set<ProcessorEntity> processorEntities = serviceFacade.getProcessors(groupId, true);
|
||||||
|
if (isProcessorValidationComplete(processorEntities, affectedComponents)) {
|
||||||
|
logger.debug("All {} processors of interest have completed validation", affectedComponents.size());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
continuePolling = pause.pause();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isProcessorValidationComplete(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents) {
|
||||||
|
updateAffectedProcessors(processorEntities, affectedComponents);
|
||||||
|
for (final ProcessorEntity entity : processorEntities) {
|
||||||
|
if (!affectedComponents.containsKey(entity.getId())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ProcessorDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits for all of the given Processors to reach the given Scheduled State.
|
* Waits for all of the given Processors to reach the given Scheduled State.
|
||||||
*
|
*
|
||||||
|
@ -157,29 +196,33 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateAffectedProcessors(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents) {
|
||||||
|
// update the affected processors
|
||||||
|
processorEntities.stream()
|
||||||
|
.filter(entity -> affectedComponents.containsKey(entity.getId()))
|
||||||
|
.forEach(entity -> {
|
||||||
|
final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId());
|
||||||
|
affectedComponentEntity.setRevision(entity.getRevision());
|
||||||
|
|
||||||
|
// only consider updating this component if the user has permissions to it
|
||||||
|
if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
|
||||||
|
final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
|
||||||
|
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
|
||||||
|
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
|
||||||
|
|
||||||
|
if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
|
||||||
|
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents, final ScheduledState desiredState,
|
private boolean isProcessorActionComplete(final Set<ProcessorEntity> processorEntities, final Map<String, AffectedComponentEntity> affectedComponents, final ScheduledState desiredState,
|
||||||
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
final InvalidComponentAction invalidComponentAction) throws LifecycleManagementException {
|
||||||
|
|
||||||
final String desiredStateName = desiredState.name();
|
final String desiredStateName = desiredState.name();
|
||||||
|
|
||||||
// update the affected processors
|
updateAffectedProcessors(processorEntities, affectedComponents);
|
||||||
processorEntities.stream()
|
|
||||||
.filter(entity -> affectedComponents.containsKey(entity.getId()))
|
|
||||||
.forEach(entity -> {
|
|
||||||
final AffectedComponentEntity affectedComponentEntity = affectedComponents.get(entity.getId());
|
|
||||||
affectedComponentEntity.setRevision(entity.getRevision());
|
|
||||||
|
|
||||||
// only consider updating this component if the user has permissions to it
|
|
||||||
if (Boolean.TRUE.equals(affectedComponentEntity.getPermissions().getCanRead())) {
|
|
||||||
final AffectedComponentDTO affectedComponent = affectedComponentEntity.getComponent();
|
|
||||||
affectedComponent.setState(entity.getStatus().getAggregateSnapshot().getRunStatus());
|
|
||||||
affectedComponent.setActiveThreadCount(entity.getStatus().getAggregateSnapshot().getActiveThreadCount());
|
|
||||||
|
|
||||||
if (Boolean.TRUE.equals(entity.getPermissions().getCanRead())) {
|
|
||||||
affectedComponent.setValidationErrors(entity.getComponent().getValidationErrors());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
for (final ProcessorEntity entity : processorEntities) {
|
for (final ProcessorEntity entity : processorEntities) {
|
||||||
if (!affectedComponents.containsKey(entity.getId())) {
|
if (!affectedComponents.containsKey(entity.getId())) {
|
||||||
|
@ -224,6 +267,8 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
||||||
|
|
||||||
logger.debug("Enabling Controller Services with ID's {} from Process Group {}", serviceRevisions.keySet(), processGroupId);
|
logger.debug("Enabling Controller Services with ID's {} from Process Group {}", serviceRevisions.keySet(), processGroupId);
|
||||||
|
|
||||||
|
waitForControllerServiceValidation(processGroupId, affectedServices, pause);
|
||||||
|
|
||||||
serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, affectedServices.keySet());
|
serviceFacade.verifyActivateControllerServices(processGroupId, ControllerServiceState.ENABLED, affectedServices.keySet());
|
||||||
serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions);
|
serviceFacade.activateControllerServices(processGroupId, ControllerServiceState.ENABLED, serviceRevisions);
|
||||||
waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.ENABLED, pause, invalidComponentAction);
|
waitForControllerServiceState(processGroupId, affectedServices, ControllerServiceState.ENABLED, pause, invalidComponentAction);
|
||||||
|
@ -282,6 +327,39 @@ public class LocalComponentLifecycle implements ComponentLifecycle {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits for all given Controller Services to complete validation
|
||||||
|
*
|
||||||
|
* @return <code>true</code> if all processors have completed validation, <code>false</code> if the given {@link Pause}
|
||||||
|
* indicated to give up before all of the controller services have completed validation
|
||||||
|
*/
|
||||||
|
private boolean waitForControllerServiceValidation(final String groupId, final Map<String, AffectedComponentEntity> affectedComponents, final Pause pause) {
|
||||||
|
logger.debug("Waiting for {} controller services to complete validation", affectedComponents.size());
|
||||||
|
boolean continuePolling = true;
|
||||||
|
while (continuePolling) {
|
||||||
|
final Set<ControllerServiceEntity> serviceEntities = serviceFacade.getControllerServices(groupId, false, true);
|
||||||
|
if (isControllerServiceValidationComplete(serviceEntities, affectedComponents)) {
|
||||||
|
logger.debug("All {} controller services of interest have completed validation", affectedComponents.size());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
continuePolling = pause.pause();
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isControllerServiceValidationComplete(final Set<ControllerServiceEntity> controllerServiceEntities, final Map<String, AffectedComponentEntity> affectedComponents) {
|
||||||
|
updateAffectedControllerServices(controllerServiceEntities, affectedComponents);
|
||||||
|
for (final ControllerServiceEntity entity : controllerServiceEntities) {
|
||||||
|
if (!affectedComponents.containsKey(entity.getId())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ControllerServiceDTO.VALIDATING.equals(entity.getComponent().getValidationStatus())) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
|
* Periodically polls the process group with the given ID, waiting for all controller services whose ID's are given to have the given Controller Service State.
|
||||||
|
|
Loading…
Reference in New Issue