NIFI-10167: Added advanced timing metrics for processors, such as CPU Usage, time reading/writing content repo, process session commit time, etc. Exposed via nifi.sh diagnostics and made configurable via nifi.properties

This closes #6156

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2022-04-20 11:11:52 -04:00 committed by exceptionfactory
parent d5b626f0e4
commit 5e83bda9e9
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
35 changed files with 1049 additions and 67 deletions

View File

@ -330,6 +330,12 @@ public class NiFiProperties extends ApplicationProperties {
public static final int DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_MAX_FILE_COUNT = 10; public static final int DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_MAX_FILE_COUNT = 10;
public static final String DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_MAX_DIRECTORY_SIZE = "10 MB"; public static final String DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_MAX_DIRECTORY_SIZE = "10 MB";
// performance tracking
public static final String TRACK_PERFORMANCE_PERCENTAGE = "nifi.performance.tracking.percentage";
// performance tracking defaults
public static final int DEFAULT_TRACK_PERFORMANCE_PERCENTAGE = 0;
// defaults // defaults
public static final Boolean DEFAULT_AUTO_RESUME_STATE = true; public static final Boolean DEFAULT_AUTO_RESUME_STATE = true;
public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = "conf/authorizers.xml"; public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = "conf/authorizers.xml";
@ -510,7 +516,7 @@ public class NiFiProperties extends ApplicationProperties {
try { try {
return Integer.parseInt(value.trim()); return Integer.parseInt(value.trim());
} catch (final Exception e) { } catch (final Exception e) {
logger.warn("Configured value is invalid, falling back to default value", e); logger.warn("Configured value for property {} in nifi.properties is invalid, falling back to default value", propertyName, e);
return defaultValue; return defaultValue;
} }
} }
@ -1563,6 +1569,18 @@ public class NiFiProperties extends ApplicationProperties {
return getProperty(STATE_MANAGEMENT_CLUSTER_PROVIDER_ID); return getProperty(STATE_MANAGEMENT_CLUSTER_PROVIDER_ID);
} }
public int getPerformanceMetricTrackingPercentage() {
final int percentage = getIntegerProperty(TRACK_PERFORMANCE_PERCENTAGE, DEFAULT_TRACK_PERFORMANCE_PERCENTAGE);
if (percentage < 0) {
return 0;
}
if (percentage > 100) {
return 100;
}
return percentage;
}
public File getEmbeddedZooKeeperPropertiesFile() { public File getEmbeddedZooKeeperPropertiesFile() {
final String filename = getProperty(STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES); final String filename = getProperty(STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES);
return filename == null ? null : new File(filename); return filename == null ? null : new File(filename);

View File

@ -4131,9 +4131,36 @@ To enable it, both `nifi.monitor.long.running.task.schedule` and `nifi.monitor.l
|*Property*|*Description* |*Property*|*Description*
|`nifi.monitor.long.running.task.schedule`|The time period between successive executions of the Long-Running Task Monitor (e.g. `1 min`). |`nifi.monitor.long.running.task.schedule`|The time period between successive executions of the Long-Running Task Monitor (e.g. `1 min`).
|`nifi.monitor.long.running.task.threshold`|The time period beyond which a task is considered long-running, i.e. stuck / hanging (e.g. `5 mins`). |`nifi.monitor.long.running.task.threshold`|The time period beyond which a task is considered long-running, i.e. stuck / hanging (e.g. `5 mins`).
|==== |====
[[performance_tracking_properties]]
=== Performance Tracking Properties
NiFi exposes a very significant number of metrics by default through the User Interface. However, there are sometimes additional metrics that may add in diagnosing bottlenecks
and improving the performance of the NiFi dataflow.
The `nifi.performance.tracking.percentage` property can be used to enable the tracking of additional metrics. Gathering these metrics, however, require system calls, which can be
expensive on some systems. As a result, this property defaults to a value of `0`, indicating that the metrics should be captured 0% of the time. I.e., the feature is disabled by
default. To enable this feature, set the value of this property to an integer value in the range of 0 to 100, inclusive. This represents what percentage of the time NiFi should
gather these metrics.
For example, if the value is set to 20, then NiFi will gather these metrics for each processor approximately 20% of the times that the Processor is run. The remainder of the time,
it will use the values that it has already captured in order to extrapolate the metrics to additional runs.
The metrics that are gathered include what percentage of the time the processor is utilizing the CPU (versus waiting for I/O to complete or blocking due to monitor/lock contention),
what percentage of time the Processor spends reading from the Content Repository, writing to the Content Repository, blocked due to Garbage Collection, etc.
So, continuing our example, if we set the value of the `nifi.performance.tracking.percentage` and a processor is triggered to run 1,000 times, then NiFi will measure how much CPU
time was consumed over the 200 iterations during which it was measured (i.e., 20% of 1,000). Let's say that this amounts to 500 milliseconds of CPU time. Additionally, let's consider
that the Processor took 5,000 milliseconds to complete those 200 invocations because most of the time was spent blocking on Socket I/O. From this, NiFi will calculate that the CPU
is used approximately 10% of the time (500 / 5,000 * 100%). Now, let's consider that in order to complete all 1,000 invocations the Processor took 35 seconds. NiFi will calculate,
then, that the Processor has used approximately 3.5 seconds (or 3500 milliseconds) of CPU time.
As a result, if we set the value of this property higher, up to a value of `100`, we will get more accurate results. However, it may be more expensive to monitor.
In order to view these metrics, we can gather diagnostics by running the command `nifi.sh diagnostics <filename>` and inspecting the generated file. See <<nifi_diagnostics>> for more information.
[[custom_properties]] [[custom_properties]]
=== Custom Properties === Custom Properties
@ -4539,8 +4566,9 @@ Example configuration:
nifi.nar.library.provider.hdfs2.resources=/etc/hadoop/core-site.xml nifi.nar.library.provider.hdfs2.resources=/etc/hadoop/core-site.xml
nifi.nar.library.provider.hdfs2.source.directory=/other/dir/for/customNars nifi.nar.library.provider.hdfs2.source.directory=/other/dir/for/customNars
NOTE: For backward compatibility reasons, `org.apache.nifi.nar.hadoop.HDFSNarProvider` is still an accepted implementation, but it is suggested to use 'org.apache.nifi.flow.resource.hadoop.HDFSExternalResourceProvider' instead. Features introduced with the External Resource Provider might not work with this implementation! NOTE: For backward compatibility reasons, `org.apache.nifi.nar.hadoop.HDFSNarProvider` is still an accepted implementation, but it is suggested to use 'org.apache.nifi.flow.resource.hadoop.HDFSExternalResourceProvider' instead.Features introduced with the External Resource Provider might not work with this implementation!
[[nifi_diagnostics]]
== NiFi diagnostics == NiFi diagnostics
It is possible to get diagnostics data from a NiFi node by executing the below command: It is possible to get diagnostics data from a NiFi node by executing the below command:

View File

@ -21,6 +21,7 @@ import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache; import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.InternalProvenanceReporter; import org.apache.nifi.provenance.InternalProvenanceReporter;
@ -48,7 +49,7 @@ public interface RepositoryContext {
boolean isRelationshipAvailabilitySatisfied(int requiredNumber); boolean isRelationshipAvailabilitySatisfied(int requiredNumber);
ContentClaimWriteCache createContentClaimWriteCache(); ContentClaimWriteCache createContentClaimWriteCache(PerformanceTracker performanceTracker);
InternalProvenanceReporter createProvenanceReporter(Predicate<FlowFile> flowfileKnownCheck, ProvenanceEventEnricher eventEnricher); InternalProvenanceReporter createProvenanceReporter(Predicate<FlowFile> flowfileKnownCheck, ProvenanceEventEnricher eventEnricher);

View File

@ -38,6 +38,8 @@ import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream;
import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.repository.io.TaskTerminationInputStream; import org.apache.nifi.controller.repository.io.TaskTerminationInputStream;
import org.apache.nifi.controller.repository.io.TaskTerminationOutputStream; import org.apache.nifi.controller.repository.io.TaskTerminationOutputStream;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.PerformanceTrackingInputStream;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.state.StandardStateMap; import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
@ -137,6 +139,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
private final Map<FlowFile, Path> deleteOnCommit = new HashMap<>(); private final Map<FlowFile, Path> deleteOnCommit = new HashMap<>();
private final long sessionId; private final long sessionId;
private final String connectableDescription; private final String connectableDescription;
private final PerformanceTracker performanceTracker;
private Map<String, Long> countersOnCommit; private Map<String, Long> countersOnCommit;
private Map<String, Long> immediateCounters; private Map<String, Long> immediateCounters;
@ -180,14 +183,15 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
private final String retryAttribute; private final String retryAttribute;
private final FlowFileLinkage flowFileLinkage = new FlowFileLinkage(); private final FlowFileLinkage flowFileLinkage = new FlowFileLinkage();
public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination) { public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination, final PerformanceTracker performanceTracker) {
this.context = context; this.context = context;
this.taskTermination = taskTermination; this.taskTermination = taskTermination;
this.performanceTracker = performanceTracker;
this.provenanceReporter = context.createProvenanceReporter(this::isFlowFileKnown, this); this.provenanceReporter = context.createProvenanceReporter(this::isFlowFileKnown, this);
this.sessionId = idGenerator.getAndIncrement(); this.sessionId = idGenerator.getAndIncrement();
this.connectableDescription = context.getConnectableDescription(); this.connectableDescription = context.getConnectableDescription();
this.claimCache = context.createContentClaimWriteCache(); this.claimCache = context.createContentClaimWriteCache(performanceTracker);
LOG.trace("Session {} created for {}", this, connectableDescription); LOG.trace("Session {} created for {}", this, connectableDescription);
processingStartTime = System.nanoTime(); processingStartTime = System.nanoTime();
retryAttribute = "retryCount." + context.getConnectable().getIdentifier(); retryAttribute = "retryCount." + context.getConnectable().getIdentifier();
@ -564,6 +568,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
protected void commit(final Checkpoint checkpoint, final boolean asynchronous) { protected void commit(final Checkpoint checkpoint, final boolean asynchronous) {
try { try {
performanceTracker.beginSessionCommit();
final long commitStartNanos = System.nanoTime(); final long commitStartNanos = System.nanoTime();
resetReadClaim(); resetReadClaim();
@ -727,6 +732,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
} else { } else {
throw new ProcessException(e); throw new ProcessException(e);
} }
} finally {
performanceTracker.endSessionCommit();
} }
} }
@ -2597,7 +2604,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
} }
final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize()); final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize());
final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream); final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim,
contentClaimOffset, limitingInputStream, performanceTracker);
return contentClaimInputStream; return contentClaimInputStream;
} }
} }
@ -2609,8 +2617,13 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
} }
currentReadClaim = claim.getResourceClaim(); currentReadClaim = claim.getResourceClaim();
performanceTracker.beginContentRead();
final InputStream contentRepoStream = context.getContentRepository().read(claim.getResourceClaim()); final InputStream contentRepoStream = context.getContentRepository().read(claim.getResourceClaim());
StreamUtils.skip(contentRepoStream, claim.getOffset() + contentClaimOffset); performanceTracker.endContentRead();
final InputStream performanceTrackInputStream = new PerformanceTrackingInputStream(contentRepoStream, performanceTracker);
StreamUtils.skip(performanceTrackInputStream, claim.getOffset() + contentClaimOffset);
final InputStream bufferedContentStream = new BufferedInputStream(contentRepoStream); final InputStream bufferedContentStream = new BufferedInputStream(contentRepoStream);
final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset() + contentClaimOffset); final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset() + contentClaimOffset);
currentReadClaimStream = byteCountingInputStream; currentReadClaimStream = byteCountingInputStream;
@ -2621,12 +2634,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
// Finally, we need to wrap the InputStream in a ContentClaimInputStream so that if mark/reset is used, we can provide that capability // Finally, we need to wrap the InputStream in a ContentClaimInputStream so that if mark/reset is used, we can provide that capability
// without buffering data in memory. // without buffering data in memory.
final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize()); final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize());
final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream); final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream, performanceTracker);
return contentClaimInputStream; return contentClaimInputStream;
} else { } else {
claimCache.flush(claim); claimCache.flush(claim);
final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset); final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, performanceTracker);
return rawInStream; return rawInStream;
} }
} catch (final ContentNotFoundException cnfe) { } catch (final ContentNotFoundException cnfe) {

View File

@ -17,20 +17,23 @@
package org.apache.nifi.controller.repository; package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.lifecycle.TaskTermination; import org.apache.nifi.controller.lifecycle.TaskTermination;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
public class StandardProcessSessionFactory implements ProcessSessionFactory { public class StandardProcessSessionFactory implements ProcessSessionFactory {
private final RepositoryContext context; private final RepositoryContext context;
private final TaskTermination taskTermination; private final TaskTermination taskTermination;
private final PerformanceTracker performanceTracker;
public StandardProcessSessionFactory(final RepositoryContext context, final TaskTermination taskTermination) { public StandardProcessSessionFactory(final RepositoryContext context, final TaskTermination taskTermination, final PerformanceTracker performanceTracker) {
this.context = context; this.context = context;
this.taskTermination = taskTermination; this.taskTermination = taskTermination;
this.performanceTracker = performanceTracker;
} }
@Override @Override
public StandardProcessSession createSession() { public StandardProcessSession createSession() {
return new StandardProcessSession(context, taskTermination); return new StandardProcessSession(context, taskTermination, performanceTracker);
} }
} }

View File

@ -18,6 +18,8 @@ package org.apache.nifi.controller.repository.io;
import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.PerformanceTrackingInputStream;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException; import java.io.IOException;
@ -32,20 +34,23 @@ public class ContentClaimInputStream extends InputStream {
private final ContentRepository contentRepository; private final ContentRepository contentRepository;
private final ContentClaim contentClaim; private final ContentClaim contentClaim;
private final long claimOffset; private final long claimOffset;
private final PerformanceTracker performanceTracker;
private InputStream delegate; private InputStream delegate;
private long bytesConsumed; private long bytesConsumed;
private long currentOffset; // offset into the Content Claim; will differ from bytesRead if reset() is called after reading at least one byte or if claimOffset > 0 private long currentOffset; // offset into the Content Claim; will differ from bytesRead if reset() is called after reading at least one byte or if claimOffset > 0
private long markOffset; private long markOffset;
public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset) { public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset, final PerformanceTracker performanceTracker) {
this(contentRepository, contentClaim, claimOffset, null); this(contentRepository, contentClaim, claimOffset, null, performanceTracker);
} }
public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset, final InputStream initialDelegate) { public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset, final InputStream initialDelegate,
final PerformanceTracker performanceTracker) {
this.contentRepository = contentRepository; this.contentRepository = contentRepository;
this.contentClaim = contentClaim; this.contentClaim = contentClaim;
this.claimOffset = claimOffset; this.claimOffset = claimOffset;
this.performanceTracker = performanceTracker;
this.currentOffset = claimOffset; this.currentOffset = claimOffset;
this.delegate = initialDelegate; this.delegate = initialDelegate;
@ -142,7 +147,14 @@ public class ContentClaimInputStream extends InputStream {
} }
formDelegate(); formDelegate();
StreamUtils.skip(delegate, markOffset - claimOffset);
performanceTracker.beginContentRead();
try {
StreamUtils.skip(delegate, markOffset - claimOffset);
} finally {
performanceTracker.endContentRead();
}
currentOffset = markOffset; currentOffset = markOffset;
} }
} }
@ -159,8 +171,13 @@ public class ContentClaimInputStream extends InputStream {
delegate.close(); delegate.close();
} }
delegate = contentRepository.read(contentClaim); performanceTracker.beginContentRead();
StreamUtils.skip(delegate, claimOffset); try {
currentOffset = claimOffset; delegate = new PerformanceTrackingInputStream(contentRepository.read(contentClaim), performanceTracker);
StreamUtils.skip(delegate, claimOffset);
currentOffset = claimOffset;
} finally {
performanceTracker.endContentRead();;
}
} }
} }

View File

@ -72,6 +72,31 @@ public class EmptyFlowFileEvent implements FlowFileEvent {
return 0; return 0;
} }
@Override
public long getCpuNanoseconds() {
return 0;
}
@Override
public long getContentReadNanoseconds() {
return 0;
}
@Override
public long getContentWriteNanoseconds() {
return 0;
}
@Override
public long getSessionCommitNanoseconds() {
return 0;
}
@Override
public long getGargeCollectionMillis() {
return 0;
}
@Override @Override
public long getAverageLineageMillis() { public long getAverageLineageMillis() {
return 0; return 0;

View File

@ -41,6 +41,11 @@ public class EventSumValue {
private long bytesReceived = 0; private long bytesReceived = 0;
private long bytesSent = 0; private long bytesSent = 0;
private long processingNanos = 0; private long processingNanos = 0;
private long cpuNanos = 0;
private long contentReadNanos = 0;
private long contentWriteNanos = 0;
private long sessionCommitNanos = 0;
private long gcMillis = 0;
private long aggregateLineageMillis = 0; private long aggregateLineageMillis = 0;
private int invocations = 0; private int invocations = 0;
private Map<String, Long> counters; private Map<String, Long> counters;
@ -70,6 +75,11 @@ public class EventSumValue {
this.flowFilesSent += flowFileEvent.getFlowFilesSent(); this.flowFilesSent += flowFileEvent.getFlowFilesSent();
this.invocations += flowFileEvent.getInvocations(); this.invocations += flowFileEvent.getInvocations();
this.processingNanos += flowFileEvent.getProcessingNanoseconds(); this.processingNanos += flowFileEvent.getProcessingNanoseconds();
this.cpuNanos += flowFileEvent.getCpuNanoseconds();
this.contentReadNanos += flowFileEvent.getContentReadNanoseconds();
this.contentWriteNanos += flowFileEvent.getContentWriteNanoseconds();
this.gcMillis += flowFileEvent.getGargeCollectionMillis();
this.sessionCommitNanos += flowFileEvent.getSessionCommitNanoseconds();
final Map<String, Long> eventCounters = flowFileEvent.getCounters(); final Map<String, Long> eventCounters = flowFileEvent.getCounters();
if (eventCounters != null) { if (eventCounters != null) {
@ -106,6 +116,11 @@ public class EventSumValue {
event.setFlowFilesSent(flowFilesSent); event.setFlowFilesSent(flowFilesSent);
event.setInvocations(invocations); event.setInvocations(invocations);
event.setProcessingNanos(processingNanos); event.setProcessingNanos(processingNanos);
event.setCpuNanoseconds(cpuNanos);
event.setContentReadNanoseconds(contentReadNanos);
event.setContentWriteNanoseconds(contentWriteNanos);
event.setSessionCommitNanos(sessionCommitNanos);
event.setGarbageCollectionMillis(gcMillis);
event.setCounters(this.counters == null ? Collections.emptyMap() : Collections.unmodifiableMap(this.counters)); event.setCounters(this.counters == null ? Collections.emptyMap() : Collections.unmodifiableMap(this.counters));
return event; return event;
} }
@ -131,6 +146,10 @@ public class EventSumValue {
this.flowFilesSent += other.flowFilesSent; this.flowFilesSent += other.flowFilesSent;
this.invocations += other.invocations; this.invocations += other.invocations;
this.processingNanos += other.processingNanos; this.processingNanos += other.processingNanos;
this.cpuNanos += other.cpuNanos;
this.contentReadNanos += other.contentReadNanos;
this.contentWriteNanos += other.contentWriteNanos;
this.sessionCommitNanos += other.sessionCommitNanos;
final Map<String, Long> eventCounters = other.counters; final Map<String, Long> eventCounters = other.counters;
if (eventCounters != null) { if (eventCounters != null) {
@ -169,6 +188,10 @@ public class EventSumValue {
this.flowFilesSent -= other.flowFilesSent; this.flowFilesSent -= other.flowFilesSent;
this.invocations -= other.invocations; this.invocations -= other.invocations;
this.processingNanos -= other.processingNanos; this.processingNanos -= other.processingNanos;
this.cpuNanos -= other.cpuNanos;
this.contentReadNanos -= other.contentReadNanos;
this.contentWriteNanos -= other.contentWriteNanos;
this.sessionCommitNanos -= other.sessionCommitNanos;
final Map<String, Long> eventCounters = other.counters; final Map<String, Long> eventCounters = other.counters;
if (eventCounters != null) { if (eventCounters != null) {

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
public class NanoTimePerformanceTracker implements PerformanceTracker {
private final Timer contentReadTimer = new Timer();
private final Timer contentWriteTimer = new Timer();
private final Timer sessionCommitTimer = new Timer();
@Override
public void beginContentRead() {
contentReadTimer.start();
}
@Override
public void endContentRead() {
contentReadTimer.stop();
}
@Override
public long getContentReadNanos() {
return contentReadTimer.get();
}
@Override
public void beginContentWrite() {
contentWriteTimer.start();
}
@Override
public void endContentWrite() {
contentWriteTimer.stop();
}
@Override
public long getContentWriteNanos() {
return contentWriteTimer.get();
}
@Override
public void beginSessionCommit() {
sessionCommitTimer.start();
}
@Override
public void endSessionCommit() {
sessionCommitTimer.stop();
}
@Override
public long getSessionCommitNanos() {
return sessionCommitTimer.get();
}
private static class Timer {
private long start = -1L;
private long total;
public void start() {
start = System.nanoTime();
}
public void stop() {
if (start == -1) {
return;
}
total += (System.nanoTime() - start);
start = -1L;
}
public long get() {
return total;
}
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
public class NopPerformanceTracker implements PerformanceTracker {
@Override
public void beginContentRead() {
}
@Override
public void endContentRead() {
}
@Override
public long getContentReadNanos() {
return 0;
}
@Override
public void beginContentWrite() {
}
@Override
public void endContentWrite() {
}
@Override
public long getContentWriteNanos() {
return 0;
}
@Override
public void beginSessionCommit() {
}
@Override
public void endSessionCommit() {
}
@Override
public long getSessionCommitNanos() {
return 0;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
public interface PerformanceTracker {
void beginContentRead();
void endContentRead();
long getContentReadNanos();
void beginContentWrite();
void endContentWrite();
long getContentWriteNanos();
void beginSessionCommit();
void endSessionCommit();
long getSessionCommitNanos();
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
public class PerformanceTrackingInputStream extends FilterInputStream {
private final PerformanceTracker performanceTracker;
public PerformanceTrackingInputStream(final InputStream in, final PerformanceTracker performanceTracker) {
super(in);
this.performanceTracker = performanceTracker;
}
@Override
public long skip(final long n) throws IOException {
performanceTracker.beginContentRead();
try {
return super.skip(n);
} finally {
performanceTracker.endContentRead();
}
}
@Override
public int read() throws IOException {
performanceTracker.beginContentRead();
try {
return super.read();
} finally {
performanceTracker.endContentRead();
}
}
@Override
public int read(final byte[] b) throws IOException {
return read(b, 0, b.length);
}
@Override
public int read(final byte[] b, final int off, final int len) throws IOException {
performanceTracker.beginContentRead();
try {
return super.read(b, off, len);
} finally {
performanceTracker.endContentRead();
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository.metrics;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class PerformanceTrackingOutputStream extends FilterOutputStream {
private final PerformanceTracker performanceTracker;
public PerformanceTrackingOutputStream(final OutputStream out, final PerformanceTracker performanceTracker) {
super(out);
this.performanceTracker = performanceTracker;
}
@Override
public void write(final int b) throws IOException {
performanceTracker.beginContentWrite();
try {
out.write(b);
} finally {
performanceTracker.endContentWrite();
}
}
@Override
public void write(final byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
performanceTracker.beginContentWrite();
try {
out.write(b, off, len);
} finally {
performanceTracker.endContentWrite();
}
}
@Override
public void close() throws IOException {
performanceTracker.beginContentWrite();
try {
super.close();
} finally {
performanceTracker.endContentWrite();
}
}
@Override
public void flush() throws IOException {
performanceTracker.beginContentWrite();
try {
super.flush();
} finally {
performanceTracker.endContentWrite();
}
}
}

View File

@ -32,6 +32,11 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
private long bytesRead; private long bytesRead;
private long bytesWritten; private long bytesWritten;
private long processingNanos; private long processingNanos;
private long cpuNanos;
private long contentReadNanos;
private long contentWriteNanos;
private long sessionCommitNanos;
private long gcMillis;
private long aggregateLineageMillis; private long aggregateLineageMillis;
private int flowFilesReceived; private int flowFilesReceived;
private long bytesReceived; private long bytesReceived;
@ -120,6 +125,50 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
return processingNanos; return processingNanos;
} }
@Override
public long getCpuNanoseconds() {
return cpuNanos;
}
public void setCpuNanoseconds(final long nanos) {
this.cpuNanos = nanos;
}
@Override
public long getContentReadNanoseconds() {
return contentReadNanos;
}
public void setContentReadNanoseconds(final long nanos) {
this.contentReadNanos = nanos;
}
@Override
public long getContentWriteNanoseconds() {
return contentWriteNanos;
}
public void setContentWriteNanoseconds(final long nanos) {
this.contentWriteNanos = nanos;
}
@Override
public long getSessionCommitNanoseconds() {
return sessionCommitNanos;
}
public void setSessionCommitNanos(final long nanos) {
this.sessionCommitNanos = nanos;
}
public long getGargeCollectionMillis() {
return gcMillis;
}
public void setGarbageCollectionMillis(final long gcMillis) {
this.gcMillis = gcMillis;
}
public void setProcessingNanos(final long processingNanos) { public void setProcessingNanos(final long processingNanos) {
this.processingNanos = processingNanos; this.processingNanos = processingNanos;
} }
@ -203,6 +252,11 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
bytesRead += event.getBytesRead(); bytesRead += event.getBytesRead();
bytesWritten += event.getBytesWritten(); bytesWritten += event.getBytesWritten();
processingNanos += event.getProcessingNanoseconds(); processingNanos += event.getProcessingNanoseconds();
cpuNanos += event.getCpuNanoseconds();
contentReadNanos += event.getContentReadNanoseconds();
contentWriteNanos += event.getContentWriteNanoseconds();
sessionCommitNanos += event.getSessionCommitNanoseconds();
gcMillis += event.getGargeCollectionMillis();
aggregateLineageMillis += event.getAggregateLineageMillis(); aggregateLineageMillis += event.getAggregateLineageMillis();
flowFilesReceived += event.getFlowFilesReceived(); flowFilesReceived += event.getFlowFilesReceived();
bytesReceived += event.getBytesReceived(); bytesReceived += event.getBytesReceived();

View File

@ -123,6 +123,31 @@ public class TestRingBufferEventRepository {
return 234782; return 234782;
} }
@Override
public long getCpuNanoseconds() {
return 0;
}
@Override
public long getContentReadNanoseconds() {
return 0;
}
@Override
public long getContentWriteNanoseconds() {
return 0;
}
@Override
public long getSessionCommitNanoseconds() {
return 0;
}
@Override
public long getGargeCollectionMillis() {
return 0;
}
@Override @Override
public int getInvocations() { public int getInvocations() {
return 1; return 1;

View File

@ -38,6 +38,16 @@ public interface FlowFileEvent {
long getProcessingNanoseconds(); long getProcessingNanoseconds();
long getCpuNanoseconds();
long getContentReadNanoseconds();
long getContentWriteNanoseconds();
long getSessionCommitNanoseconds();
long getGargeCollectionMillis();
long getAverageLineageMillis(); long getAverageLineageMillis();
long getAggregateLineageMillis(); long getAggregateLineageMillis();

View File

@ -3084,6 +3084,15 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
} }
} }
/**
* Returns a number between 0 (inclusive) and 100 (inclusive) that indicates the percentage of time that processors should
* track detailed performance, such as CPU seconds used and time reading from/writing to content repo, etc.
* @return the percentage of time that detailed performance metrics should be tracked. A value of 0 indicates that these metrics
* should never be tracked; a value of 100 indicates that these metrics should always be tracked.
*/
public int getPerformanceTrackingPercentage() {
return nifiProperties.getPerformanceMetricTrackingPercentage();
}
public Integer getRemoteSiteListeningPort() { public Integer getRemoteSiteListeningPort() {
return remoteInputSocketPort; return remoteInputSocketPort;

View File

@ -29,4 +29,6 @@ public interface GarbageCollectionLog {
Map<String, Long> getAverageGarbageCollectionDurations(); Map<String, Long> getAverageGarbageCollectionDurations();
GarbageCollectionEvent getLongestGarbageCollectionEvent(); GarbageCollectionEvent getLongestGarbageCollectionEvent();
long getTotalGarbageCollectionMillis();
} }

View File

@ -32,12 +32,14 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class RingBufferGarbageCollectionLog implements GarbageCollectionLog, NotificationListener { public class RingBufferGarbageCollectionLog implements GarbageCollectionLog, NotificationListener {
private static final Logger logger = LoggerFactory.getLogger(RingBufferGarbageCollectionLog.class); private static final Logger logger = LoggerFactory.getLogger(RingBufferGarbageCollectionLog.class);
private final RingBuffer<GarbageCollectionEvent> events; private final RingBuffer<GarbageCollectionEvent> events;
private final long minDurationThreshold; private final long minDurationThreshold;
private final long jvmStartTime; private final long jvmStartTime;
private final AtomicLong totalGcMillis = new AtomicLong(0L);
// guarded by synchronizing on this // guarded by synchronizing on this
private GarbageCollectionEvent maxDurationEvent; private GarbageCollectionEvent maxDurationEvent;
@ -54,6 +56,11 @@ public class RingBufferGarbageCollectionLog implements GarbageCollectionLog, Not
return minDurationThreshold; return minDurationThreshold;
} }
@Override
public long getTotalGarbageCollectionMillis() {
return totalGcMillis.get();
}
@Override @Override
public List<GarbageCollectionEvent> getGarbageCollectionEvents() { public List<GarbageCollectionEvent> getGarbageCollectionEvents() {
return events.asList(); return events.asList();
@ -88,6 +95,8 @@ public class RingBufferGarbageCollectionLog implements GarbageCollectionLog, Not
final GarbageCollectionNotificationInfo gcNotification = GarbageCollectionNotificationInfo.from(compositeData); final GarbageCollectionNotificationInfo gcNotification = GarbageCollectionNotificationInfo.from(compositeData);
final GcInfo gcInfo = gcNotification.getGcInfo(); final GcInfo gcInfo = gcNotification.getGcInfo();
totalGcMillis.addAndGet(gcInfo.getDuration());
final String gcName = gcNotification.getGcName(); final String gcName = gcNotification.getGcName();
final String action = gcNotification.getGcAction(); final String action = gcNotification.getGcAction();
final String cause = gcNotification.getGcCause(); final String cause = gcNotification.getGcCause();

View File

@ -20,6 +20,7 @@ import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache; import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
import org.apache.nifi.controller.repository.claim.StandardContentClaimWriteCache; import org.apache.nifi.controller.repository.claim.StandardContentClaimWriteCache;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRepository;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -33,7 +34,7 @@ public class StandardRepositoryContext extends AbstractRepositoryContext impleme
} }
@Override @Override
public ContentClaimWriteCache createContentClaimWriteCache() { public ContentClaimWriteCache createContentClaimWriteCache(final PerformanceTracker performanceTracker) {
return new StandardContentClaimWriteCache(getContentRepository()); return new StandardContentClaimWriteCache(getContentRepository(), performanceTracker);
} }
} }

View File

@ -17,6 +17,10 @@
package org.apache.nifi.controller.repository.claim; package org.apache.nifi.controller.repository.claim;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.PerformanceTrackingOutputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -25,20 +29,20 @@ import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.nifi.controller.repository.ContentRepository;
public class StandardContentClaimWriteCache implements ContentClaimWriteCache { public class StandardContentClaimWriteCache implements ContentClaimWriteCache {
private final ContentRepository contentRepo; private final ContentRepository contentRepo;
private final Map<ResourceClaim, OutputStream> streamMap = new ConcurrentHashMap<>(); private final Map<ResourceClaim, OutputStream> streamMap = new ConcurrentHashMap<>();
private final Queue<ContentClaim> queue = new LinkedList<>(); private final Queue<ContentClaim> queue = new LinkedList<>();
private final PerformanceTracker performanceTracker;
private final int bufferSize; private final int bufferSize;
public StandardContentClaimWriteCache(final ContentRepository contentRepo) { public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker) {
this(contentRepo, 8192); this(contentRepo, performanceTracker, 8192);
} }
public StandardContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker, final int bufferSize) {
this.contentRepo = contentRepo; this.contentRepo = contentRepo;
this.performanceTracker = performanceTracker;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
} }
@ -67,7 +71,8 @@ public class StandardContentClaimWriteCache implements ContentClaimWriteCache {
private OutputStream registerStream(final ContentClaim contentClaim) throws IOException { private OutputStream registerStream(final ContentClaim contentClaim) throws IOException {
final OutputStream out = contentRepo.write(contentClaim); final OutputStream out = contentRepo.write(contentClaim);
final OutputStream buffered = new BufferedOutputStream(out, bufferSize); final OutputStream performanceTrackingOut = new PerformanceTrackingOutputStream(out, performanceTracker);
final OutputStream buffered = new BufferedOutputStream(performanceTrackingOut, bufferSize);
streamMap.put(contentClaim.getResourceClaim(), buffered); streamMap.put(contentClaim.getResourceClaim(), buffered);
return buffered; return buffered;
} }

View File

@ -32,6 +32,7 @@ import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSession; import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory; import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory; import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext; import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceProvider;
@ -224,12 +225,12 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
final StandardProcessSession rawSession; final StandardProcessSession rawSession;
final boolean batch; final boolean batch;
if (procNode.isSessionBatchingSupported() && runNanos > 0L) { if (procNode.isSessionBatchingSupported() && runNanos > 0L) {
rawSession = new StandardProcessSession(context, scheduleState::isTerminated); rawSession = new StandardProcessSession(context, scheduleState::isTerminated, new NopPerformanceTracker());
sessionFactory = new BatchingSessionFactory(rawSession); sessionFactory = new BatchingSessionFactory(rawSession);
batch = true; batch = true;
} else { } else {
rawSession = null; rawSession = null;
sessionFactory = new StandardProcessSessionFactory(context, scheduleState::isTerminated); sessionFactory = new StandardProcessSessionFactory(context, scheduleState::isTerminated, new NopPerformanceTracker());
batch = false; batch = false;
} }
@ -292,7 +293,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
onEvent(procNode); onEvent(procNode);
} }
} else { } else {
final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context, scheduleState::isTerminated); final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context, scheduleState::isTerminated, new NopPerformanceTracker());
final ActiveProcessSessionFactory activeSessionFactory = new WeakHashMapProcessSessionFactory(sessionFactory); final ActiveProcessSessionFactory activeSessionFactory = new WeakHashMapProcessSessionFactory(sessionFactory);
final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier()));
trigger(connectable, scheduleState, connectableProcessContext, activeSessionFactory); trigger(connectable, scheduleState, connectableProcessContext, activeSessionFactory);

View File

@ -31,6 +31,9 @@ import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSession; import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory; import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory; import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.NanoTimePerformanceTracker;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext; import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.LifecycleState; import org.apache.nifi.controller.scheduling.LifecycleState;
@ -50,6 +53,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -68,6 +73,10 @@ public class ConnectableTask {
private final ProcessContext processContext; private final ProcessContext processContext;
private final FlowController flowController; private final FlowController flowController;
private final int numRelationships; private final int numRelationships;
private final ThreadMXBean threadMXBean;
private final AtomicLong invocations = new AtomicLong(0L);
private volatile SampledMetrics sampledMetrics = new SampledMetrics();
private final int perfTrackingNthIteration;
public ConnectableTask(final SchedulingAgent schedulingAgent, final Connectable connectable, public ConnectableTask(final SchedulingAgent schedulingAgent, final Connectable connectable,
final FlowController flowController, final RepositoryContextFactory contextFactory, final LifecycleState scheduleState) { final FlowController flowController, final RepositoryContextFactory contextFactory, final LifecycleState scheduleState) {
@ -77,6 +86,7 @@ public class ConnectableTask {
this.scheduleState = scheduleState; this.scheduleState = scheduleState;
this.numRelationships = connectable.getRelationships().size(); this.numRelationships = connectable.getRelationships().size();
this.flowController = flowController; this.flowController = flowController;
this.threadMXBean = ManagementFactory.getThreadMXBean();
final PropertyEncryptor encryptor = flowController.getEncryptor(); final PropertyEncryptor encryptor = flowController.getEncryptor();
@ -89,6 +99,13 @@ public class ConnectableTask {
} }
repositoryContext = contextFactory.newProcessContext(connectable, new AtomicLong(0L)); repositoryContext = contextFactory.newProcessContext(connectable, new AtomicLong(0L));
final int perfTrackingPercentage = flowController.getPerformanceTrackingPercentage();
if (perfTrackingPercentage == 0) {
perfTrackingNthIteration = 0;
} else {
perfTrackingNthIteration = 100 / perfTrackingPercentage;
}
} }
public Connectable getConnectable() { public Connectable getConnectable() {
@ -182,18 +199,33 @@ public class ConnectableTask {
} }
logger.debug("Triggering {}", connectable); logger.debug("Triggering {}", connectable);
final long totalInvocationCount = invocations.getAndIncrement();
final boolean measureExpensiveMetrics = isMeasureExpensiveMetrics(totalInvocationCount);
final boolean measureCpuTime = measureExpensiveMetrics && threadMXBean.isCurrentThreadCpuTimeSupported();
final long startCpuTime;
final long startGcMillis;
if (measureCpuTime) {
startCpuTime = threadMXBean.getCurrentThreadCpuTime();
startGcMillis = flowController.getGarbageCollectionLog().getTotalGarbageCollectionMillis();
} else {
startCpuTime = 0L;
startGcMillis = 0L;
}
final PerformanceTracker performanceTracker = measureExpensiveMetrics ? new NanoTimePerformanceTracker() : new NopPerformanceTracker();
final long batchNanos = connectable.getRunDuration(TimeUnit.NANOSECONDS); final long batchNanos = connectable.getRunDuration(TimeUnit.NANOSECONDS);
final ProcessSessionFactory sessionFactory; final ProcessSessionFactory sessionFactory;
final StandardProcessSession rawSession; final StandardProcessSession rawSession;
final boolean batch; final boolean batch;
if (connectable.isSessionBatchingSupported() && batchNanos > 0L) { if (connectable.isSessionBatchingSupported() && batchNanos > 0L) {
rawSession = new StandardProcessSession(repositoryContext, scheduleState::isTerminated); rawSession = new StandardProcessSession(repositoryContext, scheduleState::isTerminated, performanceTracker);
sessionFactory = new BatchingSessionFactory(rawSession); sessionFactory = new BatchingSessionFactory(rawSession);
batch = true; batch = true;
} else { } else {
rawSession = null; rawSession = null;
sessionFactory = new StandardProcessSessionFactory(repositoryContext, scheduleState::isTerminated); sessionFactory = new StandardProcessSessionFactory(repositoryContext, scheduleState::isTerminated, performanceTracker);
batch = false; batch = false;
} }
@ -268,13 +300,8 @@ public class ConnectableTask {
} }
} }
final long processingNanos = System.nanoTime() - startNanos;
try { try {
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(); updateEventRepo(startNanos, startCpuTime, startGcMillis, invocationCount, measureCpuTime, performanceTracker);
procEvent.setProcessingNanos(processingNanos);
procEvent.setInvocations(invocationCount);
repositoryContext.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier());
} catch (final IOException e) { } catch (final IOException e) {
logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable.getRunnableComponent(), e.toString()); logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable.getRunnableComponent(), e.toString());
logger.error("", e); logger.error("", e);
@ -288,7 +315,128 @@ public class ConnectableTask {
return InvocationResult.DO_NOT_YIELD; return InvocationResult.DO_NOT_YIELD;
} }
private void updateEventRepo(final long startNanoTime, final long startCpuTime, final long startGcMillis, final int invocationCount, final boolean measureCpuTime,
final PerformanceTracker performanceTracker)
throws IOException {
final long processingNanos = System.nanoTime() - startNanoTime;
final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent();
flowFileEvent.setProcessingNanos(processingNanos);
flowFileEvent.setInvocations(invocationCount);
// We won't always measure CPU time because it's expensive to calculate. So when we do measure it, we keep track of
// total CPU nanos measured as well as total processing time for those iterations. This gives us a ratio of CPU time vs. total time.
// We can then use that to extrapolate an approximate CPU Time.
if (measureCpuTime) {
updatePerformanceTrackingMetrics(flowFileEvent, performanceTracker, startCpuTime, startGcMillis, processingNanos);
} else {
estimatePerformanceTrackingMetrics(flowFileEvent, processingNanos);
}
repositoryContext.getFlowFileEventRepository().updateRepository(flowFileEvent, connectable.getIdentifier());
}
private void estimatePerformanceTrackingMetrics(final StandardFlowFileEvent flowFileEvent, final long processingNanos) {
// Use ratio of measured CPU time vs. total time for that those iterations, to estimate the CPU time for this iteration.
final SampledMetrics currentMetrics = sampledMetrics;
final double processingRatio = (double) processingNanos / (double) Math.max(1, currentMetrics.getProcessingNanosSampled());
flowFileEvent.setCpuNanoseconds((long) (processingRatio * currentMetrics.getTotalCpuNanos()));
flowFileEvent.setContentReadNanoseconds((long) (processingRatio * currentMetrics.getReadNanos()));
flowFileEvent.setContentWriteNanoseconds((long) (processingRatio * currentMetrics.getWriteNanos()));
flowFileEvent.setSessionCommitNanos((long) (processingRatio * currentMetrics.getSessionCommitNanos()));
flowFileEvent.setGarbageCollectionMillis((long) (processingRatio * currentMetrics.getGcMillis()));
}
private void updatePerformanceTrackingMetrics(final StandardFlowFileEvent flowFileEvent, final PerformanceTracker performanceTracker, final long startCpuTime,
final long startGcMillis, final long processingNanos) {
final long cpuTime = threadMXBean.getCurrentThreadCpuTime();
final long cpuNanos = cpuTime - startCpuTime;
final long endGcMillis = flowController.getGarbageCollectionLog().getTotalGarbageCollectionMillis();
final long gcMillis = endGcMillis - startGcMillis;
flowFileEvent.setCpuNanoseconds(cpuNanos);
flowFileEvent.setContentWriteNanoseconds(performanceTracker.getContentWriteNanos());
flowFileEvent.setContentReadNanoseconds(performanceTracker.getContentReadNanos());
flowFileEvent.setSessionCommitNanos(performanceTracker.getSessionCommitNanos());
flowFileEvent.setGarbageCollectionMillis(gcMillis);
final SampledMetrics previousMetrics = sampledMetrics;
final SampledMetrics updatedMetrics = new SampledMetrics();
updatedMetrics.setProcessingNanosSampled(previousMetrics.getProcessingNanosSampled() + processingNanos);
updatedMetrics.setTotalCpuNanos(previousMetrics.getTotalCpuNanos() + cpuNanos);
updatedMetrics.setReadNanos(previousMetrics.getReadNanos() + performanceTracker.getContentReadNanos());
updatedMetrics.setWriteNanos(previousMetrics.getWriteNanos() + performanceTracker.getContentWriteNanos());
updatedMetrics.setSessionCommitNanos(previousMetrics.getSessionCommitNanos() + performanceTracker.getSessionCommitNanos());
updatedMetrics.setGcMillis(gcMillis);
this.sampledMetrics = updatedMetrics;
}
private boolean isMeasureExpensiveMetrics(final long invocationCount) {
if (perfTrackingNthIteration == 0) { // A value of 0 indicates we should never track performance metrics.
return false;
}
return invocationCount % perfTrackingNthIteration == 0;
}
private ComponentLog getComponentLog() { private ComponentLog getComponentLog() {
return new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent()); return new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());
} }
private static class SampledMetrics {
private long processingNanosSampled = 0L;
private long totalCpuNanos = 0L;
private long readNanos = 0L;
private long writeNanos = 0L;
private long sessionCommitNanos = 0L;
private long gcMillis = 0L;
public long getProcessingNanosSampled() {
return processingNanosSampled;
}
public void setProcessingNanosSampled(final long processingNanosSampled) {
this.processingNanosSampled = processingNanosSampled;
}
public long getTotalCpuNanos() {
return totalCpuNanos;
}
public void setTotalCpuNanos(final long totalCpuNanos) {
this.totalCpuNanos = totalCpuNanos;
}
public long getReadNanos() {
return readNanos;
}
public void setReadNanos(final long readNanos) {
this.readNanos = readNanos;
}
public long getWriteNanos() {
return writeNanos;
}
public void setWriteNanos(final long writeNanos) {
this.writeNanos = writeNanos;
}
public long getSessionCommitNanos() {
return sessionCommitNanos;
}
public void setSessionCommitNanos(final long sessionCommitNanos) {
this.sessionCommitNanos = sessionCommitNanos;
}
public long getGcMillis() {
return gcMillis;
}
public void setGcMillis(final long gcMillis) {
this.gcMillis = gcMillis;
}
}
} }

View File

@ -25,6 +25,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.repository.RepositoryContext; import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSession; import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory; import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory; import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup;
@ -61,7 +62,7 @@ public class ExpireFlowFiles implements Runnable {
private StandardProcessSession createSession(final Connectable connectable) { private StandardProcessSession createSession(final Connectable connectable) {
final RepositoryContext context = contextFactory.newProcessContext(connectable, new AtomicLong(0L)); final RepositoryContext context = contextFactory.newProcessContext(connectable, new AtomicLong(0L));
final StandardProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context, () -> false); final StandardProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context, () -> false, new NopPerformanceTracker());
return sessionFactory.createSession(); return sessionFactory.createSession();
} }

View File

@ -22,6 +22,7 @@ import org.apache.nifi.diagnostics.DiagnosticsDump;
import org.apache.nifi.diagnostics.DiagnosticsDumpElement; import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
import org.apache.nifi.diagnostics.DiagnosticsFactory; import org.apache.nifi.diagnostics.DiagnosticsFactory;
import org.apache.nifi.diagnostics.StandardDiagnosticsDump; import org.apache.nifi.diagnostics.StandardDiagnosticsDump;
import org.apache.nifi.diagnostics.ThreadDumpTask;
import org.apache.nifi.diagnostics.bootstrap.tasks.ClusterDiagnosticTask; import org.apache.nifi.diagnostics.bootstrap.tasks.ClusterDiagnosticTask;
import org.apache.nifi.diagnostics.bootstrap.tasks.ComponentCountTask; import org.apache.nifi.diagnostics.bootstrap.tasks.ComponentCountTask;
import org.apache.nifi.diagnostics.bootstrap.tasks.ContentRepositoryScanTask; import org.apache.nifi.diagnostics.bootstrap.tasks.ContentRepositoryScanTask;
@ -35,8 +36,8 @@ import org.apache.nifi.diagnostics.bootstrap.tasks.MemoryPoolPeakUsageTask;
import org.apache.nifi.diagnostics.bootstrap.tasks.NarsDiagnosticTask; import org.apache.nifi.diagnostics.bootstrap.tasks.NarsDiagnosticTask;
import org.apache.nifi.diagnostics.bootstrap.tasks.NiFiPropertiesDiagnosticTask; import org.apache.nifi.diagnostics.bootstrap.tasks.NiFiPropertiesDiagnosticTask;
import org.apache.nifi.diagnostics.bootstrap.tasks.OperatingSystemDiagnosticTask; import org.apache.nifi.diagnostics.bootstrap.tasks.OperatingSystemDiagnosticTask;
import org.apache.nifi.diagnostics.bootstrap.tasks.ProcessorTimingDiagnosticTask;
import org.apache.nifi.diagnostics.bootstrap.tasks.RepositoryDiagnosticTask; import org.apache.nifi.diagnostics.bootstrap.tasks.RepositoryDiagnosticTask;
import org.apache.nifi.diagnostics.ThreadDumpTask;
import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -74,6 +75,7 @@ public class BootstrapDiagnosticsFactory implements DiagnosticsFactory {
tasks.add(new OperatingSystemDiagnosticTask()); tasks.add(new OperatingSystemDiagnosticTask());
tasks.add(new NarsDiagnosticTask(flowController.getExtensionManager())); tasks.add(new NarsDiagnosticTask(flowController.getExtensionManager()));
tasks.add(new FlowConfigurationDiagnosticTask(flowController)); tasks.add(new FlowConfigurationDiagnosticTask(flowController));
tasks.add(new ProcessorTimingDiagnosticTask(flowController.getFlowFileEventRepository(), flowController.getFlowManager()));
tasks.add(new LongRunningProcessorTask(flowController)); tasks.add(new LongRunningProcessorTask(flowController));
tasks.add(new ClusterDiagnosticTask(flowController)); tasks.add(new ClusterDiagnosticTask(flowController));
tasks.add(new GarbageCollectionDiagnosticTask(flowController)); tasks.add(new GarbageCollectionDiagnosticTask(flowController));

View File

@ -37,6 +37,12 @@ public class DataValveDiagnosticsTask implements DiagnosticTask {
@Override @Override
public DiagnosticsDumpElement captureDump(final boolean verbose) { public DiagnosticsDumpElement captureDump(final boolean verbose) {
if (!verbose) {
// This task is very verbose and can make the diagnostics output difficult to read as a result.
// As such, it will not run at all if verbose output is disabled.
return null;
}
final ProcessGroup rootGroup = flowManager.getRootGroup(); final ProcessGroup rootGroup = flowManager.getRootGroup();
final List<ProcessGroup> allGroups = rootGroup.findAllProcessGroups(); final List<ProcessGroup> allGroups = rootGroup.findAllProcessGroups();
allGroups.add(rootGroup); allGroups.add(rootGroup);
@ -47,18 +53,18 @@ public class DataValveDiagnosticsTask implements DiagnosticTask {
details.add("Process Group " + group.getIdentifier() + ", Name = " + group.getName()); details.add("Process Group " + group.getIdentifier() + ", Name = " + group.getName());
details.add("Currently Have Data Flowing In: " + valveDiagnostics.getGroupsWithDataFlowingIn()); details.add("Currently Have Data Flowing In: " + valveDiagnostics.getGroupsWithDataFlowingIn());
details.add("Currently Have Data Flowing out: " + valveDiagnostics.getGroupsWithDataFlowingOut()); details.add("Currently Have Data Flowing Out: " + valveDiagnostics.getGroupsWithDataFlowingOut());
details.add("Reason for Not allowing data to flow in:"); details.add("Reason for Not allowing data to flow in:");
for (final Map.Entry<String, List<ProcessGroup>> entry : valveDiagnostics.getReasonForInputNotAllowed().entrySet()) { for (final Map.Entry<String, List<ProcessGroup>> entry : valveDiagnostics.getReasonForInputNotAllowed().entrySet()) {
details.add(" " + entry.getKey() + ":"); details.add(" " + entry.getKey() + ":");
entry.getValue().forEach(gr -> details.add(" " + gr)); entry.getValue().forEach(pg -> details.add(" " + pg));
} }
details.add("Reason for Not allowing data to flow out:"); details.add("Reason for Not allowing data to flow out:");
for (final Map.Entry<String, List<ProcessGroup>> entry : valveDiagnostics.getReasonForOutputNotAllowed().entrySet()) { for (final Map.Entry<String, List<ProcessGroup>> entry : valveDiagnostics.getReasonForOutputNotAllowed().entrySet()) {
details.add(" " + entry.getKey() + ":"); details.add(" " + entry.getKey() + ":");
entry.getValue().forEach(gr -> details.add(" " + gr)); entry.getValue().forEach(pg -> details.add(" " + pg));
} }
details.add(""); details.add("");

View File

@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.diagnostics.bootstrap.tasks;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.repository.FlowFileEvent;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.diagnostics.DiagnosticTask;
import org.apache.nifi.diagnostics.DiagnosticsDumpElement;
import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement;
import org.apache.nifi.processor.DataUnit;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
public class ProcessorTimingDiagnosticTask implements DiagnosticTask {
private final FlowFileEventRepository eventRepo;
private final FlowManager flowManager;
// | Proc ID | Proc Name | Proc Type | Group Name | Proc Secs | CPU Secs | %CPU used by Proc |
private static final String PROCESSOR_TIMING_FORMAT = "| %1$-36.36s | %2$-36.36s | %3$-36.36s | %4$-36.36s | %5$15.15s | %6$27.27s | %7$25.25s | " +
// Read Secs | Write Secs| Commit Sec | GC millis | MB Read | MB Write |
"%8$16.16s | %9$16.16s | %10$20.20s | %11$13.13s | %12$11.11s | %13$11.11s |";
public ProcessorTimingDiagnosticTask(final FlowFileEventRepository flowFileEventRepository, final FlowManager flowManager) {
this.eventRepo = flowFileEventRepository;
this.flowManager = flowManager;
}
@Override
public DiagnosticsDumpElement captureDump(final boolean verbose) {
final List<String> details = new ArrayList<>();
final RepositoryStatusReport statusReport = eventRepo.reportTransferEvents(System.currentTimeMillis());
final Map<String, FlowFileEvent> eventsByComponentId = statusReport.getReportEntries();
final List<ProcessorTiming> timings = new ArrayList<>();
eventsByComponentId.entrySet().stream()
.map(entry -> getTiming(entry.getKey(), entry.getValue()))
.filter(Objects::nonNull)
.forEach(timings::add); // create ArrayList and add here instead of .collect(toList()) because arraylist allows us to sort
// Sort based on the Processor CPU time, highest CPU usage first
timings.sort(Comparator.comparing(ProcessorTiming::getCpuNanos).reversed());
final DecimalFormat dataSizeFormat = new DecimalFormat("#,###,###.##");
final DecimalFormat percentageFormat = new DecimalFormat("##.##");
final NumberFormat secondsFormat = NumberFormat.getInstance();
long totalCpuNanos = 0L;
long totalProcNanos = 0L;
long totalReadNanos = 0L;
long totalWriteNanos = 0L;
long totalSessionCommitNanos = 0L;
long totalBytesRead = 0L;
long totalBytesWritten = 0L;
long totalGcNanos = 0L;
// Tally totals for all timing elements
for (final ProcessorTiming timing : timings) {
totalCpuNanos += timing.getCpuNanos();
totalProcNanos += timing.getProcessingNanos();
totalReadNanos += timing.getReadNanos();
totalWriteNanos += timing.getWriteNanos();
totalSessionCommitNanos += timing.getSessionCommitNanos();
totalBytesRead += timing.getBytesRead();
totalBytesWritten += timing.getBytesWritten();
totalGcNanos += timing.getGarbageCollectionNanos();
}
if (totalCpuNanos < 1) {
details.add("No Processor Timing Diagnostic information has been gathered.");
return new StandardDiagnosticsDumpElement("Processor Timing Diagnostics (Stats over last 5 minutes)", details);
}
details.add(String.format(PROCESSOR_TIMING_FORMAT, "Processor ID", "Processor Name", "Processor Type", "Process Group Name", "Processing Secs",
"CPU Secs (% time using CPU)", "Pct CPU Time Used by Proc", "Disk Read Secs", "Disk Write Secs", "Session Commit Secs", "GC Millis", "MB Read", "MB Written"));
for (final ProcessorTiming timing : timings) {
final long procNanos = timing.getProcessingNanos();
if (procNanos < 1) {
continue;
}
final String cpuTime = nanosToPercentTime(timing.getCpuNanos(), procNanos, secondsFormat);
final String cpuPct = percentageFormat.format(timing.getCpuNanos() * 100 / totalCpuNanos);
final String readTime = nanosToPercentTime(timing.getReadNanos(), procNanos, secondsFormat);
final String writeTime = nanosToPercentTime(timing.getWriteNanos(), procNanos, secondsFormat);
final String commitTime = nanosToPercentTime(timing.getSessionCommitNanos(), procNanos, secondsFormat);
final String gcTime = nanosToPercentTime(timing.getGarbageCollectionNanos(), procNanos, secondsFormat);
final String formatted = String.format(PROCESSOR_TIMING_FORMAT, timing.getId(), timing.getName(), timing.getType(), timing.getGroupName(),
secondsFormat.format(TimeUnit.NANOSECONDS.toSeconds(timing.getProcessingNanos())),
cpuTime,
cpuPct,
readTime,
writeTime,
commitTime,
gcTime,
dataSizeFormat.format(DataUnit.B.toMB(timing.getBytesRead())),
dataSizeFormat.format(DataUnit.B.toMB(timing.getBytesWritten())));
details.add(formatted);
}
final String formatted = String.format(PROCESSOR_TIMING_FORMAT, "Total", "--", "--", "--",
secondsFormat.format(TimeUnit.NANOSECONDS.toSeconds(totalProcNanos)),
nanosToPercentTime(totalCpuNanos, totalProcNanos, secondsFormat),
"100.00", // Always represents 100% of CPU time used
nanosToPercentTime(totalReadNanos, totalProcNanos, secondsFormat),
nanosToPercentTime(totalWriteNanos, totalProcNanos, secondsFormat),
nanosToPercentTime(totalSessionCommitNanos, totalProcNanos, secondsFormat),
nanosToPercentTime(totalGcNanos, totalProcNanos, secondsFormat),
dataSizeFormat.format(DataUnit.B.toMB(totalBytesRead)),
dataSizeFormat.format(DataUnit.B.toMB(totalBytesWritten)));
details.add(formatted);
final double mbReadPerSecond = DataUnit.B.toMB(totalBytesRead) / 300;
final double mbWrittenPerSecond = DataUnit.B.toMB(totalBytesWritten) / 300;
details.add("");
details.add("Average MB/sec read from Content Repo in last 5 mins: " + dataSizeFormat.format(mbReadPerSecond));
details.add("Average MB/sec written to Content Repo in last 5 mins: " + dataSizeFormat.format(mbWrittenPerSecond));
return new StandardDiagnosticsDumpElement("Processor Timing Diagnostics (Stats over last 5 minutes)", details);
}
private String nanosToPercentTime(final long nanos, final long processingNanos, final NumberFormat secondsFormat) {
return secondsFormat.format(TimeUnit.NANOSECONDS.toSeconds(nanos)) + " (" + Math.min(100, ((int) (nanos * 100 / processingNanos))) + "%)";
}
private ProcessorTiming getTiming(final String processorId, final FlowFileEvent flowFileEvent) {
final ProcessorNode processorNode = flowManager.getProcessorNode(processorId);
// Processor may be null because the event may not be for a processor, or the processor may already have been removed.
if (processorNode == null) {
return null;
}
final ProcessorTiming timing = new ProcessorTiming(processorNode, flowFileEvent);
return timing;
}
private static class ProcessorTiming {
private final String id;
private final String name;
private final String type;
private final String groupName;
private final FlowFileEvent flowFileEvent;
public ProcessorTiming(final ProcessorNode processor, final FlowFileEvent flowFileEvent) {
this.id = processor.getIdentifier();
this.name = processor.getName();
this.type = processor.getComponentType();
this.groupName = processor.getProcessGroup().getName();
this.flowFileEvent = flowFileEvent;
}
public String getId() {
return id;
}
public String getName() {
return name;
}
public String getType() {
return type;
}
public String getGroupName() {
return groupName;
}
public long getProcessingNanos() {
return flowFileEvent.getProcessingNanoseconds();
}
public long getCpuNanos() {
return flowFileEvent.getCpuNanoseconds();
}
public long getReadNanos() {
return flowFileEvent.getContentReadNanoseconds();
}
public long getWriteNanos() {
return flowFileEvent.getContentWriteNanoseconds();
}
public long getSessionCommitNanos() {
return flowFileEvent.getSessionCommitNanoseconds();
}
public long getBytesRead() {
return flowFileEvent.getBytesRead();
}
public long getBytesWritten() {
return flowFileEvent.getBytesWritten();
}
public long getGarbageCollectionNanos() {
return TimeUnit.MILLISECONDS.toNanos(flowFileEvent.getGargeCollectionMillis());
}
}
}

View File

@ -35,6 +35,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository; import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
@ -221,7 +222,7 @@ public class StandardProcessSessionIT {
stateManager.setIgnoreAnnotations(true); stateManager.setIgnoreAnnotations(true);
context = new StandardRepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo, stateManager); context = new StandardRepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo, stateManager);
session = new StandardProcessSession(context, () -> false); session = new StandardProcessSession(context, () -> false, new NopPerformanceTracker());
} }
private Connection createConnection() { private Connection createConnection() {
@ -340,7 +341,7 @@ public class StandardProcessSessionIT {
children.add(child); children.add(child);
} }
final ProcessSession secondSession = new StandardProcessSession(context, () -> false); final ProcessSession secondSession = new StandardProcessSession(context, () -> false, new NopPerformanceTracker());
try { try {
session.migrate(secondSession, children); session.migrate(secondSession, children);
Assert.fail("Expected a FlowFileHandlingException to be thrown because a child FlowFile was migrated while its parent was not"); Assert.fail("Expected a FlowFileHandlingException to be thrown because a child FlowFile was migrated while its parent was not");
@ -383,7 +384,7 @@ public class StandardProcessSessionIT {
FlowFile flowFile = session.get(); FlowFile flowFile = session.get();
assertNotNull(flowFile); assertNotNull(flowFile);
final ProcessSession secondSession = new StandardProcessSession(context, () -> false); final ProcessSession secondSession = new StandardProcessSession(context, () -> false, new NopPerformanceTracker());
FlowFile clone = session.clone(flowFile); FlowFile clone = session.clone(flowFile);
session.migrate(secondSession, Collections.singletonList(clone)); session.migrate(secondSession, Collections.singletonList(clone));
@ -1743,7 +1744,7 @@ public class StandardProcessSessionIT {
session.transfer(ffb, relationship); session.transfer(ffb, relationship);
session.commit(); session.commit();
final ProcessSession newSession = new StandardProcessSession(context, () -> false); final ProcessSession newSession = new StandardProcessSession(context, () -> false, new NopPerformanceTracker());
FlowFile toUpdate = newSession.get(); FlowFile toUpdate = newSession.get();
newSession.append(toUpdate, out -> out.write('C')); newSession.append(toUpdate, out -> out.write('C'));
@ -1816,7 +1817,7 @@ public class StandardProcessSessionIT {
StandardProcessSession[] standardProcessSessions = new StandardProcessSession[100000]; StandardProcessSession[] standardProcessSessions = new StandardProcessSession[100000];
for (int i = 0; i < 70000; i++) { for (int i = 0; i < 70000; i++) {
standardProcessSessions[i] = new StandardProcessSession(context, () -> false); standardProcessSessions[i] = new StandardProcessSession(context, () -> false, new NopPerformanceTracker());
FlowFile flowFile = standardProcessSessions[i].create(); FlowFile flowFile = standardProcessSessions[i].create();
final byte[] buff = new byte["Hello".getBytes().length]; final byte[] buff = new byte["Hello".getBytes().length];
@ -2427,7 +2428,7 @@ public class StandardProcessSessionIT {
flowFile = session.append(flowFile, out -> out.write("1".getBytes())); flowFile = session.append(flowFile, out -> out.write("1".getBytes()));
flowFile = session.append(flowFile, out -> out.write("2".getBytes())); flowFile = session.append(flowFile, out -> out.write("2".getBytes()));
final StandardProcessSession newSession = new StandardProcessSession(context, () -> false); final StandardProcessSession newSession = new StandardProcessSession(context, () -> false, new NopPerformanceTracker());
assertTrue(session.isFlowFileKnown(flowFile)); assertTrue(session.isFlowFileKnown(flowFile));
assertFalse(newSession.isFlowFileKnown(flowFile)); assertFalse(newSession.isFlowFileKnown(flowFile));
@ -2459,7 +2460,7 @@ public class StandardProcessSessionIT {
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
flowFile = session.write(flowFile, out -> out.write("Hello".getBytes(StandardCharsets.UTF_8))); flowFile = session.write(flowFile, out -> out.write("Hello".getBytes(StandardCharsets.UTF_8)));
final StandardProcessSession newSession = new StandardProcessSession(context, () -> false); final StandardProcessSession newSession = new StandardProcessSession(context, () -> false, new NopPerformanceTracker());
when(connectable.getConnections(any(Relationship.class))).thenReturn(Collections.emptySet()); when(connectable.getConnections(any(Relationship.class))).thenReturn(Collections.emptySet());
when(connectable.isAutoTerminated(any(Relationship.class))).thenReturn(true); when(connectable.isAutoTerminated(any(Relationship.class))).thenReturn(true);
@ -3241,7 +3242,7 @@ public class StandardProcessSessionIT {
counterRepository, counterRepository,
provenanceRepo, provenanceRepo,
stateManager); stateManager);
return new StandardProcessSession(context, () -> false); return new StandardProcessSession(context, () -> false, new NopPerformanceTracker());
} }

View File

@ -17,16 +17,10 @@
package org.apache.nifi.controller.repository.claim; package org.apache.nifi.controller.repository.claim;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.nifi.controller.repository.FileSystemRepository; import org.apache.nifi.controller.repository.FileSystemRepository;
import org.apache.nifi.controller.repository.StandardContentRepositoryContext; import org.apache.nifi.controller.repository.StandardContentRepositoryContext;
import org.apache.nifi.controller.repository.TestFileSystemRepository; import org.apache.nifi.controller.repository.TestFileSystemRepository;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.repository.util.DiskUtils; import org.apache.nifi.controller.repository.util.DiskUtils;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
@ -36,6 +30,14 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestContentClaimWriteCache { public class TestContentClaimWriteCache {
private FileSystemRepository repository = null; private FileSystemRepository repository = null;
@ -62,7 +64,7 @@ public class TestContentClaimWriteCache {
@Test @Test
public void testFlushWriteCorrectData() throws IOException { public void testFlushWriteCorrectData() throws IOException {
final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, 4); final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, new NopPerformanceTracker(), 4);
final ContentClaim claim1 = cache.getContentClaim(); final ContentClaim claim1 = cache.getContentClaim();
assertNotNull(claim1); assertNotNull(claim1);

View File

@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository.io;
import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -56,7 +57,7 @@ public class TestContentClaimInputStream {
@Test @Test
public void testStreamCreatedFromRepository() throws IOException { public void testStreamCreatedFromRepository() throws IOException {
final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L); final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L, new NopPerformanceTracker());
final byte[] buff = new byte[5]; final byte[] buff = new byte[5];
StreamUtils.fillBuffer(in, buff); StreamUtils.fillBuffer(in, buff);
@ -78,7 +79,7 @@ public class TestContentClaimInputStream {
@Test @Test
public void testThatContentIsSkipped() throws IOException { public void testThatContentIsSkipped() throws IOException {
final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 3L); final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 3L, new NopPerformanceTracker());
final byte[] buff = new byte[2]; final byte[] buff = new byte[2];
StreamUtils.fillBuffer(in, buff); StreamUtils.fillBuffer(in, buff);
@ -100,7 +101,7 @@ public class TestContentClaimInputStream {
@Test @Test
public void testRereadEntireClaim() throws IOException { public void testRereadEntireClaim() throws IOException {
final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L); final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L, new NopPerformanceTracker());
final byte[] buff = new byte[5]; final byte[] buff = new byte[5];
@ -131,7 +132,7 @@ public class TestContentClaimInputStream {
@Test @Test
public void testMultipleResetCallsAfterMark() throws IOException { public void testMultipleResetCallsAfterMark() throws IOException {
final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L); final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L, new NopPerformanceTracker());
final byte[] buff = new byte[5]; final byte[] buff = new byte[5];
@ -162,7 +163,7 @@ public class TestContentClaimInputStream {
@Test @Test
public void testRereadWithOffset() throws IOException { public void testRereadWithOffset() throws IOException {
final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 3L); final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 3L, new NopPerformanceTracker());
final byte[] buff = new byte[2]; final byte[] buff = new byte[2];

View File

@ -254,6 +254,8 @@
<!-- nifi.properties: runtime monitoring properties --> <!-- nifi.properties: runtime monitoring properties -->
<nifi.monitor.long.running.task.schedule>1 min</nifi.monitor.long.running.task.schedule> <nifi.monitor.long.running.task.schedule>1 min</nifi.monitor.long.running.task.schedule>
<nifi.monitor.long.running.task.threshold>5 mins</nifi.monitor.long.running.task.threshold> <nifi.monitor.long.running.task.threshold>5 mins</nifi.monitor.long.running.task.threshold>
<nifi.performance.tracking.percentage>0</nifi.performance.tracking.percentage>
</properties> </properties>
<build> <build>
<plugins> <plugins>

View File

@ -343,6 +343,14 @@ nifi.diagnostics.on.shutdown.max.filecount=10
# The diagnostics folder's maximum permitted size in bytes. If the limit is exceeded, the oldest files are deleted. # The diagnostics folder's maximum permitted size in bytes. If the limit is exceeded, the oldest files are deleted.
nifi.diagnostics.on.shutdown.max.directory.size=10 MB nifi.diagnostics.on.shutdown.max.directory.size=10 MB
# Performance tracking properties
## Specifies what percentage of the time we should track the amount of time processors are using CPU, reading from/writing to content repo, etc.
## This can be useful to understand which components are the most expensive and to understand where system bottlenecks may be occurring.
## The value must be in the range of 0 (inclusive) to 100 (inclusive). A larger value will produce more accurate results, while a smaller value may be
## less expensive to compute.
## Results can be obtained by running "nifi.sh diagnostics <filename>" and then inspecting the produced file.
nifi.performance.tracking.percentage=${nifi.performance.tracking.percentage}
# NAR Provider Properties # # NAR Provider Properties #
# These properties allow configuring one or more NAR providers. A NAR provider retrieves NARs from an external source # These properties allow configuring one or more NAR providers. A NAR provider retrieves NARs from an external source
# and copies them to the directory specified by nifi.nar.library.autoload.directory. # and copies them to the directory specified by nifi.nar.library.autoload.directory.

View File

@ -41,6 +41,7 @@ import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.RepositoryContext; import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory; import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.controller.service.StandardConfigurationContext;
@ -603,7 +604,7 @@ public class StandardStatelessFlow implements StatelessDataflow {
} }
final RepositoryContext repositoryContext = repositoryContextFactory.createRepositoryContext(inputPort); final RepositoryContext repositoryContext = repositoryContextFactory.createRepositoryContext(inputPort);
final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(repositoryContext, () -> false); final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(repositoryContext, () -> false, new NopPerformanceTracker());
final ProcessSession session = sessionFactory.createSession(); final ProcessSession session = sessionFactory.createSession();
try { try {
// Get one of the outgoing connections for the Input Port so that we can return QueueSize for it. // Get one of the outgoing connections for the Input Port so that we can return QueueSize for it.

View File

@ -26,6 +26,7 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryContext; import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache; import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventRepository;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -41,7 +42,7 @@ public class StatelessRepositoryContext extends AbstractRepositoryContext implem
} }
@Override @Override
public ContentClaimWriteCache createContentClaimWriteCache() { public ContentClaimWriteCache createContentClaimWriteCache(final PerformanceTracker performanceTracker) {
return writeCache; return writeCache;
} }
} }

View File

@ -22,6 +22,7 @@ import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.StandardProcessSession; import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -55,7 +56,7 @@ public class StatelessProcessSession extends StandardProcessSession {
public StatelessProcessSession(final Connectable connectable, final RepositoryContextFactory repositoryContextFactory, final ProcessContextFactory processContextFactory, public StatelessProcessSession(final Connectable connectable, final RepositoryContextFactory repositoryContextFactory, final ProcessContextFactory processContextFactory,
final ExecutionProgress progress, final boolean requireSynchronousCommits, final AsynchronousCommitTracker tracker) { final ExecutionProgress progress, final boolean requireSynchronousCommits, final AsynchronousCommitTracker tracker) {
super(repositoryContextFactory.createRepositoryContext(connectable), progress::isCanceled); super(repositoryContextFactory.createRepositoryContext(connectable), progress::isCanceled, new NopPerformanceTracker());
this.connectable = connectable; this.connectable = connectable;
this.repositoryContextFactory = repositoryContextFactory; this.repositoryContextFactory = repositoryContextFactory;
this.processContextFactory = processContextFactory; this.processContextFactory = processContextFactory;