NIFI-862: Ensure that if we are unable to stop a controller service, we can still stop processors referencing it and the service itself

This commit is contained in:
Mark Payne 2015-08-17 20:16:28 -04:00
parent e409b6c5c1
commit f171756a88
4 changed files with 115 additions and 3 deletions

View File

@ -320,15 +320,23 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
}
attemptOnScheduled:
while (true) {
boolean needSleep = false;
attemptOnScheduled: while (true) {
try {
// We put this here so that we can sleep outside of the synchronized block, as
// we can't hold the synchronized block the whole time. If we do hold it the whole time,
// we will not be able to stop the controller service if it has trouble starting because
// the call to disable the service will block when attempting to synchronize on scheduleState.
if (needSleep) {
Thread.sleep(administrativeYieldMillis);
}
synchronized (scheduleState) {
for (final String serviceId : serviceIds) {
final boolean enabled = processContext.isControllerServiceEnabled(serviceId);
if (!enabled) {
LOG.debug("Controller Service with ID {} is not yet enabled, so will not start {} yet", serviceId, procNode);
Thread.sleep(administrativeYieldMillis);
needSleep = true;
continue attemptOnScheduled;
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.scheduling;
import org.apache.nifi.controller.ControllerService;
public interface NoStartService extends ControllerService {
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.scheduling;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.controller.AbstractControllerService;
public class NoStartServiceImpl extends AbstractControllerService implements NoStartService {
@OnEnabled
public void failIntentionally() {
throw new RuntimeException("Failed intentionally for unit test");
}
}

View File

@ -18,19 +18,31 @@ package org.apache.nifi.controller.scheduling;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
@ -84,6 +96,26 @@ public class TestStandardProcessScheduler {
assertTrue("After unscheduling Reporting Task, task ran an additional " + attemptsAfterStop + " times", attemptsAfterStop <= 1);
}
@Test(timeout = 6000)
public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException {
final Processor proc = new ServiceReferencingProcessor();
final ControllerServiceProvider serviceProvider = new StandardControllerServiceProvider(scheduler, null);
final ControllerServiceNode service = serviceProvider.createControllerService(NoStartServiceImpl.class.getName(), "service", true);
final ProcessorNode procNode = new StandardProcessorNode(proc, UUID.randomUUID().toString(),
new StandardValidationContextFactory(serviceProvider), scheduler, serviceProvider);
procNode.setProperty(ServiceReferencingProcessor.SERVICE_DESC.getName(), service.getIdentifier());
scheduler.enableControllerService(service);
scheduler.startProcessor(procNode);
Thread.sleep(1000L);
scheduler.stopProcessor(procNode);
scheduler.disableControllerService(service);
}
private class TestReportingTask extends AbstractReportingTask {
private final AtomicBoolean failOnScheduled = new AtomicBoolean(true);
@ -104,4 +136,24 @@ public class TestStandardProcessScheduler {
triggerCount.getAndIncrement();
}
}
private static class ServiceReferencingProcessor extends AbstractProcessor {
static final PropertyDescriptor SERVICE_DESC = new PropertyDescriptor.Builder()
.name("service")
.identifiesControllerService(NoStartService.class)
.required(true)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SERVICE_DESC);
return properties;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}
}