NIFI-9386: Adding status task schedule to Stateless engine config

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5532.
This commit is contained in:
Joe Gresock 2021-11-17 20:02:48 -05:00 committed by Pierre Villard
parent f3c7537d9b
commit 429087c11d
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
15 changed files with 201 additions and 19 deletions

View File

@ -315,6 +315,11 @@ public class StatelessKafkaConnectorUtil {
return extensionClientDefinitions;
}
@Override
public String getStatusTaskInterval() {
return "1 min";
}
};
return engineConfiguration;

View File

@ -36,5 +36,8 @@ nifi.stateless.sensitive.props.key=
#nifi.stateless.extension.client.mvn-central.baseUrl=https://repo1.maven.org/maven2/
#nifi.stateless.extension.client.mvn-central.useSslContext=false
# Schedule for status logging task
nifi.stateless.status.task.interval=1 min
# Kerberos Properties #
nifi.stateless.kerberos.krb5.file=/etc/krb5.conf

View File

@ -55,6 +55,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.stateless.retrieval.CachingDataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.DataflowProvider;
import org.apache.nifi.processors.stateless.retrieval.FileSystemDataflowProvider;
@ -329,6 +330,16 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor STATUS_TASK_INTERVAL = new Builder()
.name("Status Task Interval")
.displayName("Status Task Interval")
.description("The Stateless engine periodically logs the status of the dataflow's processors. This property allows the interval to be changed, or the status logging " +
"to be skipped altogether if the property is not set.")
.required(false)
.addValidator(StandardValidators.createTimePeriodValidator(10, TimeUnit.SECONDS, 24, TimeUnit.HOURS))
.expressionLanguageSupported(NONE)
.build();
static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("For any incoming FlowFile that is successfully processed, the original incoming FlowFile will be transferred to this Relationship")
@ -375,7 +386,8 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
MAX_INGEST_FLOWFILES,
MAX_INGEST_DATA_SIZE,
STATELESS_SSL_CONTEXT_SERVICE,
KRB5_CONF);
KRB5_CONF,
STATUS_TASK_INTERVAL);
}
@Override
@ -850,6 +862,8 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
contentRepoDirectory = null;
}
final String statusTaskInterval = context.getProperty(STATUS_TASK_INTERVAL).getValue();
return new StatelessEngineConfiguration() {
@Override
public File getWorkingDirectory() {
@ -900,6 +914,11 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
public boolean isLogExtensionDiscovery() {
return false;
}
@Override
public String getStatusTaskInterval() {
return statusTaskInterval;
}
};
}

View File

