NIFI-10703 - Updated VersionedDataflow to support MaxEventDrivenThreadCount

NIFI-10703 - Also setting the maxEventDrivenThreadCount on the controller when using a versioned flow.
NIFI-10703 - Corrected to use getMaxEventDrivenThreadCount(), set a default value for event driven thread count in VersionedDataflow
NIFI-10703 - Updated log message for setMaxThreadCount in FlowController.java
NIFI-10703 - Updated default value for maxEventDrivenThreadCount
NIFI-10703 - Set private for DEFAULT_MAX_EVENT_DRIVEN_THREAD_COUNT

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6638
This commit is contained in:
Nathan Gough 2022-11-08 22:49:41 -05:00 committed by Matthew Burgess
parent 709110da35
commit 1b8cd8349b
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
4 changed files with 14 additions and 1 deletions

View File

@ -30,6 +30,7 @@ import java.util.Set;
public class VersionedDataflow { public class VersionedDataflow {
private VersionedFlowEncodingVersion encodingVersion; private VersionedFlowEncodingVersion encodingVersion;
private int maxTimerDrivenThreadCount; private int maxTimerDrivenThreadCount;
private int maxEventDrivenThreadCount;
private List<VersionedFlowRegistryClient> registries; private List<VersionedFlowRegistryClient> registries;
private List<VersionedParameterContext> parameterContexts; private List<VersionedParameterContext> parameterContexts;
private List<VersionedParameterProvider> parameterProviders; private List<VersionedParameterProvider> parameterProviders;
@ -38,6 +39,8 @@ public class VersionedDataflow {
private Set<VersionedTemplate> templates; private Set<VersionedTemplate> templates;
private VersionedProcessGroup rootGroup; private VersionedProcessGroup rootGroup;
private final static int DEFAULT_MAX_EVENT_DRIVEN_THREAD_COUNT = 1;
public VersionedFlowEncodingVersion getEncodingVersion() { public VersionedFlowEncodingVersion getEncodingVersion() {
return encodingVersion; return encodingVersion;
} }
@ -54,6 +57,14 @@ public class VersionedDataflow {
this.maxTimerDrivenThreadCount = maxTimerDrivenThreadCount; this.maxTimerDrivenThreadCount = maxTimerDrivenThreadCount;
} }
public int getMaxEventDrivenThreadCount() {
return maxEventDrivenThreadCount < 1 ? DEFAULT_MAX_EVENT_DRIVEN_THREAD_COUNT : maxEventDrivenThreadCount;
}
public void setMaxEventDrivenThreadCount(final int maxEventDrivenThreadCount) {
this.maxEventDrivenThreadCount = maxEventDrivenThreadCount;
}
public List<VersionedFlowRegistryClient> getRegistries() { public List<VersionedFlowRegistryClient> getRegistries() {
return registries; return registries;
} }

View File

@ -1591,7 +1591,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
*/ */
private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) { private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) {
if (maxThreadCount < 1) { if (maxThreadCount < 1) {
throw new IllegalArgumentException("Cannot set max number of threads to less than 2"); throw new IllegalArgumentException("Cannot set max number of threads to less than 1");
} }
maxThreads.getAndSet(maxThreadCount); maxThreads.getAndSet(maxThreadCount);

View File

@ -82,6 +82,7 @@ public class VersionedDataflowMapper {
final VersionedDataflow dataflow = new VersionedDataflow(); final VersionedDataflow dataflow = new VersionedDataflow();
dataflow.setEncodingVersion(ENCODING_VERSION); dataflow.setEncodingVersion(ENCODING_VERSION);
dataflow.setMaxTimerDrivenThreadCount(flowController.getMaxTimerDrivenThreadCount()); dataflow.setMaxTimerDrivenThreadCount(flowController.getMaxTimerDrivenThreadCount());
dataflow.setMaxEventDrivenThreadCount(flowController.getMaxEventDrivenThreadCount());
dataflow.setControllerServices(mapControllerServices()); dataflow.setControllerServices(mapControllerServices());
dataflow.setParameterContexts(mapParameterContexts()); dataflow.setParameterContexts(mapParameterContexts());
dataflow.setRegistries(mapRegistries()); dataflow.setRegistries(mapRegistries());

View File

@ -370,6 +370,7 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
if (versionedFlow != null) { if (versionedFlow != null) {
controller.setMaxTimerDrivenThreadCount(versionedFlow.getMaxTimerDrivenThreadCount()); controller.setMaxTimerDrivenThreadCount(versionedFlow.getMaxTimerDrivenThreadCount());
controller.setMaxEventDrivenThreadCount(versionedFlow.getMaxEventDrivenThreadCount());
ProcessGroup rootGroup = controller.getFlowManager().getRootGroup(); ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
final Map<String, VersionedParameterContext> versionedParameterContextMap = new HashMap<>(); final Map<String, VersionedParameterContext> versionedParameterContextMap = new HashMap<>();