NIFI-1464 life-cycle refactoring part-2

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Oleg Zhurakousky 2016-03-14 13:29:41 -04:00 committed by joewitt
parent 56f79e1e85
commit c7df94e00f
8 changed files with 98 additions and 46 deletions

View File

@ -155,7 +155,7 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
* execute upon successful start of the Processor
*/
public abstract <T extends ProcessContext & ControllerServiceLookup> void start(ScheduledExecutorService scheduler,
long administrativeYieldMillis, T processContext, Runnable schedulingAgentCallback);
long administrativeYieldMillis, T processContext, SchedulingAgentCallback schedulingAgentCallback);
/**
* Will stop the {@link Processor} represented by this {@link ProcessorNode}.

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
public interface SchedulingAgentCallback {
void postMonitor();
Future<?> invokeMonitoringTask(Callable<?> task);
void trigger();
}

View File

@ -1231,41 +1231,41 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
*/
@Override
public <T extends ProcessContext & ControllerServiceLookup> void start(final ScheduledExecutorService taskScheduler,
final long administrativeYieldMillis, final T processContext, final Runnable schedulingAgentCallback) {
final long administrativeYieldMillis, final T processContext, final SchedulingAgentCallback schedulingAgentCallback) {
if (!this.isValid()) {
throw new IllegalStateException( "Processor " + this.getName() + " is not in a valid state due to " + this.getValidationErrors());
}
final ProcessorLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
if (this.scheduledState.compareAndSet(ScheduledState.STOPPED, ScheduledState.STARTING)) { // will ensure that the Processor represented by this node can only be started once
final Runnable startProcRunnable = new Runnable() {
@Override
public void run() {
try {
final SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, getControllerServiceProvider(),
StandardProcessorNode.this, processContext.getStateManager());
invokeTaskAsCancelableFuture(taskScheduler, new Callable<Void>() {
@SuppressWarnings("deprecation")
invokeTaskAsCancelableFuture(schedulingAgentCallback, new Callable<Void>() {
@Override
public Void call() throws Exception {
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, processor, schedulingContext);
SchedulingContext schedulingContext = new StandardSchedulingContext(processContext,
getControllerServiceProvider(), StandardProcessorNode.this,
processContext.getStateManager());
ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class,
org.apache.nifi.processor.annotation.OnScheduled.class, processor,
schedulingContext);
return null;
}
}
});
if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
schedulingAgentCallback.run(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
schedulingAgentCallback.trigger(); // callback provided by StandardProcessScheduler to essentially initiate component's onTrigger() cycle
} else { // can only happen if stopProcessor was called before service was transitioned to RUNNING state
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
}
scheduledState.set(ScheduledState.STOPPED);
}
} catch (final Exception e) {
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ProcessorLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
procLog.error( "{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}",
new Object[] { StandardProcessorNode.this.getProcessor(), cause, administrativeYieldMillis + " milliseconds" }, cause);
LOG.error("Failed to invoke @OnScheduled method due to {}", cause.toString(), cause);
@ -1281,7 +1281,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
};
taskScheduler.execute(startProcRunnable);
} else {
LOG.warn("Can not start Processor since it's already in the process of being started or it is DISABLED");
String procName = this.processor.getClass().getSimpleName();
LOG.warn("Can not start '" + procName
+ "' since it's already in the process of being started or it is DISABLED - "
+ scheduledState.get());
procLog.warn("Can not start '" + procName
+ "' since it's already in the process of being started or it is DISABLED - "
+ scheduledState.get());
}
}
@ -1313,27 +1319,23 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public <T extends ProcessContext & ControllerServiceLookup> void stop(final ScheduledExecutorService scheduler,
final T processContext, final Callable<Boolean> activeThreadMonitorCallback) {
LOG.info("Stopping processor: " + this.processor.getClass());
if (this.scheduledState.compareAndSet(ScheduledState.RUNNING, ScheduledState.STOPPING)) { // will ensure that the Processor represented by this node can only be stopped once
invokeTaskAsCancelableFuture(scheduler, new Callable<Void>() {
@Override
public Void call() throws Exception {
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
return null;
}
}
});
// will continue to monitor active threads, invoking OnStopped once
// there are none
scheduler.execute(new Runnable() {
boolean unscheduled = false;
@Override
public void run() {
if (!this.unscheduled){
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
this.unscheduled = true;
}
try {
if (activeThreadMonitorCallback.call()) {
try (final NarCloseable nc = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
}
scheduledState.set(ScheduledState.STOPPED);
} else {
scheduler.schedule(this, 100, TimeUnit.MILLISECONDS);
@ -1382,31 +1384,32 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
* be logged (WARN) informing a user so further actions could be taken.
* </p>
*/
private void invokeTaskAsCancelableFuture(ScheduledExecutorService taskScheduler, Callable<Void> task) {
Future<Void> executionResult = taskScheduler.submit(task);
private <T> void invokeTaskAsCancelableFuture(SchedulingAgentCallback callback, Callable<T> task) {
String timeoutString = NiFiProperties.getInstance().getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
long onScheduleTimeout = timeoutString == null ? Long.MAX_VALUE
long onScheduleTimeout = timeoutString == null ? 60000
: FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
Future<?> taskFuture = callback.invokeMonitoringTask(task);
try {
executionResult.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
taskFuture.get(onScheduleTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.warn("Thread was interrupted while waiting for processor '" + this.processor.getClass().getSimpleName()
+ "' lifecycle operation (OnScheduled or OnUnscheduled) to finish.");
+ "' lifecycle OnScheduled operation to finish.");
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while executing one of processor's OnScheduled tasks.", e);
} catch (TimeoutException e) {
executionResult.cancel(true);
LOG.warn("Timed out while waiting for lifecycle operation (OnScheduled or OnUnscheduled) of '"
taskFuture.cancel(true);
LOG.warn("Timed out while waiting for OnScheduled of '"
+ this.processor.getClass().getSimpleName()
+ "' processor to finish. An attempt is made to cancel the task via Thread.interrupt(). However it does not "
+ "guarantee that the task will be canceled since the code inside current lifecycle operation (OnScheduled or OnUnscheduled) may "
+ "guarantee that the task will be canceled since the code inside current OnScheduled operation may "
+ "have been written to ignore interrupts which may result in runaway thread which could lead to more issues "
+ "eventually requiring NiFi to be restarted. This is usually a bug in the target Processor '"
+ this.processor + "' that needs to be documented, reported and eventually fixed.");
throw new RuntimeException("Timed out while executing one of processor's OnScheduled task.", e);
} catch (ExecutionException e){
throw new RuntimeException(
"Failed while executing one of processor's lifecycle tasks (OnScheduled or OnUnscheduled).", e);
throw new RuntimeException("Failed while executing one of processor's OnScheduled task.", e);
} finally {
callback.postMonitor();
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.controller.scheduling;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.engine.FlowEngine;
/**
* Base implementation of the {@link SchedulingAgent} which encapsulates the
@ -33,6 +34,12 @@ import org.apache.nifi.controller.ReportingTaskNode;
*/
abstract class AbstractSchedulingAgent implements SchedulingAgent {
protected final FlowEngine flowEngine;
protected AbstractSchedulingAgent(FlowEngine flowEngine) {
this.flowEngine = flowEngine;
}
@Override
public void schedule(Connectable connectable, ScheduleState scheduleState) {
scheduleState.setScheduled(true);

View File

@ -55,7 +55,6 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
private final FlowEngine flowEngine;
private final ControllerServiceProvider serviceProvider;
private final StateManagerProvider stateManagerProvider;
private final EventDrivenWorkerQueue workerQueue;
@ -70,7 +69,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider,
final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) {
this.flowEngine = flowEngine;
super(flowEngine);
this.serviceProvider = serviceProvider;
this.stateManagerProvider = stateManagerProvider;
this.workerQueue = workerQueue;

View File

@ -49,16 +49,15 @@ public class QuartzSchedulingAgent extends AbstractSchedulingAgent {
private final FlowController flowController;
private final ProcessContextFactory contextFactory;
private final FlowEngine flowEngine;
private final StringEncryptor encryptor;
private volatile String adminYieldDuration = "1 sec";
private final Map<Object, List<AtomicBoolean>> canceledTriggers = new HashMap<>();
public QuartzSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor enryptor) {
super(flowEngine);
this.flowController = flowController;
this.contextFactory = contextFactory;
this.flowEngine = flowEngine;
this.encryptor = enryptor;
}

View File

@ -23,7 +23,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -42,6 +42,7 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.service.ControllerServiceNode;
@ -79,7 +80,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>();
// thread pool for starting/stopping components
private final ScheduledExecutorService componentLifeCycleThreadPool = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
private final ScheduledExecutorService componentLifeCycleThreadPool = new FlowEngine(8, "StandardProcessScheduler", true);
private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(8, "StandardProcessScheduler", true);
private final StringEncryptor encryptor;
@ -160,6 +162,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
frameworkTaskExecutor.shutdown();
componentLifeCycleThreadPool.shutdown();
componentMonitoringThreadPool.shutdown();
}
@Override
@ -295,14 +298,27 @@ public final class StandardProcessScheduler implements ProcessScheduler {
StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider,
this.encryptor, getStateManager(procNode.getIdentifier()));
final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode));
Runnable schedulingAgentCallback = new Runnable() {
SchedulingAgentCallback callback = new SchedulingAgentCallback() {
@Override
public void run() {
public void trigger() {
getSchedulingAgent(procNode).schedule(procNode, scheduleState);
heartbeater.heartbeat();
}
@Override
public Future<?> invokeMonitoringTask(Callable<?> task) {
scheduleState.incrementActiveThreadCount();
return componentMonitoringThreadPool.submit(task);
}
@Override
public void postMonitor() {
scheduleState.decrementActiveThreadCount();
}
};
procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, schedulingAgentCallback);
procNode.start(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, processContext, callback);
}
/**
@ -317,6 +333,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
StandardProcessContext processContext = new StandardProcessContext(procNode, this.controllerServiceProvider,
this.encryptor, getStateManager(procNode.getIdentifier()));
final ScheduleState state = getScheduleState(procNode);
procNode.stop(this.componentLifeCycleThreadPool, processContext, new Callable<Boolean>() {
@Override
public Boolean call() {

View File

@ -48,15 +48,14 @@ public class TimerDrivenSchedulingAgent extends AbstractSchedulingAgent {
private final long noWorkYieldNanos;
private final FlowController flowController;
private final FlowEngine flowEngine;
private final ProcessContextFactory contextFactory;
private final StringEncryptor encryptor;
private volatile String adminYieldDuration = "1 sec";
public TimerDrivenSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final ProcessContextFactory contextFactory, final StringEncryptor encryptor) {
super(flowEngine);
this.flowController = flowController;
this.flowEngine = flowEngine;
this.contextFactory = contextFactory;
this.encryptor = encryptor;