NIFI-1526: DefaultSchedule annotation should be use on Custom Processor to set the default scheduling strategy, scheduling period or max number of concurrent task for each instance of the processor

DefaultSettings annotation should be use on Custom Processor to set the default penalty period, the yield duration or the bulletin log level for each instance of the processor
This commit is contained in:
Mathias Tiberghien 2016-10-05 17:50:08 +02:00 committed by Mark Payne
parent 8d3177c38a
commit 65b7b377e3
7 changed files with 243 additions and 1 deletions

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.annotation.configuration;
import org.apache.nifi.scheduling.SchedulingStrategy;
import java.lang.annotation.Documented;
import java.lang.annotation.Target;
import java.lang.annotation.Retention;
import java.lang.annotation.ElementType;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Inherited;
/**
* <p>
* Marker interface that a Processor can use to configure the schedule strategy, the period and the number of concurrent tasks.
* Note that the number of Concurrent tasks will be ignored if the annotion @TriggerSerialy is used
* </p>
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface DefaultSchedule {
SchedulingStrategy strategy() default SchedulingStrategy.TIMER_DRIVEN;
String period() default "0 sec";
int concurrentTasks() default 1;
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.annotation.configuration;
import java.lang.annotation.Documented;
import java.lang.annotation.Target;
import java.lang.annotation.Retention;
import java.lang.annotation.ElementType;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Inherited;
import org.apache.nifi.logging.LogLevel;
/**
* <p>
* Marker interface that a Processor can use to configure the yield duration, the penalty duration and the bulletin log level.
* Note that the number of Concurrent tasks will be ignored if the annotation @TriggerSerialy is used
* </p>
*/
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface DefaultSettings {
String yieldDuration() default "1 sec";
String penaltyDuration() default "30 sec";
LogLevel bulletinLevel() default LogLevel.WARN;
}

View File

@ -21,6 +21,7 @@ import org.apache.commons.collections4.Predicate;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
@ -1060,6 +1061,32 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
try {
final Class<?> procClass = processor.getClass();
if(procClass.isAnnotationPresent(DefaultSettings.class)) {
DefaultSettings ds = procClass.getAnnotation(DefaultSettings.class);
try {
procNode.setYieldPeriod(ds.yieldDuration());
} catch(Throwable ex) {
LOG.error(String.format("Error while setting yield period from DefaultSettings annotation:%s",ex.getMessage()),ex);
}
try {
procNode.setPenalizationPeriod(ds.penaltyDuration());
} catch(Throwable ex) {
LOG.error(String.format("Error while setting penalty duration from DefaultSettings annotation:%s",ex.getMessage()),ex);
}
try {
procNode.setBulletinLevel(ds.bulletinLevel());
} catch (Throwable ex) {
LOG.error(String.format("Error while setting bulletin level from DefaultSettings annotation:%s",ex.getMessage()),ex);
}
}
} catch (Throwable ex) {
LOG.error(String.format("Error while setting default settings from DefaultSettings annotation: %s",ex.getMessage()),ex);
}
if (firstTimeAdded) {
try (final NarCloseable x = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor);

View File

@ -26,6 +26,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
@ -152,7 +153,6 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final VariableRegistry variableRegistry, final ComponentLog logger) {
super(processor, uuid, validationContextFactory, controllerServiceProvider, componentType, componentCanonicalClass, variableRegistry, logger);
this.processor = processor;
identifier = new AtomicReference<>(uuid);
destinations = new HashMap<>();
@ -191,6 +191,31 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN;
executionNode = ExecutionNode.ALL;
try {
if (procClass.isAnnotationPresent(DefaultSchedule.class)) {
DefaultSchedule dsc = procClass.getAnnotation(DefaultSchedule.class);
try {
this.setSchedulingStrategy(dsc.strategy());
} catch (Throwable ex) {
LOG.error(String.format("Error while setting scheduling strategy from DefaultSchedule annotation: %s", ex.getMessage()), ex);
}
try {
this.setScheduldingPeriod(dsc.period());
} catch (Throwable ex) {
this.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN);
LOG.error(String.format("Error while setting scheduling period from DefaultSchedule annotation: %s", ex.getMessage()), ex);
}
if (!triggeredSerially) {
try {
setMaxConcurrentTasks(dsc.concurrentTasks());
} catch (Throwable ex) {
LOG.error(String.format("Error while setting max concurrent tasks from DefaultSchedule annotation: %s", ex.getMessage()), ex);
}
}
}
} catch (Throwable ex) {
LOG.error(String.format("Error while setting default schedule from DefaultSchedule annotation: %s",ex.getMessage()),ex);
}
}
/**

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.scheduling.SchedulingStrategy;
/**
* Dummy processor to test @DefaultSchedule annotation
*/
@DefaultSchedule(concurrentTasks = 5, strategy = SchedulingStrategy.CRON_DRIVEN, period = "0 0 0 1/1 * ?")
public class DummyScheduledProcessor extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.nifi.annotation.configuration.DefaultSettings;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
/**
* Dummy Processor to test @DefaultSettings annotation
*/
@DefaultSettings(yieldDuration = "5 sec", penaltyDuration = "1 min", bulletinLevel = LogLevel.DEBUG)
public class DummySettingsProcessor extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}

View File

@ -46,10 +46,12 @@ import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.MockProvenanceRepository;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FileBasedVariableRegistry;
import org.apache.nifi.util.NiFiProperties;
import org.junit.After;
@ -57,6 +59,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFlowController {
private FlowController controller;
@ -327,4 +330,31 @@ public class TestFlowController {
assertFalse(service.equals(serviceNode));
}
@Test
public void testProcessorDefaultScheduleAnnotation() throws ProcessorInstantiationException,ClassNotFoundException,InstantiationException,IllegalAccessException {
ProcessorNode p_scheduled = controller.createProcessor(DummyScheduledProcessor.class.getName(),"1234-ScheduledProcessor");
assertEquals(5,p_scheduled.getMaxConcurrentTasks());
assertEquals(SchedulingStrategy.CRON_DRIVEN,p_scheduled.getSchedulingStrategy());
assertEquals("0 0 0 1/1 * ?",p_scheduled.getSchedulingPeriod());
assertEquals("1 sec",p_scheduled.getYieldPeriod());
assertEquals("30 sec",p_scheduled.getPenalizationPeriod());
assertEquals(LogLevel.WARN,p_scheduled.getBulletinLevel());
}
@Test
public void testProcessorDefaultSettingsAnnotation() throws ProcessorInstantiationException,ClassNotFoundException {
ProcessorNode p_settings = controller.createProcessor(DummySettingsProcessor.class.getName(),"1234-SettingsProcessor");
assertEquals("5 sec",p_settings.getYieldPeriod());
assertEquals("1 min",p_settings.getPenalizationPeriod());
assertEquals(LogLevel.DEBUG,p_settings.getBulletinLevel());
assertEquals(1,p_settings.getMaxConcurrentTasks());
assertEquals(SchedulingStrategy.TIMER_DRIVEN,p_settings.getSchedulingStrategy());
assertEquals("0 sec",p_settings.getSchedulingPeriod());
}
}