NIFI-10246: When syncing Controller Services, wait until any DISABLING Controller Service has fully disabled before attempting to synchronize them. Also performed some minor refactoring/cleanup for System Tests in order to make writing a system test for this simpler

This closes #6219

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2022-07-18 14:29:10 -04:00 committed by exceptionfactory
parent 74b25c7306
commit bc4e3e850b
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
10 changed files with 170 additions and 33 deletions

View File

@ -394,6 +394,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
return this.active.get();
}
@Override
public boolean awaitEnabled(final long timePeriod, final TimeUnit timeUnit) throws InterruptedException {
LOG.debug("Waiting up to {} {} for {} to be enabled", timePeriod, timeUnit, this);
final boolean enabled = stateTransition.awaitStateOrInvalid(ControllerServiceState.ENABLED, timePeriod, timeUnit);
@ -407,6 +408,20 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
return enabled;
}
@Override
public boolean awaitDisabled(final long timePeriod, final TimeUnit timeUnit) throws InterruptedException {
LOG.debug("Waiting up to {} {} for {} to be disabled", timePeriod, timeUnit, this);
final boolean disabled = stateTransition.awaitState(ControllerServiceState.DISABLED, timePeriod, timeUnit);
if (disabled) {
LOG.debug("{} is now disabled", this);
} else {
LOG.debug("After {} {}, {} is NOT disabled", timePeriod, timeUnit, this);
}
return disabled;
}
@Override
public void verifyCanPerformVerification() {
if (getState() != ControllerServiceState.DISABLED) {

View File

@ -426,6 +426,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
future.completeExceptionally(e);
}
}
} else {
boolean disabled = false;
while (!disabled) {
try {
disabled = serviceNode.awaitDisabled(1, TimeUnit.SECONDS);
} catch (final Exception e) {
logger.error("Failed to disable {}", serviceNode, e);
future.completeExceptionally(e);
}
}
}
}
}

View File

