diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index ef68c3d67b..f0175d1b7c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -73,6 +73,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -90,11 +91,12 @@ public final class StandardProcessScheduler implements ProcessScheduler { private final StateManagerProvider stateManagerProvider; private final long processorStartTimeoutMillis; private final LifecycleStateManager lifecycleStateManager; + private final AtomicLong frameworkTaskThreadIndex = new AtomicLong(1L); - private final ScheduledExecutorService frameworkTaskExecutor; private final ConcurrentMap strategyAgentMap = new ConcurrentHashMap<>(); // thread pool for starting/stopping components + private volatile boolean shutdown = false; private final ScheduledExecutorService componentLifeCycleThreadPool; private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processor Lifecycle", true); @@ -111,8 +113,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); processorStartTimeoutMillis = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); - - frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread"); } public ControllerServiceProvider getControllerServiceProvider() { @@ -123,20 +123,36 @@ public final class StandardProcessScheduler implements ProcessScheduler { return stateManagerProvider.getStateManager(componentId); } - public void scheduleFrameworkTask(final Runnable command, final String taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) { - frameworkTaskExecutor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - try { - command.run(); - } catch (final Throwable t) { - LOG.error("Failed to run Framework Task {} due to {}", taskName, t.toString()); - if (LOG.isDebugEnabled()) { - LOG.error("", t); - } - } + public void scheduleFrameworkTask(final Runnable task, final String taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) { + Thread.ofVirtual() + .name(taskName) + .start(() -> invokeRepeatedly(task, taskName, initialDelay, delay, timeUnit)); + } + + private void invokeRepeatedly(final Runnable task, final String taskName, final long initialDelay, final long delayBetweenInvocations, final TimeUnit timeUnit) { + if (initialDelay > 0) { + try { + timeUnit.sleep(initialDelay); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return; } - }, initialDelay, delay, timeUnit); + } + + while (!this.shutdown) { + try { + task.run(); + } catch (final Exception e) { + LOG.error("Failed to run Framework Task {}", taskName, e); + } + + try { + timeUnit.sleep(delayBetweenInvocations); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } } /** @@ -145,7 +161,29 @@ public final class StandardProcessScheduler implements ProcessScheduler { * @param task the task to perform */ public Future submitFrameworkTask(final Runnable task) { - return frameworkTaskExecutor.submit(task); + final CompletableFuture future = new CompletableFuture<>(); + + Thread.ofVirtual() + .name("Framework Task Thread-" + frameworkTaskThreadIndex.getAndIncrement()) + .start(wrapTask(task, future)); + + return future; + } + + private Runnable wrapTask(final Runnable task, final CompletableFuture future) { + return () -> { + try { + task.run(); + future.complete(null); + } catch (final Exception e) { + LOG.error("Encountered unexpected Exception when performing background Framework Task", e); + future.completeExceptionally(e); + } catch (final Throwable t) { + LOG.error("Encountered unexpected Exception when performing background Framework Task", t); + future.completeExceptionally(t); + throw t; + } + }; } @Override @@ -172,6 +210,8 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public void shutdown() { + shutdown = true; + for (final SchedulingAgent schedulingAgent : strategyAgentMap.values()) { try { schedulingAgent.shutdown(); @@ -181,7 +221,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { } } - frameworkTaskExecutor.shutdown(); componentLifeCycleThreadPool.shutdown(); } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 397f98c595..b55bd80967 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -126,6 +126,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class NiFiClientUtil { @@ -696,7 +697,10 @@ public class NiFiClientUtil { } public void waitForProcessorState(final String processorId, final String expectedState) throws NiFiClientException, IOException, InterruptedException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2); + logger.info("Waiting for Processor {} to reach state {}", processorId, expectedState); + + while (System.currentTimeMillis() < maxTimestamp) { final ProcessorEntity entity = getProcessorClient().getProcessor(processorId); final String state = entity.getComponent().getState(); @@ -714,6 +718,7 @@ public class NiFiClientUtil { final ProcessorStatusSnapshotDTO snapshotDto = entity.getStatus().getAggregateSnapshot(); if (snapshotDto.getActiveThreadCount() == 0 && snapshotDto.getTerminatedThreadCount() == 0) { + logger.info("Processor {} has reached desired state of {}", processorId, expectedState); return; } @@ -722,7 +727,10 @@ public class NiFiClientUtil { } public ReportingTaskEntity waitForReportingTaskState(final String reportingTaskId, final String expectedState) throws NiFiClientException, IOException, InterruptedException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + logger.info("Waiting for Reporting Task {} to reach desired state of {}", reportingTaskId, expectedState); + + while (System.currentTimeMillis() < maxTimestamp) { final ReportingTaskEntity entity = nifiClient.getReportingTasksClient().getReportingTask(reportingTaskId); final String state = entity.getComponent().getState(); @@ -735,15 +743,19 @@ public class NiFiClientUtil { } if ("RUNNING".equals(expectedState)) { + logger.info("Reporting task {} is now running", reportingTaskId); return entity; } if (entity.getStatus().getActiveThreadCount() == 0) { + logger.info("Reporting task {} is now stopped", reportingTaskId); return entity; } Thread.sleep(10L); } + + throw new IOException("Timed out waiting for Reporting Task " + reportingTaskId + " to reach state of " + expectedState); } public void waitForReportingTaskValid(final String reportingTaskId) throws NiFiClientException, IOException { @@ -868,9 +880,13 @@ public class NiFiClientUtil { } private void waitForNoRunningComponents(final String groupId) throws NiFiClientException, IOException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2); + logger.info("Waiting for no more running components for group {}", groupId); + + while (System.currentTimeMillis() < maxTimestamp) { final boolean anyRunning = isAnyComponentRunning(groupId); if (!anyRunning) { + logger.info("All Process Groups have finished"); return; } @@ -906,6 +922,8 @@ public class NiFiClientUtil { } private void waitForProcessorsStopped(final String groupId) throws IOException, NiFiClientException { + logger.info("Waiting for processors in group {} to stop", groupId); + final ProcessGroupFlowEntity rootGroup = nifiClient.getFlowClient().getProcessGroup(groupId); final FlowDTO rootFlowDTO = rootGroup.getProcessGroupFlow().getFlow(); for (final ProcessorEntity processor : rootFlowDTO.getProcessors()) { @@ -920,6 +938,8 @@ public class NiFiClientUtil { for (final ProcessGroupEntity group : rootFlowDTO.getProcessGroups()) { waitForProcessorsStopped(group.getComponent()); } + + logger.info("All processors in group {} have stopped", groupId); } private void waitForProcessorsStopped(final ProcessGroupDTO group) throws IOException, NiFiClientException { @@ -956,6 +976,8 @@ public class NiFiClientUtil { } public ActivateControllerServicesEntity disableControllerServices(final String groupId, final boolean recurse) throws NiFiClientException, IOException { + logger.info("Starting disableControllerServices for group {}, recurse={}", groupId, recurse); + final ActivateControllerServicesEntity activateControllerServicesEntity = new ActivateControllerServicesEntity(); activateControllerServicesEntity.setId(groupId); activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_DISABLED); @@ -973,6 +995,7 @@ public class NiFiClientUtil { } } + logger.info("Finished disableControllerServices for group {}", groupId); return activateControllerServices; } @@ -998,7 +1021,10 @@ public class NiFiClientUtil { } public void waitForControllerServiceRunStatus(final String id, final String requestedRunStatus) throws NiFiClientException, IOException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + logger.info("Waiting for Controller Service {} to have a Run Status of {}", id, requestedRunStatus); + + while (System.currentTimeMillis() < maxTimestamp) { final ControllerServiceEntity serviceEntity = nifiClient.getControllerServicesClient().getControllerService(id); final String serviceState = serviceEntity.getComponent().getState(); if (requestedRunStatus.equals(serviceState)) { @@ -1029,7 +1055,9 @@ public class NiFiClientUtil { } public void waitForControllerServiceState(final String groupId, final String desiredState, final Collection serviceIdsOfInterest) throws NiFiClientException, IOException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + + while (System.currentTimeMillis() < maxTimestamp) { final List nonDisabledServices = getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest); if (nonDisabledServices.isEmpty()) { logger.info("Process Group [{}] Controller Services have desired state [{}]", groupId, desiredState); @@ -1049,7 +1077,9 @@ public class NiFiClientUtil { } public void waitForControllerServiceValidationStatus(final String controllerServiceId, final String validationStatus) throws NiFiClientException, IOException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + + while (System.currentTimeMillis() < maxTimestamp) { final ControllerServiceEntity controllerServiceEntity = nifiClient.getControllerServicesClient().getControllerService(controllerServiceId); final String currentValidationStatus = controllerServiceEntity.getComponent().getValidationStatus(); if (validationStatus.equals(currentValidationStatus)) { @@ -1070,7 +1100,9 @@ public class NiFiClientUtil { } public void waitForReportingTaskValidationStatus(final String reportingTaskId, final String validationStatus) throws NiFiClientException, IOException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + + while (System.currentTimeMillis() < maxTimestamp) { final ReportingTaskEntity reportingTaskEntity = nifiClient.getReportingTasksClient().getReportingTask(reportingTaskId); final String currentValidationStatus = reportingTaskEntity.getStatus().getValidationStatus(); if (validationStatus.equalsIgnoreCase(currentValidationStatus)) { @@ -1306,11 +1338,17 @@ public class NiFiClientUtil { public DropRequestEntity emptyQueue(final String connectionId) throws NiFiClientException, IOException { final ConnectionClient connectionClient = getConnectionClient(); - DropRequestEntity requestEntity; - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + + DropRequestEntity requestEntity = null; + while (System.currentTimeMillis() < maxTimestamp) { requestEntity = connectionClient.emptyQueue(connectionId); try { while (requestEntity.getDropRequest().getPercentCompleted() < 100) { + if (System.currentTimeMillis() > maxTimestamp) { + throw new IOException("Timed out waiting for queue " + connectionId + " to empty"); + } + try { Thread.sleep(10L); } catch (final InterruptedException ie) { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index b927fae922..5168a152c5 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -99,7 +99,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { private static final NiFiInstanceCache instanceCache = new NiFiInstanceCache(); static { - Runtime.getRuntime().addShutdownHook(new Thread(() -> instanceCache.shutdown())); + Runtime.getRuntime().addShutdownHook(new Thread(instanceCache::shutdown)); } private TestInfo testInfo; @@ -107,6 +107,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { @BeforeEach public void setup(final TestInfo testInfo) throws IOException { this.testInfo = testInfo; + final String testClassName = testInfo.getTestClass().map(Class::getSimpleName).orElse(""); final String friendlyTestName = testClassName + ":" + testInfo.getDisplayName(); logger.info("Beginning Test {}", friendlyTestName); @@ -133,21 +134,24 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { return true; } - protected TestInfo getTestInfo() { - return testInfo; - } @AfterAll public static void cleanup() { + logger.info("Beginning cleanup"); + final NiFiInstance nifi = nifiRef.get(); nifiRef.set(null); if (nifi != null) { instanceCache.stopOrRecycle(nifi); } + + logger.info("Finished cleanup"); } @AfterEach public void teardown() throws Exception { + logger.info("Beginning teardown"); + try { Exception destroyFlowFailure = null; @@ -182,6 +186,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { if (nifiClient != null) { nifiClient.close(); } + + logger.info("Finished teardown"); } } @@ -230,6 +236,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { } protected void destroyFlow() throws NiFiClientException, IOException, InterruptedException { + logger.info("Starting destroyFlow"); + getClientUtil().stopProcessGroupComponents("root"); getClientUtil().disableControllerServices("root", true); getClientUtil().stopReportingTasks(); @@ -238,6 +246,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { getClientUtil().deleteAll("root"); getClientUtil().deleteControllerLevelServices(); getClientUtil().deleteReportingTasks(); + + logger.info("Finished destroyFlow"); } protected void waitForAllNodesConnected() { @@ -273,7 +283,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { } if (System.currentTimeMillis() > maxTime) { - throw new RuntimeException("Waited up to 60 seconds for both nodes to connect but only " + connectedNodeCount + " nodes connected"); + throw new RuntimeException("Waited up to 60 seconds for all nodes to connect but only " + connectedNodeCount + " nodes connected"); } try { @@ -569,7 +579,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { } protected void waitForCoordinatorElected() throws InterruptedException { - waitFor(() -> isCoordinatorElected()); + waitFor(this::isCoordinatorElected); } protected boolean isCoordinatorElected() throws NiFiClientException, IOException { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java index 11b1172771..bc1047f927 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java @@ -37,6 +37,7 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -234,6 +235,8 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory } private void waitForStartup() throws IOException { + final long timeoutMillis = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5L); + try (final NiFiClient client = createClient()) { while (true) { try { @@ -241,6 +244,10 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory logger.info("NiFi Startup Completed [{}]", instanceDirectory.getName()); return; } catch (final Exception e) { + if (System.currentTimeMillis() > timeoutMillis) { + throw new IOException("After waiting 5 minutes, NiFi instance still has not started"); + } + try { Thread.sleep(1000L); } catch (InterruptedException ex) { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java index 7c96e82edf..dc02c2cc7b 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java @@ -27,6 +27,8 @@ import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; @@ -37,10 +39,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { - + private static final Logger logger = LoggerFactory.getLogger(SingleFlowFileConcurrencyIT.class); @Test public void testSingleConcurrency() throws NiFiClientException, IOException, InterruptedException { + logger.info("Beginning test testSingleConcurrency"); + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); @@ -94,11 +98,15 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { // Ensure that 3 FlowFiles are queued up for Terminate waitForQueueCount(outputToTerminate.getId(), 3); + + logger.info("Finished test testSingleConcurrency"); } @Test public void testSingleConcurrencyAndBatchOutput() throws NiFiClientException, IOException, InterruptedException { + logger.info("Beginning test testSingleConcurrencyAndBatchOutput"); + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); @@ -163,11 +171,15 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { final Map secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes(); assertEquals("1", secondOutAttributes.get("batch.output.Out")); assertEquals("1", secondOutAttributes.get("batch.output.Out2")); + + logger.info("Finished test testSingleConcurrencyAndBatchOutput"); } @Test public void testBatchOutputHasCorrectNumbersOnRestart() throws NiFiClientException, IOException, InterruptedException { + logger.info("Beginning test testBatchOutputHasCorrectNumbersOnRestart"); + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); @@ -238,6 +250,8 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { final Map secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes(); assertEquals("1", secondOutAttributes.get("batch.output.Out")); assertEquals("1", secondOutAttributes.get("batch.output.Out2")); + + logger.info("Finished test testBatchOutputHasCorrectNumbersOnRestart"); } }