mirror of
https://github.com/apache/nifi.git
synced 2025-03-03 16:09:19 +00:00
NIFI-13777 Fixed Stateless Group Startup around Controller Services (#9291)
Fixed stateless group startup around Controller Services by changing the startup logic in two ways. First, we enable Controller Services before we create the Stateless Flow. Secondly, we do not bother enabling the ephemeral Controller Services that are created during the Stateless Flow creation/synchronization. This ensures that if Processors make use of a Controller Service during its @OnScheduled method (which is part of the Stateless Flow initialization process) that the Controller Service is Enabled. It also avoids calling @OnEnabled methods of Controller Services that will never actually be used, as Processors make use of the existing Controller Services when running within standard NiFi . Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
c6103519eb
commit
e9577ded29
@ -1472,7 +1472,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||
|
||||
final Processor processor = processorRef.get().getProcessor();
|
||||
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor, new StandardLoggingContext(StandardProcessorNode.this));
|
||||
LOG.info("Starting {}", this);
|
||||
LOG.debug("Starting {}", this);
|
||||
|
||||
ScheduledState currentState;
|
||||
boolean starting;
|
||||
@ -1804,7 +1804,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
||||
final SchedulingAgent schedulingAgent, final LifecycleState lifecycleState, final boolean triggerLifecycleMethods) {
|
||||
|
||||
final Processor processor = processorRef.get().getProcessor();
|
||||
LOG.info("Stopping processor: {}", this);
|
||||
LOG.debug("Stopping processor: {}", this);
|
||||
desiredState = ScheduledState.STOPPED;
|
||||
|
||||
final CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
|
@ -74,7 +74,6 @@ import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
|
||||
import org.apache.nifi.stateless.repository.StatelessProvenanceRepository;
|
||||
import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Collections;
|
||||
@ -89,6 +88,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BooleanSupplier;
|
||||
import java.util.function.Supplier;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
public class StandardStatelessGroupNodeFactory implements StatelessGroupNodeFactory {
|
||||
private final FlowController flowController;
|
||||
@ -296,9 +296,11 @@ public class StandardStatelessGroupNodeFactory implements StatelessGroupNodeFact
|
||||
|
||||
child.synchronizeFlow(versionedExternalFlow, synchronizationOptions, flowMappingOptions);
|
||||
child.setParent(group);
|
||||
|
||||
return child;
|
||||
}
|
||||
|
||||
|
||||
private FlowEngine lazyInitializeThreadPool(final AtomicReference<FlowEngine> reference, final Supplier<FlowEngine> factory) {
|
||||
FlowEngine threadPool = reference.get();
|
||||
if (threadPool == null) {
|
||||
|
@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
@ -438,18 +439,32 @@ public final class StandardProcessScheduler implements ProcessScheduler {
|
||||
|
||||
// Start each Processor. This won't trigger the Processor to run because the Execution Engine will be Stateless.
|
||||
// However, it will transition its scheduled state to the appropriate value.
|
||||
LOG.debug("All Controller Services for {} have been enabled; starting Processors and ports.", groupNode);
|
||||
groupNode.getProcessGroup().findAllProcessors().forEach(proc -> startProcessor(proc, false));
|
||||
groupNode.getProcessGroup().findAllInputPorts().forEach(port -> startConnectable(port));
|
||||
groupNode.getProcessGroup().findAllOutputPorts().forEach(port -> startConnectable(port));
|
||||
groupNode.getProcessGroup().findAllControllerServices().forEach(service -> enableControllerService(service));
|
||||
|
||||
getSchedulingAgent(groupNode).schedule(groupNode, lifecycleState);
|
||||
future.complete(null);
|
||||
}
|
||||
};
|
||||
|
||||
LOG.info("Starting {}", groupNode);
|
||||
groupNode.start(componentMonitoringThreadPool, callback, lifecycleState);
|
||||
// Enable all of the Controller Services. Once they have all become enabled, we will start the Process Group
|
||||
// We have to enable the Controller Services first in case any of the Processors use a Controller Service in its
|
||||
// @OnScheduled method. While a new copy of the Processor is created for each Concurrent Task in the stateless group,
|
||||
// we do not use a separate copy of the Controller Service, because doing so would cause problems for services that
|
||||
// perform functions such as caching or connection pooling.
|
||||
final List<CompletableFuture<?>> serviceStartFutures = new ArrayList<>();
|
||||
for (final ControllerServiceNode serviceNode : groupNode.getProcessGroup().findAllControllerServices()) {
|
||||
serviceStartFutures.add(enableControllerService(serviceNode));
|
||||
}
|
||||
|
||||
final CompletableFuture<?> allServiceStartFutures = CompletableFuture.allOf(serviceStartFutures.toArray(new CompletableFuture<?>[0]));
|
||||
allServiceStartFutures.thenRun(() -> {
|
||||
LOG.info("Starting {}", groupNode);
|
||||
groupNode.start(componentMonitoringThreadPool, callback, lifecycleState);
|
||||
});
|
||||
|
||||
return future;
|
||||
}
|
||||
|
||||
|
@ -56,6 +56,7 @@ import org.apache.nifi.stateless.engine.ProcessContextFactory;
|
||||
import org.apache.nifi.stateless.engine.StatelessProcessContextFactory;
|
||||
import org.apache.nifi.stateless.flow.StandardDataflowDefinition;
|
||||
import org.apache.nifi.stateless.flow.StandardStatelessFlow;
|
||||
import org.apache.nifi.stateless.flow.StatelessDataflowInitializationContext;
|
||||
import org.apache.nifi.stateless.flow.TransactionThresholds;
|
||||
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
|
||||
import org.apache.nifi.util.FormatUtils;
|
||||
@ -355,7 +356,20 @@ public class StandardStatelessGroupNode implements StatelessGroupNode {
|
||||
lifecycleStateManager,
|
||||
Duration.of(10, ChronoUnit.SECONDS));
|
||||
|
||||
dataflow.initialize();
|
||||
// We don't want to enable Controller Services because we want to use the actual Controller Services that exist within the
|
||||
// Standard NiFi instance, not the ephemeral ones that created during the initialization of the Stateless Group.
|
||||
// This may not matter for Controller Services such as Record Readers and Writers, but it can matter for other types
|
||||
// of Controller Services, such as connection pooling services. We don't want to create a new instance of a Connection Pool
|
||||
// for each Concurrent Task in a Stateless Group, for example.
|
||||
// Since we will not be using the ephemeral Controller Services, we also do not want to enable them.
|
||||
final StatelessDataflowInitializationContext initializationContext = new StatelessDataflowInitializationContext() {
|
||||
@Override
|
||||
public boolean isEnableControllerServices() {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
dataflow.initialize(initializationContext);
|
||||
|
||||
return dataflow;
|
||||
}
|
||||
|
@ -29,7 +29,20 @@ import io.swagger.v3.oas.annotations.security.SecurityRequirement;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import jakarta.servlet.ServletContext;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.ws.rs.Consumes;
|
||||
import jakarta.ws.rs.DefaultValue;
|
||||
import jakarta.ws.rs.GET;
|
||||
import jakarta.ws.rs.HttpMethod;
|
||||
import jakarta.ws.rs.PUT;
|
||||
import jakarta.ws.rs.Path;
|
||||
import jakarta.ws.rs.PathParam;
|
||||
import jakarta.ws.rs.Produces;
|
||||
import jakarta.ws.rs.QueryParam;
|
||||
import jakarta.ws.rs.core.Context;
|
||||
import jakarta.ws.rs.core.HttpHeaders;
|
||||
import jakarta.ws.rs.core.MediaType;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
import jakarta.ws.rs.core.StreamingOutput;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.authorization.Authorizer;
|
||||
import org.apache.nifi.authorization.RequestAction;
|
||||
@ -142,20 +155,6 @@ import org.apache.nifi.web.api.request.FlowMetricsProducer;
|
||||
import org.apache.nifi.web.api.request.FlowMetricsRegistry;
|
||||
import org.apache.nifi.web.api.request.IntegerParameter;
|
||||
import org.apache.nifi.web.api.request.LongParameter;
|
||||
|
||||
import jakarta.ws.rs.Consumes;
|
||||
import jakarta.ws.rs.DefaultValue;
|
||||
import jakarta.ws.rs.GET;
|
||||
import jakarta.ws.rs.HttpMethod;
|
||||
import jakarta.ws.rs.PUT;
|
||||
import jakarta.ws.rs.Path;
|
||||
import jakarta.ws.rs.PathParam;
|
||||
import jakarta.ws.rs.Produces;
|
||||
import jakarta.ws.rs.QueryParam;
|
||||
import jakarta.ws.rs.core.HttpHeaders;
|
||||
import jakarta.ws.rs.core.MediaType;
|
||||
import jakarta.ws.rs.core.Response;
|
||||
import jakarta.ws.rs.core.StreamingOutput;
|
||||
import org.apache.nifi.web.servlet.shared.RequestUriBuilder;
|
||||
import org.apache.nifi.web.util.PaginationHelper;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@ -946,7 +945,7 @@ public class FlowResource extends ApplicationResource {
|
||||
)
|
||||
@PathParam("id") String id,
|
||||
@Parameter(
|
||||
description = "The request to schedule or unschedule. If the comopnents in the request are not specified, all authorized components will be considered.",
|
||||
description = "The request to schedule or unschedule. If the components in the request are not specified, all authorized components will be considered.",
|
||||
required = true
|
||||
) final ScheduleComponentsEntity requestScheduleComponentsEntity) {
|
||||
|
||||
|
@ -31,7 +31,7 @@ public interface StatelessDataflow {
|
||||
* Triggers the dataflow to run, returning a DataflowTrigger that can be used to wait for the result. Uses the {@link DataflowTriggerContext#IMPLICIT_CONTEXT}.
|
||||
* @return a DataflowTrigger that can be used to wait for the result
|
||||
*
|
||||
* @throws IllegalStateException if called before {@link #initialize()} is called.
|
||||
* @throws IllegalStateException if called before {@link #initialize(StatelessDataflowInitializationContext)} is called.
|
||||
*/
|
||||
default DataflowTrigger trigger() {
|
||||
return trigger(DataflowTriggerContext.IMPLICIT_CONTEXT);
|
||||
@ -43,7 +43,7 @@ public interface StatelessDataflow {
|
||||
* @param triggerContext the trigger context to use
|
||||
* @return a DataflowTrigger that can be used to wait for the result
|
||||
*
|
||||
* @throws IllegalStateException if called before {@link #initialize()} is called.
|
||||
* @throws IllegalStateException if called before {@link #initialize(StatelessDataflowInitializationContext)} is called.
|
||||
*/
|
||||
DataflowTrigger trigger(DataflowTriggerContext triggerContext);
|
||||
|
||||
@ -66,7 +66,7 @@ public interface StatelessDataflow {
|
||||
* This method MUST be called prior to calling {@link #trigger()}.
|
||||
* </p>
|
||||
*/
|
||||
void initialize();
|
||||
void initialize(StatelessDataflowInitializationContext initializationContext);
|
||||
|
||||
default void shutdown() {
|
||||
shutdown(true, false);
|
||||
|
@ -0,0 +1,24 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.stateless.flow;
|
||||
|
||||
public interface StatelessDataflowInitializationContext {
|
||||
|
||||
boolean isEnableControllerServices();
|
||||
|
||||
}
|
@ -24,6 +24,7 @@ import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
|
||||
import org.apache.nifi.stateless.flow.DataflowDefinition;
|
||||
import org.apache.nifi.stateless.flow.DataflowTrigger;
|
||||
import org.apache.nifi.stateless.flow.StatelessDataflow;
|
||||
import org.apache.nifi.stateless.flow.StatelessDataflowInitializationContext;
|
||||
import org.apache.nifi.stateless.flow.StatelessDataflowValidation;
|
||||
import org.apache.nifi.stateless.flow.TriggerResult;
|
||||
import org.slf4j.Logger;
|
||||
@ -92,7 +93,12 @@ public class RunStatelessFlow {
|
||||
final DataflowDefinition dataflowDefinition = bootstrap.parseDataflowDefinition(flowDefinitionFile, parameterOverrides);
|
||||
|
||||
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition);
|
||||
dataflow.initialize();
|
||||
dataflow.initialize(new StatelessDataflowInitializationContext() {
|
||||
@Override
|
||||
public boolean isEnableControllerServices() {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
final StatelessDataflowValidation validation = dataflow.performValidation();
|
||||
if (!validation.isValid()) {
|
||||
|
@ -147,7 +147,6 @@ public class StatelessProcessScheduler implements ProcessScheduler {
|
||||
final Supplier<ProcessContext> processContextSupplier = () -> processContextFactory.createProcessContext(procNode);
|
||||
procNode.start(componentMonitoringThreadPool, ADMINISTRATIVE_YIELD_MILLIS, this.processorStartTimeoutMillis, processContextSupplier, callback, failIfStopping, true);
|
||||
return future;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -230,7 +230,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize() {
|
||||
public void initialize(final StatelessDataflowInitializationContext initializationContext) {
|
||||
if (initialized) {
|
||||
logger.debug("{} initialize() was called, but dataflow has already been initialized. Returning without doing anything.", this);
|
||||
return;
|
||||
@ -248,9 +248,14 @@ public class StandardStatelessFlow implements StatelessDataflow {
|
||||
// before proceeding any further.
|
||||
try {
|
||||
final long serviceEnableStart = System.currentTimeMillis();
|
||||
enableControllerServices(rootGroup);
|
||||
|
||||
waitForServicesEnabled(rootGroup);
|
||||
if (initializationContext.isEnableControllerServices()) {
|
||||
enableControllerServices(rootGroup);
|
||||
waitForServicesEnabled(rootGroup);
|
||||
} else {
|
||||
logger.debug("Skipping Controller Service enablement because initializationContext.isEnableControllerServices() returned false");
|
||||
}
|
||||
|
||||
final long serviceEnableMillis = System.currentTimeMillis() - serviceEnableStart;
|
||||
|
||||
// Perform validation again so that any processors that reference controller services that were just
|
||||
@ -474,9 +479,14 @@ public class StandardStatelessFlow implements StatelessDataflow {
|
||||
try {
|
||||
future.get(this.componentEnableTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||
} catch (final Exception e) {
|
||||
final String validationErrors = performValidation().toString();
|
||||
throw new IllegalStateException("Processor " + processor + " has not fully enabled. Current Validation Status is "
|
||||
+ processor.getValidationStatus() + ". All validation errors: " + validationErrors);
|
||||
final StatelessDataflowValidation validation = performValidation();
|
||||
if (validation.isValid()) {
|
||||
throw new IllegalStateException("Processor " + processor + " is valid but has not fully started", e);
|
||||
} else {
|
||||
final String validationErrors = performValidation().toString();
|
||||
throw new IllegalStateException("Processor " + processor + " has not fully started. Current Validation Status is "
|
||||
+ processor.getValidationStatus() + ". All validation errors: " + validationErrors);
|
||||
}
|
||||
}
|
||||
|
||||
final long millis = System.currentTimeMillis() - start;
|
||||
|
@ -33,6 +33,7 @@ import org.apache.nifi.stateless.config.StatelessConfigurationException;
|
||||
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
|
||||
import org.apache.nifi.stateless.flow.DataflowDefinition;
|
||||
import org.apache.nifi.stateless.flow.StatelessDataflow;
|
||||
import org.apache.nifi.stateless.flow.StatelessDataflowInitializationContext;
|
||||
import org.apache.nifi.stateless.flow.TransactionThresholds;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
@ -232,7 +233,12 @@ public class StatelessSystemIT {
|
||||
|
||||
final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(getEngineConfiguration());
|
||||
final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition);
|
||||
dataflow.initialize();
|
||||
dataflow.initialize(new StatelessDataflowInitializationContext() {
|
||||
@Override
|
||||
public boolean isEnableControllerServices() {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
|
||||
createdFlows.add(dataflow);
|
||||
return dataflow;
|
||||
|
Loading…
x
Reference in New Issue
Block a user