diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java new file mode 100644 index 0000000000..ff9125cd70 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.annotation.configuration; + +import org.apache.nifi.scheduling.SchedulingStrategy; + +import java.lang.annotation.Documented; +import java.lang.annotation.Target; +import java.lang.annotation.Retention; +import java.lang.annotation.ElementType; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Inherited; + +/** + *
+ * Marker interface that a Processor can use to configure the schedule strategy, the period and the number of concurrent tasks. + * Note that the number of Concurrent tasks will be ignored if the annotion @TriggerSerialy is used + *
+ */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface DefaultSchedule { + + SchedulingStrategy strategy() default SchedulingStrategy.TIMER_DRIVEN; + String period() default "0 sec"; + int concurrentTasks() default 1; + +} diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSettings.java b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSettings.java new file mode 100644 index 0000000000..d01972c8c7 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSettings.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.annotation.configuration; + +import java.lang.annotation.Documented; +import java.lang.annotation.Target; +import java.lang.annotation.Retention; +import java.lang.annotation.ElementType; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Inherited; +import org.apache.nifi.logging.LogLevel; + +/** + *+ * Marker interface that a Processor can use to configure the yield duration, the penalty duration and the bulletin log level. + * Note that the number of Concurrent tasks will be ignored if the annotation @TriggerSerialy is used + *
+ */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface DefaultSettings { + String yieldDuration() default "1 sec"; + String penaltyDuration() default "30 sec"; + LogLevel bulletinLevel() default LogLevel.WARN; +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index b790526ca3..a5789e99bf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -21,6 +21,7 @@ import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.admin.service.AuditService; +import org.apache.nifi.annotation.configuration.DefaultSettings; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnRemoved; @@ -1060,6 +1061,32 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); + try { + final Class> procClass = processor.getClass(); + if(procClass.isAnnotationPresent(DefaultSettings.class)) { + DefaultSettings ds = procClass.getAnnotation(DefaultSettings.class); + try { + procNode.setYieldPeriod(ds.yieldDuration()); + } catch(Throwable ex) { + LOG.error(String.format("Error while setting yield period from DefaultSettings annotation:%s",ex.getMessage()),ex); + } + try { + + procNode.setPenalizationPeriod(ds.penaltyDuration()); + } catch(Throwable ex) { + LOG.error(String.format("Error while setting penalty duration from DefaultSettings annotation:%s",ex.getMessage()),ex); + } + try { + procNode.setBulletinLevel(ds.bulletinLevel()); + } catch (Throwable ex) { + LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s",ex.getMessage()),ex); + } + + } + } catch (Throwable ex) { + LOG.error(String.format("Error while setting default settings from DefaultSettings annotation: %s",ex.getMessage()),ex); + } + if (firstTimeAdded) { try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 27fdc0f705..aede8256f6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; @@ -152,7 +153,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final VariableRegistry variableRegistry, final ComponentLog logger) { super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, logger); - this.processor = processor; identifier = new AtomicReference<>(uuid); destinations = new HashMap<>(); @@ -191,6 +191,31 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; executionNode = ExecutionNode.ALL; + try { + if (procClass.isAnnotationPresent(DefaultSchedule.class)) { + DefaultSchedule dsc = procClass.getAnnotation(DefaultSchedule.class); + try { + this.setSchedulingStrategy(dsc.strategy()); + } catch (Throwable ex) { + LOG.error(String.format("Error while setting scheduling strategy from DefaultSchedule annotation: %s", ex.getMessage()), ex); + } + try { + this.setScheduldingPeriod(dsc.period()); + } catch (Throwable ex) { + this.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN); + LOG.error(String.format("Error while setting scheduling period from DefaultSchedule annotation: %s", ex.getMessage()), ex); + } + if (!triggeredSerially) { + try { + setMaxConcurrentTasks(dsc.concurrentTasks()); + } catch (Throwable ex) { + LOG.error(String.format("Error while setting max concurrent tasks from DefaultSchedule annotation: %s", ex.getMessage()), ex); + } + } + } + } catch (Throwable ex) { + LOG.error(String.format("Error while setting default schedule from DefaultSchedule annotation: %s",ex.getMessage()),ex); + } } /** diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledProcessor.java new file mode 100644 index 0000000000..b8e469b2e2 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledProcessor.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.scheduling.SchedulingStrategy; + + +/** + * Dummy processor to test @DefaultSchedule annotation + */ +@DefaultSchedule(concurrentTasks = 5, strategy = SchedulingStrategy.CRON_DRIVEN, period = "0 0 0 1/1 * ?") +public class DummyScheduledProcessor extends AbstractProcessor { + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummySettingsProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummySettingsProcessor.java new file mode 100644 index 0000000000..34c16afda3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummySettingsProcessor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +/** + * Dummy Processor to test @DefaultSettings annotation + */ +@DefaultSettings(yieldDuration = "5 sec", penaltyDuration = "1 min", bulletinLevel = LogLevel.DEBUG) +public class DummySettingsProcessor extends AbstractProcessor { + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index bbcdc3b7b8..32f8135846 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -46,10 +46,12 @@ import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.junit.After; @@ -57,6 +59,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; + public class TestFlowController { private FlowController controller; @@ -327,4 +330,31 @@ public class TestFlowController { assertFalse(service.equals(serviceNode)); } + @Test + public void testProcessorDefaultScheduleAnnotation() throws ProcessorInstantiationException,ClassNotFoundException,InstantiationException,IllegalAccessException { + ProcessorNode p_scheduled = controller.createProcessor(DummyScheduledProcessor.class.getName(),"1234-ScheduledProcessor"); + assertEquals(5,p_scheduled.getMaxConcurrentTasks()); + assertEquals(SchedulingStrategy.CRON_DRIVEN,p_scheduled.getSchedulingStrategy()); + assertEquals("0 0 0 1/1 * ?",p_scheduled.getSchedulingPeriod()); + assertEquals("1 sec",p_scheduled.getYieldPeriod()); + assertEquals("30 sec",p_scheduled.getPenalizationPeriod()); + assertEquals(LogLevel.WARN,p_scheduled.getBulletinLevel()); + } + + @Test + public void testProcessorDefaultSettingsAnnotation() throws ProcessorInstantiationException,ClassNotFoundException { + + ProcessorNode p_settings = controller.createProcessor(DummySettingsProcessor.class.getName(),"1234-SettingsProcessor"); + assertEquals("5 sec",p_settings.getYieldPeriod()); + assertEquals("1 min",p_settings.getPenalizationPeriod()); + assertEquals(LogLevel.DEBUG,p_settings.getBulletinLevel()); + assertEquals(1,p_settings.getMaxConcurrentTasks()); + assertEquals(SchedulingStrategy.TIMER_DRIVEN,p_settings.getSchedulingStrategy()); + assertEquals("0 sec",p_settings.getSchedulingPeriod()); + } + + + + + }