mirror of https://github.com/apache/nifi.git
NIFI-10262: Ensure that when an Exception is thrown from a Controller Service's @OnEnabled method that we properly handle that Exception and continue enabling the other services in the given collection of services
This closes #6236 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
7767a5b85d
commit
0497907829
|
@ -324,7 +324,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
|||
@Override
|
||||
public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
|
||||
if (!this.isActive()) {
|
||||
throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation().getIdentifier() + " because it is not enabled");
|
||||
return;
|
||||
}
|
||||
|
||||
final ControllerServiceReference references = getReferences();
|
||||
|
@ -608,16 +608,12 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
|
|||
|
||||
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
|
||||
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
|
||||
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
|
||||
LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
|
||||
componentLog.error("Failed to invoke @OnEnabled method", cause);
|
||||
invokeDisable(configContext);
|
||||
|
||||
if (isActive()) {
|
||||
scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getControllerServiceImplementation().getClass(), getIdentifier())) {
|
||||
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
|
||||
}
|
||||
stateTransition.disable();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,9 +48,11 @@ import java.util.Set;
|
|||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
@ -224,12 +226,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
if (shouldStart) {
|
||||
for (ControllerServiceNode controllerServiceNode : serviceNodes) {
|
||||
try {
|
||||
if (!controllerServiceNode.isActive()) {
|
||||
final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
|
||||
final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
|
||||
|
||||
future.get(30, TimeUnit.SECONDS);
|
||||
logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
|
||||
}
|
||||
future.get(30, TimeUnit.SECONDS);
|
||||
logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
|
||||
} catch (final ControllerServiceNotValidException csnve) {
|
||||
logger.warn("Failed to enable service {} because it is not currently valid", controllerServiceNode);
|
||||
} catch (Exception e) {
|
||||
|
@ -247,14 +247,20 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
public Future<Void> enableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
|
||||
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
processScheduler.submitFrameworkTask(() -> {
|
||||
enableControllerServices(serviceNodes, future);
|
||||
future.complete(null);
|
||||
try {
|
||||
enableControllerServices(serviceNodes, future);
|
||||
future.complete(null);
|
||||
} catch (final Exception e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
});
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) {
|
||||
private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) throws Exception {
|
||||
Exception firstFailure = null;
|
||||
|
||||
// validate that we are able to start all of the services.
|
||||
for (final ControllerServiceNode controllerServiceNode : serviceNodes) {
|
||||
if (completableFuture.isCancelled()) {
|
||||
|
@ -262,29 +268,37 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
}
|
||||
|
||||
try {
|
||||
if (!controllerServiceNode.isActive()) {
|
||||
final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
|
||||
// If service is already active, just move on to the next
|
||||
if (controllerServiceNode.isActive()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
future.get(1, TimeUnit.SECONDS);
|
||||
logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
|
||||
break;
|
||||
} catch (final TimeoutException e) {
|
||||
if (completableFuture.isCancelled()) {
|
||||
return;
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to enable service {}", controllerServiceNode, e);
|
||||
completableFuture.completeExceptionally(e);
|
||||
|
||||
if (this.bulletinRepo != null) {
|
||||
this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
|
||||
Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e));
|
||||
}
|
||||
final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
|
||||
|
||||
// Wait for the future to complete. But if the completableFuture ever is canceled, we want to stop waiting and return.
|
||||
while (true) {
|
||||
try {
|
||||
future.get(1, TimeUnit.SECONDS);
|
||||
logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
|
||||
break;
|
||||
} catch (final TimeoutException e) {
|
||||
if (completableFuture.isCancelled()) {
|
||||
return;
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
logger.warn("Failed to enable service {}", controllerServiceNode, e);
|
||||
if (firstFailure == null) {
|
||||
firstFailure = e;
|
||||
} else {
|
||||
firstFailure.addSuppressed(e);
|
||||
}
|
||||
|
||||
if (this.bulletinRepo != null) {
|
||||
this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
|
||||
Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -295,6 +309,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (firstFailure != null) {
|
||||
throw firstFailure;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -382,14 +400,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
public Future<Void> disableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
|
||||
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
processScheduler.submitFrameworkTask(() -> {
|
||||
disableControllerServices(serviceNodes, future);
|
||||
future.complete(null);
|
||||
try {
|
||||
disableControllerServices(serviceNodes, future);
|
||||
future.complete(null);
|
||||
} catch (final Exception e) {
|
||||
future.completeExceptionally(e);
|
||||
}
|
||||
});
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
private void disableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> future) {
|
||||
private void disableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> future) throws Exception {
|
||||
final Set<ControllerServiceNode> serviceNodeSet = new HashSet<>(serviceNodes);
|
||||
|
||||
// Verify that for each Controller Service given, any service that references it is either disabled or is also in the given collection
|
||||
|
@ -406,24 +428,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
}
|
||||
}
|
||||
|
||||
Exception firstFailure = null;
|
||||
for (final ControllerServiceNode serviceNode : serviceNodes) {
|
||||
if (serviceNode.isActive()) {
|
||||
disableReferencingServices(serviceNode);
|
||||
final CompletableFuture<?> serviceFuture = disableControllerService(serviceNode);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
serviceFuture.get(1, TimeUnit.SECONDS);
|
||||
break;
|
||||
} catch (final TimeoutException e) {
|
||||
if (future.isCancelled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
continue;
|
||||
} catch (final Exception e) {
|
||||
logger.error("Failed to disable {}", serviceNode, e);
|
||||
future.completeExceptionally(e);
|
||||
try {
|
||||
disableControllerServiceAndReferencingServices(serviceNode, future::isCancelled);
|
||||
} catch (final Exception e) {
|
||||
if (firstFailure == null) {
|
||||
firstFailure = e;
|
||||
} else {
|
||||
firstFailure.addSuppressed(e);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -438,6 +452,26 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (firstFailure != null) {
|
||||
throw firstFailure;
|
||||
}
|
||||
}
|
||||
|
||||
private void disableControllerServiceAndReferencingServices(final ControllerServiceNode serviceNode, final BooleanSupplier cancelSupplier) throws ExecutionException, InterruptedException {
|
||||
disableReferencingServices(serviceNode);
|
||||
final CompletableFuture<?> serviceFuture = disableControllerService(serviceNode);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
serviceFuture.get(1, TimeUnit.SECONDS);
|
||||
break;
|
||||
} catch (final TimeoutException e) {
|
||||
if (cancelSupplier.getAsBoolean()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.cs.tests.system;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class LifecycleFailureService extends AbstractControllerService {
|
||||
static final PropertyDescriptor ENABLE_FAILURE_COUNT = new PropertyDescriptor.Builder()
|
||||
.name("Enable Failure Count")
|
||||
.description("How many times the CS should fail to enable before succeeding")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||
.defaultValue("0")
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor FAIL_ON_DISABLE = new PropertyDescriptor.Builder()
|
||||
.name("Fail on Disable")
|
||||
.displayName("Fail on Disable")
|
||||
.description("Whether or not hte Controller Service should fail when disabled")
|
||||
.required(true)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.build();
|
||||
|
||||
private final AtomicInteger invocationCount = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return Arrays.asList(ENABLE_FAILURE_COUNT, FAIL_ON_DISABLE);
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) {
|
||||
final int maxFailureCount = context.getProperty(ENABLE_FAILURE_COUNT).asInteger();
|
||||
final int currentInvocationCount = invocationCount.getAndIncrement();
|
||||
if (currentInvocationCount >= maxFailureCount) {
|
||||
getLogger().info("Enabling successfully because invocation count is {}", currentInvocationCount);
|
||||
return;
|
||||
}
|
||||
|
||||
getLogger().info("Will fail to enable because invocation count is {}", currentInvocationCount);
|
||||
throw new RuntimeException("Failing to enable because configured to fail " + maxFailureCount + " times and current failure count is only " + currentInvocationCount);
|
||||
}
|
||||
|
||||
@OnDisabled
|
||||
public void onDisabled(final ConfigurationContext context) {
|
||||
if (context.getProperty(FAIL_ON_DISABLE).asBoolean()) {
|
||||
getLogger().info("Throwing Exception in onDisabled as configured");
|
||||
throw new RuntimeException("Failing to disable because configured to fail on disable");
|
||||
}
|
||||
|
||||
getLogger().info("Completing onDisabled successfully as configured");
|
||||
}
|
||||
}
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
org.apache.nifi.cs.tests.system.EnsureControllerServiceConfigurationCorrect
|
||||
org.apache.nifi.cs.tests.system.FakeControllerService1
|
||||
org.apache.nifi.cs.tests.system.LifecycleFailureService
|
||||
org.apache.nifi.cs.tests.system.SensitiveDynamicPropertiesService
|
||||
org.apache.nifi.cs.tests.system.StandardCountService
|
||||
org.apache.nifi.cs.tests.system.StandardSleepService
|
||||
|
|
|
@ -550,6 +550,20 @@ public class NiFiClientUtil {
|
|||
return nifiClient.getControllerServicesClient().updateControllerService(entity);
|
||||
}
|
||||
|
||||
public ActivateControllerServicesEntity enableControllerServices(final String groupId, final boolean waitForEnabled) throws NiFiClientException, IOException {
|
||||
final ActivateControllerServicesEntity activateControllerServicesEntity = new ActivateControllerServicesEntity();
|
||||
activateControllerServicesEntity.setId(groupId);
|
||||
activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_ENABLED);
|
||||
activateControllerServicesEntity.setDisconnectedNodeAcknowledged(true);
|
||||
|
||||
final ActivateControllerServicesEntity activateControllerServices = nifiClient.getFlowClient().activateControllerServices(activateControllerServicesEntity);
|
||||
if (waitForEnabled) {
|
||||
waitForControllerSerivcesEnabled(groupId);
|
||||
}
|
||||
|
||||
return activateControllerServices;
|
||||
}
|
||||
|
||||
public ControllerServiceEntity enableControllerService(final ControllerServiceEntity entity) throws NiFiClientException, IOException {
|
||||
final ControllerServiceRunStatusEntity runStatusEntity = new ControllerServiceRunStatusEntity();
|
||||
runStatusEntity.setState("ENABLED");
|
||||
|
@ -732,6 +746,10 @@ public class NiFiClientUtil {
|
|||
waitForControllerServiceState(groupId, "ENABLED", Arrays.asList(serviceIdsOfInterest));
|
||||
}
|
||||
|
||||
public void waitForControllerSerivcesEnabled(final String groupId, final List<String> serviceIdsOfInterest) throws NiFiClientException, IOException {
|
||||
waitForControllerServiceState(groupId, "ENABLED", serviceIdsOfInterest);
|
||||
}
|
||||
|
||||
public void waitForControllerServiceState(final String groupId, final String desiredState, final Collection<String> serviceIdsOfInterest) throws NiFiClientException, IOException {
|
||||
while (true) {
|
||||
final List<ControllerServiceEntity> nonDisabledServices = getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest);
|
||||
|
|
|
@ -157,7 +157,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
|
|||
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), i);
|
||||
assertEquals("updated", flowFile.getFlowFile().getAttributes().get("attr"));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.tests.system.controllerservice;
|
||||
|
||||
import org.apache.nifi.tests.system.NiFiSystemIT;
|
||||
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
|
||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class ControllerServiceLifecycleIT extends NiFiSystemIT {
|
||||
@Test
|
||||
public void testControllerServiceFailingToEnableAllowsOthersToEnable() throws NiFiClientException, IOException {
|
||||
for (int i=0; i < 12; i++) {
|
||||
ControllerServiceEntity failureService = getClientUtil().createControllerService("LifecycleFailureService");
|
||||
getClientUtil().updateControllerServiceProperties(failureService, Collections.singletonMap("Enable Failure Count", "1000"));
|
||||
}
|
||||
|
||||
final List<String> countServiceIds = new ArrayList<>();
|
||||
for (int i=0; i < 12; i++) {
|
||||
ControllerServiceEntity countService = getClientUtil().createControllerService("StandardCountService");
|
||||
countServiceIds.add(countService.getId());
|
||||
}
|
||||
|
||||
getClientUtil().enableControllerServices("root", false);
|
||||
getClientUtil().waitForControllerSerivcesEnabled("root", countServiceIds);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testControllerServiceEnableFailureCausesRetry() throws NiFiClientException, IOException {
|
||||
ControllerServiceEntity service = getClientUtil().createControllerService("LifecycleFailureService");
|
||||
getClientUtil().updateControllerServiceProperties(service, Collections.singletonMap("Enable Failure Count", "1"));
|
||||
|
||||
getClientUtil().enableControllerServices("root", false);
|
||||
getClientUtil().waitForControllerSerivcesEnabled("root");
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue