NIFI-2484: Ensure that if a Processor throws an Exception from an @OnScheduled method that @OnStopped methods get called

This closes #791.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2016-08-04 13:05:03 -04:00 committed by Bryan Bende
parent aca2a92d6b
commit 7ba10a6dea
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
3 changed files with 187 additions and 0 deletions

View File

@ -1258,6 +1258,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
if (scheduledState.get() != ScheduledState.STOPPING) { // make sure we only continue retry loop if STOP action wasn't initiated
taskScheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
} else {

View File

@ -219,6 +219,9 @@ public final class StandardProcessScheduler implements ProcessScheduler {
+ "ReportingTask and will attempt to schedule it again after {}",
new Object[] { reportingTask, e.toString(), administrativeYieldDuration }, e);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, taskNode.getConfigurationContext());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, taskNode.getConfigurationContext());
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {

View File

@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import static org.junit.Assert.assertEquals;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.expression.ExpressionLanguageCompiler;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockPropertyValue;
import org.junit.Assert;
import org.junit.Test;
public class TestStandardProcessorNode {
@Test(timeout = 10000)
public void testStart() throws InterruptedException {
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
final ProcessorThatThrowsExceptionOnScheduled processor = new ProcessorThatThrowsExceptionOnScheduled();
final String uuid = UUID.randomUUID().toString();
final StandardProcessorNode procNode = new StandardProcessorNode(processor, uuid, createValidationContextFactory(), null, null);
final ScheduledExecutorService taskScheduler = new FlowEngine(2, "TestStandardProcessorNode", true);
final StandardProcessContext processContext = new StandardProcessContext(procNode, null, null, null, null);
final SchedulingAgentCallback schedulingAgentCallback = new SchedulingAgentCallback() {
@Override
public void postMonitor() {
}
@Override
public Future<?> invokeMonitoringTask(final Callable<?> task) {
return taskScheduler.submit(task);
}
@Override
public void trigger() {
Assert.fail("Should not have completed");
}
};
procNode.start(taskScheduler, 20000L, processContext, schedulingAgentCallback);
Thread.sleep(1000L);
assertEquals(1, processor.onScheduledCount);
assertEquals(1, processor.onUnscheduledCount);
assertEquals(1, processor.onStoppedCount);
}
private ValidationContextFactory createValidationContextFactory() {
return new ValidationContextFactory() {
@Override
public ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData, String groupId, String componentId) {
return new ValidationContext() {
@Override
public ControllerServiceLookup getControllerServiceLookup() {
return null;
}
@Override
public ValidationContext getControllerServiceValidationContext(ControllerService controllerService) {
return null;
}
@Override
public ExpressionLanguageCompiler newExpressionLanguageCompiler() {
return null;
}
@Override
public PropertyValue getProperty(PropertyDescriptor property) {
return newPropertyValue(properties.get(property));
}
@Override
public PropertyValue newPropertyValue(String value) {
return new MockPropertyValue(value);
}
@Override
public Map<PropertyDescriptor, String> getProperties() {
return Collections.unmodifiableMap(properties);
}
@Override
public String getAnnotationData() {
return null;
}
@Override
public boolean isValidationRequired(ControllerService service) {
return false;
}
@Override
public boolean isExpressionLanguagePresent(String value) {
return false;
}
@Override
public boolean isExpressionLanguageSupported(String propertyName) {
return false;
}
@Override
public String getProcessGroupIdentifier() {
return groupId;
}
};
}
@Override
public ValidationContext newValidationContext(Set<String> serviceIdentifiersToNotValidate, Map<PropertyDescriptor, String> properties, String annotationData, String groupId,
String componentId) {
return newValidationContext(properties, annotationData, groupId, componentId);
}
};
}
public static class ProcessorThatThrowsExceptionOnScheduled extends AbstractProcessor {
private int onScheduledCount = 0;
private int onUnscheduledCount = 0;
private int onStoppedCount = 0;
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
@OnScheduled
public void onScheduled() {
onScheduledCount++;
throw new ProcessException("OnScheduled called - Unit Test throws Exception intentionally");
}
@OnUnscheduled
public void onUnscheduled() {
onUnscheduledCount++;
}
@OnStopped
public void onStopped() {
onStoppedCount++;
}
}
}