mirror of https://github.com/apache/nifi.git
NIFI-11816 - Removed deprecated Primary Node Scheduling Strategy
This closes #7501 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
0a774f231a
commit
20b19992d2
|
@ -70,7 +70,7 @@ public class ProcessorSchemaV1Test {
|
|||
public void setup() {
|
||||
testName = "testName";
|
||||
testProcessorClass = "testProcessorClass";
|
||||
testSchedulingStrategy = SchedulingStrategy.PRIMARY_NODE_ONLY.toString();
|
||||
testSchedulingStrategy = SchedulingStrategy.TIMER_DRIVEN.toString();
|
||||
testSchedulingPeriod = "testSchedulingPeriod";
|
||||
testMaxConcurrentTasks = 55;
|
||||
testPenalizationPeriod = "testPenalizationPeriod";
|
||||
|
|
|
@ -41,7 +41,7 @@ public class ProcessorSchemaTest extends BaseSchemaTester<ProcessorSchema, Proce
|
|||
private final String testName = "testName";
|
||||
private final String testId = UUID.nameUUIDFromBytes("testId".getBytes(StandardCharsets.UTF_8)).toString();
|
||||
private final String testProcessorClass = "testProcessorClass";
|
||||
private final String testSchedulingStrategy = SchedulingStrategy.PRIMARY_NODE_ONLY.name();
|
||||
private final String testSchedulingStrategy = SchedulingStrategy.TIMER_DRIVEN.name();
|
||||
private final String testSchedulingPeriod = "10 s";
|
||||
private final int testMaxConcurrentTasks = 101;
|
||||
private final String testYieldDuration = "5 s";
|
||||
|
|
|
@ -28,18 +28,6 @@ public enum SchedulingStrategy {
|
|||
* Components support Timer-Driven mode.
|
||||
*/
|
||||
TIMER_DRIVEN(1, "0 sec"),
|
||||
/**
|
||||
* NOTE: This option has been deprecated with the addition of the
|
||||
* execution-node combo box. It still exists for backward compatibility
|
||||
* with existing flows that still have this value for schedulingStrategy.
|
||||
**
|
||||
* Indicates that the component will be scheduled via timer only on the
|
||||
* Primary Node. If the instance is not part of a cluster and this
|
||||
* Scheduling Strategy is used, the component will be scheduled in the same
|
||||
* manner as if {@link TIMER_DRIVEN} were used.
|
||||
*/
|
||||
@Deprecated
|
||||
PRIMARY_NODE_ONLY(1, "0 sec"),
|
||||
/**
|
||||
* Indicates that the component will be scheduled to run according to a
|
||||
* Cron-style expression
|
||||
|
|
|
@ -360,7 +360,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public boolean isIsolated() {
|
||||
return schedulingStrategy == SchedulingStrategy.PRIMARY_NODE_ONLY || executionNode == ExecutionNode.PRIMARY;
|
||||
return executionNode == ExecutionNode.PRIMARY;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1236,7 +1236,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
|
|||
}
|
||||
}
|
||||
break;
|
||||
case PRIMARY_NODE_ONLY:
|
||||
case TIMER_DRIVEN: {
|
||||
try {
|
||||
final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(evaluatedSchedulingPeriod),
|
||||
|
|
|
@ -572,8 +572,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
|
|||
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory);
|
||||
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, this.nifiProperties);
|
||||
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
|
||||
// PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been re-configured with executeNode = PRIMARY).
|
||||
processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent);
|
||||
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent);
|
||||
|
||||
startConnectablesAfterInitialization = new HashSet<>();
|
||||
|
|
|
@ -292,7 +292,6 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
|
|||
if (isNotNull(config.getConcurrentlySchedulableTaskCount())) {
|
||||
switch (schedulingStrategy) {
|
||||
case TIMER_DRIVEN:
|
||||
case PRIMARY_NODE_ONLY:
|
||||
if (config.getConcurrentlySchedulableTaskCount() <= 0) {
|
||||
validationErrors.add("Concurrent tasks must be greater than 0.");
|
||||
}
|
||||
|
@ -307,7 +306,6 @@ public class StandardProcessorDAO extends ComponentDAO implements ProcessorDAO {
|
|||
if (isNotNull(schedulingPeriod) && isNotNull(evaluatedSchedulingPeriod)) {
|
||||
switch (schedulingStrategy) {
|
||||
case TIMER_DRIVEN:
|
||||
case PRIMARY_NODE_ONLY:
|
||||
final Matcher schedulingMatcher = FormatUtils.TIME_DURATION_PATTERN.matcher(evaluatedSchedulingPeriod);
|
||||
if (!schedulingMatcher.matches()) {
|
||||
validationErrors.add("Scheduling period is not a valid time duration (ie 30 sec, 5 min)");
|
||||
|
|
|
@ -23,16 +23,13 @@ import org.apache.nifi.web.search.query.SearchQuery;
|
|||
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.nifi.scheduling.SchedulingStrategy.PRIMARY_NODE_ONLY;
|
||||
import static org.apache.nifi.scheduling.SchedulingStrategy.TIMER_DRIVEN;
|
||||
|
||||
public class SchedulingMatcher implements AttributeMatcher<ProcessorNode> {
|
||||
private static final String SEARCH_TERM_TIMER = "timer";
|
||||
private static final String SEARCH_TERM_PRIMARY = "primary";
|
||||
|
||||
private static final String MATCH_PREFIX = "Scheduling strategy: ";
|
||||
private static final String MATCH_TIMER = "Timer driven";
|
||||
private static final String MATCH_PRIMARY = "On primary node";
|
||||
|
||||
@Override
|
||||
public void match(final ProcessorNode component, final SearchQuery query, final List<String> matches) {
|
||||
|
@ -41,9 +38,6 @@ public class SchedulingMatcher implements AttributeMatcher<ProcessorNode> {
|
|||
|
||||
if (TIMER_DRIVEN.equals(schedulingStrategy) && StringUtils.containsIgnoreCase(SEARCH_TERM_TIMER, searchTerm)) {
|
||||
matches.add(MATCH_PREFIX + MATCH_TIMER);
|
||||
} else if (PRIMARY_NODE_ONLY.equals(schedulingStrategy) && StringUtils.containsIgnoreCase(SEARCH_TERM_PRIMARY, searchTerm)) {
|
||||
// PRIMARY_NODE_ONLY has been deprecated as a SchedulingStrategy and replaced by PRIMARY as an ExecutionNode.
|
||||
matches.add(MATCH_PREFIX + MATCH_PRIMARY);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -177,7 +177,7 @@ public class ControllerSearchServiceIntegrationTest extends AbstractControllerSe
|
|||
public void testSearchBasedOnExecution() {
|
||||
// given
|
||||
givenRootProcessGroup()
|
||||
.withProcessor(getProcessorNode("processor1", "processor1name", SchedulingStrategy.PRIMARY_NODE_ONLY, ExecutionNode.PRIMARY, ScheduledState.RUNNING, ValidationStatus.VALID,
|
||||
.withProcessor(getProcessorNode("processor1", "processor1name", SchedulingStrategy.TIMER_DRIVEN, ExecutionNode.PRIMARY, ScheduledState.RUNNING, ValidationStatus.VALID,
|
||||
AUTHORIZED));
|
||||
|
||||
// when
|
||||
|
@ -185,7 +185,7 @@ public class ControllerSearchServiceIntegrationTest extends AbstractControllerSe
|
|||
|
||||
// then
|
||||
thenResultConsists()
|
||||
.ofProcessor(getSimpleResultFromRoot("processor1", "processor1name", "Execution node: primary", "Scheduling strategy: On primary node"))
|
||||
.ofProcessor(getSimpleResultFromRoot("processor1", "processor1name", "Execution node: primary"))
|
||||
.validate(results);
|
||||
}
|
||||
|
||||
|
|
|
@ -69,20 +69,6 @@ public class SchedulingMatcherTest extends AbstractAttributeMatcherTest {
|
|||
thenMatchConsistsOf("Scheduling strategy: Timer driven");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWhenKeywordAppearsAndPrimaryNodeOnly() {
|
||||
// given
|
||||
final SchedulingMatcher testSubject = new SchedulingMatcher();
|
||||
givenSchedulingStrategy(SchedulingStrategy.PRIMARY_NODE_ONLY);
|
||||
givenSearchTerm("primary");
|
||||
|
||||
// when
|
||||
testSubject.match(component, searchQuery, matches);
|
||||
|
||||
// then
|
||||
thenMatchConsistsOf("Scheduling strategy: On primary node");
|
||||
}
|
||||
|
||||
private void givenSchedulingStrategy(final SchedulingStrategy schedulingStrategy) {
|
||||
Mockito.when(component.getSchedulingStrategy()).thenReturn(schedulingStrategy);
|
||||
}
|
||||
|
|
|
@ -100,16 +100,6 @@
|
|||
description: 'Processor will be scheduled to run on an interval defined by the run schedule.'
|
||||
}];
|
||||
|
||||
// conditionally support primary node only - to be removed - deprecated
|
||||
if (processor.config['schedulingStrategy'] === 'PRIMARY_NODE_ONLY') {
|
||||
strategies.push({
|
||||
text: 'On primary node',
|
||||
value: 'PRIMARY_NODE_ONLY',
|
||||
description: 'Processor will be scheduled on the primary node on an interval defined by the run schedule. This option has been deprecated, please use the Execution setting below.',
|
||||
disabled: true
|
||||
});
|
||||
}
|
||||
|
||||
// add an option for cron driven
|
||||
strategies.push({
|
||||
text: 'CRON driven',
|
||||
|
|
|
@ -241,10 +241,9 @@
|
|||
// make the scheduling strategy human readable
|
||||
if (schedulingStrategy === 'CRON_DRIVEN') {
|
||||
schedulingStrategy = 'CRON driven';
|
||||
} else if (schedulingStrategy === 'TIMER_DRIVEN') {
|
||||
schedulingStrategy = "Timer driven";
|
||||
} else {
|
||||
schedulingStrategy = "On primary node";
|
||||
// the only other option schedulingStrategy === 'TIMER_DRIVEN'
|
||||
schedulingStrategy = "Timer driven";
|
||||
}
|
||||
nfCommon.populateField('read-only-scheduling-strategy', schedulingStrategy);
|
||||
|
||||
|
|
Loading…
Reference in New Issue