NIFI-9289: On startup, when enabling a Controller Service & its dependencies, do not wait for the dependencies to fully enable. Doing so can take 30 seconds per each Controller Service (and per each reference). Due to some previous refactoring, this waiting period is no longer necessary, as the referencing service can now be enabled and will asynchronously complete the enabling once it becomes valid (due to the referenced service becoming enabled).

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5449.
This commit is contained in:
Mark Payne 2021-10-07 13:31:00 -04:00 committed by Joe Gresock
parent 650da75f2d
commit 231dd57e11
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
2 changed files with 42 additions and 4 deletions

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.service;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.controller.ComponentNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,6 +31,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BooleanSupplier;
public class ServiceStateTransition {
private static final Logger logger = LoggerFactory.getLogger(ServiceStateTransition.class);
@ -140,19 +142,27 @@ public class ServiceStateTransition {
}
}
public boolean awaitState(final ControllerServiceState desiredState, final long timePeriod, final TimeUnit timeUnit) throws InterruptedException {
/**
* Waits up to the specified max amount of time for the given predicate to become true.
* @param predicate the condition under which the wait should stop
* @param timePeriod the max period of time to wait
* @param timeUnit the time unit associated with the time period
* @return true if the predicate becomes true before the given time period, false if the time period elapses first
* @throws InterruptedException if interrupted while waiting for the condition to become true
*/
public boolean awaitCondition(final BooleanSupplier predicate, final long timePeriod, final TimeUnit timeUnit, final String desiredConditionDescription) throws InterruptedException {
Objects.requireNonNull(timeUnit);
final long timeout = System.currentTimeMillis() + timeUnit.toMillis(timePeriod);
writeLock.lock();
try {
while (desiredState != state) {
while (!predicate.getAsBoolean()) {
final long millisLeft = timeout - System.currentTimeMillis();
if (millisLeft <= 0) {
return false;
}
logger.debug("State of {} is currently {}. Will wait up to {} milliseconds for state to transition to {}", controllerServiceNode, state, millisLeft, desiredState);
logger.debug("State of {} is currently {}. Will wait up to {} milliseconds for condition to become {}", controllerServiceNode, state, millisLeft, desiredConditionDescription);
stateChangeCondition.await(millisLeft, TimeUnit.MILLISECONDS);
}
@ -162,4 +172,22 @@ public class ServiceStateTransition {
writeLock.unlock();
}
}
public boolean awaitState(final ControllerServiceState desiredState, final long timePeriod, final TimeUnit timeUnit) throws InterruptedException {
return awaitCondition(() -> desiredState == state, timePeriod, timeUnit, "service has a state of " + desiredState.name());
}
public boolean awaitStateOrInvalid(final ControllerServiceState desiredState, final long timePeriod, final TimeUnit timeUnit) throws InterruptedException {
final BooleanSupplier predicate = () -> desiredState == state || controllerServiceNode.getValidationStatus() == ValidationStatus.INVALID;
return awaitCondition(predicate, timePeriod, timeUnit, "service has a state of " + desiredState.name());
}
public void signalInvalid() {
writeLock.lock();
try {
stateChangeCondition.signalAll();
} finally {
writeLock.unlock();
}
}
}

View File

@ -31,6 +31,7 @@ import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.validation.ValidationState;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
@ -378,7 +379,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
public boolean awaitEnabled(final long timePeriod, final TimeUnit timeUnit) throws InterruptedException {
LOG.debug("Waiting up to {} {} for {} to be enabled", timePeriod, timeUnit, this);
final boolean enabled = stateTransition.awaitState(ControllerServiceState.ENABLED, timePeriod, timeUnit);
final boolean enabled = stateTransition.awaitStateOrInvalid(ControllerServiceState.ENABLED, timePeriod, timeUnit);
if (enabled) {
LOG.debug("{} is enabled", this);
@ -481,6 +482,15 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
}
}
@Override
public ValidationState performValidation(final ValidationContext validationContext) {
final ValidationState state = super.performValidation(validationContext);
if (state.getStatus() == ValidationStatus.INVALID) {
stateTransition.signalInvalid();
}
return state;
}
/**
* Will atomically enable this service by invoking its @OnEnabled operation.
* It uses CAS operation on {@link #stateTransition} to transition this service