NIFI-5686: Updated StandardProcessScheduler so that if it fails to schedule a Reporting Task, it re-schedules the @OnScheduled task instead of looping and calling Thread.sleep. As it was, the single-threaded Process Scheduler was, when calling ProcessScheduler.unschedule(), the unschedule task was not executing because the schedule task was using the only thread. But switching the logic to schedule the task for later and return, instead of calling Thread.sleep and looping, we are able to avoid blocking the one thread in the thread pool. Also, performed some trivial code cleanup and updated erroneous links in Java-docs.

NIFI-5686: Fixed unit test in TestSocketLoadBalancedFlowFileQueue; renamed TestProcessorLifecycle to ProcessorLifecycleIT as it is testing integration between many components and largely focuses on high numbers of concurrent tasks to see if it can trigger any threading bugs that may get introduced

NIFI-5686: Extended unit test timeouts
Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #3062
This commit is contained in:
Mark Payne 2018-10-11 11:45:41 -04:00 committed by Matthew Burgess
parent 218063a0b5
commit 32db43b306
5 changed files with 130 additions and 137 deletions

View File

@ -16,19 +16,6 @@
*/
package org.apache.nifi.controller.scheduling;
import static java.util.Objects.requireNonNull;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@ -53,6 +40,7 @@ import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
@ -64,9 +52,21 @@ import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.Objects.requireNonNull;
/**
* Responsible for scheduling Processors, Ports, and Funnels to run at regular
* intervals
* Responsible for scheduling Processors, Ports, and Funnels to run at regular intervals
*/
public final class StandardProcessScheduler implements ProcessScheduler {
@ -196,44 +196,39 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final long lastStopTime = lifecycleState.getLastStopTime();
final ReportingTask reportingTask = taskNode.getReportingTask();
// Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time.
while (true) {
try {
synchronized (lifecycleState) {
// if no longer scheduled to run, then we're finished. This can happen, for example,
// if the @OnScheduled method throws an Exception and the user stops the reporting task
// while we're administratively yielded.
// we also check if the schedule state's last start time is equal to what it was before.
// if not, then means that the reporting task has been stopped and started again, so we should just
// bail; another thread will be responsible for invoking the @OnScheduled methods.
if (!lifecycleState.isScheduled() || lifecycleState.getLastStopTime() != lastStopTime) {
return;
}
try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
}
agent.schedule(taskNode, lifecycleState);
// Attempt to start the Reporting Task, and if we fail re-schedule the task again after #administrativeYielMillis milliseconds
try {
synchronized (lifecycleState) {
// if no longer scheduled to run, then we're finished. This can happen, for example,
// if the @OnScheduled method throws an Exception and the user stops the reporting task
// while we're administratively yielded.
// we also check if the schedule state's last start time is equal to what it was before.
// if not, then means that the reporting task has been stopped and started again, so we should just
// bail; another thread will be responsible for invoking the @OnScheduled methods.
if (!lifecycleState.isScheduled() || lifecycleState.getLastStopTime() != lastStopTime) {
LOG.debug("Did not complete invocation of @OnScheduled task for {} but Lifecycle State is no longer scheduled. Will not attempt to invoke task anymore", reportingTask);
return;
}
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this "
+ "ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, taskNode.getConfigurationContext());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, taskNode.getConfigurationContext());
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
}
agent.schedule(taskNode, lifecycleState);
}
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
componentLog.error("Failed to invoke @OnScheduled method due to {}", cause);
LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this "
+ "ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, taskNode.getConfigurationContext());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, taskNode.getConfigurationContext());
componentLifeCycleThreadPool.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
}
}
};
@ -262,10 +257,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
synchronized (lifecycleState) {
lifecycleState.setScheduled(false);
try {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext);
}
try (final NarCloseable x = NarCloseable.withComponentNarLoader(reportingTask.getClass(), reportingTask.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext);
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
@ -274,11 +267,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
reportingTask, cause.toString(), administrativeYieldDuration);
LOG.error("", cause);
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
}
}
agent.unschedule(taskNode, lifecycleState);
@ -295,10 +283,10 @@ public final class StandardProcessScheduler implements ProcessScheduler {
/**
* Starts the given {@link Processor} by invoking its
* {@link ProcessorNode#start(ScheduledExecutorService, long, org.apache.nifi.processor.ProcessContext, Runnable)}
* {@link ProcessorNode#start(ScheduledExecutorService, long, ProcessContext, SchedulingAgentCallback, boolean)}
* method.
*
* @see StandardProcessorNode#start(ScheduledExecutorService, long, org.apache.nifi.processor.ProcessContext, Runnable)
* @see StandardProcessorNode#start(ScheduledExecutorService, long, ProcessContext, SchedulingAgentCallback, boolean)
*/
@Override
public synchronized CompletableFuture<Void> startProcessor(final ProcessorNode procNode, final boolean failIfStopping) {
@ -335,10 +323,10 @@ public final class StandardProcessScheduler implements ProcessScheduler {
/**
* Stops the given {@link Processor} by invoking its
* {@link ProcessorNode#stop(ScheduledExecutorService, org.apache.nifi.processor.ProcessContext, SchedulingAgent, LifecycleState)}
* {@link ProcessorNode#stop(ProcessScheduler, ScheduledExecutorService, ProcessContext, SchedulingAgent, LifecycleState)}
* method.
*
* @see StandardProcessorNode#stop(ScheduledExecutorService, org.apache.nifi.processor.ProcessContext, SchedulingAgent, LifecycleState)
* @see StandardProcessorNode#stop(ProcessScheduler, ScheduledExecutorService, ProcessContext, SchedulingAgent, LifecycleState)
*/
@Override
public synchronized CompletableFuture<Void> stopProcessor(final ProcessorNode procNode) {

View File

@ -96,12 +96,12 @@ public class TestSocketLoadBalancedFlowFileQueue {
swapManager = new MockSwapManager();
eventReporter = EventReporter.NO_OP;
final NodeIdentifier localNodeIdentifier = createNodeIdentifier();
final NodeIdentifier localNodeIdentifier = createNodeIdentifier("00000000-0000-0000-0000-000000000000");
nodeIds = new ArrayList<>();
nodeIds.add(localNodeIdentifier);
nodeIds.add(createNodeIdentifier());
nodeIds.add(createNodeIdentifier());
nodeIds.add(createNodeIdentifier("11111111-1111-1111-1111-111111111111"));
nodeIds.add(createNodeIdentifier("22222222-2222-2222-2222-222222222222"));
Mockito.doAnswer(new Answer<Set<NodeIdentifier>>() {
@Override
@ -128,7 +128,11 @@ public class TestSocketLoadBalancedFlowFileQueue {
}
private NodeIdentifier createNodeIdentifier() {
return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", nodePort++, "localhost", nodePort++,
return createNodeIdentifier(UUID.randomUUID().toString());
}
private NodeIdentifier createNodeIdentifier(final String uuid) {
return new NodeIdentifier(uuid, "localhost", nodePort++, "localhost", nodePort++,
"localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet());
}
@ -339,9 +343,12 @@ public class TestSocketLoadBalancedFlowFileQueue {
}
}
@Test(timeout = 10000)
@Test(timeout = 30000)
public void testChangeInClusterTopologyTriggersRebalanceOnlyOnRemovedNodeIfNecessary() throws InterruptedException {
// Create partitioner that sends first 2 FlowFiles to Partition 0, next 2 to Partition 1, and then next 4 to Partition 3.
// Create partitioner that sends first 1 FlowFile to Partition 0, next to Partition 2, and then next 2 to Partition 2.
// Then, cycle back to partitions 0 and 1. This will result in partitions 0 & 1 getting 1 FlowFile each and Partition 2
// getting 2 FlowFiles. Then, when Partition 2 is removed, those 2 FlowFiles will be rebalanced to Partitions 0 and 1.
queue.setFlowFilePartitioner(new StaticSequencePartitioner(new int[] {0, 1, 2, 2, 0, 1}, false));
for (int i = 0; i < 4; i++) {
@ -359,6 +366,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
final int[] expectedPartitionSizes = new int[] {2, 2};
final int[] partitionSizes = new int[2];
while (!Arrays.equals(expectedPartitionSizes, partitionSizes)) {
Thread.sleep(10L);

View File

@ -81,9 +81,9 @@ import static org.mockito.Mockito.mock;
* Validate Processor's life-cycle operation within the context of
* {@link FlowController} and {@link StandardProcessScheduler}
*/
public class TestProcessorLifecycle {
public class ProcessorLifecycleIT {
private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class);
private static final Logger logger = LoggerFactory.getLogger(ProcessorLifecycleIT.class);
private static final long SHORT_DELAY_TOLERANCE = 10000L;
private static final long MEDIUM_DELAY_TOLERANCE = 15000L;
private static final long LONG_DELAY_TOLERANCE = 20000L;

View File

@ -16,26 +16,6 @@
*/
package org.apache.nifi.controller.scheduling;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
@ -53,7 +33,6 @@ import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.TerminationAwareLogger;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.scheduling.processors.FailOnScheduledProcessor;
@ -92,6 +71,28 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
public class TestStandardProcessScheduler {
private StandardProcessScheduler scheduler = null;
@ -136,7 +137,7 @@ public class TestStandardProcessScheduler {
final ConcurrentMap<String, ProcessorNode> processorMap = new ConcurrentHashMap<>();
Mockito.doAnswer(new Answer<ProcessorNode>() {
@Override
public ProcessorNode answer(InvocationOnMock invocation) throws Throwable {
public ProcessorNode answer(InvocationOnMock invocation) {
final String id = invocation.getArgumentAt(0, String.class);
return processorMap.get(id);
}
@ -144,7 +145,7 @@ public class TestStandardProcessScheduler {
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
public Object answer(InvocationOnMock invocation) {
final ProcessorNode procNode = invocation.getArgumentAt(0, ProcessorNode.class);
processorMap.putIfAbsent(procNode.getIdentifier(), procNode);
return null;
@ -169,8 +170,7 @@ public class TestStandardProcessScheduler {
* run. This unit test is intended to verify that we have this resolved.
*/
@Test
@Ignore("This test appears to be buggy")
public void testReportingTaskDoesntKeepRunningAfterStop() throws InterruptedException, InitializationException {
public void testReportingTaskDoesntKeepRunningAfterStop() throws InterruptedException {
taskNode.performValidation();
scheduler.schedule(taskNode);
@ -222,15 +222,15 @@ public class TestStandardProcessScheduler {
scheduler.stopProcessor(procNode);
assertTrue(service.isActive());
assertTrue(service.getState() == ControllerServiceState.ENABLING);
assertSame(service.getState(), ControllerServiceState.ENABLING);
scheduler.disableControllerService(service);
assertTrue(service.getState() == ControllerServiceState.DISABLING);
assertSame(service.getState(), ControllerServiceState.DISABLING);
assertFalse(service.isActive());
while (service.getState() != ControllerServiceState.DISABLED) {
Thread.sleep(5L);
}
assertTrue(service.getState() == ControllerServiceState.DISABLED);
assertSame(service.getState(), ControllerServiceState.DISABLED);
}
public class TestReportingTask extends AbstractReportingTask {
@ -414,7 +414,7 @@ public class TestStandardProcessScheduler {
* services, shut down processors etc) before shutting down itself
*/
assertTrue(serviceNode.isActive());
assertTrue(serviceNode.getState() == ControllerServiceState.ENABLING);
assertSame(serviceNode.getState(), ControllerServiceState.ENABLING);
}
/**
@ -422,11 +422,11 @@ public class TestStandardProcessScheduler {
* be disabled. This test is set up in such way that disabling of the
* service could be initiated by both disable and enable methods. In other
* words it tests two conditions in
* {@link StandardControllerServiceNode#disable(java.util.concurrent.ScheduledExecutorService, Heartbeater)}
* {@link StandardControllerServiceNode#disable(ScheduledExecutorService)}
* where the disabling of the service can be initiated right there (if
* ENABLED), or if service is still enabling its disabling will be deferred
* to the logic in
* {@link StandardControllerServiceNode#enable(java.util.concurrent.ScheduledExecutorService, long, Heartbeater)}
* {@link StandardControllerServiceNode#enable(ScheduledExecutorService, long)}
* IN any even the resulting state of the service is DISABLED
*/
@Test
@ -455,7 +455,7 @@ public class TestStandardProcessScheduler {
});
Thread.sleep(100);
assertFalse(serviceNode.isActive());
assertTrue(serviceNode.getState() == ControllerServiceState.DISABLED);
assertEquals(ControllerServiceState.DISABLED, serviceNode.getState());
}
// need to sleep a while since we are emulating async invocations on

View File

@ -16,28 +16,16 @@
*/
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.google.api.client.util.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.http.HttpContextMap;
import org.apache.nifi.processors.standard.util.HTTPUtils;
@ -53,17 +41,26 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import com.google.api.client.util.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import okhttp3.MediaType;
import okhttp3.MultipartBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import static org.junit.Assert.assertEquals;
public class TestHandleHttpRequest {
@ -97,7 +94,7 @@ public class TestHandleHttpRequest {
return service.createSSLContext(SSLContextService.ClientAuth.WANT);
}
@Test(timeout=10000)
@Test(timeout=30000)
public void testRequestAddedToService() throws InitializationException, MalformedURLException, IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
runner.setProperty(HandleHttpRequest.PORT, "0");
@ -157,7 +154,7 @@ public class TestHandleHttpRequest {
}
@Test(timeout=10000)
@Test(timeout=30000)
public void testMultipartFormDataRequest() throws InitializationException, MalformedURLException, IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
runner.setProperty(HandleHttpRequest.PORT, "0");
@ -275,7 +272,7 @@ public class TestHandleHttpRequest {
}
}
@Test(timeout=10000)
@Test(timeout=30000)
public void testMultipartFormDataRequestFailToRegisterContext() throws InitializationException, MalformedURLException, IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
runner.setProperty(HandleHttpRequest.PORT, "0");
@ -365,7 +362,7 @@ public class TestHandleHttpRequest {
}
@Test(timeout=10000)
@Test(timeout=30000)
public void testFailToRegister() throws InitializationException, MalformedURLException, IOException, InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(HandleHttpRequest.class);
runner.setProperty(HandleHttpRequest.PORT, "0");