From 28fc030ef57b2d54f0bdca44ade763c25c2cacd8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lehel=20Bo=C3=A9r?= Date: Tue, 30 Aug 2022 20:38:56 +0200 Subject: [PATCH] NIFI-10366: Make Default Run Duration configurable (#6310) NIFI-10366: Make Default Run Duration configurable --- .../behavior/DefaultRunDuration.java | 40 +++++++++++++++++++ .../annotation/behavior/SupportsBatching.java | 4 +- .../nifi/controller/ExtensionBuilder.java | 17 ++++++++ 3 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 nifi-api/src/main/java/org/apache/nifi/annotation/behavior/DefaultRunDuration.java diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/DefaultRunDuration.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/DefaultRunDuration.java new file mode 100644 index 0000000000..2ef6aa8c8a --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/DefaultRunDuration.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.behavior; + +import java.time.Duration; + +public enum DefaultRunDuration { + NO_BATCHING(Duration.ZERO), + TWENTY_FIVE_MILLIS(Duration.ofMillis(25)), + FIFTY_MILLIS(Duration.ofMillis(50)), + ONE_HUNDRED_MILLIS(Duration.ofMillis(100)), + TWO_HUNDRED_FIFTY_MILLIS(Duration.ofMillis(250)), + FIVE_HUNDRED_MILLIS(Duration.ofMillis(500)), + ONE_SECOND(Duration.ofSeconds(1)), + TWO_SECONDS(Duration.ofSeconds(2)); + + private final Duration duration; + + DefaultRunDuration(final Duration duration) { + this.duration = duration; + } + + public Duration getDuration() { + return duration; + } +} diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SupportsBatching.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SupportsBatching.java index 3ffe2d6308..49136ade7c 100644 --- a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SupportsBatching.java +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SupportsBatching.java @@ -41,11 +41,13 @@ import java.lang.annotation.Target; * ProcessSession.commit() to ensure data is persisted before deleting the data * from a remote source. * + * When the defaultDuration parameter is set, the processor is created with the supplied duration time, which can be adjusted afterwards. + * The supplied values can be selected from {@link org.apache.nifi.annotation.behavior.DefaultRunDuration}. */ @Documented @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface SupportsBatching { - + DefaultRunDuration defaultDuration() default DefaultRunDuration.NO_BATCHING; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java index 883c352677..c59a6a5be6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java @@ -16,9 +16,11 @@ */ package org.apache.nifi.controller; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.configuration.DefaultSettings; import org.apache.nifi.bundle.Bundle; import org.apache.nifi.bundle.BundleCoordinate; @@ -320,6 +322,8 @@ public class ExtensionBuilder { } applyDefaultSettings(procNode); + applyDefaultRunDuration(procNode); + return procNode; } @@ -359,6 +363,19 @@ public class ExtensionBuilder { } } + private void applyDefaultRunDuration(final ProcessorNode processorNode) { + try { + final Class procClass = processorNode.getProcessor().getClass(); + + final SupportsBatching sb = procClass.getAnnotation(SupportsBatching.class); + if (sb != null) { + processorNode.setRunDuration(sb.defaultDuration().getDuration().toMillis(), TimeUnit.MILLISECONDS); + } + } catch (final Exception ex) { + logger.error("Set Default Run Duration failed", ex); + } + } + private ControllerServiceNode createControllerServiceNode() throws ClassNotFoundException, IllegalAccessException, InstantiationException, InitializationException { final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); try {