NIFI-7539: When capturing diagnostics information, capture a thread dump once and then provide this information to ProcessorNode when capturing active threads. Previously, each processor captured a thread dump itself. When this is done thousands of times it can result in a very long delay.

This commit is contained in:
Mark Payne 2020-06-16 14:21:44 -04:00 committed by markap14
parent 9828e7dd14
commit 8b1a23a99c
5 changed files with 61 additions and 13 deletions

View File

@ -73,12 +73,12 @@ public abstract class ProcessorNode extends AbstractComponentNode implements Con
public abstract Requirement getInputRequirement();
public abstract List<ActiveThreadInfo> getActiveThreads();
public abstract List<ActiveThreadInfo> getActiveThreads(ThreadDetails threadDetails);
/**
* Returns the number of threads that are still 'active' in this Processor but have been terminated
* via {@link #terminate()}. To understand more about these threads, such as their stack traces and
* how long they have been active, one can use {@link #getActiveThreads()} and then filter the results
* how long they have been active, one can use {@link #getActiveThreads(ThreadDetails)} and then filter the results
* to include only those {@link ActiveThreadInfo} objects for which the thread is terminated. For example:
* {@code getActiveThreads().stream().filter(ActiveThreadInfo::isTerminated).collect(Collectors.toList());}
*

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
public class ThreadDetails {
private final ThreadInfo[] threadInfos;
private final long[] deadlockedThreadIds;
private final long[] monitorDeadlockThreadIds;
private ThreadDetails(final ThreadMXBean mbean) {
threadInfos = mbean.dumpAllThreads(true, true);
deadlockedThreadIds = mbean.findDeadlockedThreads();
monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
}
public ThreadInfo[] getThreadInfos() {
return threadInfos;
}
public long[] getDeadlockedThreadIds() {
return deadlockedThreadIds;
}
public long[] getMonitorDeadlockThreadIds() {
return monitorDeadlockThreadIds;
}
public static ThreadDetails capture() {
final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
return new ThreadDetails(mbean);
}
}

View File

@ -76,9 +76,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
@ -1401,14 +1399,10 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
@Override
public synchronized List<ActiveThreadInfo> getActiveThreads() {
public synchronized List<ActiveThreadInfo> getActiveThreads(final ThreadDetails threadDetails) {
final long now = System.currentTimeMillis();
final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
final Map<Long, ThreadInfo> threadInfoMap = Stream.of(infos)
final Map<Long, ThreadInfo> threadInfoMap = Stream.of(threadDetails.getThreadInfos())
.collect(Collectors.toMap(ThreadInfo::getThreadId, Function.identity(), (a, b) -> a));
final List<ActiveThreadInfo> threadList = new ArrayList<>(activeThreads.size());
@ -1419,7 +1413,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final long activeMillis = now - timestamp;
final ThreadInfo threadInfo = threadInfoMap.get(thread.getId());
final String stackTrace = ThreadUtils.createStackTrace(thread, threadInfo, deadlockedThreadIds, monitorDeadlockThreadIds, activeMillis);
final String stackTrace = ThreadUtils.createStackTrace(thread, threadInfo, threadDetails.getDeadlockedThreadIds(), threadDetails.getMonitorDeadlockThreadIds(), activeMillis);
final ActiveThreadInfo activeThreadInfo = new ActiveThreadInfo(thread.getName(), stackTrace, activeMillis, activeTask.isTerminated());
threadList.add(activeThreadInfo);

View File

@ -19,6 +19,7 @@ package org.apache.nifi.diagnostics.bootstrap.tasks;
import org.apache.nifi.controller.ActiveThreadInfo;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ThreadDetails;
import org.apache.nifi.diagnostics.DiagnosticTask;
import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
@ -40,9 +41,10 @@ public class LongRunningProcessorTask implements DiagnosticTask {
@Override
public DiagnosticsDumpElement captureDump(final boolean verbose) {
final List<String> details = new ArrayList<>();
final ThreadDetails threadDetails = ThreadDetails.capture();
for (final ProcessorNode processorNode : flowController.getFlowManager().getRootGroup().findAllProcessors()) {
final List<ActiveThreadInfo> activeThreads = processorNode.getActiveThreads();
final List<ActiveThreadInfo> activeThreads = processorNode.getActiveThreads(threadDetails);
for (final ActiveThreadInfo activeThread : activeThreads) {
if (activeThread.getActiveMillis() > MIN_ACTIVE_MILLIS) {

View File

@ -82,6 +82,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.ThreadDetails;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.DropFlowFileState;
@ -3873,7 +3874,7 @@ public final class DtoFactory {
private List<ThreadDumpDTO> createThreadDumpDtos(final ProcessorNode procNode) {
final List<ThreadDumpDTO> threadDumps = new ArrayList<>();
final List<ActiveThreadInfo> activeThreads = procNode.getActiveThreads();
final List<ActiveThreadInfo> activeThreads = procNode.getActiveThreads(ThreadDetails.capture());
for (final ActiveThreadInfo threadInfo : activeThreads) {
final ThreadDumpDTO dto = new ThreadDumpDTO();
dto.setStackTrace(threadInfo.getStackTrace());