From 89f1bd3189970f9930bfdd4861fc2043dfd30c90 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Thu, 5 Jan 2017 23:14:37 +0100 Subject: [PATCH] NIFI-957 Added the possibility to use DefaultSchedule annotation in reporting tasks This closes #1400 --- .../configuration/DefaultSchedule.java | 7 ++-- .../reporting/AbstractReportingTaskNode.java | 24 +++++++++++++ .../DummyScheduledReportingTask.java | 34 +++++++++++++++++++ .../nifi/controller/TestFlowController.java | 10 ++++-- 4 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledReportingTask.java diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java index ff9125cd70..8635a74513 100644 --- a/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/configuration/DefaultSchedule.java @@ -28,8 +28,9 @@ import java.lang.annotation.Inherited; /** *

- * Marker interface that a Processor can use to configure the schedule strategy, the period and the number of concurrent tasks. - * Note that the number of Concurrent tasks will be ignored if the annotion @TriggerSerialy is used + * Marker interface that a Processor can use to configure default settings for the schedule strategy, the period and the number of concurrent tasks. + * Marker interface that a ReportingTask can use to configure default settings the schedule strategy and the period. + * Note that the number of Concurrent tasks will be ignored if the annotation @TriggerSerialy is used *

*/ @Documented @@ -42,4 +43,4 @@ public @interface DefaultSchedule { String period() default "0 sec"; int concurrentTasks() default 1; -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index deca385a86..d7ae309e6f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.controller.reporting; +import org.apache.nifi.annotation.configuration.DefaultSchedule; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractConfiguredComponent; import org.apache.nifi.controller.ConfigurationContext; @@ -40,8 +41,14 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.annotation.AnnotationUtils; + public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode { + private static final Logger LOG = LoggerFactory.getLogger(AbstractReportingTaskNode.class); + private final ReportingTask reportingTask; private final ProcessScheduler processScheduler; private final ControllerServiceLookup serviceLookup; @@ -72,6 +79,23 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon this.reportingTask = reportingTask; this.processScheduler = processScheduler; this.serviceLookup = controllerServiceProvider; + + final Class reportingClass = reportingTask.getClass(); + + DefaultSchedule dsc = AnnotationUtils.findAnnotation(reportingClass, DefaultSchedule.class); + if(dsc != null) { + try { + this.setSchedulingStrategy(dsc.strategy()); + } catch (Throwable ex) { + LOG.error(String.format("Error while setting scheduling strategy from DefaultSchedule annotation: %s", ex.getMessage()), ex); + } + try { + this.setSchedulingPeriod(dsc.period()); + } catch (Throwable ex) { + this.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN); + LOG.error(String.format("Error while setting scheduling period from DefaultSchedule annotation: %s", ex.getMessage()), ex); + } + } } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledReportingTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledReportingTask.java new file mode 100644 index 0000000000..9dd19e3029 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/DummyScheduledReportingTask.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.reporting.AbstractReportingTask; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.scheduling.SchedulingStrategy; + +/** + * Dummy reporting task to test @DefaultSchedule annotation + */ +@DefaultSchedule(strategy = SchedulingStrategy.CRON_DRIVEN, period = "0 0 0 1/1 * ?") +public class DummyScheduledReportingTask extends AbstractReportingTask { + @Override + public void onTrigger(ReportingContext context) { + + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java index 85d3491cd7..f770f5ef56 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFlowController.java @@ -333,7 +333,7 @@ public class TestFlowController { } @Test - public void testProcessorDefaultScheduleAnnotation() throws ProcessorInstantiationException,ClassNotFoundException,InstantiationException,IllegalAccessException { + public void testProcessorDefaultScheduleAnnotation() throws ProcessorInstantiationException { ProcessorNode p_scheduled = controller.createProcessor(DummyScheduledProcessor.class.getName(),"1234-ScheduledProcessor"); assertEquals(5,p_scheduled.getMaxConcurrentTasks()); assertEquals(SchedulingStrategy.CRON_DRIVEN,p_scheduled.getSchedulingStrategy()); @@ -344,8 +344,14 @@ public class TestFlowController { } @Test - public void testProcessorDefaultSettingsAnnotation() throws ProcessorInstantiationException,ClassNotFoundException { + public void testReportingTaskDefaultScheduleAnnotation() throws ReportingTaskInstantiationException { + ReportingTaskNode p_scheduled = controller.createReportingTask(DummyScheduledReportingTask.class.getName()); + assertEquals(SchedulingStrategy.CRON_DRIVEN,p_scheduled.getSchedulingStrategy()); + assertEquals("0 0 0 1/1 * ?",p_scheduled.getSchedulingPeriod()); + } + @Test + public void testProcessorDefaultSettingsAnnotation() throws ProcessorInstantiationException { ProcessorNode p_settings = controller.createProcessor(DummySettingsProcessor.class.getName(),"1234-SettingsProcessor"); assertEquals("5 sec",p_settings.getYieldPeriod()); assertEquals("1 min",p_settings.getPenalizationPeriod());