NIFI-8629: Implemented the LogComponentStatuses task that runs periodically in stateless.

Also noticed a typo in the ControllerStatusReportingTask and found in comparing outputs
that it had a bug that caused it to log counters generated only by processors at the root level so fixed that.

This closes #5101

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2021-05-25 08:27:25 -04:00 committed by exceptionfactory
parent 46b1f6755c
commit 08edc33eb7
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
6 changed files with 293 additions and 12 deletions

View File

@ -119,7 +119,7 @@ public class ControllerStatusReportingTask extends AbstractReportingTask {
"Flow Files Out", "Bytes Read", "Bytes Written", "Tasks", "Proc Time");
processorBorderLine = createLine(processorHeader);
counterHeader = String.format(COUNTER_LINE_FORMAT, "Context Context", "Counter Name", "Counter Value");
counterHeader = String.format(COUNTER_LINE_FORMAT, "Counter Context", "Counter Name", "Counter Value");
counterBorderLine = createLine(counterHeader);
}
@ -228,6 +228,10 @@ public class ControllerStatusReportingTask extends AbstractReportingTask {
}
}
}
for (final ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
printCounterStatus(childGroupStatus, builder, showDeltas, divisor);
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.reporting;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class LogComponentStatuses implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(LogComponentStatuses.class);
private static final int METRIC_CACHE_SECONDS = 300; // FlowFileEvent Repository holds 300 seconds' worth of metrics/events
private static final String PROCESSOR_LINE_FORMAT = "| %1$-30.30s | %2$-36.36s | %3$-30.30s | %4$28.28s | %5$30.30s | %6$14.14s | %714.14s | %8$28.28s |\n";
private static final String COUNTER_LINE_FORMAT = "| %1$-36.36s | %2$-36.36s | %3$28.28s | %4$28.28s |\n";
private final FlowFileEventRepository flowFileEventRepository;
private final CounterRepository counterRepository;
private final FlowManager flowManager;
private final String processorHeader;
private final String processorBorderLine;
private final String counterHeader;
private final String counterBorderLine;
private final Map<String, Long> previousCounterValues = new ConcurrentHashMap<>();
private volatile long lastTriggerTime = System.currentTimeMillis();
public LogComponentStatuses(final FlowFileEventRepository flowFileEventRepository, final CounterRepository counterRepository, final FlowManager flowManager) {
this.flowFileEventRepository = flowFileEventRepository;
this.counterRepository = counterRepository;
this.flowManager = flowManager;
processorHeader = String.format(PROCESSOR_LINE_FORMAT, "Processor Name", "Processor ID", "Processor Type", "Bytes Read/sec", "Bytes Written/sec", "Tasks/sec", "Nanos/Task",
"Percent of Processing Time");
processorBorderLine = createLine(processorHeader);
counterHeader = String.format(COUNTER_LINE_FORMAT, "Counter Context", "Counter Name", "Counter Value", "Increase/sec");
counterBorderLine = createLine(counterHeader);
}
private String createLine(final String valueToUnderscore) {
final StringBuilder processorBorderBuilder = new StringBuilder(valueToUnderscore.length());
for (int i = 0; i < valueToUnderscore.length(); i++) {
processorBorderBuilder.append('-');
}
return processorBorderBuilder.toString();
}
@Override
public void run() {
try {
if (!logger.isInfoEnabled()) {
return;
}
logFlowFileEvents();
logCounters();
} catch (final Exception e) {
logger.error("Failed to log component statuses", e);
}
}
private void logFlowFileEvents() {
final long timestamp = System.currentTimeMillis();
final ProcessGroup rootGroup = flowManager.getRootGroup();
final List<ProcessorNode> allProcessors = rootGroup.findAllProcessors();
long totalNanos = 0L;
final List<ProcessorAndEvent> processorsAndEvents = new ArrayList<>();
for (final ProcessorNode processorNode : allProcessors) {
final FlowFileEvent flowFileEvent = flowFileEventRepository.reportTransferEvents(processorNode.getIdentifier(), timestamp);
if (flowFileEvent == null) {
continue;
}
processorsAndEvents.add(new ProcessorAndEvent(processorNode, flowFileEvent));
totalNanos += flowFileEvent.getProcessingNanoseconds();
}
final Comparator<ProcessorAndEvent> comparator = Comparator.comparing(procAndEvent -> procAndEvent.getEvent().getProcessingNanoseconds());
processorsAndEvents.sort(comparator.reversed());
final StringBuilder builder = new StringBuilder();
builder.append("Processor Statuses:\n");
builder.append(processorBorderLine);
builder.append("\n");
builder.append(processorHeader);
builder.append(processorBorderLine);
builder.append("\n");
for (final ProcessorAndEvent processorAndEvent : processorsAndEvents) {
addStatus(processorAndEvent, builder, METRIC_CACHE_SECONDS, totalNanos);
}
builder.append(processorBorderLine);
logger.info(builder.toString());
}
private void addStatus(final ProcessorAndEvent processorAndEvent, final StringBuilder builder, final int secondsInEvent, final long totalNanos) {
final ProcessorNode processorNode = processorAndEvent.getProcessorNode();
final FlowFileEvent flowFileEvent = processorAndEvent.getEvent();
final long bytesReadPerSecond = flowFileEvent.getBytesRead() / secondsInEvent;
final long bytesWrittenPerSecond = flowFileEvent.getBytesWritten() / secondsInEvent;
final double invocations = (double) flowFileEvent.getInvocations() / (double) secondsInEvent;
final long nanos = flowFileEvent.getProcessingNanoseconds();
final double nanosPer = (double) nanos / invocations;
final double nanosRatio = (double) nanos / (double) totalNanos;
final double processingPercent = nanosRatio * 100D;
final String processingPercentTwoDecimals = String.format("%.2f %%", processingPercent);
final String bytesRead = FormatUtils.formatDataSize(bytesReadPerSecond);
final String bytesWritten = FormatUtils.formatDataSize(bytesWrittenPerSecond);
final String invocationsPerSec = String.format("%.2f", invocations);
final String nanosPerInvocation = String.format("%.2f", nanosPer);
builder.append(String.format(PROCESSOR_LINE_FORMAT,
processorNode.getName(),
processorNode.getIdentifier(),
processorNode.getComponentType(),
bytesRead,
bytesWritten,
invocationsPerSec,
nanosPerInvocation,
processingPercentTwoDecimals));
}
private void logCounters() {
final StringBuilder builder = new StringBuilder();
builder.append("Counters:\n");
builder.append(counterBorderLine);
builder.append("\n");
builder.append(counterHeader);
builder.append(counterBorderLine);
builder.append("\n");
final long now = System.currentTimeMillis();
final long millisSinceLastTrigger = now - lastTriggerTime;
final double secondsSinceLastTrigger = (double) millisSinceLastTrigger / 1000D;
lastTriggerTime = now;
final List<Counter> counters = counterRepository.getCounters();
counters.sort(Comparator.comparing(Counter::getContext).thenComparing(Counter::getName));
for (final Counter counter : counters) {
final String counterId = counter.getIdentifier();
final long lastValue = previousCounterValues.getOrDefault(counterId, 0L);
previousCounterValues.put(counterId, counter.getValue());
final long increaseSinceLast = counter.getValue() - lastValue;
final double increasePerSecond = (double) increaseSinceLast / secondsSinceLastTrigger;
final String increase = String.format("%.2f", increasePerSecond);
builder.append(String.format(COUNTER_LINE_FORMAT, counter.getContext(), counter.getName(), counter.getValue(), increase));
}
builder.append(counterBorderLine);
logger.info(builder.toString());
}
private static class ProcessorAndEvent {
private final ProcessorNode processorNode;
private final FlowFileEvent event;
public ProcessorAndEvent(final ProcessorNode processorNode, final FlowFileEvent event) {
this.processorNode = processorNode;
this.event = event;
}
public ProcessorNode getProcessorNode() {
return processorNode;
}
public FlowFileEvent getEvent() {
return event;
}
}
}

