diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 7e2c51b8f0..2c0c99be55 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -320,15 +320,23 @@ public final class StandardProcessScheduler implements ProcessScheduler { } } - attemptOnScheduled: - while (true) { + boolean needSleep = false; + attemptOnScheduled: while (true) { try { + // We put this here so that we can sleep outside of the synchronized block, as + // we can't hold the synchronized block the whole time. If we do hold it the whole time, + // we will not be able to stop the controller service if it has trouble starting because + // the call to disable the service will block when attempting to synchronize on scheduleState. + if (needSleep) { + Thread.sleep(administrativeYieldMillis); + } + synchronized (scheduleState) { for (final String serviceId : serviceIds) { final boolean enabled = processContext.isControllerServiceEnabled(serviceId); if (!enabled) { LOG.debug("Controller Service with ID {} is not yet enabled, so will not start {} yet", serviceId, procNode); - Thread.sleep(administrativeYieldMillis); + needSleep = true; continue attemptOnScheduled; } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/NoStartService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/NoStartService.java new file mode 100644 index 0000000000..fac5531a50 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/NoStartService.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.scheduling; + +import org.apache.nifi.controller.ControllerService; + +public interface NoStartService extends ControllerService { + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/NoStartServiceImpl.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/NoStartServiceImpl.java new file mode 100644 index 0000000000..d6a47be63d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/NoStartServiceImpl.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.scheduling; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.controller.AbstractControllerService; + +public class NoStartServiceImpl extends AbstractControllerService implements NoStartService { + + @OnEnabled + public void failIntentionally() { + throw new RuntimeException("Failed intentionally for unit test"); + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index 72eaa84531..d8c5d084c8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -18,19 +18,31 @@ package org.apache.nifi.controller.scheduling; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.Heartbeater; +import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.StandardProcessorNode; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.reporting.StandardReportingInitializationContext; import org.apache.nifi.controller.reporting.StandardReportingTaskNode; +import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.controller.service.StandardControllerServiceProvider; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.StandardValidationContextFactory; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.ReportingContext; @@ -84,6 +96,26 @@ public class TestStandardProcessScheduler { assertTrue("After unscheduling Reporting Task, task ran an additional " + attemptsAfterStop + " times", attemptsAfterStop <= 1); } + @Test(timeout = 6000) + public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException { + final Processor proc = new ServiceReferencingProcessor(); + + final ControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(scheduler, null); + final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true); + final ProcessorNode procNode = new StandardProcessorNode(proc, UUID.randomUUID().toString(), + new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider); + + procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier()); + + scheduler.enableControllerService(service); + scheduler.startProcessor(procNode); + + Thread.sleep(1000L); + + scheduler.stopProcessor(procNode); + scheduler.disableControllerService(service); + } + private class TestReportingTask extends AbstractReportingTask { private final AtomicBoolean failOnScheduled = new AtomicBoolean(true); @@ -104,4 +136,24 @@ public class TestStandardProcessScheduler { triggerCount.getAndIncrement(); } } + + + private static class ServiceReferencingProcessor extends AbstractProcessor { + static final PropertyDescriptor SERVICE_DESC = new PropertyDescriptor.Builder() + .name("service") + .identifiesControllerService(NoStartService.class) + .required(true) + .build(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(SERVICE_DESC); + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + } + } }