From 65b7b377e3072b83f75e63b053a4f080726d824d Mon Sep 17 00:00:00 2001 From: Mathias Tiberghien Date: Wed, 5 Oct 2016 17:50:08 +0200 Subject: [PATCH] NIFI-1526: DefaultSchedule annotation should be use on Custom Processor to set the default scheduling strategy, scheduling period or max number of concurrent task for each instance of the processor DefaultSettings annotation should be use on Custom Processor to set the default penalty period, the yield duration or the bulletin log level for each instance of the processor --- .../configuration/DefaultSchedule.java | 45 +++++++++++++++++++ .../configuration/DefaultSettings.java | 42 +++++++++++++++++ .../nifi/controller/FlowController.java | 27 +++++++++++ .../controller/StandardProcessorNode.java | 27 ++++++++++- .../controller/DummyScheduledProcessor.java | 37 +++++++++++++++ .../controller/DummySettingsProcessor.java | 36 +++++++++++++++ .../nifi/controller/TestFlowController.java | 30 +++++++++++++ 7 files changed, 243 insertions(+), 1 deletion(-) create mode 100644 nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java create mode 100644 nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSettings.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledProcessor.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummySettingsProcessor.java diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java new file mode 100644 index 0000000000..ff9125cd70 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.annotation.configuration; + +import org.apache.nifi.scheduling.SchedulingStrategy; + +import java.lang.annotation.Documented; +import java.lang.annotation.Target; +import java.lang.annotation.Retention; +import java.lang.annotation.ElementType; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Inherited; + +/** + *

+ * Marker interface that a Processor can use to configure the schedule strategy, the period and the number of concurrent tasks. + * Note that the number of Concurrent tasks will be ignored if the annotion @TriggerSerialy is used + *

+ */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface DefaultSchedule { + + SchedulingStrategy strategy() default SchedulingStrategy.TIMER_DRIVEN; + String period() default "0 sec"; + int concurrentTasks() default 1; + +} diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSettings.java b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSettings.java new file mode 100644 index 0000000000..d01972c8c7 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSettings.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.annotation.configuration; + +import java.lang.annotation.Documented; +import java.lang.annotation.Target; +import java.lang.annotation.Retention; +import java.lang.annotation.ElementType; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Inherited; +import org.apache.nifi.logging.LogLevel; + +/** + *

+ * Marker interface that a Processor can use to configure the yield duration, the penalty duration and the bulletin log level. + * Note that the number of Concurrent tasks will be ignored if the annotation @TriggerSerialy is used + *

+ */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface DefaultSettings { + String yieldDuration() default "1 sec"; + String penaltyDuration() default "30 sec"; + LogLevel bulletinLevel() default LogLevel.WARN; +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index b790526ca3..a5789e99bf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -21,6 +21,7 @@ import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.admin.service.AuditService; +import org.apache.nifi.annotation.configuration.DefaultSettings; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; import org.apache.nifi.annotation.lifecycle.OnRemoved; @@ -1060,6 +1061,32 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R final LogRepository logRepository = LogRepositoryFactory.getRepository(id); logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode)); + try { + final Class procClass = processor.getClass(); + if(procClass.isAnnotationPresent(DefaultSettings.class)) { + DefaultSettings ds = procClass.getAnnotation(DefaultSettings.class); + try { + procNode.setYieldPeriod(ds.yieldDuration()); + } catch(Throwable ex) { + LOG.error(String.format("Error while setting yield period from DefaultSettings annotation:%s",ex.getMessage()),ex); + } + try { + + procNode.setPenalizationPeriod(ds.penaltyDuration()); + } catch(Throwable ex) { + LOG.error(String.format("Error while setting penalty duration from DefaultSettings annotation:%s",ex.getMessage()),ex); + } + try { + procNode.setBulletinLevel(ds.bulletinLevel()); + } catch (Throwable ex) { + LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s",ex.getMessage()),ex); + } + + } + } catch (Throwable ex) { + LOG.error(String.format("Error while setting default settings from DefaultSettings annotation: %s",ex.getMessage()),ex); + } + if (firstTimeAdded) { try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 27fdc0f705..aede8256f6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable; import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; @@ -152,7 +153,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final VariableRegistry variableRegistry, final ComponentLog logger) { super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, logger); - this.processor = processor; identifier = new AtomicReference<>(uuid); destinations = new HashMap<>(); @@ -191,6 +191,31 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; executionNode = ExecutionNode.ALL; + try { + if (procClass.isAnnotationPresent(DefaultSchedule.class)) { + DefaultSchedule dsc = procClass.getAnnotation(DefaultSchedule.class); + try { + this.setSchedulingStrategy(dsc.strategy()); + } catch (Throwable ex) { + LOG.error(String.format("Error while setting scheduling strategy from DefaultSchedule annotation: %s", ex.getMessage()), ex); + } + try { + this.setScheduldingPeriod(dsc.period()); + } catch (Throwable ex) { + this.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN); + LOG.error(String.format("Error while setting scheduling period from DefaultSchedule annotation: %s", ex.getMessage()), ex); + } + if (!triggeredSerially) { + try { + setMaxConcurrentTasks(dsc.concurrentTasks()); + } catch (Throwable ex) { + LOG.error(String.format("Error while setting max concurrent tasks from DefaultSchedule annotation: %s", ex.getMessage()), ex); + } + } + } + } catch (Throwable ex) { + LOG.error(String.format("Error while setting default schedule from DefaultSchedule annotation: %s",ex.getMessage()),ex); + } } /** diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledProcessor.java new file mode 100644 index 0000000000..b8e469b2e2 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledProcessor.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.scheduling.SchedulingStrategy; + + +/** + * Dummy processor to test @DefaultSchedule annotation + */ +@DefaultSchedule(concurrentTasks = 5, strategy = SchedulingStrategy.CRON_DRIVEN, period = "0 0 0 1/1 * ?") +public class DummyScheduledProcessor extends AbstractProcessor { + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummySettingsProcessor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummySettingsProcessor.java new file mode 100644 index 0000000000..34c16afda3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummySettingsProcessor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import org.apache.nifi.annotation.configuration.DefaultSettings; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +/** + * Dummy Processor to test @DefaultSettings annotation + */ +@DefaultSettings(yieldDuration = "5 sec", penaltyDuration = "1 min", bulletinLevel = LogLevel.DEBUG) +public class DummySettingsProcessor extends AbstractProcessor { + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index bbcdc3b7b8..32f8135846 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -46,10 +46,12 @@ import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException; import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.logging.LogLevel; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.MockProvenanceRepository; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FileBasedVariableRegistry; import org.apache.nifi.util.NiFiProperties; import org.junit.After; @@ -57,6 +59,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; + public class TestFlowController { private FlowController controller; @@ -327,4 +330,31 @@ public class TestFlowController { assertFalse(service.equals(serviceNode)); } + @Test + public void testProcessorDefaultScheduleAnnotation() throws ProcessorInstantiationException,ClassNotFoundException,InstantiationException,IllegalAccessException { + ProcessorNode p_scheduled = controller.createProcessor(DummyScheduledProcessor.class.getName(),"1234-ScheduledProcessor"); + assertEquals(5,p_scheduled.getMaxConcurrentTasks()); + assertEquals(SchedulingStrategy.CRON_DRIVEN,p_scheduled.getSchedulingStrategy()); + assertEquals("0 0 0 1/1 * ?",p_scheduled.getSchedulingPeriod()); + assertEquals("1 sec",p_scheduled.getYieldPeriod()); + assertEquals("30 sec",p_scheduled.getPenalizationPeriod()); + assertEquals(LogLevel.WARN,p_scheduled.getBulletinLevel()); + } + + @Test + public void testProcessorDefaultSettingsAnnotation() throws ProcessorInstantiationException,ClassNotFoundException { + + ProcessorNode p_settings = controller.createProcessor(DummySettingsProcessor.class.getName(),"1234-SettingsProcessor"); + assertEquals("5 sec",p_settings.getYieldPeriod()); + assertEquals("1 min",p_settings.getPenalizationPeriod()); + assertEquals(LogLevel.DEBUG,p_settings.getBulletinLevel()); + assertEquals(1,p_settings.getMaxConcurrentTasks()); + assertEquals(SchedulingStrategy.TIMER_DRIVEN,p_settings.getSchedulingStrategy()); + assertEquals("0 sec",p_settings.getSchedulingPeriod()); + } + + + + + }