@ -201,6 +201,16 @@ public interface ControllerServiceNode extends ComponentNode, VersionedComponent
*/
boolean awaitEnabled(long timePeriod, TimeUnit timeUnit) throws InterruptedException;
/**
* Waits up to the given amount of time for the Controller Service to transition to a DISABLED state.
* @param timePeriod maximum amount of time to wait
* @param timeUnit the unit for the time period
* @return <code>true</code> if the Controller Service finished disabling, <code>false</code> otherwise
* @throws InterruptedException if interrupted while waiting for the service complete its enabling
*/
boolean awaitDisabled(long timePeriod, TimeUnit timeUnit) throws InterruptedException;
/**
* Verifies that the Controller Service is in a state in which it can verify a configuration by calling
* {@link #verifyConfiguration(ConfigurationContext, ComponentLog, Map, ExtensionManager)}.

View File

@ -45,6 +45,7 @@ import org.apache.nifi.remote.RemoteGroupPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@ -62,6 +63,11 @@ import java.util.stream.Collectors;
*/
public class AffectedComponentSet {
private static final Logger logger = LoggerFactory.getLogger(AffectedComponentSet.class);
private static final Set<ControllerServiceState> ACTIVE_CONTROLLER_SERVICE_STATES = new HashSet<>(Arrays.asList(
ControllerServiceState.ENABLED,
ControllerServiceState.ENABLING,
ControllerServiceState.DISABLING
));
private final FlowController flowController;
private final FlowManager flowManager;
@ -451,7 +457,7 @@ public class AffectedComponentSet {
processors.stream().filter(this::isActive).forEach(active::addProcessor);
reportingTasks.stream().filter(task -> task.getScheduledState() == ScheduledState.STARTING || task.getScheduledState() == ScheduledState.RUNNING || task.isRunning())
.forEach(active::addReportingTask);
controllerServices.stream().filter(service -> service.getState() == ControllerServiceState.ENABLING || service.getState() == ControllerServiceState.ENABLED)
controllerServices.stream().filter(service -> ACTIVE_CONTROLLER_SERVICE_STATES.contains(service.getState()))
.forEach(active::addControllerServiceWithoutReferences);
return active;

View File

@ -55,7 +55,7 @@ public class StandardSleepService extends AbstractControllerService implements S
.build();
public static final PropertyDescriptor ON_DISABLED_SLEEP_TIME = new PropertyDescriptor.Builder()
.name("@OnDisabled Sleep Time")
.description("The amount of time to sleep when disabeld")
.description("The amount of time to sleep when disabled")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("0 sec")

View File

@ -799,7 +799,7 @@ public class NiFiClientUtil {
return servicesEntity.getControllerServices().stream()
.filter(svc -> serviceIds == null || serviceIds.isEmpty() || serviceIds.contains(svc.getId()))
.filter(svc -> !desiredState.equals(svc.getStatus().getRunStatus()))
.filter(svc -> !desiredState.equalsIgnoreCase(svc.getComponent().getState()))
.collect(Collectors.toList());
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.tests.system;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
@ -63,6 +64,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
// Group ID | Source Name | Dest Name | Conn Name | Queue Size |
private static final String QUEUE_SIZES_FORMAT = "| %1$-36.36s | %2$-30.30s | %3$-30.30s | %4$-30.30s | %5$-30.30s |";
public static final RequestConfig DO_NOT_REPLICATE = () -> Collections.singletonMap("X-Request-Replicated", "value");
public static final int CLIENT_API_PORT = 5671;
public static final int CLIENT_API_BASE_PORT = 5670;
public static final String NIFI_GROUP_ID = "org.apache.nifi";
@ -157,12 +160,22 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
@Override
public NiFiInstanceFactory getInstanceFactory() {
return createStandaloneInstanceFactory();
}
public NiFiInstanceFactory createStandaloneInstanceFactory() {
return new SpawnedStandaloneNiFiInstanceFactory(
new InstanceConfiguration.Builder()
.bootstrapConfig("src/test/resources/conf/default/bootstrap.conf")
.instanceDirectory("target/standalone-instance")
.overrideNifiProperties(getNifiPropertiesOverrides())
.build());
new InstanceConfiguration.Builder()
.bootstrapConfig("src/test/resources/conf/default/bootstrap.conf")
.instanceDirectory("target/standalone-instance")
.overrideNifiProperties(getNifiPropertiesOverrides())
.build());
}
public NiFiInstanceFactory createTwoNodeInstanceFactory() {
return new SpawnedClusterNiFiInstanceFactory(
"src/test/resources/conf/clustered/node1/bootstrap.conf",
"src/test/resources/conf/clustered/node2/bootstrap.conf");
}
protected String getTestName() {
@ -471,4 +484,18 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId);
return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued();
}
public NodeDTO getNodeDtoByNodeIndex(final int nodeIndex) throws NiFiClientException, IOException {
return getNodeDtoByApiPort(5670 + nodeIndex);
}
public NodeDTO getNodeDtoByApiPort(final int apiPort) throws NiFiClientException, IOException {
final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes();
final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
.filter(nodeDto -> nodeDto.getApiPort() == apiPort)
.findAny()
.orElseThrow(() -> new RuntimeException("Could not locate Node 2"));
return node2Dto;
}
}

View File

