NIFI-12161: This closes #7829. Ensuring framework threads use lightweight threads instead of a capped thread pool. This prevents framework threads from livelocking in the event enough framework threads are holding threads while those needing to run cannot get them.

Signed-off-by: Joseph Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2023-10-02 11:12:10 -04:00 committed by Joseph Witt
parent 746dad7f46
commit 1d9ccb3857
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
5 changed files with 142 additions and 34 deletions

View File

@ -73,6 +73,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
@ -90,11 +91,12 @@ public final class StandardProcessScheduler implements ProcessScheduler {
private final StateManagerProvider stateManagerProvider;
private final long processorStartTimeoutMillis;
private final LifecycleStateManager lifecycleStateManager;
private final AtomicLong frameworkTaskThreadIndex = new AtomicLong(1L);
private final ScheduledExecutorService frameworkTaskExecutor;
private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>();
// thread pool for starting/stopping components
private volatile boolean shutdown = false;
private final ScheduledExecutorService componentLifeCycleThreadPool;
private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processor Lifecycle", true);
@ -111,8 +113,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
processorStartTimeoutMillis = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread");
}
public ControllerServiceProvider getControllerServiceProvider() {
@ -123,20 +123,36 @@ public final class StandardProcessScheduler implements ProcessScheduler {
return stateManagerProvider.getStateManager(componentId);
}
public void scheduleFrameworkTask(final Runnable command, final String taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) {
frameworkTaskExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
command.run();
} catch (final Throwable t) {
LOG.error("Failed to run Framework Task {} due to {}", taskName, t.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", t);
}
}
public void scheduleFrameworkTask(final Runnable task, final String taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) {
Thread.ofVirtual()
.name(taskName)
.start(() -> invokeRepeatedly(task, taskName, initialDelay, delay, timeUnit));
}
private void invokeRepeatedly(final Runnable task, final String taskName, final long initialDelay, final long delayBetweenInvocations, final TimeUnit timeUnit) {
if (initialDelay > 0) {
try {
timeUnit.sleep(initialDelay);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}, initialDelay, delay, timeUnit);
}
while (!this.shutdown) {
try {
task.run();
} catch (final Exception e) {
LOG.error("Failed to run Framework Task {}", taskName, e);
}
try {
timeUnit.sleep(delayBetweenInvocations);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
/**
@ -145,7 +161,29 @@ public final class StandardProcessScheduler implements ProcessScheduler {
* @param task the task to perform
*/
public Future<?> submitFrameworkTask(final Runnable task) {
return frameworkTaskExecutor.submit(task);
final CompletableFuture<?> future = new CompletableFuture<>();
Thread.ofVirtual()
.name("Framework Task Thread-" + frameworkTaskThreadIndex.getAndIncrement())
.start(wrapTask(task, future));
return future;
}
private Runnable wrapTask(final Runnable task, final CompletableFuture<?> future) {
return () -> {
try {
task.run();
future.complete(null);
} catch (final Exception e) {
LOG.error("Encountered unexpected Exception when performing background Framework Task", e);
future.completeExceptionally(e);
} catch (final Throwable t) {
LOG.error("Encountered unexpected Exception when performing background Framework Task", t);
future.completeExceptionally(t);
throw t;
}
};
}
@Override
@ -172,6 +210,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override
public void shutdown() {
shutdown = true;
for (final SchedulingAgent schedulingAgent : strategyAgentMap.values()) {
try {
schedulingAgent.shutdown();
@ -181,7 +221,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
}
frameworkTaskExecutor.shutdown();
componentLifeCycleThreadPool.shutdown();
}

View File

@ -126,6 +126,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class NiFiClientUtil {
@ -696,7 +697,10 @@ public class NiFiClientUtil {
}
public void waitForProcessorState(final String processorId, final String expectedState) throws NiFiClientException, IOException, InterruptedException {
while (true) {
final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2);
logger.info("Waiting for Processor {} to reach state {}", processorId, expectedState);
while (System.currentTimeMillis() < maxTimestamp) {
final ProcessorEntity entity = getProcessorClient().getProcessor(processorId);
final String state = entity.getComponent().getState();
@ -714,6 +718,7 @@ public class NiFiClientUtil {
final ProcessorStatusSnapshotDTO snapshotDto = entity.getStatus().getAggregateSnapshot();
if (snapshotDto.getActiveThreadCount() == 0 && snapshotDto.getTerminatedThreadCount() == 0) {
logger.info("Processor {} has reached desired state of {}", processorId, expectedState);
return;
}
@ -722,7 +727,10 @@ public class NiFiClientUtil {
}
public ReportingTaskEntity waitForReportingTaskState(final String reportingTaskId, final String expectedState) throws NiFiClientException, IOException, InterruptedException {
while (true) {
final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L);
logger.info("Waiting for Reporting Task {} to reach desired state of {}", reportingTaskId, expectedState);
while (System.currentTimeMillis() < maxTimestamp) {
final ReportingTaskEntity entity = nifiClient.getReportingTasksClient().getReportingTask(reportingTaskId);
final String state = entity.getComponent().getState();
@ -735,15 +743,19 @@ public class NiFiClientUtil {
}
if ("RUNNING".equals(expectedState)) {
logger.info("Reporting task {} is now running", reportingTaskId);
return entity;
}
if (entity.getStatus().getActiveThreadCount() == 0) {
logger.info("Reporting task {} is now stopped", reportingTaskId);
return entity;
}
Thread.sleep(10L);
}
throw new IOException("Timed out waiting for Reporting Task " + reportingTaskId + " to reach state of " + expectedState);
}
public void waitForReportingTaskValid(final String reportingTaskId) throws NiFiClientException, IOException {
@ -868,9 +880,13 @@ public class NiFiClientUtil {
}
private void waitForNoRunningComponents(final String groupId) throws NiFiClientException, IOException {
while (true) {
final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2);
logger.info("Waiting for no more running components for group {}", groupId);
while (System.currentTimeMillis() < maxTimestamp) {
final boolean anyRunning = isAnyComponentRunning(groupId);
if (!anyRunning) {
logger.info("All Process Groups have finished");
return;
}
@ -906,6 +922,8 @@ public class NiFiClientUtil {
}
private void waitForProcessorsStopped(final String groupId) throws IOException, NiFiClientException {
logger.info("Waiting for processors in group {} to stop", groupId);
final ProcessGroupFlowEntity rootGroup = nifiClient.getFlowClient().getProcessGroup(groupId);
final FlowDTO rootFlowDTO = rootGroup.getProcessGroupFlow().getFlow();
for (final ProcessorEntity processor : rootFlowDTO.getProcessors()) {
@ -920,6 +938,8 @@ public class NiFiClientUtil {
for (final ProcessGroupEntity group : rootFlowDTO.getProcessGroups()) {
waitForProcessorsStopped(group.getComponent());
}
logger.info("All processors in group {} have stopped", groupId);
}
private void waitForProcessorsStopped(final ProcessGroupDTO group) throws IOException, NiFiClientException {
@ -956,6 +976,8 @@ public class NiFiClientUtil {
}
public ActivateControllerServicesEntity disableControllerServices(final String groupId, final boolean recurse) throws NiFiClientException, IOException {
logger.info("Starting disableControllerServices for group {}, recurse={}", groupId, recurse);
final ActivateControllerServicesEntity activateControllerServicesEntity = new ActivateControllerServicesEntity();
activateControllerServicesEntity.setId(groupId);
activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_DISABLED);
@ -973,6 +995,7 @@ public class NiFiClientUtil {
}
}
logger.info("Finished disableControllerServices for group {}", groupId);
return activateControllerServices;
}
@ -998,7 +1021,10 @@ public class NiFiClientUtil {
}
public void waitForControllerServiceRunStatus(final String id, final String requestedRunStatus) throws NiFiClientException, IOException {
while (true) {
final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L);
logger.info("Waiting for Controller Service {} to have a Run Status of {}", id, requestedRunStatus);
while (System.currentTimeMillis() < maxTimestamp) {
final ControllerServiceEntity serviceEntity = nifiClient.getControllerServicesClient().getControllerService(id);
final String serviceState = serviceEntity.getComponent().getState();
if (requestedRunStatus.equals(serviceState)) {
@ -1029,7 +1055,9 @@ public class NiFiClientUtil {
}
public void waitForControllerServiceState(final String groupId, final String desiredState, final Collection<String> serviceIdsOfInterest) throws NiFiClientException, IOException {
while (true) {
final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L);
while (System.currentTimeMillis() < maxTimestamp) {
final List<ControllerServiceEntity> nonDisabledServices = getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest);
if (nonDisabledServices.isEmpty()) {
logger.info("Process Group [{}] Controller Services have desired state [{}]", groupId, desiredState);
@ -1049,7 +1077,9 @@ public class NiFiClientUtil {
}
public void waitForControllerServiceValidationStatus(final String controllerServiceId, final String validationStatus) throws NiFiClientException, IOException {
while (true) {
final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L);
while (System.currentTimeMillis() < maxTimestamp) {
final ControllerServiceEntity controllerServiceEntity = nifiClient.getControllerServicesClient().getControllerService(controllerServiceId);
final String currentValidationStatus = controllerServiceEntity.getComponent().getValidationStatus();
if (validationStatus.equals(currentValidationStatus)) {
@ -1070,7 +1100,9 @@ public class NiFiClientUtil {
}
public void waitForReportingTaskValidationStatus(final String reportingTaskId, final String validationStatus) throws NiFiClientException, IOException {
while (true) {
final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L);
while (System.currentTimeMillis() < maxTimestamp) {
final ReportingTaskEntity reportingTaskEntity = nifiClient.getReportingTasksClient().getReportingTask(reportingTaskId);
final String currentValidationStatus = reportingTaskEntity.getStatus().getValidationStatus();
if (validationStatus.equalsIgnoreCase(currentValidationStatus)) {
@ -1306,11 +1338,17 @@ public class NiFiClientUtil {
public DropRequestEntity emptyQueue(final String connectionId) throws NiFiClientException, IOException {
final ConnectionClient connectionClient = getConnectionClient();
DropRequestEntity requestEntity;
while (true) {
final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L);
DropRequestEntity requestEntity = null;
while (System.currentTimeMillis() < maxTimestamp) {
requestEntity = connectionClient.emptyQueue(connectionId);
try {
while (requestEntity.getDropRequest().getPercentCompleted() < 100) {
if (System.currentTimeMillis() > maxTimestamp) {
throw new IOException("Timed out waiting for queue " + connectionId + " to empty");
}
try {
Thread.sleep(10L);
} catch (final InterruptedException ie) {

View File

@ -99,7 +99,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
private static final NiFiInstanceCache instanceCache = new NiFiInstanceCache();
static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> instanceCache.shutdown()));
Runtime.getRuntime().addShutdownHook(new Thread(instanceCache::shutdown));
}
private TestInfo testInfo;
@ -107,6 +107,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
@BeforeEach
public void setup(final TestInfo testInfo) throws IOException {
this.testInfo = testInfo;
final String testClassName = testInfo.getTestClass().map(Class::getSimpleName).orElse("<Unknown Test Class>");
final String friendlyTestName = testClassName + ":" + testInfo.getDisplayName();
logger.info("Beginning Test {}", friendlyTestName);
@ -133,21 +134,24 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
return true;
}
protected TestInfo getTestInfo() {
return testInfo;
}
@AfterAll
public static void cleanup() {
logger.info("Beginning cleanup");
final NiFiInstance nifi = nifiRef.get();
nifiRef.set(null);
if (nifi != null) {
instanceCache.stopOrRecycle(nifi);
}
logger.info("Finished cleanup");
}
@AfterEach
public void teardown() throws Exception {
logger.info("Beginning teardown");
try {
Exception destroyFlowFailure = null;
@ -182,6 +186,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
if (nifiClient != null) {
nifiClient.close();
}
logger.info("Finished teardown");
}
}
@ -230,6 +236,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
}
protected void destroyFlow() throws NiFiClientException, IOException, InterruptedException {
logger.info("Starting destroyFlow");
getClientUtil().stopProcessGroupComponents("root");
getClientUtil().disableControllerServices("root", true);
getClientUtil().stopReportingTasks();
@ -238,6 +246,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
getClientUtil().deleteAll("root");
getClientUtil().deleteControllerLevelServices();
getClientUtil().deleteReportingTasks();
logger.info("Finished destroyFlow");
}
protected void waitForAllNodesConnected() {
@ -273,7 +283,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
}
if (System.currentTimeMillis() > maxTime) {
throw new RuntimeException("Waited up to 60 seconds for both nodes to connect but only " + connectedNodeCount + " nodes connected");
throw new RuntimeException("Waited up to 60 seconds for all nodes to connect but only " + connectedNodeCount + " nodes connected");
}
try {
@ -569,7 +579,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
}
protected void waitForCoordinatorElected() throws InterruptedException {
waitFor(() -> isCoordinatorElected());
waitFor(this::isCoordinatorElected);
}
protected boolean isCoordinatorElected() throws NiFiClientException, IOException {

View File

@ -37,6 +37,7 @@ import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -234,6 +235,8 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
}
private void waitForStartup() throws IOException {
final long timeoutMillis = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5L);
try (final NiFiClient client = createClient()) {
while (true) {
try {
@ -241,6 +244,10 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory
logger.info("NiFi Startup Completed [{}]", instanceDirectory.getName());
return;
} catch (final Exception e) {
if (System.currentTimeMillis() > timeoutMillis) {
throw new IOException("After waiting 5 minutes, NiFi instance still has not started");
}
try {
Thread.sleep(1000L);
} catch (InterruptedException ex) {

View File

@ -27,6 +27,8 @@ import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
@ -37,10 +39,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
private static final Logger logger = LoggerFactory.getLogger(SingleFlowFileConcurrencyIT.class);
@Test
public void testSingleConcurrency() throws NiFiClientException, IOException, InterruptedException {
logger.info("Beginning test testSingleConcurrency");
final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root");
final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId());
final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId());
@ -94,11 +98,15 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
// Ensure that 3 FlowFiles are queued up for Terminate
waitForQueueCount(outputToTerminate.getId(), 3);
logger.info("Finished test testSingleConcurrency");
}
@Test
public void testSingleConcurrencyAndBatchOutput() throws NiFiClientException, IOException, InterruptedException {
logger.info("Beginning test testSingleConcurrencyAndBatchOutput");
final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root");
final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId());
final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId());
@ -163,11 +171,15 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
final Map<String, String> secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes();
assertEquals("1", secondOutAttributes.get("batch.output.Out"));
assertEquals("1", secondOutAttributes.get("batch.output.Out2"));
logger.info("Finished test testSingleConcurrencyAndBatchOutput");
}
@Test
public void testBatchOutputHasCorrectNumbersOnRestart() throws NiFiClientException, IOException, InterruptedException {
logger.info("Beginning test testBatchOutputHasCorrectNumbersOnRestart");
final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root");
final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId());
final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId());
@ -238,6 +250,8 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
final Map<String, String> secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes();
assertEquals("1", secondOutAttributes.get("batch.output.Out"));
assertEquals("1", secondOutAttributes.get("batch.output.Out2"));
logger.info("Finished test testBatchOutputHasCorrectNumbersOnRestart");
}
}