NIFI-10001: When enabling a collection of Controller Services, change… (#6042)

* NIFI-10001: When enabling a collection of Controller Services, changed logic. Instead of enabling dependent services and waiting for them to complete enablement before starting a given service, just start the services given. The previous logic was necessary long ago because we couldn't enable a service unless all dependent services were fully enabled. But that changed a while ago. Now, we can enable a service when it's invalid. It'll just keep trying to enable until it becomes valid. At that point, it will complete its transition from ENABLING to ENABLED.

* NIFI-10001: Restored previous implementation for StandardControllerServiceProvider, as the changes were not ultimately what we needed. Changed StandardProcessGroup to use a ConcurrentHashMap for controller services instead of a HashMap with readLock. This was causing a deadlock when we enable a Controller Service that references another service during flow synchronization. Flow Synchronization was happening within a write lock and enabling the service required a read lock on the group. Eventually the thread holding the write lock would timeout and release the write lock. But this caused significant delays on startup. By changing to a ConcurrentHashMap, we alleviate the need for the Read Lock. Also noticed in testing that the StandardNiFiServiceFacade did not save flow changes when enabling dependent services so added call to controllerFacade.save().
This commit is contained in:
markap14 2022-05-13 15:09:23 -04:00 committed by GitHub
parent 30f7c1ba1e
commit 8031b62351
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 59 additions and 25 deletions

View File

@ -470,11 +470,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
return (rootServiceNode == null) ? null : rootServiceNode.getProxiedControllerService();
}
final Set<ControllerServiceNode> servicesForGroup = groupOfInterest.getControllerServices(true);
for (final ControllerServiceNode serviceNode : servicesForGroup) {
if (serviceIdentifier.equals(serviceNode.getIdentifier())) {
return serviceNode.getProxiedControllerService();
}
final ControllerServiceNode serviceNode = groupOfInterest.findControllerService(serviceIdentifier, false, true);
if (serviceNode != null) {
return serviceNode.getProxiedControllerService();
}
return null;

View File

@ -183,6 +183,12 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
if (FlowDifferenceFilters.isScheduledStateNew(diff)) {
continue;
}
// If the difference type is a Scheduled State Change, we want to ignore it, because we are just trying to
// find components that need to be stopped in order to be updated. We don't need to stop a component in order
// to change its Scheduled State.
if (diff.getDifferenceType() == DifferenceType.SCHEDULED_STATE_CHANGED) {
continue;
}
// If this update adds a new Controller Service, then we need to check if the service already exists at a higher level
// and if so compare our VersionedControllerService to the existing service.
@ -214,12 +220,17 @@ public class StandardVersionedComponentSynchronizer implements VersionedComponen
}
if (LOG.isInfoEnabled()) {
final String differencesByLine = flowComparison.getDifferences().stream()
.map(FlowDifference::toString)
.collect(Collectors.joining("\n"));
final Set<FlowDifference> differences = flowComparison.getDifferences();
if (differences.isEmpty()) {
LOG.info("No differences between current flow and proposed flow for {}", group);
} else {
final String differencesByLine = differences.stream()
.map(FlowDifference::toString)
.collect(Collectors.joining("\n"));
LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
flowComparison.getDifferences().size(), differencesByLine);
LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", group, versionedExternalFlow,
differences.size(), differencesByLine);
}
}
final Set<String> knownVariables = getKnownVariableNames(group);

View File

@ -135,6 +135,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -186,7 +187,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
private final Map<String, ProcessorNode> processors = new HashMap<>();
private final Map<String, Funnel> funnels = new HashMap<>();
private final Map<String, ControllerServiceNode> controllerServices = new HashMap<>();
private final Map<String, ControllerServiceNode> controllerServices = new ConcurrentHashMap<>();
private final Map<String, Template> templates = new HashMap<>();
private final PropertyEncryptor encryptor;
private final MutableVariableRegistry variableRegistry;
@ -2313,24 +2314,12 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public ControllerServiceNode getControllerService(final String id) {
readLock.lock();
try {
return controllerServices.get(requireNonNull(id));
} finally {
readLock.unlock();
}
return controllerServices.get(requireNonNull(id));
}
@Override
public Set<ControllerServiceNode> getControllerServices(final boolean recursive) {
final Set<ControllerServiceNode> services = new HashSet<>();
readLock.lock();
try {
services.addAll(controllerServices.values());
} finally {
readLock.unlock();
}
final Set<ControllerServiceNode> services = new HashSet<>(controllerServices.values());
if (recursive) {
final ProcessGroup parentGroup = parent.get();

View File

@ -2890,6 +2890,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public RevisionUpdate<ControllerServiceReferencingComponentsEntity> update() {
final Set<ComponentNode> updated = controllerServiceDAO.updateControllerServiceReferencingComponents(controllerServiceId, scheduledState, controllerServiceState);
controllerFacade.save();
final ControllerServiceReference updatedReference = controllerServiceDAO.getControllerService(controllerServiceId).getReferences();
// get the revisions of the updated components

View File

@ -873,6 +873,40 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
});
}
@Test
public void testRejoinAfterControllerServiceEnabled() throws NiFiClientException, IOException, InterruptedException {
final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService");
disconnectNode(2);
getClientUtil().enableControllerService(controllerService);
reconnectNode(2);
waitForAllNodesConnected();
switchClientToNode(2);
waitFor(() -> {
final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId());
return ControllerServiceState.ENABLED.name().equals(currentService.getComponent().getState());
});
}
@Test
public void testRejoinAfterControllerServiceDisabled() throws NiFiClientException, IOException, InterruptedException {
final ControllerServiceEntity controllerService = getClientUtil().createControllerService("StandardCountService");
getClientUtil().enableControllerService(controllerService);
disconnectNode(2);
getClientUtil().disableControllerService(controllerService);
reconnectNode(2);
waitForAllNodesConnected();
switchClientToNode(2);
waitFor(() -> {
final ControllerServiceEntity currentService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(controllerService.getId());
return ControllerServiceState.DISABLED.name().equals(currentService.getComponent().getState());
});
}
private VersionedDataflow getNode2Flow() throws IOException {
final File instanceDir = getNiFiInstance().getNodeInstance(2).getInstanceDirectory();