View File

@ -31,6 +31,8 @@ import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.reporting.LogComponentStatuses;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
@ -73,6 +75,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@ -93,6 +96,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
private final FlowFileEventRepository flowFileEventRepository;
private final ProvenanceRepository provenanceRepository;
private final ExtensionRepository extensionRepository;
private final CounterRepository counterRepository;
// Member Variables created/managed internally
private final ReloadComponent reloadComponent;
@ -118,6 +122,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
this.flowFileEventRepository = requireNonNull(builder.flowFileEventRepository, "FlowFile Event Repository must be provided");
this.provenanceRepository = requireNonNull(builder.provenanceRepository, "Provenance Repository must be provided");
this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided");
this.counterRepository = requireNonNull(builder.counterRepository, "Counter Repository must be provided");
this.reloadComponent = new StatelessReloadComponent(this);
this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true);
@ -170,6 +175,10 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition);
final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory,
repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler);
final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager);
dataflow.scheduleBackgroundTask(logComponentStatuses, 1, TimeUnit.MINUTES);
return dataflow;
}
@ -491,6 +500,11 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return flowManager;
}
@Override
public CounterRepository getCounterRepository() {
return counterRepository;
}
public static class Builder {
private ExtensionManager extensionManager = null;
private BulletinRepository bulletinRepository = null;
@ -503,6 +517,7 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
private FlowFileEventRepository flowFileEventRepository = null;
private ProvenanceRepository provenanceRepository = null;
private ExtensionRepository extensionRepository = null;
private CounterRepository counterRepository = null;
public Builder extensionManager(final ExtensionManager extensionManager) {
this.extensionManager = extensionManager;
@ -559,6 +574,11 @@ public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSna
return this;
}
public Builder counterRepository(final CounterRepository counterRepository) {
this.counterRepository = counterRepository;
return this;
}
public StandardStatelessEngine build() {
return new StandardStatelessEngine(this);
}

View File

@ -23,6 +23,7 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReloadComponent;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
@ -68,4 +69,6 @@ public interface StatelessEngine<T> {
ProvenanceRepository getProvenanceRepository();
FlowFileEventRepository getFlowFileEventRepository();
CounterRepository getCounterRepository();
}

View File

@ -171,6 +171,8 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
}
};
final CounterRepository counterRepo = new StandardCounterRepository();
final File krb5File = engineConfiguration.getKrb5File();
final KerberosConfig kerberosConfig = new KerberosConfig(null, null, krb5File);
logger.info("Setting java.security.krb5.conf to {}", krb5File.getAbsolutePath());
@ -188,6 +190,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
.flowFileEventRepository(flowFileEventRepo)
.provenanceRepository(provenanceRepo)
.extensionRepository(extensionRepository)
.counterRepository(counterRepo)
.build();
final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext);
@ -197,7 +200,6 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor
final ProcessContextFactory processContextFactory = new CachingProcessContextFactory(rawProcessContextFactory);
contentRepo = new ByteArrayContentRepository();
flowFileRepo = new StatelessFlowFileRepository();
final CounterRepository counterRepo = new StandardCounterRepository();
final RepositoryContextFactory repositoryContextFactory = new StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo,
counterRepo, provenanceRepo, stateManagerProvider);

