NIFI-11813 Removed Event Driven Scheduling Strategy

This closes #7483

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2023-07-16 20:06:13 +02:00 committed by exceptionfactory
parent 779eea6453
commit 150bfbe990
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
172 changed files with 83 additions and 1637 deletions

View File

@ -38,7 +38,6 @@ public class ProcessorDefinition extends ConfigurableExtensionDefinition {
private boolean triggerWhenEmpty;
private boolean triggerWhenAnyDestinationAvailable;
private boolean supportsBatching;
private boolean supportsEventDriven;
private boolean primaryNodeOnly;
private boolean sideEffectFree;
@ -128,17 +127,6 @@ public class ProcessorDefinition extends ConfigurableExtensionDefinition {
this.supportsBatching = supportsBatching;
}
@ApiModelProperty("Whether or not this processor supports event driven scheduling. Indicates to the framework that the " +
"Processor is eligible to be scheduled to run based on the occurrence of an \"Event\" " +
"(e.g., when a FlowFile is enqueued in an incoming Connection), rather than being triggered periodically.")
public boolean getSupportsEventDriven() {
return supportsEventDriven;
}
public void setSupportsEventDriven(boolean supportsEventDriven) {
this.supportsEventDriven = supportsEventDriven;
}
@ApiModelProperty("Whether or not this processor should be scheduled only on the primary node in a cluster.")
public boolean getPrimaryNodeOnly() {
return primaryNodeOnly;

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.annotation.behavior;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* <p>
* Annotation that may be placed on a Processor that indicates to the framework
* that the Processor is eligible to be scheduled to run based on the occurrence
* of an "Event" (e.g., when a FlowFile is enqueued in an incoming Connection),
* rather than being triggered periodically.
* </p>
*
* <p>
* This Annotation should not be used in conjunction with
* {@link TriggerSerially} or {@link TriggerWhenEmpty}. If this Annotation is
* used with either of these other Annotations, the Processor will not be
* eligible to be scheduled in Event-Driven mode.
* </p>
*
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface EventDriven {
}

View File

@ -116,14 +116,6 @@ public class NodeStatus implements Cloneable {
this.totalThreads = totalThreads;
}
public long getEventDrivenThreads() {
return eventDrivenThreads;
}
public void setEventDrivenThreads(final long eventDrivenThreads) {
this.eventDrivenThreads = eventDrivenThreads;
}
public long getTimerDrivenThreads() {
return timerDrivenThreads;
}

View File

@ -19,7 +19,6 @@ package org.apache.nifi.documentation;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@ -151,7 +150,6 @@ public abstract class AbstractDocumentationWriter implements ExtensionDocumentat
writeTriggerWhenEmpty(processor.getClass().getAnnotation(TriggerWhenEmpty.class));
writeTriggerWhenAnyDestinationAvailable(processor.getClass().getAnnotation(TriggerWhenAnyDestinationAvailable.class));
writeSupportsBatching(processor.getClass().getAnnotation(SupportsBatching.class));
writeEventDriven(processor.getClass().getAnnotation(EventDriven.class));
writePrimaryNodeOnly(processor.getClass().getAnnotation(PrimaryNodeOnly.class));
writeSideEffectFree(processor.getClass().getAnnotation(SideEffectFree.class));
writeDefaultSettings(processor.getClass().getAnnotation(DefaultSettings.class));
@ -317,8 +315,6 @@ public abstract class AbstractDocumentationWriter implements ExtensionDocumentat
protected abstract void writeSupportsSensitiveDynamicProperties(SupportsSensitiveDynamicProperties supportsSensitiveDynamicProperties) throws IOException;
protected abstract void writeEventDriven(EventDriven eventDriven) throws IOException;
protected abstract void writePrimaryNodeOnly(PrimaryNodeOnly primaryNodeOnly) throws IOException;
protected abstract void writeSideEffectFree(SideEffectFree sideEffectFree) throws IOException;

View File

@ -18,7 +18,6 @@ package org.apache.nifi.documentation.xml;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@ -471,14 +470,6 @@ public class XmlDocumentationWriter extends AbstractDocumentationWriter {
writeBooleanElement("supportsSensitiveDynamicProperties", true);
}
@Override
protected void writeEventDriven(EventDriven eventDriven) throws IOException {
if (eventDriven == null) {
return;
}
writeBooleanElement("eventDriven", true);
}
@Override
protected void writePrimaryNodeOnly(PrimaryNodeOnly primaryNodeOnly) throws IOException {
if (primaryNodeOnly == null) {

View File

@ -52,7 +52,7 @@ public class VersionedProcessor extends VersionedConfigurableExtension {
this.schedulingPeriod = setSchedulingPeriod;
}
@ApiModelProperty("Indicates whether the processor should be scheduled to run in event or timer driven mode.")
@ApiModelProperty("Indicates how the processor should be scheduled to run.")
public String getSchedulingStrategy() {
return schedulingStrategy;
}

View File

@ -22,33 +22,6 @@ package org.apache.nifi.scheduling;
*/
public enum SchedulingStrategy {
/**
* Components should be scheduled to run whenever a relevant Event occurs.
* Examples of "relevant Events" are:
*
* <ul>
* <li>A FlowFile is added to one of the Component's incoming
* Connections</li>
* <li>A FlowFile is removed from one of the Component's outgoing
* Connections</li>
* <li>The Component is scheduled to run (started)</li>
* </ul>
*
* <p>
* When using this mode, the user will be unable to configure the scheduling
* period. Instead, the framework will manage this.
* </p>
*
* <p>
* When using this mode, the maximum number of concurrent tasks can be set
* to 0, indicating no maximum.
* </p>
*
* <p>
* Not all Components support Event-Driven mode.
* </p>
*/
EVENT_DRIVEN(0, null),
/**
* Components should be scheduled to run on a periodic interval that is
* user-defined with a user-defined number of concurrent tasks. All

View File

@ -567,19 +567,6 @@ The first configuration option is the Scheduling Strategy. There are three possi
*Timer driven*: This is the default mode. The Processor will be scheduled to run on a regular interval. The interval
at which the Processor is run is defined by the 'Run Schedule' option (see below).
*Event driven*: When this mode is selected, the Processor will be triggered to run by an event, and that event occurs when FlowFiles enter Connections
feeding this Processor. This mode is currently considered experimental and is not supported by all Processors. When this mode is
selected, the 'Run Schedule' option is not configurable, as the Processor is not triggered to run periodically but
as the result of an event. Additionally, this is the only mode for which the 'Concurrent Tasks'
option can be set to 0. In this case, the number of threads is limited only by the size of the Event-Driven Thread Pool that
the administrator has configured.
[WARNING]
.Experimental
============
This implementation is marked <<experimental_warning, *experimental*>> as of Apache NiFi 1.10.0 (October 2019). The API, configuration, and internal behavior may change without warning, and such changes may occur during a minor release. Use at your own risk.
============
*CRON driven*: When using the CRON driven scheduling mode, the Processor is scheduled to run periodically, similar to the
Timer driven scheduling mode. However, the CRON driven mode provides significantly more flexibility at the expense of
increasing the complexity of the configuration. The CRON driven scheduling value is a string of six required fields and one
@ -644,11 +631,11 @@ most Processors. There are, however, some types of Processors that can only be s
===== Run Schedule
The 'Run Schedule' dictates how often the Processor should be scheduled to run. The valid values for this field depend on the selected
Scheduling Strategy (see above). If using the Event driven Scheduling Strategy, this field is not available. When using the Timer driven
Scheduling Strategy, this value is a time duration specified by a number followed by a time unit. For example, `1 second` or `5 mins`.
The default value of `0 sec` means that the Processor should run as often as possible as long as it has data to process. This is true
for any time duration of 0, regardless of the time unit (e.g., `0 sec`, `0 mins`, `0 days`). For an explanation of values that are
applicable for the CRON driven Scheduling Strategy, see the description of the CRON driven Scheduling Strategy itself.
Scheduling Strategy (see above). When using the Timer driven Scheduling Strategy, this value is a time duration specified by a number
followed by a time unit. For example, `1 second` or `5 mins`. The default value of `0 sec` means that the Processor should run as often
as possible as long as it has data to process. This is true for any time duration of 0, regardless of the time unit (e.g., `0 sec`,
`0 mins`, `0 days`). For an explanation of values that are applicable for the CRON driven Scheduling Strategy, see the description of
the CRON driven Scheduling Strategy itself.
===== Execution
The Execution setting is used to determine on which node(s) the Processor will be
@ -1964,8 +1951,6 @@ The supported keywords are the following:
- *Scheduling strategy*
** *event*: Adds Processors to the result list where the Scheduling Strategy is "Event Driven".
** *timer*: Adds Processors to the result list where the Scheduling Strategy is "Timer Driven".
- *Execution*

View File

@ -98,7 +98,6 @@ public class Extension {
private boolean triggerWhenEmpty;
private boolean triggerWhenAnyDestinationAvailable;
private boolean supportsBatching;
private boolean eventDriven;
private boolean primaryNodeOnly;
private boolean sideEffectFree;
@ -317,15 +316,6 @@ public class Extension {
this.supportsBatching = supportsBatching;
}
@ApiModelProperty(value = "Indicates that a processor supports event driven scheduling")
public boolean getEventDriven() {
return eventDriven;
}
public void setEventDriven(boolean eventDriven) {
this.eventDriven = eventDriven;
}
@ApiModelProperty(value = "Indicates that a processor should be scheduled only on the primary node")
public boolean getPrimaryNodeOnly() {
return primaryNodeOnly;

View File

@ -164,7 +164,6 @@ public class TestJAXBExtensionManifestParser {
assertTrue(processor1.getTriggerWhenEmpty());
assertTrue(processor1.getTriggerWhenAnyDestinationAvailable());
assertTrue(processor1.getPrimaryNodeOnly());
assertTrue(processor1.getEventDriven());
assertTrue(processor1.getSupportsBatching());
assertTrue(processor1.getSideEffectFree());
@ -189,7 +188,6 @@ public class TestJAXBExtensionManifestParser {
assertFalse(processor2.getTriggerWhenEmpty());
assertFalse(processor2.getTriggerWhenAnyDestinationAvailable());
assertFalse(processor2.getPrimaryNodeOnly());
assertFalse(processor2.getEventDriven());
assertFalse(processor2.getSupportsBatching());
assertFalse(processor2.getSideEffectFree());

View File

@ -27,7 +27,6 @@ public class SchedulingDefaultsFactory {
public static SchedulingDefaults getNifiSchedulingDefaults() {
final Map<String, Integer> defaultConcurrentTasks = new LinkedHashMap<>(3);
defaultConcurrentTasks.put(SchedulingStrategy.TIMER_DRIVEN.name(), SchedulingStrategy.TIMER_DRIVEN.getDefaultConcurrentTasks());
defaultConcurrentTasks.put(SchedulingStrategy.EVENT_DRIVEN.name(), SchedulingStrategy.EVENT_DRIVEN.getDefaultConcurrentTasks());
defaultConcurrentTasks.put(SchedulingStrategy.CRON_DRIVEN.name(), SchedulingStrategy.CRON_DRIVEN.getDefaultConcurrentTasks());
final Map<String, String> defaultSchedulingPeriods = new LinkedHashMap<>(2);

View File

@ -217,7 +217,6 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
processorDefinition.setTriggerSerially(extension.getTriggerSerially());
processorDefinition.setTriggerWhenAnyDestinationAvailable(extension.getTriggerWhenAnyDestinationAvailable());
processorDefinition.setSupportsBatching(extension.getSupportsBatching());
processorDefinition.setSupportsEventDriven(extension.getEventDriven());
processorDefinition.setPrimaryNodeOnly(extension.getPrimaryNodeOnly());
processorDefinition.setSideEffectFree(extension.getSideEffectFree());
@ -235,9 +234,6 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
final List<String> schedulingStrategies = new ArrayList<>();
schedulingStrategies.add(SchedulingStrategy.TIMER_DRIVEN.name());
schedulingStrategies.add(SchedulingStrategy.CRON_DRIVEN.name());
if (extension.getEventDriven()) {
schedulingStrategies.add(SchedulingStrategy.EVENT_DRIVEN.name());
}
// If a default schedule is provided then use that, otherwise default to TIMER_DRIVEN
final DefaultSchedule defaultSchedule = extension.getDefaultSchedule();
@ -247,9 +243,6 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
final Map<String, Integer> defaultConcurrentTasks = new LinkedHashMap<>(3);
defaultConcurrentTasks.put(SchedulingStrategy.TIMER_DRIVEN.name(), SchedulingStrategy.TIMER_DRIVEN.getDefaultConcurrentTasks());
defaultConcurrentTasks.put(SchedulingStrategy.CRON_DRIVEN.name(), SchedulingStrategy.CRON_DRIVEN.getDefaultConcurrentTasks());
if (extension.getEventDriven()) {
defaultConcurrentTasks.put(SchedulingStrategy.EVENT_DRIVEN.name(), SchedulingStrategy.EVENT_DRIVEN.getDefaultConcurrentTasks());
}
final Map<String, String> defaultSchedulingPeriods = new LinkedHashMap<>(2);
defaultSchedulingPeriods.put(SchedulingStrategy.TIMER_DRIVEN.name(), SchedulingStrategy.TIMER_DRIVEN.getDefaultSchedulingPeriod());
@ -557,7 +550,7 @@ public class StandardRuntimeManifestBuilder implements RuntimeManifestBuilder {
final DependentValues dependentValues = dependency.getDependentValues();
if (dependentValues != null && dependentValues.getValues() != null) {
final List<String> values = new ArrayList();
final List<String> values = new ArrayList<String>();
values.addAll(dependentValues.getValues());
propertyDependency.setDependentValues(values);
}

View File

@ -82,9 +82,8 @@ class TestRuntimeManifest {
final Map<String, Integer> defaultConcurrentTasks = schedulingDefaults.getDefaultConcurrentTasksBySchedulingStrategy();
assertNotNull(defaultConcurrentTasks);
assertEquals(3, defaultConcurrentTasks.size());
assertEquals(2, defaultConcurrentTasks.size());
assertEquals(SchedulingStrategy.TIMER_DRIVEN.getDefaultConcurrentTasks(), defaultConcurrentTasks.get(SchedulingStrategy.TIMER_DRIVEN.name()).intValue());
assertEquals(SchedulingStrategy.EVENT_DRIVEN.getDefaultConcurrentTasks(), defaultConcurrentTasks.get(SchedulingStrategy.EVENT_DRIVEN.name()).intValue());
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultConcurrentTasks(), defaultConcurrentTasks.get(SchedulingStrategy.CRON_DRIVEN.name()).intValue());
final Map<String, String> defaultSchedulingPeriods = schedulingDefaults.getDefaultSchedulingPeriodsBySchedulingStrategy();
@ -103,7 +102,6 @@ class TestRuntimeManifest {
assertTrue(listHdfsDefinition.getTriggerSerially());
assertTrue(listHdfsDefinition.getTriggerWhenEmpty());
assertFalse(listHdfsDefinition.getSupportsBatching());
assertFalse(listHdfsDefinition.getSupportsEventDriven());
assertFalse(listHdfsDefinition.getSideEffectFree());
assertFalse(listHdfsDefinition.getTriggerWhenAnyDestinationAvailable());
assertFalse(listHdfsDefinition.getSupportsDynamicProperties());
@ -221,25 +219,21 @@ class TestRuntimeManifest {
assertEquals(REPORTING_TASK_DEFAULT_SCHEDULE_TIME, prometheusDefaultSchedulingPeriods.get(SchedulingStrategy.TIMER_DRIVEN.name()));
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultSchedulingPeriod(), prometheusDefaultSchedulingPeriods.get(SchedulingStrategy.CRON_DRIVEN.name()));
// Verify JoltTransformRecord which has @EventDriven
final ProcessorDefinition joltTransformDef = getProcessorDefinition(bundles, "nifi-jolt-record-nar",
"org.apache.nifi.processors.jolt.record.JoltTransformRecord");
assertEquals(SchedulingStrategy.TIMER_DRIVEN.name(), joltTransformDef.getDefaultSchedulingStrategy());
final List<String> joltTransformSchedulingStrategies = joltTransformDef.getSupportedSchedulingStrategies();
assertNotNull(joltTransformSchedulingStrategies);
assertEquals(3, joltTransformSchedulingStrategies.size());
assertEquals(2, joltTransformSchedulingStrategies.size());
assertTrue(joltTransformSchedulingStrategies.contains(SchedulingStrategy.TIMER_DRIVEN.name()));
assertTrue(joltTransformSchedulingStrategies.contains(SchedulingStrategy.CRON_DRIVEN.name()));
assertTrue(joltTransformSchedulingStrategies.contains(SchedulingStrategy.EVENT_DRIVEN.name()));
final Map<String, Integer> joltTransformDefaultConcurrentTasks = joltTransformDef.getDefaultConcurrentTasksBySchedulingStrategy();
assertNotNull(joltTransformDefaultConcurrentTasks);
assertEquals(3, joltTransformDefaultConcurrentTasks.size());
assertEquals(2, joltTransformDefaultConcurrentTasks.size());
assertEquals(SchedulingStrategy.TIMER_DRIVEN.getDefaultConcurrentTasks(), joltTransformDefaultConcurrentTasks.get(SchedulingStrategy.TIMER_DRIVEN.name()).intValue());
assertEquals(SchedulingStrategy.CRON_DRIVEN.getDefaultConcurrentTasks(), joltTransformDefaultConcurrentTasks.get(SchedulingStrategy.CRON_DRIVEN.name()).intValue());
assertEquals(SchedulingStrategy.EVENT_DRIVEN.getDefaultConcurrentTasks(), joltTransformDefaultConcurrentTasks.get(SchedulingStrategy.EVENT_DRIVEN.name()).intValue());
final Map<String, String> joltTransformDefaultSchedulingPeriods = listHdfsDefinition.getDefaultSchedulingPeriodBySchedulingStrategy();
assertNotNull(joltTransformDefaultSchedulingPeriods);

View File

@ -33,7 +33,6 @@ import org.apache.accumulo.core.security.ColumnVisibility;
import org.apache.hadoop.io.Text;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -79,7 +78,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "accumulo", "put", "record"})

View File

@ -26,7 +26,6 @@ import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
import org.apache.nifi.accumulo.controllerservices.BaseAccumuloService;
import org.apache.nifi.accumulo.data.KeySchema;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
@ -76,7 +75,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription("Scan the given table and writes result in a flowfile. Value will be represented as UTF-8 Encoded String.")

View File

@ -20,7 +20,6 @@ import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ConflictException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SystemResource;
@ -55,7 +54,6 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
@EventDriven
@Tags({ "azure", "cosmos", "insert", "record", "put" })
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("This processor is a record-aware processor for inserting data into Cosmos DB with Core SQL API. It uses a configured record reader and " +

View File

@ -28,7 +28,6 @@ import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@ -72,7 +71,6 @@ import java.util.regex.Pattern;
@SupportsBatching
@Tags({"cassandra", "cql", "put", "insert", "update", "set"})
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement on a Cassandra 1.x, 2.x, or 3.0.x cluster. "
+ "The content of an incoming FlowFile is expected to be the CQL command to execute. The CQL command may use "

View File

@ -34,7 +34,6 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@ -78,7 +77,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
@Tags({"cassandra", "cql", "select"})
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription("Execute provided Cassandra Query Language (CQL) select query on a Cassandra 1.x, 2.x, or 3.0.x cluster. Query result "
+ "may be converted to Avro or JSON format. Streaming is used so arbitrarily large result sets are supported. This processor can be "

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.cybersecurity;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -56,7 +55,6 @@ import java.util.Objects;
import java.util.Set;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)

View File

@ -18,7 +18,6 @@ package org.apache.nifi.processors.cybersecurity;
import com.idealista.tlsh.exceptions.InsufficientComplexityException;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -54,7 +53,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@ -43,7 +42,6 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = "elasticsearch.query.error", description = "The error message provided by Elasticsearch if there is an error querying the index.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "query", "read", "get", "json"})
@CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " +
"Elasticsearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " +

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
@ -46,7 +45,6 @@ import java.util.List;
@WritesAttribute(attribute = "elasticsearch.query.error", description = "The error message provided by Elasticsearch if there is an error querying the index.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@Tags({"elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "query", "scroll", "page", "read", "json"})
@CapabilityDescription("A processor that allows the user to run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
"It will use the flowfile's content for the query unless the QUERY attribute is populated. " +

View File

@ -36,7 +36,6 @@ import javax.mail.internet.MimeMessage;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.mail.util.MimeMessageParser;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -59,9 +58,7 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
@SupportsBatching
@EventDriven
@SideEffectFree
@Tags({"split", "email"})
@InputRequirement(Requirement.INPUT_REQUIRED)

View File

@ -39,7 +39,6 @@ import javax.mail.internet.MimeMessage;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.mail.util.MimeMessageParser;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -62,7 +61,6 @@ import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching
@EventDriven
@SideEffectFree
@Tags({"split", "email"})
@InputRequirement(Requirement.INPUT_REQUIRED)

View File

@ -29,7 +29,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -54,7 +53,6 @@ import org.apache.poi.hmef.HMEFMessage;
@SupportsBatching
@EventDriven
@SideEffectFree
@Tags({"split", "email"})
@InputRequirement(Requirement.INPUT_REQUIRED)

View File

@ -22,7 +22,6 @@ import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.CityResponse;
import com.maxmind.geoip2.record.Subdivision;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -44,7 +43,6 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"geo", "enrich", "ip", "maxmind"})

View File

@ -20,7 +20,6 @@ import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.IspResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -41,7 +40,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"ISP", "enrich", "ip", "maxmind"})

View File

@ -37,7 +37,6 @@ import javax.naming.directory.InitialDirContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -54,8 +53,6 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"dns", "enrich", "ip"})

View File

@ -20,7 +20,6 @@ package org.apache.nifi.processors.enrich;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.net.whois.WhoisClient;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -49,7 +48,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"whois", "enrich", "ip"})

View File

@ -18,7 +18,6 @@
package org.apache.nifi.processors.evtx;
import com.google.common.annotations.VisibleForTesting;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@ -55,7 +54,6 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@SideEffectFree
@EventDriven
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"logs", "windows", "event", "evtx", "message", "file"})

View File

@ -27,7 +27,6 @@ import javax.xml.bind.annotation.XmlType;
public class ControllerConfigurationDTO {
private Integer maxTimerDrivenThreadCount;
private Integer maxEventDrivenThreadCount;
/**
* @return maximum number of timer driven threads this NiFi has available
@ -43,17 +42,4 @@ public class ControllerConfigurationDTO {
this.maxTimerDrivenThreadCount = maxTimerDrivenThreadCount;
}
/**
* @return maximum number of event driven thread this NiFi has available
*/
@ApiModelProperty(
value = "The maximum number of event driven threads the NiFi has available."
)
public Integer getMaxEventDrivenThreadCount() {
return maxEventDrivenThreadCount;
}
public void setMaxEventDrivenThreadCount(Integer maxEventDrivenThreadCount) {
this.maxEventDrivenThreadCount = maxEventDrivenThreadCount;
}
}

View File

@ -79,12 +79,12 @@ public class ProcessorConfigDTO {
}
/**
* Indicates whether the processor should be scheduled to run in event-driven mode or timer-driven mode
* Indicates how the processor should be scheduled to run
*
* @return scheduling strategy
*/
@ApiModelProperty(
value = "Indcates whether the prcessor should be scheduled to run in event or timer driven mode."
value = "Indicates how the processor should be scheduled to run."
)
public String getSchedulingStrategy() {
return schedulingStrategy;

View File

@ -40,7 +40,6 @@ public class ProcessorDTO extends ComponentDTO {
private List<RelationshipDTO> relationships;
private String description;
private Boolean supportsParallelProcessing;
private Boolean supportsEventDriven;
private Boolean supportsBatching;
private Boolean supportsSensitiveDynamicProperties;
private Boolean persistsState;
@ -249,20 +248,6 @@ public class ProcessorDTO extends ComponentDTO {
this.inputRequirement = inputRequirement;
}
/**
* @return whether this processor supports event driven scheduling
*/
@ApiModelProperty(
value = "Whether the processor supports event driven scheduling."
)
public Boolean getSupportsEventDriven() {
return supportsEventDriven;
}
public void setSupportsEventDriven(Boolean supportsEventDriven) {
this.supportsEventDriven = supportsEventDriven;
}
/**
* @return whether this processor supports batching
*/

View File

@ -26,7 +26,6 @@ public class JVMControllerDiagnosticsSnapshotDTO implements Cloneable {
private Boolean primaryNode;
private Boolean clusterCoordinator;
private Integer maxTimerDrivenThreads;
private Integer maxEventDrivenThreads;
@ApiModelProperty("Whether or not this node is primary node")
public Boolean getPrimaryNode() {
@ -55,21 +54,11 @@ public class JVMControllerDiagnosticsSnapshotDTO implements Cloneable {
this.maxTimerDrivenThreads = maxTimerDrivenThreads;
}
@ApiModelProperty("The maximum number of event-driven threads")
public Integer getMaxEventDrivenThreads() {
return maxEventDrivenThreads;
}
public void setMaxEventDrivenThreads(Integer maxEventDrivenThreads) {
this.maxEventDrivenThreads = maxEventDrivenThreads;
}
@Override
public JVMControllerDiagnosticsSnapshotDTO clone() {
final JVMControllerDiagnosticsSnapshotDTO clone = new JVMControllerDiagnosticsSnapshotDTO();
clone.clusterCoordinator = clusterCoordinator;
clone.primaryNode = primaryNode;
clone.maxEventDrivenThreads = maxEventDrivenThreads;
clone.maxTimerDrivenThreads = maxTimerDrivenThreads;
return clone;
}

View File

@ -31,7 +31,6 @@ public class JVMFlowDiagnosticsSnapshotDTO implements Cloneable {
private String uptime;
private String timeZone;
private Integer activeTimerDrivenThreads;
private Integer activeEventDrivenThreads;
private Set<BundleDTO> bundlesLoaded;
@ApiModelProperty("How long this node has been running, formatted as hours:minutes:seconds.milliseconds")
@ -62,15 +61,6 @@ public class JVMFlowDiagnosticsSnapshotDTO implements Cloneable {
this.activeTimerDrivenThreads = activeTimerDrivenThreads;
}
@ApiModelProperty("The number of event-driven threads that are active")
public Integer getActiveEventDrivenThreads() {
return activeEventDrivenThreads;
}
public void setActiveEventDrivenThreads(Integer activeEventDrivenThreads) {
this.activeEventDrivenThreads = activeEventDrivenThreads;
}
@ApiModelProperty("The NiFi Bundles (NARs) that are loaded by NiFi")
public Set<BundleDTO> getBundlesLoaded() {
return bundlesLoaded;
@ -83,7 +73,6 @@ public class JVMFlowDiagnosticsSnapshotDTO implements Cloneable {
@Override
public JVMFlowDiagnosticsSnapshotDTO clone() {
final JVMFlowDiagnosticsSnapshotDTO clone = new JVMFlowDiagnosticsSnapshotDTO();
clone.activeEventDrivenThreads = activeEventDrivenThreads;
clone.activeTimerDrivenThreads = activeTimerDrivenThreads;
clone.bundlesLoaded = bundlesLoaded == null ? null : new HashSet<>(bundlesLoaded);
clone.timeZone = timeZone;

View File

@ -737,7 +737,6 @@ public class StatusMerger {
return;
}
target.setMaxEventDrivenThreads(add(target.getMaxEventDrivenThreads(), toMerge.getMaxEventDrivenThreads()));
target.setMaxTimerDrivenThreads(add(target.getMaxTimerDrivenThreads(), toMerge.getMaxTimerDrivenThreads()));
target.setClusterCoordinator(null);
target.setPrimaryNode(null);
@ -748,7 +747,6 @@ public class StatusMerger {
return;
}
target.setActiveEventDrivenThreads(add(target.getActiveEventDrivenThreads(), toMerge.getActiveEventDrivenThreads()));
target.setActiveTimerDrivenThreads(add(target.getActiveTimerDrivenThreads(), toMerge.getActiveTimerDrivenThreads()));
target.setBundlesLoaded(null);
target.setUptime(null);

View File

@ -128,24 +128,24 @@ class StandardHttpResponseMapperSpec extends Specification {
expectedEntity
'nifi-api/controller/config' | 'get' | [
new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
component: new ControllerConfigurationDTO(maxTimerDrivenThreadCount: 10)),
new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
component: new ControllerConfigurationDTO(maxTimerDrivenThreadCount: 10)),
new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))] ||
component: new ControllerConfigurationDTO(maxTimerDrivenThreadCount: 10))] ||
// expectedEntity
new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))
component: new ControllerConfigurationDTO(maxTimerDrivenThreadCount: 10))
'nifi-api/controller/config' | 'put' | [
new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
component: new ControllerConfigurationDTO(maxTimerDrivenThreadCount: 10)),
new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10)),
component: new ControllerConfigurationDTO(maxTimerDrivenThreadCount: 10)),
new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: true),
component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))] ||
component: new ControllerConfigurationDTO(maxTimerDrivenThreadCount: 10))] ||
// expectedEntity
new ControllerConfigurationEntity(permissions: new PermissionsDTO(canRead: true, canWrite: false),
component: new ControllerConfigurationDTO(maxEventDrivenThreadCount: 10, maxTimerDrivenThreadCount: 10))
component: new ControllerConfigurationDTO(maxTimerDrivenThreadCount: 10))
"nifi-api/process-groups/${UUID.randomUUID()}/connections" | 'get' | [
new ConnectionsEntity(connections: [new ConnectionEntity(id: '1', permissions: new PermissionsDTO(canRead: true, canWrite: true), status: new
ConnectionStatusDTO(aggregateSnapshot: new ConnectionStatusSnapshotDTO(bytesIn: 300)), component: new ConnectionDTO())] as Set),

View File

@ -28,7 +28,6 @@ import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
@ -38,7 +37,6 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.scheduling.SchedulingStrategy;
import java.util.ArrayList;
import java.util.Collection;
@ -58,7 +56,7 @@ import java.util.stream.Collectors;
* one or more relationships that map the source component to the destination
* component.
*/
public final class StandardConnection implements Connection, ConnectionEventListener {
public final class StandardConnection implements Connection {
public static final long DEFAULT_Z_INDEX = 0;
private final String id;
@ -86,7 +84,7 @@ public final class StandardConnection implements Connection, ConnectionEventList
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
scheduler = builder.scheduler;
flowFileQueue = builder.flowFileQueueFactory.createFlowFileQueue(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null, this, processGroup.get());
flowFileQueue = builder.flowFileQueueFactory.createFlowFileQueue(LoadBalanceStrategy.DO_NOT_LOAD_BALANCE, null, processGroup.get());
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
}
@ -146,20 +144,6 @@ public final class StandardConnection implements Connection, ConnectionEventList
};
}
@Override
public void triggerDestinationEvent() {
if (getDestination().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
scheduler.registerEvent(getDestination());
}
}
@Override
public void triggerSourceEvent() {
if (getSource().getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
scheduler.registerEvent(getSource());
}
}
@Override
public Authorizable getSourceAuthorizable() {
final Connectable sourceConnectable = getSource();
@ -322,9 +306,7 @@ public final class StandardConnection implements Connection, ConnectionEventList
previousDestination.removeConnection(this);
this.destination.set(newDestination);
getSource().updateConnection(this);
newDestination.addConnection(this);
scheduler.registerEvent(newDestination);
} catch (final RuntimeException e) {
this.destination.set(previousDestination);
throw e;

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.controller;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -38,7 +37,6 @@ public class ProcessorDetails {
private final boolean sideEffectFree;
private final boolean triggeredSerially;
private final boolean triggerWhenAnyDestinationAvailable;
private final boolean eventDrivenSupported;
private final boolean batchSupported;
private final boolean executionNodeRestricted;
private final InputRequirement.Requirement inputRequirement;
@ -56,7 +54,6 @@ public class ProcessorDetails {
this.batchSupported = procClass.isAnnotationPresent(SupportsBatching.class);
this.triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class);
this.triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class);
this.eventDrivenSupported = procClass.isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty;
this.executionNodeRestricted = procClass.isAnnotationPresent(PrimaryNodeOnly.class);
final boolean inputRequirementPresent = procClass.isAnnotationPresent(InputRequirement.class);
@ -91,10 +88,6 @@ public class ProcessorDetails {
return triggerWhenAnyDestinationAvailable;
}
public boolean isEventDrivenSupported() {
return eventDrivenSupported;
}
public boolean isBatchSupported() {
return batchSupported;
}

View File

@ -478,11 +478,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
return timeUnit.convert(schedulingNanos.get(), TimeUnit.NANOSECONDS);
}
@Override
public boolean isEventDrivenSupported() {
return processorRef.get().isEventDrivenSupported();
}
/**
* Updates the Scheduling Strategy used for this Processor
*
@ -495,15 +490,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
*/
@Override
public synchronized void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) {
if (schedulingStrategy == SchedulingStrategy.EVENT_DRIVEN && !processorRef.get().isEventDrivenSupported()) {
// not valid. Just ignore it. We don't throw an Exception because if
// a developer changes a Processor so that
// it no longer supports EventDriven mode, we don't want the app to
// fail to startup if it was already in Event-Driven
// Mode. Instead, we will simply leave it in Timer-Driven mode
return;
}
this.schedulingStrategy = schedulingStrategy;
}
@ -664,9 +650,8 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
throw new IllegalStateException("Cannot modify configuration of " + this + " while the Processor is running");
}
if (taskCount < 1 && getSchedulingStrategy() != SchedulingStrategy.EVENT_DRIVEN) {
throw new IllegalArgumentException("Cannot set Concurrent Tasks to " + taskCount + " for component "
+ this + " because Scheduling Strategy is not Event Driven");
if (taskCount < 1) {
throw new IllegalArgumentException("Cannot set Concurrent Tasks to " + taskCount + " for component " + this);
}
if (!isTriggeredSerially()) {
@ -1278,7 +1263,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
}
break;
case EVENT_DRIVEN:
default:
return results;
}