@ -38,7 +38,6 @@ import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.entity.AffectedComponentEntity;
import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.NodeEntity;
@ -114,7 +113,7 @@ public class JoinClusterWithDifferentFlow extends NiFiSystemIT {
node2.start(true);
final File backupFile = getBackupFile(node2ConfDir);
final NodeDTO node2Dto = getNodeDTO(5672);
final NodeDTO node2Dto = getNodeDtoByApiPort(5672);
verifyFlowContentsOnDisk(backupFile);
disconnectNode(node2Dto);
@ -182,16 +181,6 @@ public class JoinClusterWithDifferentFlow extends NiFiSystemIT {
}
private NodeDTO getNodeDTO(final int apiPort) throws NiFiClientException, IOException {
final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes();
final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
.filter(nodeDto -> nodeDto.getApiPort() == apiPort)
.findAny()
.orElseThrow(() -> new RuntimeException("Could not locate Node 2"));
return node2Dto;
}
private void disconnectNode(final NodeDTO nodeDto) throws NiFiClientException, IOException, InterruptedException {
// Disconnect Node 2 so that we can go to the node directly via the REST API and ensure that the flow is correct.
final NodeEntity nodeEntity = new NodeEntity();

View File

@ -23,7 +23,6 @@ import org.apache.nifi.tests.system.SpawnedClusterNiFiInstanceFactory;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
@ -71,7 +70,7 @@ public class OffloadIT extends NiFiSystemIT {
waitForQueueNotEmpty(connectionEntity.getId());
final NodeDTO node2Dto = getNodeDTO(5672);
final NodeDTO node2Dto = getNodeDtoByApiPort(5672);
disconnectNode(node2Dto);
waitForNodeStatus(node2Dto, "DISCONNECTED");
@ -84,16 +83,6 @@ public class OffloadIT extends NiFiSystemIT {
waitForAllNodesConnected();
}
private NodeDTO getNodeDTO(final int apiPort) throws NiFiClientException, IOException {
final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes();
final NodeDTO node2Dto = clusterEntity.getCluster().getNodes().stream()
.filter(nodeDto -> nodeDto.getApiPort() == apiPort)
.findAny()
.orElseThrow(() -> new RuntimeException("Could not locate Node 2"));
return node2Dto;
}
private void disconnectNode(final NodeDTO nodeDto) throws NiFiClientException, IOException, InterruptedException {
getClientUtil().disconnectNode(nodeDto.getNodeId());
@ -101,7 +90,7 @@ public class OffloadIT extends NiFiSystemIT {
final Integer apiPort = nodeDto.getApiPort();
waitFor(() -> {
try {
final NodeDTO dto = getNodeDTO(apiPort);
final NodeDTO dto = getNodeDtoByApiPort(apiPort);
final String status = dto.getStatus();
return "DISCONNECTED".equals(status);
} catch (final Exception e) {

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.controllerservice;
import org.apache.nifi.tests.system.NiFiInstance;
import org.apache.nifi.tests.system.NiFiInstanceFactory;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.util.Collections;
public class ControllerServiceEnableDisableConflictIT extends NiFiSystemIT {
@Override
public NiFiInstanceFactory getInstanceFactory() {
return createTwoNodeInstanceFactory();
}
@Override
protected boolean isDestroyEnvironmentAfterEachTest() {
// We need to destroy the environment after each test because otherwise, we have a situation where in the second invocation of
// #testJoinClusterWithEnabledServiceWhileDisabling when we restart the node, we can have a race condition in which Node 2 is
// disconnected but hasn't yet acknowledged the disconnection. It's then restarted. Meanwhile, Node 1 keeps attempting to notify
// Node 2 that it's been disconnected. Once Node 2 is restarted and connects, Node 1 sends it the Disconnection request,
// and Node 2 then disconnects. This can cause issues during shutdown / cleanup.
// But if we tear down between tests, this won't occur.
return true;
}
@ParameterizedTest
@EnumSource(NodeReconnectMode.class)
public void testJoinClusterWithEnabledServiceWhileDisabling(final NodeReconnectMode reconnectMode) throws NiFiClientException, IOException, InterruptedException {
final ControllerServiceEntity sleepService = getClientUtil().createControllerService("StandardSleepService");
getClientUtil().updateControllerServiceProperties(sleepService, Collections.singletonMap("@OnDisabled Sleep Time", "10 secs"));
getClientUtil().enableControllerService(sleepService);
final NodeDTO node2Dto = getNodeDtoByNodeIndex(2);
final String node2Id = node2Dto.getNodeId();
getClientUtil().disconnectNode(node2Id);
waitForNodeStatus(node2Dto, "DISCONNECTED");
switchClientToNode(2);
getClientUtil().disableControllerService(sleepService);
switchClientToNode(1);
switch (reconnectMode) {
case RECONNECT_DIRECTLY:
getClientUtil().connectNode(node2Id);
break;
case RESTART_NODE:
final NiFiInstance node2Instance = getNiFiInstance().getNodeInstance(2);
node2Instance.stop();
node2Instance.start(true);
break;
}
waitForAllNodesConnected();
waitFor(() -> {
final ControllerServiceEntity serviceEntity = getNifiClient().getControllerServicesClient().getControllerService(sleepService.getId());
return "ENABLED".equalsIgnoreCase(serviceEntity.getComponent().getState());
});
}
private enum NodeReconnectMode {
RECONNECT_DIRECTLY,
RESTART_NODE;
}
}