View File

@ -74,6 +74,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -97,8 +99,10 @@ public class StandardStatelessFlow implements StatelessDataflow {
private final ProcessScheduler processScheduler;
private final AsynchronousCommitTracker tracker = new AsynchronousCommitTracker();
private final TransactionThresholdMeter transactionThresholdMeter;
private final List<BackgroundTask> backgroundTasks = new ArrayList<>();
private volatile ExecutorService runDataflowExecutor;
private volatile ScheduledExecutorService backgroundTaskExecutor;
private volatile boolean initialized = false;
public StandardStatelessFlow(final ProcessGroup rootGroup, final List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider controllerServiceProvider,
@ -212,23 +216,39 @@ public class StandardStatelessFlow implements StatelessDataflow {
logger.info("Successfully initialized components in {} millis ({} millis to perform validation, {} millis for services to enable)",
initializationMillis, validationMillis, serviceEnableMillis);
runDataflowExecutor = Executors.newFixedThreadPool(1, r -> {
final Thread thread = Executors.defaultThreadFactory().newThread(r);
final String flowName = dataflowDefinition.getFlowName();
if (flowName == null) {
thread.setName("Run Dataflow");
} else {
thread.setName("Run Dataflow " + flowName);
}
// Create executor for dataflow
final String flowName = dataflowDefinition.getFlowName();
final String threadName = (flowName == null) ? "Run Dataflow" : "Run Dataflow " + flowName;
runDataflowExecutor = Executors.newFixedThreadPool(1, createNamedThreadFactory(threadName, false));
return thread;
});
// Periodically log component statuses
backgroundTaskExecutor = Executors.newScheduledThreadPool(1, createNamedThreadFactory("Background Tasks", true));
backgroundTasks.forEach(task -> backgroundTaskExecutor.scheduleWithFixedDelay(task.getTask(), task.getSchedulingPeriod(), task.getSchedulingPeriod(), task.getSchedulingUnit()));
} catch (final Throwable t) {
processScheduler.shutdown();
throw t;
}
}
private ThreadFactory createNamedThreadFactory(final String name, final boolean daemon) {
return (Runnable r) -> {
final Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setName(name);
thread.setDaemon(daemon);
return thread;
};
}
/**
* Schedules the given background task to run periodically after the dataflow has been initialized until it has been shutdown
* @param task the task to run
* @param period how often to run it
* @param unit the unit for the time period
*/
public void scheduleBackgroundTask(final Runnable task, final long period, final TimeUnit unit) {
backgroundTasks.add(new BackgroundTask(task, period, unit));
}
private void waitForServicesEnabled(final ProcessGroup group) {
final long startTime = System.currentTimeMillis();
final long cutoff = startTime + COMPONENT_ENABLE_TIMEOUT_MILLIS;
@ -268,6 +288,9 @@ public class StandardStatelessFlow implements StatelessDataflow {
if (runDataflowExecutor != null) {
runDataflowExecutor.shutdown();
}
if (backgroundTaskExecutor != null) {
backgroundTaskExecutor.shutdown();
}
rootGroup.stopProcessing();
rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
@ -657,4 +680,28 @@ public class StandardStatelessFlow implements StatelessDataflow {
this.stateValues = stateValues;
}
}
private static class BackgroundTask {
private final Runnable task;
private final long schedulingPeriod;
private final TimeUnit schedulingUnit;
public BackgroundTask(final Runnable task, final long schedulingPeriod, final TimeUnit schedulingUnit) {
this.task = task;
this.schedulingPeriod = schedulingPeriod;
this.schedulingUnit = schedulingUnit;
}
public Runnable getTask() {
return task;
}
public long getSchedulingPeriod() {
return schedulingPeriod;
}
public TimeUnit getSchedulingUnit() {
return schedulingUnit;
}
}
}