View File

@ -1,34 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.queue;
public interface ConnectionEventListener {
void triggerSourceEvent();
void triggerDestinationEvent();
ConnectionEventListener NOP_EVENT_LISTENER = new ConnectionEventListener() {
@Override
public void triggerSourceEvent() {
}
@Override
public void triggerDestinationEvent() {
}
};
}

View File

@ -20,5 +20,5 @@ package org.apache.nifi.controller.queue;
import org.apache.nifi.groups.ProcessGroup;
public interface FlowFileQueueFactory {
FlowFileQueue createFlowFileQueue(LoadBalanceStrategy loadBalanceStrategy, String partitioningAttribute, ConnectionEventListener connectionEventListener, ProcessGroup processGroup);
FlowFileQueue createFlowFileQueue(LoadBalanceStrategy loadBalanceStrategy, String partitioningAttribute, ProcessGroup processGroup);
}

View File

@ -17,7 +17,6 @@
package org.apache.nifi.controller;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.service.ControllerServiceNode;
@ -196,13 +195,6 @@ public interface ProcessScheduler {
*/
boolean isScheduled(Object scheduled);
/**
* Registers a relevant event for an Event-Driven worker
*
* @param worker to register
*/
void registerEvent(Connectable worker);
/**
* Notifies the ProcessScheduler of how many threads are available to use
* for the given {@link SchedulingStrategy}

View File

@ -73,8 +73,6 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con
public abstract boolean isTriggeredSerially();
public abstract boolean isEventDrivenSupported();
public abstract boolean isExecutionNodeRestricted();
public abstract Requirement getInputRequirement();

View File

@ -30,7 +30,6 @@ import java.util.Set;
public class VersionedDataflow {
private VersionedFlowEncodingVersion encodingVersion;
private int maxTimerDrivenThreadCount;
private int maxEventDrivenThreadCount;
private List<VersionedFlowRegistryClient> registries;
private List<VersionedParameterContext> parameterContexts;
private List<VersionedParameterProvider> parameterProviders;
@ -39,8 +38,6 @@ public class VersionedDataflow {
private Set<VersionedTemplate> templates;
private VersionedProcessGroup rootGroup;
private final static int DEFAULT_MAX_EVENT_DRIVEN_THREAD_COUNT = 1;
public VersionedFlowEncodingVersion getEncodingVersion() {
return encodingVersion;
}
@ -57,14 +54,6 @@ public class VersionedDataflow {
this.maxTimerDrivenThreadCount = maxTimerDrivenThreadCount;
}
public int getMaxEventDrivenThreadCount() {
return maxEventDrivenThreadCount < 1 ? DEFAULT_MAX_EVENT_DRIVEN_THREAD_COUNT : maxEventDrivenThreadCount;
}
public void setMaxEventDrivenThreadCount(final int maxEventDrivenThreadCount) {
this.maxEventDrivenThreadCount = maxEventDrivenThreadCount;
}
public List<VersionedFlowRegistryClient> getRegistries() {
return registries;
}

View File

@ -1,331 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.Connectables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class EventDrivenWorkerQueue implements WorkerQueue {
private final Object workMonitor = new Object();
private final Map<Connectable, Worker> workerMap = new HashMap<>(); // protected by synchronizing on workMonitor
private final WorkerReadyQueue workerQueue;
public EventDrivenWorkerQueue(final boolean clustered, final boolean primary, final ProcessScheduler scheduler) {
workerQueue = new WorkerReadyQueue(scheduler);
workerQueue.setClustered(clustered);
workerQueue.setPrimary(primary);
}
@Override
public void setClustered(final boolean clustered) {
workerQueue.setClustered(clustered);
}
@Override
public void setPrimary(final boolean primary) {
workerQueue.setPrimary(primary);
}
@Override
public Worker poll(final long timeout, final TimeUnit timeUnit) {
final long maxTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
while (System.currentTimeMillis() < maxTime) {
synchronized (workMonitor) {
final Worker worker = workerQueue.poll();
if (worker == null) {
// nothing to do. wait until we have something to do.
final long timeLeft = maxTime - System.currentTimeMillis();
if (timeLeft <= 0) {
return null;
}
try {
workMonitor.wait(timeLeft);
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
return null;
}
} else {
// Decrement the amount of work there is to do for this worker.
final int workLeft = worker.decrementEventCount();
if (workLeft > 0) {
workerQueue.offer(worker);
}
return worker;
}
}
}
return null;
}
@Override
public void offer(final Connectable connectable) {
synchronized (workMonitor) {
Worker worker = workerMap.get(connectable);
if (worker == null) {
// if worker is null, then it has not been scheduled to run; ignore the event.
return;
}
final int countBefore = worker.incrementEventCount();
if (countBefore < 0) {
worker.setWorkCount(1);
}
if (countBefore <= 0) {
// If countBefore > 0 then it's already on the queue, so just incrementing its counter is sufficient.
workerQueue.offer(worker);
}
workMonitor.notify();
}
}
private int getWorkCount(final Connectable connectable) {
int sum = 0;
for (final Connection connection : connectable.getIncomingConnections()) {
sum += connection.getFlowFileQueue().size().getObjectCount();
}
return sum;
}
@Override
public void resumeWork(final Connectable connectable) {
synchronized (workMonitor) {
final int workCount = getWorkCount(connectable);
final Worker worker = new Worker(connectable);
workerMap.put(connectable, worker);
if (workCount > 0) {
worker.setWorkCount(workCount);
workerQueue.offer(worker);
workMonitor.notify();
}
}
}
@Override
public void suspendWork(final Connectable connectable) {
synchronized (workMonitor) {
final Worker worker = this.workerMap.remove(connectable);
if (worker == null) {
return;
}
worker.resetWorkCount();
workerQueue.remove(worker);
}
}
public static class Worker implements EventBasedWorker {
private final Connectable connectable;
private final AtomicInteger workCount = new AtomicInteger(0);
public Worker(final Connectable connectable) {
this.connectable = connectable;
}
@Override
public Connectable getConnectable() {
return connectable;
}
@Override
public int decrementEventCount() {
return workCount.decrementAndGet();
}
@Override
public int incrementEventCount() {
return workCount.getAndIncrement();
}
void resetWorkCount() {
workCount.set(0);
}
void setWorkCount(final int workCount) {
this.workCount.set(workCount);
}
}
@SuppressWarnings("serial")
private static class WorkerReadyQueue extends LinkedList<Worker> {
private final ProcessScheduler scheduler;
private volatile boolean clustered = false;
private volatile boolean primary = false;
public WorkerReadyQueue(final ProcessScheduler scheduler) {
this.scheduler = scheduler;
}
public void setClustered(final boolean clustered) {
this.clustered = clustered;
}
public void setPrimary(final boolean primary) {
this.primary = primary;
}
@Override
public Worker poll() {
final List<Worker> putBack = new ArrayList<>();
Worker worker;
try {
while ((worker = super.poll()) != null) {
final DelayProcessingReason reason = getDelayReason(worker);
if (reason == null) {
return worker;
} else {
// Worker is not ready. We may want to add him back to the queue, depending on the reason that he is unready.
switch (reason) {
case YIELDED:
case ISOLATED:
case DESTINATION_FULL:
case ALL_WORK_PENALIZED:
case NO_WORK:
case TOO_MANY_THREADS:
// there will not be an event that triggers this to happen, so we add this worker back to the queue.
putBack.add(worker);
break;
default:
case NOT_RUNNING:
// There's no need to check if this worker is available again until a another event
// occurs. Therefore, we keep him off of the queue and reset his work count
worker.resetWorkCount();
break;
}
}
}
} finally {
if (!putBack.isEmpty()) {
super.addAll(putBack);
}
}
return null;
}
private DelayProcessingReason getDelayReason(final Worker worker) {
final Connectable connectable = worker.getConnectable();
if (ScheduledState.RUNNING != connectable.getScheduledState()) {
return DelayProcessingReason.NOT_RUNNING;
}
if (connectable.getYieldExpiration() > System.currentTimeMillis()) {
return DelayProcessingReason.YIELDED;
}
// For Remote Output Ports,
int availableRelationshipCount = 0;
if (!connectable.getRelationships().isEmpty()) {
availableRelationshipCount = getAvailableRelationshipCount(connectable);
if (availableRelationshipCount == 0) {
return DelayProcessingReason.DESTINATION_FULL;
}
}
if (connectable.hasIncomingConnection() && !Connectables.flowFilesQueued(connectable)) {
return DelayProcessingReason.NO_WORK;
}
final int activeThreadCount = scheduler.getActiveThreadCount(worker.getConnectable());
final int maxThreadCount = worker.getConnectable().getMaxConcurrentTasks();
if (maxThreadCount > 0 && activeThreadCount >= maxThreadCount) {
return DelayProcessingReason.TOO_MANY_THREADS;
}
if (connectable instanceof ProcessorNode) {
final ProcessorNode procNode = (ProcessorNode) connectable;
if (procNode.isIsolated() && clustered && !primary) {
return DelayProcessingReason.ISOLATED;
}
final boolean triggerWhenAnyAvailable = procNode.isTriggerWhenAnyDestinationAvailable();
final boolean allDestinationsAvailable = availableRelationshipCount == procNode.getRelationships().size();
if (!triggerWhenAnyAvailable && !allDestinationsAvailable) {
return DelayProcessingReason.DESTINATION_FULL;
}
}
return null;
}
private int getAvailableRelationshipCount(final Connectable connectable) {
int count = 0;
for (final Relationship relationship : connectable.getRelationships()) {
final Collection<Connection> connections = connectable.getConnections(relationship);
if (connections == null || connections.isEmpty()) {
if (connectable.isAutoTerminated(relationship)) {
// If the relationship is auto-terminated, consider it available.
count++;
}
} else {
boolean available = true;
for (final Connection connection : connections) {
if (connection.getSource() == connection.getDestination()) {
// don't count self-loops
continue;
}
if (connection.getFlowFileQueue().isFull()) {
available = false;
}
}
if (available) {
count++;
}
}
}
return count;
}
}
private static enum DelayProcessingReason {
YIELDED,
DESTINATION_FULL,
NO_WORK,
ALL_WORK_PENALIZED,
ISOLATED,
NOT_RUNNING,
TOO_MANY_THREADS;
}
}

View File

@ -51,7 +51,6 @@ import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.StandardConnection;
import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater;
import org.apache.nifi.controller.cluster.Heartbeater;
@ -61,7 +60,6 @@ import org.apache.nifi.controller.flow.StandardFlowManager;
import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
@ -102,7 +100,6 @@ import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
@ -208,7 +205,6 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.concurrency.TimedLock;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.revision.RevisionManager;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
@ -271,10 +267,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
public static final double DEFAULT_POSITION_SCALE_FACTOR_Y = 1.34;
private final AtomicInteger maxTimerDrivenThreads;
private final AtomicInteger maxEventDrivenThreads;
private final AtomicReference<FlowEngine> timerDrivenEngineRef;
private final AtomicReference<FlowEngine> eventDrivenEngineRef;
private final EventDrivenSchedulingAgent eventDrivenSchedulingAgent;
private final ContentRepository contentRepository;
private final FlowFileRepository flowFileRepository;
@ -295,7 +288,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
private final StandardControllerServiceResolver controllerServiceResolver;
private final Authorizer authorizer;
private final AuditService auditService;
private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
private final StatusHistoryRepository statusHistoryRepository;
private final StateManagerProvider stateManagerProvider;
private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
@ -487,7 +479,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
final StatusHistoryRepository statusHistoryRepository) {
maxTimerDrivenThreads = new AtomicInteger(10);
maxEventDrivenThreads = new AtomicInteger(1);
this.encryptor = encryptor;
this.nifiProperties = nifiProperties;
@ -513,7 +504,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
this.sensitiveValueEncoder = new StandardSensitiveValueEncoder(nifiProperties);
timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
final FlowFileRepository flowFileRepo = createFlowFileRepository(nifiProperties, extensionManager, resourceClaimManager);
flowFileRepository = flowFileRepo;
@ -554,7 +544,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, stateManagerProvider, this.nifiProperties);
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
parameterContextManager = new StandardParameterContextManager();
repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider);
@ -580,11 +569,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
flowManager.initialize(controllerServiceProvider, pythonBridge);
eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider, eventDrivenWorkerQueue,
repositoryContextFactory, maxEventDrivenThreads.get(), extensionManager, this);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory);
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, this.nifiProperties);
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
@ -1420,11 +1404,9 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
if (kill) {
this.timerDrivenEngineRef.get().shutdownNow();
this.eventDrivenEngineRef.get().shutdownNow();
LOG.info("Initiated immediate shutdown of flow controller...");
} else {
this.timerDrivenEngineRef.get().shutdown();
this.eventDrivenEngineRef.get().shutdown();
LOG.info("Initiated graceful shutdown of flow controller...waiting up to " + gracefulShutdownSeconds + " seconds");
}
@ -1456,8 +1438,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
try {
this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds, TimeUnit.SECONDS);
} catch (final InterruptedException ie) {
LOG.info("Interrupted while waiting for controller termination.");
}
@ -1468,7 +1449,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
LOG.warn("Unable to shut down FlowFileRepository", t);
}
if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) {
if (this.timerDrivenEngineRef.get().isTerminated()) {
LOG.info("Controller has been terminated successfully.");
} else {
LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that "
@ -1670,14 +1651,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return maxTimerDrivenThreads.get();
}
public int getMaxEventDrivenThreadCount() {
return maxEventDrivenThreads.get();
}
public int getActiveEventDrivenThreadCount() {
return eventDrivenEngineRef.get().getActiveCount();
}
public int getActiveTimerDrivenThreadCount() {
return timerDrivenEngineRef.get().getActiveCount();
}
@ -1691,16 +1664,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
}
public void setMaxEventDrivenThreadCount(final int maxThreadCount) {
writeLock.lock();
try {
setMaxThreadCount(maxThreadCount, "Event Driven", this.eventDrivenEngineRef.get(), this.maxEventDrivenThreads);
processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN, maxThreadCount);
} finally {
writeLock.unlock("setMaxEventDrivenThreadCount");
}
}
/**
* Updates the number of threads that can be simultaneously used for executing processors.
* This method must be called while holding the write lock!
@ -1779,14 +1742,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return provenanceRepository.getContainerFileStoreName(containerName);
}
//
// ProcessGroup access
//
private Position toPosition(final PositionDTO dto) {
return new Position(dto.getX(), dto.getY());
}
//
// Snippet
//
@ -2156,16 +2111,15 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
@Override
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener,
final ProcessGroup processGroup) {
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ProcessGroup processGroup) {
final FlowFileQueue flowFileQueue;
if (clusterCoordinator == null) {
flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
flowFileQueue = new StandardFlowFileQueue(id, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
eventReporter, nifiProperties.getQueueSwapThreshold(),
processGroup.getDefaultFlowFileExpiration(), processGroup.getDefaultBackPressureObjectThreshold(), processGroup.getDefaultBackPressureDataSizeThreshold());
} else {
flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, eventListener, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
flowFileQueue.setFlowFileExpiration(processGroup.getDefaultFlowFileExpiration());
@ -2332,9 +2286,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
public int getActiveThreadCount() {
final int timerDrivenCount = timerDrivenEngineRef.get().getActiveCount();
final int eventDrivenCount = eventDrivenSchedulingAgent.getActiveThreadCount();
return timerDrivenCount + eventDrivenCount;
return timerDrivenEngineRef.get().getActiveCount();
}
@ -2574,7 +2526,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
// mark the new cluster status
this.clustered = clustered;
eventDrivenWorkerQueue.setClustered(clustered);
if (clusterInstanceId != null) {
this.instanceId = clusterInstanceId;
@ -2658,9 +2609,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
processScheduler.submitFrameworkTask(() -> processScheduler.notifyPrimaryNodeStateChange(reportingTaskNode, nodeState) );
}
// update primary
eventDrivenWorkerQueue.setPrimary(primary);
// update the heartbeat bean
final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary));
@ -3268,7 +3216,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
result.setOpenFileHandlers(systemDiagnostics.getOpenFileHandles());
result.setProcessorLoadAverage(systemDiagnostics.getProcessorLoadAverage());
result.setTotalThreads(systemDiagnostics.getTotalThreads());
result.setEventDrivenThreads(getActiveEventDrivenThreadCount());
result.setTimerDrivenThreads(getActiveTimerDrivenThreadCount());
result.setFlowFileRepositoryFreeSpace(systemDiagnostics.getFlowFileRepositoryStorageUsage().getFreeSpace());
result.setFlowFileRepositoryUsedSpace(systemDiagnostics.getFlowFileRepositoryStorageUsage().getUsedSpace());

View File

@ -255,7 +255,6 @@ public class TemplateUtils {
processorDTO.setInputRequirement(null);
processorDTO.setPersistsState(null);
processorDTO.setSupportsBatching(null);
processorDTO.setSupportsEventDriven(null);
processorDTO.setSupportsParallelProcessing(null);
processorDTO.setSupportsSensitiveDynamicProperties(null);
}

View File

@ -397,10 +397,8 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount");
if (maxThreadCount == null) {
controller.setMaxTimerDrivenThreadCount(getInt(rootElement, "maxTimerDrivenThreadCount"));
controller.setMaxEventDrivenThreadCount(getInt(rootElement, "maxEventDrivenThreadCount"));
} else {
controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3);
controller.setMaxEventDrivenThreadCount(maxThreadCount / 3);
}
// get the root group XML element
@ -661,10 +659,8 @@ public class XmlFlowSynchronizer implements FlowSynchronizer {
final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount");
if (maxThreadCount == null) {
controller.setMaxTimerDrivenThreadCount(getInt(rootElement, "maxTimerDrivenThreadCount"));
controller.setMaxEventDrivenThreadCount(getInt(rootElement, "maxEventDrivenThreadCount"));
} else {
controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3);
controller.setMaxEventDrivenThreadCount(maxThreadCount / 3);
}
}

View File

@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.queue;
public class NopConnectionEventListener implements ConnectionEventListener {
@Override
public void triggerSourceEvent() {
}
@Override
public void triggerDestinationEvent() {
}
}

View File

@ -45,14 +45,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class StandardFlowFileQueue extends AbstractFlowFileQueue implements FlowFileQueue {
private final SwappablePriorityQueue queue;
private final ConnectionEventListener eventListener;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private final FlowFileSwapManager swapManager;
private final TimedLock writeLock;
public StandardFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
public StandardFlowFileQueue(final String identifier, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter,
final int swapThreshold, final String expirationPeriod, final long defaultBackPressureObjectThreshold, final String defaultBackPressureDataSizeThreshold) {
@ -60,7 +59,6 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
super.setFlowFileExpiration(expirationPeriod);
this.swapManager = swapManager;
this.queue = new SwappablePriorityQueue(swapManager, swapThreshold, eventReporter, this, this::drop, null);
this.eventListener = eventListener;
writeLock = new TimedLock(this.lock.writeLock(), getIdentifier() + " Write Lock", 100);
@ -112,15 +110,11 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
@Override
public void put(final FlowFileRecord file) {
queue.put(file);
eventListener.triggerDestinationEvent();
}
@Override
public void putAll(final Collection<FlowFileRecord> files) {
queue.putAll(files);
eventListener.triggerDestinationEvent();
}
@ -142,15 +136,11 @@ public class StandardFlowFileQueue extends AbstractFlowFileQueue implements Flow
@Override
public void acknowledge(final FlowFileRecord flowFile) {
queue.acknowledge(flowFile);
eventListener.triggerSourceEvent();
}
@Override
public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
queue.acknowledge(flowFiles);
eventListener.triggerSourceEvent();
}
@Override

View File

@ -24,7 +24,6 @@ import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.queue.AbstractFlowFileQueue;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.status.FlowFileAvailability;
@ -103,7 +102,6 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
.thenComparing(NodeIdentifier::getLoadBalancePort);
private final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
private final ConnectionEventListener eventListener;
private final AtomicReference<QueueSize> totalSize = new AtomicReference<>(new QueueSize(0, 0L));
private final LocalQueuePartition localPartition;
private final RebalancingPartition rebalancingPartition;
@ -126,13 +124,12 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
private volatile boolean offloaded = false;
public SocketLoadBalancedFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final ProcessScheduler scheduler, final FlowFileRepository flowFileRepo,
public SocketLoadBalancedFlowFileQueue(final String identifier, final ProcessScheduler scheduler, final FlowFileRepository flowFileRepo,
final ProvenanceEventRepository provRepo, final ContentRepository contentRepo, final ResourceClaimManager resourceClaimManager,
final ClusterCoordinator clusterCoordinator, final AsyncLoadBalanceClientRegistry clientRegistry, final FlowFileSwapManager swapManager,
final int swapThreshold, final EventReporter eventReporter) {
super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager);
this.eventListener = eventListener;
this.eventReporter = eventReporter;
this.swapManager = swapManager;
this.flowFileRepo = flowFileRepo;
@ -827,7 +824,6 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
partitionReadLock.unlock();
}
eventListener.triggerDestinationEvent();
return partition;
}
@ -893,8 +889,6 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
return partitionMap;
} finally {
partitionReadLock.unlock();
eventListener.triggerDestinationEvent();
}
}
@ -984,7 +978,6 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
localPartition.acknowledge(flowFile);
adjustSize(-1, -flowFile.getSize());
eventListener.triggerSourceEvent();
}
@Override
@ -995,8 +988,6 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple
final long bytes = flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum();
adjustSize(-flowFiles.size(), -bytes);
}
eventListener.triggerSourceEvent();
}
@Override

View File

@ -31,7 +31,6 @@ import java.util.concurrent.Future;
* {@link LifecycleState#setScheduled(boolean)} with value 'true' will be
* invoked.
*
* @see EventDrivenSchedulingAgent
* @see TimerDrivenSchedulingAgent
* @see QuartzSchedulingAgent
*/

View File

@ -1,392 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.scheduling;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.EventBasedWorker;
import org.apache.nifi.controller.EventDrivenWorkerQueue;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.lifecycle.TaskTerminationAwareStateManager;
import org.apache.nifi.controller.repository.ActiveProcessSessionFactory;
import org.apache.nifi.controller.repository.BatchingSessionFactory;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.logging.StandardLoggingContext;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.Connectables;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
private final ControllerServiceProvider serviceProvider;
private final StateManagerProvider stateManagerProvider;
private final EventDrivenWorkerQueue workerQueue;
private final RepositoryContextFactory contextFactory;
private final AtomicInteger maxThreadCount;
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
private final ExtensionManager extensionManager;
private final NodeTypeProvider nodeTypeProvider;
private volatile String adminYieldDuration = "1 sec";
private final ConcurrentMap<Connectable, AtomicLong> connectionIndexMap = new ConcurrentHashMap<>();
private final ConcurrentMap<Connectable, LifecycleState> scheduleStates = new ConcurrentHashMap<>();
public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider,
final EventDrivenWorkerQueue workerQueue, final RepositoryContextFactory contextFactory, final int maxThreadCount,
final ExtensionManager extensionManager, final NodeTypeProvider nodeTypeProvider) {
super(flowEngine);
this.serviceProvider = serviceProvider;
this.stateManagerProvider = stateManagerProvider;
this.workerQueue = workerQueue;
this.contextFactory = contextFactory;
this.maxThreadCount = new AtomicInteger(maxThreadCount);
this.extensionManager = extensionManager;
this.nodeTypeProvider = nodeTypeProvider;
for (int i = 0; i < maxThreadCount; i++) {
final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount);
flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS);
}
}
public int getActiveThreadCount() {
return activeThreadCount.get();
}
private StateManager getStateManager(final String componentId) {
return stateManagerProvider.getStateManager(componentId);
}
@Override
public void shutdown() {
flowEngine.shutdown();
}
@Override
public void doSchedule(final ReportingTaskNode taskNode, LifecycleState scheduleState) {
throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode");
}
@Override
public void doUnschedule(ReportingTaskNode taskNode, LifecycleState scheduleState) {
throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode");
}
@Override
public void doSchedule(final Connectable connectable, final LifecycleState scheduleState) {
workerQueue.resumeWork(connectable);
logger.info("Scheduled {} to run in Event-Driven mode", connectable);
scheduleStates.put(connectable, scheduleState);
}
@Override
protected void doScheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable<Future<Void>> stopCallback) {
throw new UnsupportedOperationException();
}
@Override
public void doUnschedule(final Connectable connectable, final LifecycleState scheduleState) {
workerQueue.suspendWork(connectable);
logger.info("Stopped scheduling {} to run", connectable);
}
@Override
public void onEvent(final Connectable connectable) {
workerQueue.offer(connectable);
}
@Override
public void setMaxThreadCount(final int maxThreadCount) {
final int oldMax = this.maxThreadCount.getAndSet(maxThreadCount);
if (maxThreadCount > oldMax) {
// if more threads have been allocated, add more tasks to the work queue
final int tasksToAdd = maxThreadCount - oldMax;
for (int i = 0; i < tasksToAdd; i++) {
final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount);
flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS);
}
}
}
@Override
public void incrementMaxThreadCount(int toAdd) {
}
@Override
public void setAdministrativeYieldDuration(final String yieldDuration) {
this.adminYieldDuration = yieldDuration;
}
@Override
public String getAdministrativeYieldDuration() {
return adminYieldDuration;
}
@Override
public long getAdministrativeYieldDuration(final TimeUnit timeUnit) {
return FormatUtils.getTimeDuration(adminYieldDuration, timeUnit);
}
private class EventDrivenTask implements Runnable {
private final EventDrivenWorkerQueue workerQueue;
private final AtomicInteger activeThreadCount;
public EventDrivenTask(final EventDrivenWorkerQueue workerQueue, final AtomicInteger activeThreadCount) {
this.workerQueue = workerQueue;
this.activeThreadCount = activeThreadCount;
}
@Override
public void run() {
while (!flowEngine.isShutdown()) {
final EventBasedWorker worker = workerQueue.poll(1, TimeUnit.SECONDS);
if (worker == null) {
continue;
}
final Connectable connectable = worker.getConnectable();
final LifecycleState scheduleState = scheduleStates.get(connectable);
if (scheduleState == null) {
// Component not yet scheduled to run but has received events
continue;
}
activeThreadCount.incrementAndGet();
try {
// get the connection index for this worker
AtomicLong connectionIndex = connectionIndexMap.get(connectable);
if (connectionIndex == null) {
connectionIndex = new AtomicLong(0L);
final AtomicLong existingConnectionIndex = connectionIndexMap.putIfAbsent(connectable, connectionIndex);
if (existingConnectionIndex != null) {
connectionIndex = existingConnectionIndex;
}
}
final RepositoryContext context = contextFactory.newProcessContext(connectable, connectionIndex);
if (connectable instanceof ProcessorNode) {
final ProcessorNode procNode = (ProcessorNode) connectable;
final StateManager stateManager = new TaskTerminationAwareStateManager(getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
final StandardProcessContext standardProcessContext = new StandardProcessContext(
procNode, serviceProvider, stateManager, scheduleState::isTerminated, nodeTypeProvider);
final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
final ProcessSessionFactory sessionFactory;
final StandardProcessSession rawSession;
final boolean batch;
if (procNode.isSessionBatchingSupported() && runNanos > 0L) {
rawSession = new StandardProcessSession(context, scheduleState::isTerminated, new NopPerformanceTracker());
sessionFactory = new BatchingSessionFactory(rawSession);
batch = true;
} else {
rawSession = null;
sessionFactory = new StandardProcessSessionFactory(context, scheduleState::isTerminated, new NopPerformanceTracker());
batch = false;
}
final ActiveProcessSessionFactory activeSessionFactory = new WeakHashMapProcessSessionFactory(sessionFactory);
final long startNanos = System.nanoTime();
final long finishNanos = startNanos + runNanos;
int invocationCount = 0;
boolean shouldRun = true;
try {
while (shouldRun) {
trigger(procNode, context, scheduleState, standardProcessContext, activeSessionFactory);
invocationCount++;
if (!batch) {
break;
}
if (System.nanoTime() > finishNanos) {
break;
}
if (!scheduleState.isScheduled()) {
break;
}
final int eventCount = worker.decrementEventCount();
if (eventCount < 0) {
worker.incrementEventCount();
}
shouldRun = (eventCount > 0);
}
} finally {
if (batch && rawSession != null) {
try {
rawSession.commitAsync();
} catch (final RuntimeException re) {
logger.error("Unable to commit process session", re);
}
}
try {
final long processingNanos = System.nanoTime() - startNanos;
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent();
procEvent.setProcessingNanos(processingNanos);
procEvent.setInvocations(invocationCount);
context.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier());
} catch (final IOException e) {
logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString());
logger.error("", e);
}
}
// If the Processor has FlowFiles, go ahead and register it to run again.
// We do this because it's possible (and fairly common) for a Processor to be triggered and then determine,
// for whatever reason, that it is not ready to do anything and as a result simply returns without pulling anything
// off of its input queue.
// In this case, we could just say that the Processor shouldn't be Event-Driven, but then it becomes very complex and
// confusing to determine whether or not a Processor is really Event-Driven. So, the solution that we will use at this
// point is to register the Processor to run again.
if (Connectables.flowFilesQueued(procNode)) {
onEvent(procNode);
}
} else {
final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context, scheduleState::isTerminated, new NopPerformanceTracker());
final ActiveProcessSessionFactory activeSessionFactory = new WeakHashMapProcessSessionFactory(sessionFactory);
final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, getStateManager(connectable.getIdentifier()));
trigger(connectable, scheduleState, connectableProcessContext, activeSessionFactory);
// See explanation above for the ProcessorNode as to why we do this.
if (Connectables.flowFilesQueued(connectable)) {
onEvent(connectable);
}
}
} finally {
activeThreadCount.decrementAndGet();
}
}
}
private void trigger(final Connectable worker, final LifecycleState scheduleState, final ConnectableProcessContext processContext, final ActiveProcessSessionFactory sessionFactory) {
final int newThreadCount = scheduleState.incrementActiveThreadCount(sessionFactory);
if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
// its possible that the worker queue could give us a worker node that is eligible to run based
// on the number of threads but another thread has already incremented the thread count, result in
// reaching the maximum number of threads. we won't know this until we atomically increment the thread count
// on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
// result in using more than the maximum number of defined threads
scheduleState.decrementActiveThreadCount();
return;
}
try {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(extensionManager, worker.getClass(), worker.getIdentifier())) {
worker.onTrigger(processContext, sessionFactory);
} catch (final ProcessException pe) {
logger.error("{} failed to process session due to {}", worker, pe.toString());
} catch (final Throwable t) {
logger.error("{} failed to process session due to {}", worker, t.toString());
logger.error("", t);
logger.warn("{} Administratively Pausing for {} due to processing failure: {}", worker, getAdministrativeYieldDuration(), t.toString());
logger.warn("", t);
try {
Thread.sleep(FormatUtils.getTimeDuration(adminYieldDuration, TimeUnit.MILLISECONDS));
} catch (final InterruptedException e) {
}
}
} finally {
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, worker.getClass(), worker.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker, processContext);
}
}
scheduleState.decrementActiveThreadCount();
}
}
private void trigger(final ProcessorNode worker, final RepositoryContext context, final LifecycleState scheduleState,
final StandardProcessContext processContext, final ActiveProcessSessionFactory sessionFactory) {
final int newThreadCount = scheduleState.incrementActiveThreadCount(sessionFactory);
if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
// its possible that the worker queue could give us a worker node that is eligible to run based
// on the number of threads but another thread has already incremented the thread count, result in
// reaching the maximum number of threads. we won't know this until we atomically increment the thread count
// on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
// result in using more than the maximum number of defined threads
scheduleState.decrementActiveThreadCount();
return;
}
try {
try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(extensionManager, worker.getProcessor().getClass(), worker.getIdentifier())) {
worker.onTrigger(processContext, sessionFactory);
} catch (final ProcessException pe) {
final ComponentLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor(), new StandardLoggingContext(worker));
procLog.error("Failed to process session due to {}", new Object[]{pe});
} catch (final Throwable t) {
// Use ComponentLog to log the event so that a bulletin will be created for this processor
final ComponentLog procLog = new SimpleProcessLogger(worker.getIdentifier(), worker.getProcessor(), new StandardLoggingContext(worker));
procLog.error("{} failed to process session due to {}", new Object[]{worker.getProcessor(), t});
procLog.warn("Processor Administratively Yielded for {} due to processing failure", new Object[]{adminYieldDuration});
logger.warn("Administratively Yielding {} due to uncaught Exception: ", worker.getProcessor());
logger.warn("", t);
worker.yield(FormatUtils.getTimeDuration(adminYieldDuration, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
}
} finally {
// if the processor is no longer scheduled to run and this is the last thread,
// invoke the OnStopped methods
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(extensionManager, worker.getProcessor().getClass(), worker.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker.getProcessor(), processContext);
}
}
scheduleState.decrementActiveThreadCount();
}
}
}
}

