NIFI-9233 - Improve reliability of system integration tests (#5749)

* NIFI-9233 - Improve reliability of system integration tests
This commit is contained in:
greyp9 2022-02-15 13:14:01 -05:00 committed by GitHub
parent 51202d7c2d
commit 87cfd43f6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 63 additions and 12 deletions

View File

@ -339,17 +339,21 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
@Override
public void verifyCanEnable() {
final ControllerServiceState state = getState();
if (state != ControllerServiceState.DISABLED) {
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled - it has a state of " + state);
switch (state) {
case DISABLED:
return;
case DISABLING:
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled - it has a state of " + state);
default:
if (isReloadAdditionalResourcesNecessary()) {
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because additional resources are needed - it has a state of " + state);
}
}
}
@Override
public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) {
final ControllerServiceState state = getState();
if (state != ControllerServiceState.DISABLED) {
throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled - it has a state of " + state);
}
verifyCanEnable();
}
@Override

View File

@ -625,6 +625,25 @@ public abstract class AbstractComponentNode implements ComponentNode {
return new PropertyConfiguration(value, references, references.toReferenceList(), variableImpact);
}
/**
* Generates fingerprint for the additional urls and compares it with the previous
* fingerprint value.
*/
@Override
public synchronized boolean isReloadAdditionalResourcesNecessary() {
// Components that don't have any PropertyDescriptors marked `dynamicallyModifiesClasspath`
// won't have the fingerprint i.e. will be null, in such cases do nothing
if (additionalResourcesFingerprint == null) {
return false;
}
final Set<PropertyDescriptor> descriptors = this.getProperties().keySet();
final Set<URL> additionalUrls = this.getAdditionalClasspathResources(descriptors);
final String newFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey());
return (!StringUtils.equals(additionalResourcesFingerprint, newFingerprint));
}
/**
* Generates fingerprint for the additional urls and compares it with the previous
* fingerprint value. If the fingerprint values don't match, the function calls the

View File

@ -147,6 +147,8 @@ public interface ComponentNode extends ComponentAuthorizable {
void verifyCanUpdateBundle(BundleCoordinate bundleCoordinate) throws IllegalStateException;
boolean isReloadAdditionalResourcesNecessary();
void reloadAdditionalResourcesIfNecessary();
void resetValidationState();

View File

@ -34,6 +34,8 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLServerSocket;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.security.util.CertificateUtils;
@ -96,7 +98,7 @@ public class ConnectionLoadBalanceServer {
}
public void stop() {
stopped = false;
stopped = true;
if (acceptConnection != null) {
acceptConnection.stop();
@ -148,6 +150,7 @@ public class ConnectionLoadBalanceServer {
public void stop() {
this.stopped = true;
IOUtils.closeQuietly(socket);
}
@Override

View File

@ -20,7 +20,9 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.junit.After;
import org.junit.AfterClass;
@ -36,6 +38,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -192,6 +195,7 @@ public abstract class NiFiSystemIT {
final ClusteSummaryEntity clusterSummary = client.getFlowClient().getClusterSummary();
final int connectedNodeCount = clusterSummary.getClusterSummary().getConnectedNodeCount();
if (connectedNodeCount == expectedNumberOfNodes) {
logger.info("Wait successful, {} nodes connected", expectedNumberOfNodes);
return;
}
@ -326,6 +330,21 @@ public abstract class NiFiSystemIT {
}
}
protected void waitForNodeStatus(final NodeDTO nodeDto, final String status) throws InterruptedException {
waitFor(() -> {
try {
final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes();
final Collection<NodeDTO> nodes = clusterEntity.getCluster().getNodes();
final NodeDTO nodeDtoMatch = nodes.stream()
.filter(n -> n.getApiPort().equals(nodeDto.getApiPort())).findFirst().get();
return nodeDtoMatch.getStatus().equals(status);
} catch (final Exception e) {
logger.error("Failed to determine node status", e);
}
return false;
});
}
protected void waitForQueueNotEmpty(final String connectionId) throws InterruptedException {
logger.info("Waiting for Queue on Connection {} to not be empty", connectionId);

View File

@ -303,7 +303,10 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
final ControllerServiceEntity node2SleepService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(sleepService.getId());
assertEquals(sleepService.getId(), node2SleepService.getId());
waitFor(() -> node2SleepService.getComponent().getState().equals(ENABLED_STATE));
waitFor(() -> {
final ControllerServiceEntity updatedNode2SleepService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(sleepService.getId());
return updatedNode2SleepService.getComponent().getState().equals(ENABLED_STATE);
});
final ReportingTaskEntity node2ReportingTask = getNifiClient().getReportingTasksClient(DO_NOT_REPLICATE).getReportingTask(reportingTask.getId());
waitFor(() -> node2ReportingTask.getComponent().getState().equals(RUNNING_STATE));
@ -683,7 +686,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
connection.setDisconnectedNodeAcknowledged(true);
// Delete the CountFlowFiles processor, and countB and countC services, disable A.
getNifiClient().getProcessorClient().stopProcessor(countFlowFiles);
getClientUtil().stopProcessor(countFlowFiles);
getNifiClient().getConnectionClient().deleteConnection(connection);
getNifiClient().getProcessorClient().deleteProcessor(countFlowFiles);
getClientUtil().disableControllerServices("root", true);

View File

@ -75,10 +75,11 @@ public class OffloadIT extends NiFiSystemIT {
final NodeDTO node2Dto = getNodeDTO(5672);
disconnectNode(node2Dto);
waitForNodeStatus(node2Dto, "DISCONNECTED");
final String nodeId = node2Dto.getNodeId();
getClientUtil().offloadNode(nodeId);
waitFor(this::isNodeOffloaded);
waitForNodeStatus(node2Dto, "OFFLOADED");
getClientUtil().connectNode(nodeId);
waitForAllNodesConnected();

View File

@ -254,9 +254,9 @@ public class LoadBalanceIT extends NiFiSystemIT {
final String nodeId = firstNodeDto.getNodeId();
getClientUtil().disconnectNode(nodeId);
waitForNodeStatus(firstNodeDto, "DISCONNECTED");
getClientUtil().offloadNode(nodeId);
waitFor(this::isNodeOffloaded);
waitForNodeStatus(firstNodeDto, "OFFLOADED");
assertEquals(20, getQueueSize(connection.getId()));
assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId()));