@ -51,6 +51,7 @@ public class PropertiesFileEngineConfigurationParser {
private static final String READONLY_EXTENSIONS_DIRECTORY = PREFIX + "readonly.extensions.directory.";
private static final String WORKING_DIRECTORY = PREFIX + "working.directory";
private static final String CONTENT_REPO_DIRECTORY = PREFIX + "content.repository.directory";
private static final String STATUS_TASK_INTERVAL = PREFIX + "status.task.interval";
private static final String TRUSTSTORE_FILE = PREFIX + "security.truststore";
private static final String TRUSTSTORE_TYPE = PREFIX + "security.truststoreType";
@ -108,6 +109,8 @@ public class PropertiesFileEngineConfigurationParser {
final List<ExtensionClientDefinition> extensionClients = parseExtensionClients(properties);
final String statusTaskInterval = properties.getProperty(STATUS_TASK_INTERVAL, "1 min");
return new StatelessEngineConfiguration() {
@Override
public File getWorkingDirectory() {
@ -153,6 +156,11 @@ public class PropertiesFileEngineConfigurationParser {
public List<ExtensionClientDefinition> getExtensionClients() {
return extensionClients;
}
@Override
public String getStatusTaskInterval() {
return statusTaskInterval;
}
};
}
@ -166,7 +174,6 @@ public class PropertiesFileEngineConfigurationParser {
.collect(Collectors.toList());
}
private List<ExtensionClientDefinition> parseExtensionClients(final Properties properties) {
final Map<String, ExtensionClientDefinition> extensionClientDefinitions = new LinkedHashMap<>();

View File

@ -83,4 +83,10 @@ public interface StatelessEngineConfiguration {
default boolean isLogExtensionDiscovery() {
return true;
}
/**
* @return a String representing the interval between periodic status task executions (e.g., 1 min).
* A <code>null</code> value indicates that no status tasks are scheduled.
*/
String getStatusTaskInterval();
}

View File

@ -114,6 +114,31 @@ public class PropertiesFileEngineConfigurationParserTest {
assertEquals(0, readOnlyExtensionsDirs.size());
}
@Test
public void testStatusTaskSchedule() throws IOException, StatelessConfigurationException {
final Properties properties = getRequiredProperties();
properties.setProperty("nifi.stateless.status.task.interval", "15 secs");
final File propertiesFile = getPropertiesFile(properties);
final StatelessEngineConfiguration configuration = parser.parseEngineConfiguration(propertiesFile);
assertNotNull(configuration);
final String statusTaskInterval = configuration.getStatusTaskInterval();
assertEquals("15 secs", statusTaskInterval);
}
@Test
public void testStatusTaskScheduleEmpty() throws IOException, StatelessConfigurationException {
final Properties properties = getRequiredProperties();
properties.setProperty("nifi.stateless.status.task.interval", "");
final File propertiesFile = getPropertiesFile(properties);
final StatelessEngineConfiguration configuration = parser.parseEngineConfiguration(propertiesFile);
assertNotNull(configuration);
final String statusTaskInterval = configuration.getStatusTaskInterval();
assertEquals("", statusTaskInterval);
}
private Properties getRequiredProperties() {
final Properties properties = new Properties();

View File

@ -265,6 +265,12 @@ nifi.stateless.extension.client.mvn-central.useSslContext=false
nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
```
Other configuration properties for the Engine Configuration include:
| Property Name | Description | Example Value |
|---------------|-------------|---------------|
| nifi.stateless.status.task.interval | The Stateless Engine can periodically log the status of all processors. This property can configure the period, or the logging can be avoided by setting this property value to empty. | 1 min |
A minimum configuration of the Engine Configuration may look as follows:
```
@ -272,7 +278,6 @@ nifi.stateless.nar.directory=/var/lib/nifi/lib
nifi.stateless.working.directory=/var/lib/nifi/work/stateless
```
#### Dataflow Configuration
While the Engine Configuration above gives Stateless NiFi the necessary information for how to run the flow, the dataflow

View File

@ -26,3 +26,5 @@ nifi.stateless.security.truststorePasswd=
nifi.stateless.sensitive.props.key=nifi-stateless
nifi.stateless.kerberos.krb5.file=/etc/krb5.conf
nifi.stateless.status.task.interval=1 min

View File

@ -74,9 +74,12 @@ import org.apache.nifi.stateless.parameter.CompositeParameterValueProvider;
import org.apache.nifi.stateless.parameter.ParameterValueProvider;
import org.apache.nifi.stateless.parameter.ParameterValueProviderInitializationContext;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -98,6 +101,7 @@ import static java.util.Objects.requireNonNull;
public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSnapshot> {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessEngine.class);
private static final int CONCURRENT_EXTENSION_DOWNLOADS = 8;
public static final Duration DEFAULT_STATUS_TASK_PERIOD = Duration.of(1, ChronoUnit.MINUTES);
// Member Variables injected via Builder
private final ExtensionManager extensionManager;
@ -112,6 +116,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
private final ProvenanceRepository provenanceRepository;
private final ExtensionRepository extensionRepository;
private final CounterRepository counterRepository;
private final Duration statusTaskInterval;
// Member Variables created/managed internally
private final ReloadComponent reloadComponent;
@ -137,6 +142,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
this.provenanceRepository = requireNonNull(builder.provenanceRepository, "Provenance Repository must be provided");
this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
this.counterRepository = requireNonNull(builder.counterRepository, "Counter Repository must be provided");
this.statusTaskInterval = parseDuration(builder.statusTaskInterval);
this.reloadComponent = new StatelessReloadComponent(this);
this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
@ -191,8 +197,10 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler, bulletinRepository);
if (statusTaskInterval != null) {
final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
dataflow.scheduleBackgroundTask(logComponentStatuses, 1, TimeUnit.MINUTES);
dataflow.scheduleBackgroundTask(logComponentStatuses, statusTaskInterval.toMillis(), TimeUnit.MILLISECONDS);
}
return dataflow;
}
@ -653,6 +661,11 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return counterRepository;
}
@Override
public Duration getStatusTaskInterval() {
return statusTaskInterval;
}
public static class Builder {
private ExtensionManager extensionManager = null;
private BulletinRepository bulletinRepository = null;
@ -666,6 +679,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
private ProvenanceRepository provenanceRepository = null;
private ExtensionRepository extensionRepository = null;
private CounterRepository counterRepository = null;
private String statusTaskInterval = null;
public Builder extensionManager(final ExtensionManager extensionManager) {
this.extensionManager = extensionManager;
@ -727,8 +741,33 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return this;
}
public Builder statusTaskInterval(final String statusTaskInterval) {
this.statusTaskInterval = statusTaskInterval;
return this;
}
public StandardStatelessEngine build() {
return new StandardStatelessEngine(this);
}
}
static Duration parseDuration(final String durationValue) {
if (durationValue == null || durationValue.trim().isEmpty()) {
return null;
}
try {
final Long taskScheduleSeconds = FormatUtils.getTimeDuration(durationValue.trim(), TimeUnit.SECONDS);
final Duration taskScheduleDuration = Duration.ofSeconds(taskScheduleSeconds);
if (taskScheduleDuration.toMillis() < 1000) {
logger.warn("Status task schedule period [{}] must be at least one second", durationValue);
throw new IllegalArgumentException("Status task schedule period is too small");
}
return taskScheduleDuration;
} catch (final IllegalArgumentException e) {
logger.warn("Encountered invalid status task schedule: <{}>. Will ignore this property.", durationValue);
return DEFAULT_STATUS_TASK_PERIOD;
}
}
}

View File

@ -35,6 +35,8 @@ import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import java.time.Duration;
public interface StatelessEngine<T> {
void initialize(StatelessEngineInitializationContext initializationContext);
@ -70,4 +72,6 @@ public interface StatelessEngine<T> {
FlowFileEventRepository getFlowFileEventRepository();
CounterRepository getCounterRepository();
Duration getStatusTaskInterval();
}

View File

@ -193,6 +193,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
.provenanceRepository(provenanceRepo)
.extensionRepository(extensionRepository)
.counterRepository(counterRepo)
.statusTaskInterval(engineConfiguration.getStatusTaskInterval())
.build();
final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext, bulletinRepository);

View File

@ -123,6 +123,11 @@ public class TestPropertiesFileFlowDefinitionParser {
public List<ExtensionClientDefinition> getExtensionClients() {
return Collections.emptyList();
}
@Override
public String getStatusTaskInterval() {
return null;
}
};
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.engine;
import org.junit.Test;
import java.time.Duration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class TestStandardStatelessEngine {
@Test
public void parseDurationTooSmall() {
// Falls back to default 1 minute if invalid
assertEquals(Duration.ofMinutes(1), StandardStatelessEngine.parseDuration("999 ms"));
}
@Test
public void parseDurationInvalid() {
// Falls back to default 1 minute if invalid
assertEquals(Duration.ofMinutes(1), StandardStatelessEngine.parseDuration("1 nonsense"));
}
@Test
public void parseDurationValid() {
assertEquals(Duration.ofSeconds(1), StandardStatelessEngine.parseDuration("1 sec"));
assertEquals(Duration.ofHours(24), StandardStatelessEngine.parseDuration("24 hours"));
assertEquals(Duration.ofSeconds(5), StandardStatelessEngine.parseDuration(" 5 secs "));
}
@Test
public void parseDurationNull() {
assertNull(StandardStatelessEngine.parseDuration(""));
assertNull(StandardStatelessEngine.parseDuration(" "));
assertNull(StandardStatelessEngine.parseDuration(null));
}
}

View File

@ -40,5 +40,8 @@ nifi.stateless.extension.client.mvn-central.timeout=30 sec
nifi.stateless.extension.client.mvn-central.baseUrl=https://repo1.maven.org/maven2/
nifi.stateless.extension.client.mvn-central.useSslContext=false
# Schedule for status logging task
nifi.stateless.status.task.interval=1 min
# Kerberos Properties #
nifi.stateless.kerberos.krb5.file=/etc/krb5.conf

View File

@ -121,6 +121,11 @@ public class StatelessSystemIT {
public List<ExtensionClientDefinition> getExtensionClients() {
return Collections.emptyList();
}
@Override
public String getStatusTaskInterval() {
return null;
}
};
}