View File

@ -516,11 +516,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
// that gave very bad results.
}
@Override
public void registerEvent(final Connectable worker) {
getSchedulingAgent(worker).onEvent(worker);
}
@Override
public int getActiveThreadCount(final Object scheduled) {
return getLifecycleState(scheduled, false).getActiveThreadCount();

View File

@ -98,7 +98,6 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
rootNode.setAttribute("encoding-version", MAX_ENCODING_VERSION);
doc.appendChild(rootNode);
addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
final Element registriesElement = doc.createElement("registries");
rootNode.appendChild(registriesElement);

View File

@ -82,7 +82,6 @@ public class VersionedDataflowMapper {
final VersionedDataflow dataflow = new VersionedDataflow();
dataflow.setEncodingVersion(ENCODING_VERSION);
dataflow.setMaxTimerDrivenThreadCount(flowController.getMaxTimerDrivenThreadCount());
dataflow.setMaxEventDrivenThreadCount(flowController.getMaxEventDrivenThreadCount());
dataflow.setControllerServices(mapControllerServices());
dataflow.setParameterContexts(mapParameterContexts());
dataflow.setRegistries(mapRegistries());

View File

@ -371,7 +371,6 @@ public class VersionedFlowSynchronizer implements FlowSynchronizer {
if (versionedFlow != null) {
controller.setMaxTimerDrivenThreadCount(versionedFlow.getMaxTimerDrivenThreadCount());
controller.setMaxEventDrivenThreadCount(versionedFlow.getMaxEventDrivenThreadCount());
ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
final Map<String, VersionedParameterContext> versionedParameterContextMap = new HashMap<>();

View File

@ -92,12 +92,6 @@ public enum NodeStatusDescriptor {
"The current number of live threads in the Java virtual machine (both daemon and non-daemon threads).",
MetricDescriptor.Formatter.COUNT,
s -> s.getTotalThreads()),
EVENT_DRIVEN_THREADS(
"eventDrivenThreads",
"Number of event driven threads",
"The current number of active threads in the event driven thread pool.",
MetricDescriptor.Formatter.COUNT,
s -> s.getEventDrivenThreads()),
TIME_DRIVEN_THREADS(
"timeDrivenThreads",
"Number of time driven threads",

View File

@ -69,8 +69,7 @@ public class QuestDbNodeStatusStorage implements NodeStatusStorage {
METRICS.put(6, NodeStatusDescriptor.OPEN_FILE_HANDLES.getDescriptor());
METRICS.put(7, NodeStatusDescriptor.PROCESSOR_LOAD_AVERAGE.getDescriptor());
METRICS.put(8, NodeStatusDescriptor.TOTAL_THREADS.getDescriptor());
METRICS.put(9, NodeStatusDescriptor.EVENT_DRIVEN_THREADS.getDescriptor());
METRICS.put(10, NodeStatusDescriptor.TIME_DRIVEN_THREADS.getDescriptor());
METRICS.put(9, NodeStatusDescriptor.TIME_DRIVEN_THREADS.getDescriptor());
}
private static final QuestDbEntityWritingTemplate<NodeStatus> WRITING_TEMPLATE

View File

@ -27,7 +27,6 @@ import org.apache.nifi.diagnostics.DiagnosticTask;
import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.scheduling.SchedulingStrategy;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
@ -102,11 +101,6 @@ public class DiagnosticAnalysisTask implements DiagnosticTask {
+ ", which is very high. Under most circumstances, this value should not be set above 12-15. This processor is currently " + procNode.getScheduledState().name());
}
if (procNode.getSchedulingStrategy() == SchedulingStrategy.EVENT_DRIVEN) {
details.add(procNode + " is configured with a Scheduling Strategy of Event-Driven. The Event-Driven Scheduling Strategy is experimental and may trigger unexpected behavior, such as " +
"a Processor \"hanging\" or becoming unresponsive.");
}
if (isHighMemoryUtilizer(procNode)) {
final String processorType = procNode.getComponentType();
final int currentCount = highMemTypesToCounts.computeIfAbsent(processorType, k -> 0);

View File

@ -1078,7 +1078,6 @@ public class TestFlowController {
processorDTO.setDescription("description");
processorDTO.setSupportsParallelProcessing(!processorNode.isTriggeredSerially());
processorDTO.setSupportsEventDriven(processorNode.isEventDrivenSupported());
processorDTO.setSupportsBatching(processorNode.isSessionBatchingSupported());
ProcessorConfigDTO configDTO = new ProcessorConfigDTO();
@ -1138,7 +1137,6 @@ public class TestFlowController {
processorDTO.setDescription("description");
processorDTO.setSupportsParallelProcessing(!processorNode.isTriggeredSerially());
processorDTO.setSupportsEventDriven(processorNode.isEventDrivenSupported());
processorDTO.setSupportsBatching(processorNode.isSessionBatchingSupported());
ProcessorConfigDTO configDTO = new ProcessorConfigDTO();
@ -1198,7 +1196,6 @@ public class TestFlowController {
processorDTO.setDescription("description");
processorDTO.setSupportsParallelProcessing(!processorNode.isTriggeredSerially());
processorDTO.setSupportsEventDriven(processorNode.isEventDrivenSupported());
processorDTO.setSupportsBatching(processorNode.isSessionBatchingSupported());
ProcessorConfigDTO configDTO = new ProcessorConfigDTO();

View File

@ -23,7 +23,6 @@ import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.ListFlowFileState;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
@ -99,7 +98,7 @@ public class TestStandardFlowFileQueue {
}
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, "0 sec", 0L, "0 B");
queue = new StandardFlowFileQueue("id", flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, "0 sec", 0L, "0 B");
MockFlowFileRecord.resetIdGenerator();
}
@ -348,7 +347,7 @@ public class TestStandardFlowFileQueue {
@Test
public void testSwapInWhenThresholdIsLessThanSwapSize() {
// create a queue where the swap threshold is less than 10k
queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, "0 sec", 0L, "0 B");
queue = new StandardFlowFileQueue("id", flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, "0 sec", 0L, "0 B");
for (int i = 1; i <= 20000; i++) {
queue.put(new MockFlowFileRecord());

View File

@ -30,7 +30,6 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient;
import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientFactory;
@ -237,7 +236,7 @@ public class LoadBalancedQueueIT {
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(clientRegistry, clusterCoordinator, eventReporter);
final Thread clientThread = new Thread(clientTask);
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
@ -346,7 +345,7 @@ public class LoadBalancedQueueIT {
final Thread clientThread = new Thread(clientTask);
clientThread.setDaemon(true);
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
@ -445,7 +444,7 @@ public class LoadBalancedQueueIT {
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
@ -534,7 +533,7 @@ public class LoadBalancedQueueIT {
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
@ -606,7 +605,7 @@ public class LoadBalancedQueueIT {
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
flowFileQueue.setLoadBalanceCompression(LoadBalanceCompression.COMPRESS_ATTRIBUTES_ONLY);
@ -697,7 +696,7 @@ public class LoadBalancedQueueIT {
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
flowFileQueue.setLoadBalanceCompression(LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
@ -786,7 +785,7 @@ public class LoadBalancedQueueIT {
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
@ -874,7 +873,7 @@ public class LoadBalancedQueueIT {
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
@ -962,7 +961,7 @@ public class LoadBalancedQueueIT {
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
@ -1079,7 +1078,7 @@ public class LoadBalancedQueueIT {
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new FlowFilePartitioner() {
@Override
@ -1185,7 +1184,7 @@ public class LoadBalancedQueueIT {
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());
@ -1254,7 +1253,7 @@ public class LoadBalancedQueueIT {
clientThread.setDaemon(true);
clientThread.start();
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, new NopConnectionEventListener(), processScheduler, clientFlowFileRepo, clientProvRepo,
final SocketLoadBalancedFlowFileQueue flowFileQueue = new SocketLoadBalancedFlowFileQueue(queueId, processScheduler, clientFlowFileRepo, clientProvRepo,
clientContentRepo, resourceClaimManager, clusterCoordinator, clientRegistry, flowFileSwapManager, swapThreshold, eventReporter);
flowFileQueue.setFlowFilePartitioner(new RoundRobinPartitioner());

View File

@ -26,7 +26,6 @@ import org.apache.nifi.controller.MockFlowFileRecord;
import org.apache.nifi.controller.MockSwapManager;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
@ -128,7 +127,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
final ProcessScheduler scheduler = mock(ProcessScheduler.class);
final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo,
queue = new SocketLoadBalancedFlowFileQueue("unit-test", scheduler, flowFileRepo, provRepo,
contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
}
@ -223,7 +222,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo,
queue = new SocketLoadBalancedFlowFileQueue("unit-test", scheduler, flowFileRepo, provRepo,
contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
queue.setPriorities(Collections.singletonList(iValuePrioritizer));
@ -563,7 +562,7 @@ public class TestSocketLoadBalancedFlowFileQueue {
when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class);
queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), mock(ProcessScheduler.class), flowFileRepo, provRepo,
queue = new SocketLoadBalancedFlowFileQueue("unit-test", mock(ProcessScheduler.class), flowFileRepo, provRepo,
contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter);
queue.setFlowFilePartitioner(new RoundRobinPartitioner());

View File

@ -28,7 +28,6 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
@ -240,7 +239,7 @@ public class StandardProcessSessionIT {
final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", new NopConnectionEventListener(), flowFileRepo, provenanceRepo, null,
final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", flowFileRepo, provenanceRepo, null,
processScheduler, swapManager, null, 10000, "0 sec", 0L, "0 B");
return Mockito.spy(actualQueue);
}

View File

@ -25,7 +25,6 @@ import org.apache.nifi.controller.queue.FlowFileQueueSize;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueDiagnostics;
import org.apache.nifi.controller.queue.QueueSize;
@ -538,7 +537,7 @@ public class TestWriteAheadFlowFileRepository {
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
final FlowFileQueue queue = new StandardFlowFileQueue("1234", new NopConnectionEventListener(), null, null, claimManager, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
final FlowFileQueue queue = new StandardFlowFileQueue("1234", null, null, claimManager, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
when(connection.getFlowFileQueue()).thenReturn(queue);
queueProvider.addConnection(connection);

View File

@ -297,7 +297,6 @@ public abstract class AbstractStatusHistoryRepositoryTest {
result.setOpenFileHandlers(6 + number);
result.setProcessorLoadAverage(7.1d + number);
result.setTotalThreads(9 + number);
result.setEventDrivenThreads(20 + number);
result.setTimerDrivenThreads(21 + number);
result.setFlowFileRepositoryFreeSpace(10 + number);
result.setFlowFileRepositoryUsedSpace(11 + number);
@ -323,7 +322,6 @@ public abstract class AbstractStatusHistoryRepositoryTest {
status.setOpenFileHandlers(16);
status.setProcessorLoadAverage(17);
status.setTotalThreads(18);
status.setEventDrivenThreads(19);
status.setTimerDrivenThreads(20);
status.setContentRepositories(Arrays.asList(
givenStorageStatus("c1", 21, 22),
@ -365,7 +363,6 @@ public abstract class AbstractStatusHistoryRepositoryTest {
assertEquals(16, snapshot.getStatusMetric(NodeStatusDescriptor.OPEN_FILE_HANDLES.getDescriptor()).longValue());
assertEquals(17000000, snapshot.getStatusMetric(NodeStatusDescriptor.PROCESSOR_LOAD_AVERAGE.getDescriptor()).longValue());
assertEquals(18, snapshot.getStatusMetric(NodeStatusDescriptor.TOTAL_THREADS.getDescriptor()).longValue());
assertEquals(19, snapshot.getStatusMetric(NodeStatusDescriptor.EVENT_DRIVEN_THREADS.getDescriptor()).longValue());
assertEquals(20, snapshot.getStatusMetric(NodeStatusDescriptor.TIME_DRIVEN_THREADS.getDescriptor()).longValue());
// Storage metrics

View File

@ -69,7 +69,6 @@ public class VolatileComponentStatusRepositoryForNodeTest extends AbstractStatus
Double.valueOf(nodeStatus.getProcessorLoadAverage() * MetricDescriptor.FRACTION_MULTIPLIER).longValue(),
snapshot.getStatusMetric(NodeStatusDescriptor.PROCESSOR_LOAD_AVERAGE.getDescriptor()).longValue());
assertEquals(nodeStatus.getTotalThreads(), snapshot.getStatusMetric(NodeStatusDescriptor.TOTAL_THREADS.getDescriptor()).longValue());
assertEquals(nodeStatus.getEventDrivenThreads(), snapshot.getStatusMetric(NodeStatusDescriptor.EVENT_DRIVEN_THREADS.getDescriptor()).longValue());
assertEquals(nodeStatus.getTimerDrivenThreads(), snapshot.getStatusMetric(NodeStatusDescriptor.TIME_DRIVEN_THREADS.getDescriptor()).longValue());
assertEquals(nodeStatus.getFlowFileRepositoryFreeSpace(), snapshot.getStatusMetric(NodeStatusDescriptor.FLOW_FILE_REPOSITORY_FREE_SPACE.getDescriptor()).longValue());
assertEquals(nodeStatus.getFlowFileRepositoryUsedSpace(), snapshot.getStatusMetric(NodeStatusDescriptor.FLOW_FILE_REPOSITORY_USED_SPACE.getDescriptor()).longValue());
@ -87,26 +86,26 @@ public class VolatileComponentStatusRepositoryForNodeTest extends AbstractStatus
snapshot.getStatusMetric(NodeStatusDescriptor.PROVENANCE_REPOSITORY_USED_SPACE.getDescriptor()).longValue());
// metrics based on repositories
assertEquals(12 + i, getMetricAtOrdinal(snapshot, 17)); // c1 used
assertEquals(13 + i, getMetricAtOrdinal(snapshot, 16)); // c1 free
assertEquals(14 + i, getMetricAtOrdinal(snapshot, 19)); // c2 used
assertEquals(15 + i, getMetricAtOrdinal(snapshot, 18)); // c2 free
assertEquals(12 + i, getMetricAtOrdinal(snapshot, 16)); // c1 used
assertEquals(13 + i, getMetricAtOrdinal(snapshot, 15)); // c1 free
assertEquals(14 + i, getMetricAtOrdinal(snapshot, 18)); // c2 used
assertEquals(15 + i, getMetricAtOrdinal(snapshot, 17)); // c2 free
assertEquals(16 + i, getMetricAtOrdinal(snapshot, 21)); // p1 used
assertEquals(17 + i, getMetricAtOrdinal(snapshot, 20)); // p1 free
assertEquals(18 + i, getMetricAtOrdinal(snapshot, 23)); // p2 used
assertEquals(19 + i, getMetricAtOrdinal(snapshot, 22)); // p2 free
assertEquals(16 + i, getMetricAtOrdinal(snapshot, 20)); // p1 used
assertEquals(17 + i, getMetricAtOrdinal(snapshot, 19)); // p1 free
assertEquals(18 + i, getMetricAtOrdinal(snapshot, 22)); // p2 used
assertEquals(19 + i, getMetricAtOrdinal(snapshot, 21)); // p2 free
}
// metrics based on GarbageCollectionStatus (The ordinal numbers are true for setup, in production it might differ)
final int g0TimeOrdinal = 24;
final int g0CountOrdinal = 25;
final int g0TimeDiffOrdinal = 26;
final int g0CountDiffOrdinal = 27;
final int g1TimeOrdinal = 28;
final int g1CountOrdinal = 29;
final int g1TimeDiffOrdinal = 30;
final int g1CountDiffOrdinal = 31;
final int g0TimeOrdinal = 23;
final int g0CountOrdinal = 24;
final int g0TimeDiffOrdinal = 25;
final int g0CountDiffOrdinal = 26;
final int g1TimeOrdinal = 27;
final int g1CountOrdinal = 28;
final int g1TimeDiffOrdinal = 29;
final int g1CountDiffOrdinal = 30;
final StatusSnapshot snapshot1 = result.getStatusSnapshots().get(0);
final StatusSnapshot snapshot2 = result.getStatusSnapshots().get(1);

View File

@ -42,7 +42,6 @@ import org.apache.nifi.controller.XmlFlowSynchronizer;
import org.apache.nifi.controller.flow.StandardFlowManager;
import org.apache.nifi.framework.cluster.leader.zookeeper.CuratorLeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
import org.apache.nifi.controller.repository.ContentRepository;
@ -370,7 +369,7 @@ public class FrameworkIntegrationTest {
protected FlowFileQueue createFlowFileQueue(final String uuid, final ProcessGroup processGroup) {
final RepositoryContext repoContext = getRepositoryContext();
return new StandardFlowFileQueue(uuid, ConnectionEventListener.NOP_EVENT_LISTENER, repoContext.getFlowFileRepository(), repoContext.getProvenanceRepository(),
return new StandardFlowFileQueue(uuid, repoContext.getFlowFileRepository(), repoContext.getProvenanceRepository(),
resourceClaimManager, processScheduler, flowFileSwapManager, flowController.createEventReporter(), 20000,
processGroup.getDefaultFlowFileExpiration(), processGroup.getDefaultBackPressureObjectThreshold(), processGroup.getDefaultBackPressureDataSizeThreshold());
}
@ -483,7 +482,7 @@ public class FrameworkIntegrationTest {
.relationships(relationships)
.id(id)
.clustered(false)
.flowFileQueueFactory((loadBalanceStrategy, partitioningAttribute, eventListener, processGroup1) -> createFlowFileQueue(id, processGroup))
.flowFileQueueFactory((loadBalanceStrategy, partitioningAttribute, processGroup1) -> createFlowFileQueue(id, processGroup))
.build();
source.addConnection(connection);

View File

@ -540,9 +540,6 @@ public class StandardPublicPort extends AbstractPort implements PublicPort {
throw new RequestExpiredException();
}
// Trigger this port to run.
scheduler.registerEvent(this);
// Get a response from the response queue but don't wait forever if the port is stopped
ProcessingResult result = null;
@ -610,9 +607,6 @@ public class StandardPublicPort extends AbstractPort implements PublicPort {
throw new RequestExpiredException();
}
// Trigger this port to run
scheduler.registerEvent(this);
// Get a response from the response queue but don't wait forever if the port is stopped
ProcessingResult result;

View File

@ -94,56 +94,4 @@ public class ControllerAuditor extends NiFiAuditor {
}
}
/**
* Audits updating the max number of event driven threads for the controller.
*
* @param proceedingJoinPoint join point
* @param maxEventDrivenThreadCount thread count
* @param controllerFacade facade
* @throws java.lang.Throwable ex
*/
@Around("within(org.apache.nifi.web.controller.ControllerFacade) && "
+ "execution(void setMaxEventDrivenThreadCount(int)) && "
+ "args(maxEventDrivenThreadCount) && "
+ "target(controllerFacade)")
public void updateControllerEventDrivenThreadsAdvice(ProceedingJoinPoint proceedingJoinPoint, int maxEventDrivenThreadCount, ControllerFacade controllerFacade) throws Throwable {
// get the current max thread count
int previousMaxEventDrivenThreadCount = controllerFacade.getMaxEventDrivenThreadCount();
// update the processors state
proceedingJoinPoint.proceed();
// if no exception were thrown, add the configuration action...
// ensure the value changed
if (previousMaxEventDrivenThreadCount != maxEventDrivenThreadCount) {
// get the current user
NiFiUser user = NiFiUserUtils.getNiFiUser();
// ensure the user was found
if (user != null) {
Collection<Action> actions = new ArrayList<>();
// create the configure details
FlowChangeConfigureDetails configDetails = new FlowChangeConfigureDetails();
configDetails.setName("Controller Max Event Driven Thread Count");
configDetails.setValue(String.valueOf(maxEventDrivenThreadCount));
configDetails.setPreviousValue(String.valueOf(previousMaxEventDrivenThreadCount));
// create the config action
FlowChangeAction configAction = new FlowChangeAction();
configAction.setUserIdentity(user.getIdentity());
configAction.setOperation(Operation.Configure);
configAction.setTimestamp(new Date());
configAction.setSourceId("Flow Controller");
configAction.setSourceName("Flow Controller");
configAction.setSourceType(Component.Controller);
configAction.setActionDetails(configDetails);
actions.add(configAction);
// record the action
saveActions(actions, logger);
}
}
}
}

View File

@ -1744,9 +1744,6 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
if (controllerConfigurationDTO.getMaxTimerDrivenThreadCount() != null) {
controllerFacade.setMaxTimerDrivenThreadCount(controllerConfigurationDTO.getMaxTimerDrivenThreadCount());
}
if (controllerConfigurationDTO.getMaxEventDrivenThreadCount() != null) {
controllerFacade.setMaxEventDrivenThreadCount(controllerConfigurationDTO.getMaxEventDrivenThreadCount());
}
return controllerConfigurationDTO;
},

View File

@ -299,7 +299,6 @@ public final class DtoFactory {
public ControllerConfigurationDTO createControllerConfigurationDto(final ControllerFacade controllerFacade) {
final ControllerConfigurationDTO dto = new ControllerConfigurationDTO();
dto.setMaxTimerDrivenThreadCount(controllerFacade.getMaxTimerDrivenThreadCount());
dto.setMaxEventDrivenThreadCount(controllerFacade.getMaxEventDrivenThreadCount());
return dto;
}
@ -3423,7 +3422,6 @@ public final class DtoFactory {
dto.setDescription(getCapabilityDescription(node.getClass()));
dto.setSupportsParallelProcessing(!node.isTriggeredSerially());
dto.setSupportsEventDriven(node.isEventDrivenSupported());
dto.setSupportsBatching(node.isSessionBatchingSupported());
dto.setConfig(createProcessorConfigDto(node, uiOnly));
@ -4038,7 +4036,6 @@ public final class DtoFactory {
.map(this::createBundleDto)
.collect(Collectors.toCollection(LinkedHashSet::new));
flowDiagnosticsDto.setActiveEventDrivenThreads(flowController.getActiveEventDrivenThreadCount());
flowDiagnosticsDto.setActiveTimerDrivenThreads(flowController.getActiveTimerDrivenThreadCount());
flowDiagnosticsDto.setBundlesLoaded(bundlesLoaded);
flowDiagnosticsDto.setTimeZone(System.getProperty("user.timezone"));
@ -4047,7 +4044,6 @@ public final class DtoFactory {
// controller-related information
controllerDiagnosticsDto.setClusterCoordinator(flowController.isClusterCoordinator());
controllerDiagnosticsDto.setPrimaryNode(flowController.isPrimary());
controllerDiagnosticsDto.setMaxEventDrivenThreads(flowController.getMaxEventDrivenThreadCount());
controllerDiagnosticsDto.setMaxTimerDrivenThreads(flowController.getMaxTimerDrivenThreadCount());
// system-related information
@ -4225,7 +4221,6 @@ public final class DtoFactory {
// set up the default values for concurrent tasks and scheduling period
final Map<String, String> defaultConcurrentTasks = new HashMap<>();
defaultConcurrentTasks.put(SchedulingStrategy.TIMER_DRIVEN.name(), String.valueOf(SchedulingStrategy.TIMER_DRIVEN.getDefaultConcurrentTasks()));
defaultConcurrentTasks.put(SchedulingStrategy.EVENT_DRIVEN.name(), String.valueOf(SchedulingStrategy.EVENT_DRIVEN.getDefaultConcurrentTasks()));
defaultConcurrentTasks.put(SchedulingStrategy.CRON_DRIVEN.name(), String.valueOf(SchedulingStrategy.CRON_DRIVEN.getDefaultConcurrentTasks()));
dto.setDefaultConcurrentTasks(defaultConcurrentTasks);
@ -4451,7 +4446,6 @@ public final class DtoFactory {
copy.setType(original.getType());
copy.setBundle(copy(original.getBundle()));
copy.setSupportsParallelProcessing(original.getSupportsParallelProcessing());
copy.setSupportsEventDriven(original.getSupportsEventDriven());
copy.setSupportsBatching(original.getSupportsBatching());
copy.setSupportsSensitiveDynamicProperties(original.getSupportsSensitiveDynamicProperties());
copy.setPersistsState(original.getPersistsState());

View File

@ -260,15 +260,6 @@ public class ControllerFacade implements Authorizable {
flowController.setMaxTimerDrivenThreadCount(maxTimerDrivenThreadCount);
}
/**
* Sets the max event driven thread count of this controller.
*
* @param maxEventDrivenThreadCount count
*/
public void setMaxEventDrivenThreadCount(int maxEventDrivenThreadCount) {
flowController.setMaxEventDrivenThreadCount(maxEventDrivenThreadCount);
}
/**
* Gets the root group id.
*
@ -466,15 +457,6 @@ public class ControllerFacade implements Authorizable {
return flowController.getMaxTimerDrivenThreadCount();
}
/**
* Gets the max event driven thread count of this controller.
*
* @return count
*/
public int getMaxEventDrivenThreadCount() {
return flowController.getMaxEventDrivenThreadCount();
}
/**
* Gets the FlowFileProcessor types that this controller supports.
*

View File

@ -297,11 +297,6 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
validationErrors.add("Concurrent tasks must be greater than 0.");
}
break;
case EVENT_DRIVEN:
if (config.getConcurrentlySchedulableTaskCount() < 0) {
validationErrors.add("Concurrent tasks must be greater or equal to 0.");
}
break;
}
}

View File

@ -23,17 +23,14 @@ import org.apache.nifi.web.search.query.SearchQuery;
import java.util.List;
import static org.apache.nifi.scheduling.SchedulingStrategy.EVENT_DRIVEN;
import static org.apache.nifi.scheduling.SchedulingStrategy.PRIMARY_NODE_ONLY;
import static org.apache.nifi.scheduling.SchedulingStrategy.TIMER_DRIVEN;
public class SchedulingMatcher implements AttributeMatcher<ProcessorNode> {
private static final String SEARCH_TERM_EVENT = "event";
private static final String SEARCH_TERM_TIMER = "timer";
private static final String SEARCH_TERM_PRIMARY = "primary";
private static final String MATCH_PREFIX = "Scheduling strategy: ";
private static final String MATCH_EVENT = "Event driven";
private static final String MATCH_TIMER = "Timer driven";
private static final String MATCH_PRIMARY = "On primary node";
@ -42,9 +39,7 @@ public class SchedulingMatcher implements AttributeMatcher<ProcessorNode> {
final String searchTerm = query.getTerm();
final SchedulingStrategy schedulingStrategy = component.getSchedulingStrategy();
if (EVENT_DRIVEN.equals(schedulingStrategy) && StringUtils.containsIgnoreCase(SEARCH_TERM_EVENT, searchTerm)) {
matches.add(MATCH_PREFIX + MATCH_EVENT);
} else if (TIMER_DRIVEN.equals(schedulingStrategy) && StringUtils.containsIgnoreCase(SEARCH_TERM_TIMER, searchTerm)) {
if (TIMER_DRIVEN.equals(schedulingStrategy) && StringUtils.containsIgnoreCase(SEARCH_TERM_TIMER, searchTerm)) {
matches.add(MATCH_PREFIX + MATCH_TIMER);
} else if (PRIMARY_NODE_ONLY.equals(schedulingStrategy) && StringUtils.containsIgnoreCase(SEARCH_TERM_PRIMARY, searchTerm)) {
// PRIMARY_NODE_ONLY has been deprecated as a SchedulingStrategy and replaced by PRIMARY as an ExecutionNode.

View File

@ -142,9 +142,6 @@ public class ControllerSearchServiceIntegrationTest extends AbstractControllerSe
public void testSearchBasedOnScheduling() {
// given
givenRootProcessGroup()
.withProcessor(getProcessorNode("processor1", "processor1name", SchedulingStrategy.EVENT_DRIVEN, ExecutionNode.ALL, ScheduledState.RUNNING, ValidationStatus.VALID, AUTHORIZED))
.withProcessor(getProcessorNode("processor2", "processor2name", SchedulingStrategy.EVENT_DRIVEN, ExecutionNode.ALL, ScheduledState.DISABLED, ValidationStatus.INVALID, AUTHORIZED))
.withProcessor(getProcessorNode("processor3", "processor3name", SchedulingStrategy.EVENT_DRIVEN, ExecutionNode.ALL, ScheduledState.RUNNING, ValidationStatus.VALID, NOT_AUTHORIZED))
.withProcessor(getProcessorNode("processor4", "processor4name", SchedulingStrategy.TIMER_DRIVEN, ExecutionNode.ALL, ScheduledState.STOPPED, ValidationStatus.VALID, AUTHORIZED))
.withProcessor(getProcessorNode("processor5", "eventHandlerProcessor", SchedulingStrategy.CRON_DRIVEN, ExecutionNode.PRIMARY, ScheduledState.RUNNING, ValidationStatus.VALID,
AUTHORIZED));
@ -154,8 +151,6 @@ public class ControllerSearchServiceIntegrationTest extends AbstractControllerSe
// then
thenResultConsists()
.ofProcessor(getSimpleResultFromRoot("processor1", "processor1name", "Scheduling strategy: Event driven"))
.ofProcessor(getSimpleResultFromRoot("processor2", "processor2name", "Scheduling strategy: Event driven"))
.ofProcessor(getSimpleResultFromRoot("processor5", "eventHandlerProcessor", "Name: eventHandlerProcessor"))
.validate(results);

View File

@ -27,20 +27,6 @@ public class SchedulingMatcherTest extends AbstractAttributeMatcherTest {
@Mock
private ProcessorNode component;
@Test
public void testWhenKeywordAppearsAndEvent() {
// given
final SchedulingMatcher testSubject = new SchedulingMatcher();
givenSchedulingStrategy(SchedulingStrategy.EVENT_DRIVEN);
givenSearchTerm("event");
// when
testSubject.match(component, searchQuery, matches);
// then
thenMatchConsistsOf("Scheduling strategy: Event driven");
}
@Test
public void testWhenKeywordAppearsAndNotEvent() {
// given

View File

@ -38,18 +38,6 @@
<span id="read-only-maximum-timer-driven-thread-count-field"></span>
</div>
</div>
<div class="setting">
<div class="setting-name">
Maximum event driven thread count
<div class="fa fa-question-circle" alt="Info" title="The maximum number of threads for event driven processors available to the system."></div>
</div>
<div class="editable setting-field">
<input type="text" id="maximum-event-driven-thread-count-field" class="setting-input"/>
</div>
<div class="read-only setting-field">
<span id="read-only-maximum-event-driven-thread-count-field"></span>
</div>
</div>
<div class="editable settings-buttons">
<div id="settings-save" class="button">Apply</div>
<div class="clear"></div>

View File

@ -100,24 +100,7 @@
description: 'Processor will be scheduled to run on an interval defined by the run schedule.'
}];
// conditionally support event driven based on processor
if (processor.supportsEventDriven === true) {
strategies.push({
text: 'Event driven',
value: 'EVENT_DRIVEN',
description: 'Processor will be scheduled to run when triggered by an event (e.g. a FlowFile enters an incoming queue). This scheduling strategy is experimental.'
});
} else if (processor.config['schedulingStrategy'] === 'EVENT_DRIVEN') {
// the processor was once configured for event driven but no longer supports it
strategies.push({
text: 'Event driven',
value: 'EVENT_DRIVEN',
description: 'Processor will be scheduled to run when triggered by an event (e.g. a FlowFile enters an incoming queue). This scheduling strategy is experimental.',
disabled: true
});
}
// conditionally support event driven
// conditionally support primary node only - to be removed - deprecated
if (processor.config['schedulingStrategy'] === 'PRIMARY_NODE_ONLY') {
strategies.push({
text: 'On primary node',
@ -258,9 +241,7 @@
if (details.supportsParallelProcessing === true) {
// get the appropriate concurrent tasks field
var concurrentTasks;
if (schedulingStrategy === 'EVENT_DRIVEN') {
concurrentTasks = $('#event-driven-concurrently-schedulable-tasks');
} else if (schedulingStrategy === 'CRON_DRIVEN') {
if (schedulingStrategy === 'CRON_DRIVEN') {
concurrentTasks = $('#cron-driven-concurrently-schedulable-tasks');
} else {
concurrentTasks = $('#timer-driven-concurrently-schedulable-tasks');
@ -276,8 +257,6 @@
var schedulingPeriod;
if (schedulingStrategy === 'CRON_DRIVEN') {
schedulingPeriod = $('#cron-driven-scheduling-period');
} else if (schedulingStrategy !== 'EVENT_DRIVEN') {
schedulingPeriod = $('#timer-driven-scheduling-period');
}
// check the scheduling period
@ -327,9 +306,7 @@
// get the appropriate concurrent tasks field
var concurrentTasks;
if (schedulingStrategy === 'EVENT_DRIVEN') {
concurrentTasks = $('#event-driven-concurrently-schedulable-tasks');
} else if (schedulingStrategy === 'CRON_DRIVEN') {
if (schedulingStrategy === 'CRON_DRIVEN') {
concurrentTasks = $('#cron-driven-concurrently-schedulable-tasks');
} else {
concurrentTasks = $('#timer-driven-concurrently-schedulable-tasks');
@ -344,8 +321,6 @@
var schedulingPeriod;
if (schedulingStrategy === 'CRON_DRIVEN') {
schedulingPeriod = $('#cron-driven-scheduling-period');
} else if (schedulingStrategy !== 'EVENT_DRIVEN') {
schedulingPeriod = $('#timer-driven-scheduling-period');
}
// get the scheduling period if appropriate
@ -836,24 +811,12 @@
},
select: function (selectedOption) {
// show the appropriate panel
if (selectedOption.value === 'EVENT_DRIVEN') {
$('#event-driven-warning').show();
if (selectedOption.value === 'CRON_DRIVEN') {
$('#timer-driven-options').hide();
$('#event-driven-options').show();
$('#cron-driven-options').hide();
$('#cron-driven-options').show();
} else {
$('#event-driven-warning').hide();
if (selectedOption.value === 'CRON_DRIVEN') {
$('#timer-driven-options').hide();
$('#event-driven-options').hide();
$('#cron-driven-options').show();
} else {
$('#timer-driven-options').show();
$('#event-driven-options').hide();
$('#cron-driven-options').hide();
}
$('#timer-driven-options').show();
$('#cron-driven-options').hide();
}
}
});
@ -873,14 +836,11 @@
// initialize the concurrentTasks
var defaultConcurrentTasks = processor.config['defaultConcurrentTasks'];
$('#timer-driven-concurrently-schedulable-tasks').val(defaultConcurrentTasks['TIMER_DRIVEN']);
$('#event-driven-concurrently-schedulable-tasks').val(defaultConcurrentTasks['EVENT_DRIVEN']);
$('#cron-driven-concurrently-schedulable-tasks').val(defaultConcurrentTasks['CRON_DRIVEN']);
// get the appropriate concurrent tasks field
var concurrentTasks;
if (schedulingStrategy === 'EVENT_DRIVEN') {
concurrentTasks = $('#event-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']);
} else if (schedulingStrategy === 'CRON_DRIVEN') {
if (schedulingStrategy === 'CRON_DRIVEN') {
concurrentTasks = $('#cron-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']);
} else {
concurrentTasks = $('#timer-driven-concurrently-schedulable-tasks').val(processor.config['concurrentlySchedulableTaskCount']);
@ -903,7 +863,7 @@
// set the scheduling period as appropriate
if (processor.config['schedulingStrategy'] === 'CRON_DRIVEN') {
$('#cron-driven-scheduling-period').val(processor.config['schedulingPeriod']);
} else if (processor.config['schedulingStrategy'] !== 'EVENT_DRIVEN') {
} else {
$('#timer-driven-scheduling-period').val(processor.config['schedulingPeriod']);
}

View File

@ -127,9 +127,6 @@
if (nfCommon.isDefinedAndNotNull(configuration['maxTimerDrivenThreadCount']) && !$.isNumeric(configuration['maxTimerDrivenThreadCount'])) {
errors.push('Maximum Timer Driven Thread Count must be an integer value');
}
if (nfCommon.isDefinedAndNotNull(configuration['maxEventDrivenThreadCount']) && !$.isNumeric(configuration['maxEventDrivenThreadCount'])) {
errors.push('Maximum Event Driven Thread Count must be an integer value');
}
if (errors.length > 0) {
nfDialog.showOkDialog({
@ -197,7 +194,6 @@
// create the configuration
var configuration = {};
configuration['maxTimerDrivenThreadCount'] = $('#maximum-timer-driven-thread-count-field').val();
configuration['maxEventDrivenThreadCount'] = $('#maximum-event-driven-thread-count-field').val();
return configuration;
};
@ -2499,7 +2495,6 @@
var loadSettings = function () {
var setUnauthorizedText = function () {
$('#read-only-maximum-timer-driven-thread-count-field').addClass('unset').text('Unauthorized');
$('#read-only-maximum-event-driven-thread-count-field').addClass('unset').text('Unauthorized');
};
var setEditable = function (editable) {
@ -2523,7 +2518,6 @@
if (response.permissions.canWrite) {
// populate the settings
$('#maximum-timer-driven-thread-count-field').removeClass('unset').val(response.component.maxTimerDrivenThreadCount);
$('#maximum-event-driven-thread-count-field').removeClass('unset').val(response.component.maxEventDrivenThreadCount);
setEditable(true);
@ -2535,7 +2529,6 @@
if (response.permissions.canRead) {
// populate the settings
$('#read-only-maximum-timer-driven-thread-count-field').removeClass('unset').text(response.component.maxTimerDrivenThreadCount);
$('#read-only-maximum-event-driven-thread-count-field').removeClass('unset').text(response.component.maxEventDrivenThreadCount);
} else {
setUnauthorizedText();
}

View File

@ -239,10 +239,7 @@
var schedulingStrategy = details.config['schedulingStrategy'];
// make the scheduling strategy human readable
if (schedulingStrategy === 'EVENT_DRIVEN') {
showRunSchedule = false;
schedulingStrategy = 'Event driven';
} else if (schedulingStrategy === 'CRON_DRIVEN') {
if (schedulingStrategy === 'CRON_DRIVEN') {
schedulingStrategy = 'CRON driven';
} else if (schedulingStrategy === 'TIMER_DRIVEN') {
schedulingStrategy = "Timer driven";

View File

@ -47,7 +47,6 @@ import com.google.protobuf.DynamicMessage;
import io.grpc.Status;
import java.time.LocalTime;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -87,7 +86,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@TriggerSerially
@EventDriven
@Tags({"google", "google cloud", "bq", "bigquery"})
@CapabilityDescription("Unified processor for batch and stream flow files content to a Google BigQuery table via the Storage Write API." +
"The processor is record based so the used schema is driven by the RecordReader. Attributes that are not matched to the target schema" +

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.geohash;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -67,7 +66,6 @@ import java.util.Map;
import java.util.HashMap;
import java.util.Optional;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"geo", "geohash", "record"})

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.graph;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -45,7 +44,6 @@ import java.util.Map;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@SupportsBatching
@Tags({"cypher", "neo4j", "graph", "network", "insert", "update", "delete", "put", "get",
"node", "relationship", "connection", "executor", "gremlin", "tinkerpop"})

View File

@ -19,7 +19,6 @@ package org.apache.nifi.processors.groovyx;
import groovy.lang.GroovyShell;
import groovy.lang.Script;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
@ -66,7 +65,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({"script", "groovy", "groovyx"})
@CapabilityDescription(

View File

@ -22,7 +22,6 @@ import io.grpc.DecompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContext;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -59,7 +58,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@EventDriven
@SupportsBatching
@Tags({"grpc", "rpc", "client"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
@ -43,7 +42,6 @@ import java.util.Set;
import static org.apache.nifi.hbase.util.VisibilityUtil.pickVisibilityString;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "hbase"})

View File

@ -20,7 +20,6 @@ package org.apache.nifi.hbase;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -52,7 +51,6 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.nifi.hbase.util.VisibilityUtil.pickVisibilityString;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "hbase", "put", "json"})

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.hbase;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -58,7 +57,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
@EventDriven
@SupportsBatching
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "hbase", "put", "record"})

View File

@ -19,7 +19,6 @@ package org.apache.nifi.processors.hive;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -68,7 +67,6 @@ import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV_MIME_TYPE;
import static org.apache.nifi.util.hive.HiveJdbcCommon.MIME_TYPE_AVRO_BINARY;
import static org.apache.nifi.util.hive.HiveJdbcCommon.NORMALIZE_NAMES_FOR_AVRO;
@EventDriven
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"hive", "sql", "select", "jdbc", "query", "database"})
@CapabilityDescription("Execute provided HiveQL SELECT query against a Hive database connection. Query result will be converted to Avro or CSV format."

View File

@ -28,7 +28,6 @@ import java.util.Set;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@ -63,7 +62,6 @@ import ca.uhn.hl7v2.model.Message;
import ca.uhn.hl7v2.parser.PipeParser;
import ca.uhn.hl7v2.validation.impl.ValidationContextFactory;
@EventDriven
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.influxdb;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -57,7 +56,6 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@SupportsBatching
@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"})
@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).")

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.influxdb;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -48,7 +47,6 @@ import java.util.List;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@SupportsBatching
@Tags({"influxdb", "measurement","insert", "write", "put", "timeseries"})
@CapabilityDescription("Processor to write the content of a FlowFile in 'line protocol'. Please check details of the 'line protocol' in InfluxDB documentation (https://www.influxdb.com/). "

View File

@ -22,7 +22,6 @@ import com.bazaarvoice.jolt.JsonUtils;
import com.bazaarvoice.jolt.Transform;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -84,7 +83,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@EventDriven
@SideEffectFree
@SupportsBatching
@Tags({"record", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"})

View File

@ -26,7 +26,6 @@ import org.apache.kudu.client.Operation;
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -81,7 +80,6 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
@SystemResourceConsideration(resource = SystemResource.MEMORY)
@EventDriven
@SupportsBatching
@RequiresInstanceClassLoading // Because of calls to UserGroupInformation.setConfiguration
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)

View File

@ -37,7 +37,6 @@ import javax.imageio.ImageIO;
import javax.imageio.ImageReader;
import javax.imageio.stream.ImageInputStream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -55,7 +54,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
@EventDriven
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({ "resize", "image", "jpg", "jpeg", "png", "bmp", "wbmp", "gif" })

View File

@ -22,7 +22,6 @@ package org.apache.nifi.processors.mongodb;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.result.DeleteResult;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -45,7 +44,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
@EventDriven
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({ "delete", "mongo", "mongodb" })
@CapabilityDescription(

View File

@ -21,7 +21,6 @@ import com.mongodb.WriteConcern;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOptions;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SystemResource;
@ -55,7 +54,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
@EventDriven
@Tags({ "mongodb", "insert", "update", "write", "put" })
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Writes the contents of a FlowFile to MongoDB")

Some files were not shown because too many files have changed in this diff Show More