diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index 0da7c98884..02189a1579 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -324,7 +324,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme @Override public void verifyCanDisable(final Set ignoreReferences) { if (!this.isActive()) { - throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation().getIdentifier() + " because it is not enabled"); + return; } final ControllerServiceReference references = getReferences(); @@ -608,16 +608,12 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e; final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this); - componentLog.error("Failed to invoke @OnEnabled method due to {}", cause); - LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString()); + componentLog.error("Failed to invoke @OnEnabled method", cause); invokeDisable(configContext); if (isActive()) { scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS); } else { - try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getControllerServiceImplementation().getClass(), getIdentifier())) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext); - } stateTransition.disable(); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 414d629f0d..735414d376 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -48,9 +48,11 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.BooleanSupplier; import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; @@ -224,12 +226,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi if (shouldStart) { for (ControllerServiceNode controllerServiceNode : serviceNodes) { try { - if (!controllerServiceNode.isActive()) { - final Future future = enableControllerServiceAndDependencies(controllerServiceNode); + final Future future = enableControllerServiceAndDependencies(controllerServiceNode); - future.get(30, TimeUnit.SECONDS); - logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState()); - } + future.get(30, TimeUnit.SECONDS); + logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState()); } catch (final ControllerServiceNotValidException csnve) { logger.warn("Failed to enable service {} because it is not currently valid", controllerServiceNode); } catch (Exception e) { @@ -247,14 +247,20 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi public Future enableControllerServicesAsync(final Collection serviceNodes) { final CompletableFuture future = new CompletableFuture<>(); processScheduler.submitFrameworkTask(() -> { - enableControllerServices(serviceNodes, future); - future.complete(null); + try { + enableControllerServices(serviceNodes, future); + future.complete(null); + } catch (final Exception e) { + future.completeExceptionally(e); + } }); return future; } - private void enableControllerServices(final Collection serviceNodes, final CompletableFuture completableFuture) { + private void enableControllerServices(final Collection serviceNodes, final CompletableFuture completableFuture) throws Exception { + Exception firstFailure = null; + // validate that we are able to start all of the services. for (final ControllerServiceNode controllerServiceNode : serviceNodes) { if (completableFuture.isCancelled()) { @@ -262,29 +268,37 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } try { - if (!controllerServiceNode.isActive()) { - final Future future = enableControllerServiceAndDependencies(controllerServiceNode); + // If service is already active, just move on to the next + if (controllerServiceNode.isActive()) { + continue; + } - while (true) { - try { - future.get(1, TimeUnit.SECONDS); - logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState()); - break; - } catch (final TimeoutException e) { - if (completableFuture.isCancelled()) { - return; - } - } catch (final Exception e) { - logger.warn("Failed to enable service {}", controllerServiceNode, e); - completableFuture.completeExceptionally(e); - - if (this.bulletinRepo != null) { - this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service", - Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e)); - } + final Future future = enableControllerServiceAndDependencies(controllerServiceNode); + // Wait for the future to complete. But if the completableFuture ever is canceled, we want to stop waiting and return. + while (true) { + try { + future.get(1, TimeUnit.SECONDS); + logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState()); + break; + } catch (final TimeoutException e) { + if (completableFuture.isCancelled()) { return; } + } catch (final Exception e) { + logger.warn("Failed to enable service {}", controllerServiceNode, e); + if (firstFailure == null) { + firstFailure = e; + } else { + firstFailure.addSuppressed(e); + } + + if (this.bulletinRepo != null) { + this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service", + Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e)); + } + + break; } } } catch (Exception e) { @@ -295,6 +309,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } } + + if (firstFailure != null) { + throw firstFailure; + } } @Override @@ -382,14 +400,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi public Future disableControllerServicesAsync(final Collection serviceNodes) { final CompletableFuture future = new CompletableFuture<>(); processScheduler.submitFrameworkTask(() -> { - disableControllerServices(serviceNodes, future); - future.complete(null); + try { + disableControllerServices(serviceNodes, future); + future.complete(null); + } catch (final Exception e) { + future.completeExceptionally(e); + } }); return future; } - private void disableControllerServices(final Collection serviceNodes, final CompletableFuture future) { + private void disableControllerServices(final Collection serviceNodes, final CompletableFuture future) throws Exception { final Set serviceNodeSet = new HashSet<>(serviceNodes); // Verify that for each Controller Service given, any service that references it is either disabled or is also in the given collection @@ -406,24 +428,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } + Exception firstFailure = null; for (final ControllerServiceNode serviceNode : serviceNodes) { if (serviceNode.isActive()) { - disableReferencingServices(serviceNode); - final CompletableFuture serviceFuture = disableControllerService(serviceNode); - - while (true) { - try { - serviceFuture.get(1, TimeUnit.SECONDS); - break; - } catch (final TimeoutException e) { - if (future.isCancelled()) { - return; - } - - continue; - } catch (final Exception e) { - logger.error("Failed to disable {}", serviceNode, e); - future.completeExceptionally(e); + try { + disableControllerServiceAndReferencingServices(serviceNode, future::isCancelled); + } catch (final Exception e) { + if (firstFailure == null) { + firstFailure = e; + } else { + firstFailure.addSuppressed(e); } } } else { @@ -438,6 +452,26 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } } + + if (firstFailure != null) { + throw firstFailure; + } + } + + private void disableControllerServiceAndReferencingServices(final ControllerServiceNode serviceNode, final BooleanSupplier cancelSupplier) throws ExecutionException, InterruptedException { + disableReferencingServices(serviceNode); + final CompletableFuture serviceFuture = disableControllerService(serviceNode); + + while (true) { + try { + serviceFuture.get(1, TimeUnit.SECONDS); + break; + } catch (final TimeoutException e) { + if (cancelSupplier.getAsBoolean()) { + return; + } + } + } } @Override diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java new file mode 100644 index 0000000000..59df1c404f --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cs.tests.system; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class LifecycleFailureService extends AbstractControllerService { + static final PropertyDescriptor ENABLE_FAILURE_COUNT = new PropertyDescriptor.Builder() + .name("Enable Failure Count") + .description("How many times the CS should fail to enable before succeeding") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("0") + .build(); + + static final PropertyDescriptor FAIL_ON_DISABLE = new PropertyDescriptor.Builder() + .name("Fail on Disable") + .displayName("Fail on Disable") + .description("Whether or not hte Controller Service should fail when disabled") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + private final AtomicInteger invocationCount = new AtomicInteger(0); + + @Override + protected List getSupportedPropertyDescriptors() { + return Arrays.asList(ENABLE_FAILURE_COUNT, FAIL_ON_DISABLE); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final int maxFailureCount = context.getProperty(ENABLE_FAILURE_COUNT).asInteger(); + final int currentInvocationCount = invocationCount.getAndIncrement(); + if (currentInvocationCount >= maxFailureCount) { + getLogger().info("Enabling successfully because invocation count is {}", currentInvocationCount); + return; + } + + getLogger().info("Will fail to enable because invocation count is {}", currentInvocationCount); + throw new RuntimeException("Failing to enable because configured to fail " + maxFailureCount + " times and current failure count is only " + currentInvocationCount); + } + + @OnDisabled + public void onDisabled(final ConfigurationContext context) { + if (context.getProperty(FAIL_ON_DISABLE).asBoolean()) { + getLogger().info("Throwing Exception in onDisabled as configured"); + throw new RuntimeException("Failing to disable because configured to fail on disable"); + } + + getLogger().info("Completing onDisabled successfully as configured"); + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 10f6db7935..01e184617a 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -15,6 +15,7 @@ org.apache.nifi.cs.tests.system.EnsureControllerServiceConfigurationCorrect org.apache.nifi.cs.tests.system.FakeControllerService1 +org.apache.nifi.cs.tests.system.LifecycleFailureService org.apache.nifi.cs.tests.system.SensitiveDynamicPropertiesService org.apache.nifi.cs.tests.system.StandardCountService org.apache.nifi.cs.tests.system.StandardSleepService diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 15512cc694..38e8a83fce 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -550,6 +550,20 @@ public class NiFiClientUtil { return nifiClient.getControllerServicesClient().updateControllerService(entity); } + public ActivateControllerServicesEntity enableControllerServices(final String groupId, final boolean waitForEnabled) throws NiFiClientException, IOException { + final ActivateControllerServicesEntity activateControllerServicesEntity = new ActivateControllerServicesEntity(); + activateControllerServicesEntity.setId(groupId); + activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_ENABLED); + activateControllerServicesEntity.setDisconnectedNodeAcknowledged(true); + + final ActivateControllerServicesEntity activateControllerServices = nifiClient.getFlowClient().activateControllerServices(activateControllerServicesEntity); + if (waitForEnabled) { + waitForControllerSerivcesEnabled(groupId); + } + + return activateControllerServices; + } + public ControllerServiceEntity enableControllerService(final ControllerServiceEntity entity) throws NiFiClientException, IOException { final ControllerServiceRunStatusEntity runStatusEntity = new ControllerServiceRunStatusEntity(); runStatusEntity.setState("ENABLED"); @@ -732,6 +746,10 @@ public class NiFiClientUtil { waitForControllerServiceState(groupId, "ENABLED", Arrays.asList(serviceIdsOfInterest)); } + public void waitForControllerSerivcesEnabled(final String groupId, final List serviceIdsOfInterest) throws NiFiClientException, IOException { + waitForControllerServiceState(groupId, "ENABLED", serviceIdsOfInterest); + } + public void waitForControllerServiceState(final String groupId, final String desiredState, final Collection serviceIdsOfInterest) throws NiFiClientException, IOException { while (true) { final List nonDisabledServices = getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java index faf1409b9f..ba8a606a6f 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java @@ -157,7 +157,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT { final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), i); assertEquals("updated", flowFile.getFlowFile().getAttributes().get("attr")); } - } @Test diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java new file mode 100644 index 0000000000..6295f6ee24 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.controllerservice; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ControllerServiceEntity; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class ControllerServiceLifecycleIT extends NiFiSystemIT { + @Test + public void testControllerServiceFailingToEnableAllowsOthersToEnable() throws NiFiClientException, IOException { + for (int i=0; i < 12; i++) { + ControllerServiceEntity failureService = getClientUtil().createControllerService("LifecycleFailureService"); + getClientUtil().updateControllerServiceProperties(failureService, Collections.singletonMap("Enable Failure Count", "1000")); + } + + final List countServiceIds = new ArrayList<>(); + for (int i=0; i < 12; i++) { + ControllerServiceEntity countService = getClientUtil().createControllerService("StandardCountService"); + countServiceIds.add(countService.getId()); + } + + getClientUtil().enableControllerServices("root", false); + getClientUtil().waitForControllerSerivcesEnabled("root", countServiceIds); + } + + @Test + public void testControllerServiceEnableFailureCausesRetry() throws NiFiClientException, IOException { + ControllerServiceEntity service = getClientUtil().createControllerService("LifecycleFailureService"); + getClientUtil().updateControllerServiceProperties(service, Collections.singletonMap("Enable Failure Count", "1")); + + getClientUtil().enableControllerServices("root", false); + getClientUtil().waitForControllerSerivcesEnabled("root"); + } + +}