NIFI-10037: When system test fails to clean up flow, destroy the entire environment so that the next test starts in a healthy state. Name troubleshooting directories with the name of the test class to avoid ambiguity. Also added a log statement so that we know which test is running when looking at the log output from the tests themselves. Finally, found an issue in AbstractComponentNode in which we iterate over the elements in a Map and call setProperty, which can update the underlying Map - updated to first create a copy of the HashMap. Updated that in this Jira because I suspect it is causing one of the tests failures that I've been investigating.

This closes #6059

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2022-05-19 11:58:52 -04:00 committed by exceptionfactory
parent 1cfca0d978
commit 38b51b0dde
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
8 changed files with 64 additions and 23 deletions

View File

@ -66,6 +66,7 @@ import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
@ -591,7 +592,10 @@ public abstract class AbstractComponentNode implements ComponentNode {
// use setProperty instead of setProperties so we can bypass the class loading logic. // use setProperty instead of setProperties so we can bypass the class loading logic.
// Consider value changed if it is different than the PropertyDescriptor's default value because we need to call the #onPropertiesModified // Consider value changed if it is different than the PropertyDescriptor's default value because we need to call the #onPropertiesModified
// method on the component if the current value is not the default value, since the component itself is being reloaded. // method on the component if the current value is not the default value, since the component itself is being reloaded.
for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry : this.properties.entrySet()) { // Also, create a copy of this.properties instead of iterating directly over this.properties since the call to setProperty can change the
// underlying map, and the behavior of modifying the map while iterating over its elements is undefined.
final Map<PropertyDescriptor, PropertyConfiguration> copyOfPropertiesMap = new HashMap<>(this.properties);
for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry : copyOfPropertiesMap.entrySet()) {
final PropertyDescriptor propertyDescriptor = entry.getKey(); final PropertyDescriptor propertyDescriptor = entry.getKey();
final PropertyConfiguration configuration = entry.getValue(); final PropertyConfiguration configuration = entry.getValue();

View File

@ -73,6 +73,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
@BeforeEach @BeforeEach
public void setup(final TestInfo testInfo) throws IOException { public void setup(final TestInfo testInfo) throws IOException {
this.testInfo = testInfo; this.testInfo = testInfo;
final String testClassName = testInfo.getTestClass().map(Class::getSimpleName).orElse("<Unknown Test Class>");
logger.info("Beginning Test {}:{}", testClassName, testInfo.getDisplayName());
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
setupClient(); setupClient();
@ -116,6 +118,12 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
if (isDestroyEnvironmentAfterEachTest()) { if (isDestroyEnvironmentAfterEachTest()) {
cleanup(); cleanup();
} else if (destroyFlowFailure != null) {
// If unable to destroy the flow, we need to shutdown the instance and delete the flow and completely recreate the environment.
// Otherwise, we will be left in an unknown state for the next test, and that can cause cascading failures that are very difficult
// to understand and troubleshoot.
logger.info("Because there was a failure when destroying the flow, will completely tear down the environments and start with a clean environment for the next test.");
cleanup();
} }
if (destroyFlowFailure != null) { if (destroyFlowFailure != null) {

View File

@ -40,7 +40,8 @@ public class TroubleshootingTestWatcher implements TestWatcher {
final NiFiInstanceProvider provider = (NiFiInstanceProvider) testInstance; final NiFiInstanceProvider provider = (NiFiInstanceProvider) testInstance;
final String displayName = context.getDisplayName(); final String displayName = context.getDisplayName();
try { try {
final File dir = quarantineTroubleshootingInfo(provider, displayName, cause); final String testClassName = context.getTestClass().map(Class::getSimpleName).orElse("TestClassUnknown");
final File dir = quarantineTroubleshootingInfo(provider, testClassName, displayName, cause);
logger.info("Test Failed [{}]: Troubleshooting information stored [{}]", displayName, dir.getAbsolutePath()); logger.info("Test Failed [{}]: Troubleshooting information stored [{}]", displayName, dir.getAbsolutePath());
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Test Failed [{}]: Troubleshooting information not stored", displayName, e); logger.error("Test Failed [{}]: Troubleshooting information not stored", displayName, e);
@ -49,17 +50,21 @@ public class TroubleshootingTestWatcher implements TestWatcher {
} }
} }
private File quarantineTroubleshootingInfo(final NiFiInstanceProvider provider, final String methodName, final Throwable failureCause) throws IOException { private File quarantineTroubleshootingInfo(final NiFiInstanceProvider provider, final String testClassName, final String methodName, final Throwable failureCause) throws IOException {
NiFiInstance instance = provider.getNiFiInstance(); NiFiInstance instance = provider.getNiFiInstance();
// The teardown method may or may not have already run at this point. If it has, the instance will be null. // The teardown method may or may not have already run at this point. If it has, the instance will be null.
// In that case, just create a new instance and use it - it will map to the same directories. // In that case, just create a new instance and use it - it will map to the same directories.
if (instance == null) { if (instance == null) {
logger.warn("While capturing troubleshooting info for {}, the NiFi Instance is not available. Will create a new one for Diagnostics purposes, but some of the diagnostics may be less " +
"accurate, since it's not the same instance that ran the test", methodName);
instance = provider.getInstanceFactory().createInstance(); instance = provider.getInstanceFactory().createInstance();
} }
final File troubleshooting = new File("target/troubleshooting"); final File troubleshooting = new File("target/troubleshooting");
final File quarantineDir = new File(troubleshooting, methodName); final String quarantineDirName = testClassName + "-" + methodName.replace("()", "");
final File quarantineDir = new File(troubleshooting, quarantineDirName);
quarantineDir.mkdirs(); quarantineDir.mkdirs();
instance.quarantineTroubleshootingInfo(quarantineDir, failureCause); instance.quarantineTroubleshootingInfo(quarantineDir, failureCause);

View File

@ -42,12 +42,13 @@ import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.LongSummaryStatistics; import java.util.LongSummaryStatistics;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class LoadBalanceIT extends NiFiSystemIT { public class LoadBalanceIT extends NiFiSystemIT {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@ -277,13 +278,13 @@ public class LoadBalanceIT extends NiFiSystemIT {
private int getQueueSize(final String connectionId) { private int getQueueSize(final String connectionId) {
final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId); final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId);
final ConnectionStatusDTO connectionStatusDto = statusEntity.getConnectionStatus(); final ConnectionStatusDTO connectionStatusDto = statusEntity.getConnectionStatus();
return connectionStatusDto.getAggregateSnapshot().getFlowFilesQueued().intValue(); return connectionStatusDto.getAggregateSnapshot().getFlowFilesQueued();
} }
private long getQueueBytes(final String connectionId) { private long getQueueBytes(final String connectionId) {
final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId); final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId);
final ConnectionStatusDTO connectionStatusDto = statusEntity.getConnectionStatus(); final ConnectionStatusDTO connectionStatusDto = statusEntity.getConnectionStatus();
return connectionStatusDto.getAggregateSnapshot().getBytesQueued().longValue(); return connectionStatusDto.getAggregateSnapshot().getBytesQueued();
} }
private boolean isConnectionDoneLoadBalancing(final String connectionId) { private boolean isConnectionDoneLoadBalancing(final String connectionId) {
@ -372,22 +373,45 @@ public class LoadBalanceIT extends NiFiSystemIT {
instance2.start(true); instance2.start(true);
waitForAllNodesConnected(); waitForAllNodesConnected();
// Generate the data again
generate = getNifiClient().getProcessorClient().getProcessor(generate.getId()); generate = getNifiClient().getProcessorClient().getProcessor(generate.getId());
getNifiClient().getProcessorClient().startProcessor(generate);
// Wait until all 20 FlowFiles are queued up // Generate data and wait for it to be spread across the cluster. We do this in an infinite while() loop because
waitFor(() -> { // there can be a failure, in which case we'll retry. If that happens, we just want to keep retrying until the test
final ConnectionStatusEntity secondRoundStatusEntity = getConnectionStatus(connection.getId()); // times out.
return secondRoundStatusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == 20; while (true) {
}); // Generate the data.
getNifiClient().getProcessorClient().startProcessor(generate);
// Wait until load balancing is complete // Wait until all 20 FlowFiles are queued up
waitFor(() -> isConnectionDoneLoadBalancing(connection.getId())); waitFor(() -> {
final ConnectionStatusEntity secondRoundStatusEntity = getConnectionStatus(connection.getId());
return secondRoundStatusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == 20;
});
// Ensure that the FlowFiles are evenly distributed between the nodes. // Wait until load balancing is complete
final ConnectionStatusEntity afterSecondDataGenerationStatusEntity = getConnectionStatus(connection.getId()); waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
assertTrue(isEvenlyDistributed(afterSecondDataGenerationStatusEntity));
// Log the distribution of data between nodes for easier troubleshooting in case there's a failure.
final ConnectionStatusEntity afterSecondDataGenerationStatusEntity = getConnectionStatus(connection.getId());
final List<NodeConnectionStatusSnapshotDTO> nodeSnapshots = afterSecondDataGenerationStatusEntity.getConnectionStatus().getNodeSnapshots();
logger.info("FlowFiles Queued Per Node:");
nodeSnapshots.forEach(snapshot ->
logger.info("{}:{} - {}", snapshot.getAddress(), snapshot.getApiPort(), snapshot.getStatusSnapshot().getFlowFilesQueued())
);
// Check if the FlowFiles are evenly distributed between the nodes. If so, we're done.
final boolean evenlyDistributed = isEvenlyDistributed(afterSecondDataGenerationStatusEntity);
if (evenlyDistributed) {
break;
}
// If there's an IOException thrown while communicating between the nodes, the data will be rebalanced and will go to
// the local partition. There's nothing we can do about that in this test. However, we can verify that NiFi recovers
// from this and continues to distribute data. To do that, we will stop the processor so that it can be started again
// (and produce more data) and we can empty the queue so that we know how much data to expect.
getNifiClient().getProcessorClient().stopProcessor(generate);
getClientUtil().emptyQueue(connection.getId());
}
assertEquals(20, getQueueSize(connection.getId())); assertEquals(20, getQueueSize(connection.getId()));
assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId())); assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId()));

View File

@ -77,7 +77,7 @@ nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
nifi.content.claim.max.appendable.size=50 KB nifi.content.claim.max.appendable.size=50 KB
nifi.content.repository.directory.default=./content_repository nifi.content.repository.directory.default=./content_repository
nifi.content.repository.archive.max.retention.period=12 hours nifi.content.repository.archive.max.retention.period=12 hours
nifi.content.repository.archive.max.usage.percentage=50% nifi.content.repository.archive.max.usage.percentage=90%
nifi.content.repository.archive.enabled=true nifi.content.repository.archive.enabled=true
nifi.content.repository.always.sync=false nifi.content.repository.always.sync=false
nifi.content.viewer.url=../nifi-content-viewer/ nifi.content.viewer.url=../nifi-content-viewer/

View File

@ -27,7 +27,7 @@ java.arg.3=-Xmx512m
java.arg.14=-Djava.awt.headless=true java.arg.14=-Djava.awt.headless=true
java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8003 #java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8003
java.arg.nodeNum=-DnodeNumber=2 java.arg.nodeNum=-DnodeNumber=2

View File

@ -77,7 +77,7 @@ nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
nifi.content.claim.max.appendable.size=50 KB nifi.content.claim.max.appendable.size=50 KB
nifi.content.repository.directory.default=./content_repository nifi.content.repository.directory.default=./content_repository
nifi.content.repository.archive.max.retention.period=12 hours nifi.content.repository.archive.max.retention.period=12 hours
nifi.content.repository.archive.max.usage.percentage=50% nifi.content.repository.archive.max.usage.percentage=90%
nifi.content.repository.archive.enabled=true nifi.content.repository.archive.enabled=true
nifi.content.repository.always.sync=false nifi.content.repository.always.sync=false
nifi.content.viewer.url=../nifi-content-viewer/ nifi.content.viewer.url=../nifi-content-viewer/

View File

@ -77,7 +77,7 @@ nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
nifi.content.claim.max.appendable.size=50 KB nifi.content.claim.max.appendable.size=50 KB
nifi.content.repository.directory.default=./content_repository nifi.content.repository.directory.default=./content_repository
nifi.content.repository.archive.max.retention.period=12 hours nifi.content.repository.archive.max.retention.period=12 hours
nifi.content.repository.archive.max.usage.percentage=50% nifi.content.repository.archive.max.usage.percentage=90%
nifi.content.repository.archive.enabled=true nifi.content.repository.archive.enabled=true
nifi.content.repository.always.sync=false nifi.content.repository.always.sync=false
nifi.content.viewer.url=../nifi-content-viewer/ nifi.content.viewer.url=../nifi-content-viewer/