diff --git a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java index f00a29419d..e620475c79 100644 --- a/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java +++ b/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java @@ -330,6 +330,12 @@ public class NiFiProperties extends ApplicationProperties { public static final int DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_MAX_FILE_COUNT = 10; public static final String DEFAULT_DIAGNOSTICS_ON_SHUTDOWN_MAX_DIRECTORY_SIZE = "10 MB"; + // performance tracking + public static final String TRACK_PERFORMANCE_PERCENTAGE = "nifi.performance.tracking.percentage"; + + // performance tracking defaults + public static final int DEFAULT_TRACK_PERFORMANCE_PERCENTAGE = 0; + // defaults public static final Boolean DEFAULT_AUTO_RESUME_STATE = true; public static final String DEFAULT_AUTHORIZER_CONFIGURATION_FILE = "conf/authorizers.xml"; @@ -510,7 +516,7 @@ public class NiFiProperties extends ApplicationProperties { try { return Integer.parseInt(value.trim()); } catch (final Exception e) { - logger.warn("Configured value is invalid, falling back to default value", e); + logger.warn("Configured value for property {} in nifi.properties is invalid, falling back to default value", propertyName, e); return defaultValue; } } @@ -1563,6 +1569,18 @@ public class NiFiProperties extends ApplicationProperties { return getProperty(STATE_MANAGEMENT_CLUSTER_PROVIDER_ID); } + public int getPerformanceMetricTrackingPercentage() { + final int percentage = getIntegerProperty(TRACK_PERFORMANCE_PERCENTAGE, DEFAULT_TRACK_PERFORMANCE_PERCENTAGE); + if (percentage < 0) { + return 0; + } + if (percentage > 100) { + return 100; + } + + return percentage; + } + public File getEmbeddedZooKeeperPropertiesFile() { final String filename = getProperty(STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES); return filename == null ? null : new File(filename); diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index fd8a31ed3c..e3ba327476 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -4131,9 +4131,36 @@ To enable it, both `nifi.monitor.long.running.task.schedule` and `nifi.monitor.l |*Property*|*Description* |`nifi.monitor.long.running.task.schedule`|The time period between successive executions of the Long-Running Task Monitor (e.g. `1 min`). |`nifi.monitor.long.running.task.threshold`|The time period beyond which a task is considered long-running, i.e. stuck / hanging (e.g. `5 mins`). - |==== + +[[performance_tracking_properties]] +=== Performance Tracking Properties + +NiFi exposes a very significant number of metrics by default through the User Interface. However, there are sometimes additional metrics that may add in diagnosing bottlenecks +and improving the performance of the NiFi dataflow. + +The `nifi.performance.tracking.percentage` property can be used to enable the tracking of additional metrics. Gathering these metrics, however, require system calls, which can be +expensive on some systems. As a result, this property defaults to a value of `0`, indicating that the metrics should be captured 0% of the time. I.e., the feature is disabled by +default. To enable this feature, set the value of this property to an integer value in the range of 0 to 100, inclusive. This represents what percentage of the time NiFi should +gather these metrics. + +For example, if the value is set to 20, then NiFi will gather these metrics for each processor approximately 20% of the times that the Processor is run. The remainder of the time, +it will use the values that it has already captured in order to extrapolate the metrics to additional runs. + +The metrics that are gathered include what percentage of the time the processor is utilizing the CPU (versus waiting for I/O to complete or blocking due to monitor/lock contention), +what percentage of time the Processor spends reading from the Content Repository, writing to the Content Repository, blocked due to Garbage Collection, etc. + +So, continuing our example, if we set the value of the `nifi.performance.tracking.percentage` and a processor is triggered to run 1,000 times, then NiFi will measure how much CPU +time was consumed over the 200 iterations during which it was measured (i.e., 20% of 1,000). Let's say that this amounts to 500 milliseconds of CPU time. Additionally, let's consider +that the Processor took 5,000 milliseconds to complete those 200 invocations because most of the time was spent blocking on Socket I/O. From this, NiFi will calculate that the CPU +is used approximately 10% of the time (500 / 5,000 * 100%). Now, let's consider that in order to complete all 1,000 invocations the Processor took 35 seconds. NiFi will calculate, +then, that the Processor has used approximately 3.5 seconds (or 3500 milliseconds) of CPU time. + +As a result, if we set the value of this property higher, up to a value of `100`, we will get more accurate results. However, it may be more expensive to monitor. + +In order to view these metrics, we can gather diagnostics by running the command `nifi.sh diagnostics ` and inspecting the generated file. See <> for more information. + [[custom_properties]] === Custom Properties @@ -4539,8 +4566,9 @@ Example configuration: nifi.nar.library.provider.hdfs2.resources=/etc/hadoop/core-site.xml nifi.nar.library.provider.hdfs2.source.directory=/other/dir/for/customNars -NOTE: For backward compatibility reasons, `org.apache.nifi.nar.hadoop.HDFSNarProvider` is still an accepted implementation, but it is suggested to use 'org.apache.nifi.flow.resource.hadoop.HDFSExternalResourceProvider' instead. Features introduced with the External Resource Provider might not work with this implementation! +NOTE: For backward compatibility reasons, `org.apache.nifi.nar.hadoop.HDFSNarProvider` is still an accepted implementation, but it is suggested to use 'org.apache.nifi.flow.resource.hadoop.HDFSExternalResourceProvider' instead.Features introduced with the External Resource Provider might not work with this implementation! +[[nifi_diagnostics]] == NiFi diagnostics It is possible to get diagnostics data from a NiFi node by executing the below command: diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java index 96fe4bcaa6..bc0cc8eb19 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/RepositoryContext.java @@ -21,6 +21,7 @@ import org.apache.nifi.components.state.StateManager; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache; +import org.apache.nifi.controller.repository.metrics.PerformanceTracker; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.Relationship; import org.apache.nifi.provenance.InternalProvenanceReporter; @@ -48,7 +49,7 @@ public interface RepositoryContext { boolean isRelationshipAvailabilitySatisfied(int requiredNumber); - ContentClaimWriteCache createContentClaimWriteCache(); + ContentClaimWriteCache createContentClaimWriteCache(PerformanceTracker performanceTracker); InternalProvenanceReporter createProvenanceReporter(Predicate flowfileKnownCheck, ProvenanceEventEnricher eventEnricher); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index af79fabb9b..e969d029e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -38,6 +38,8 @@ import org.apache.nifi.controller.repository.io.FlowFileAccessOutputStream; import org.apache.nifi.controller.repository.io.LimitedInputStream; import org.apache.nifi.controller.repository.io.TaskTerminationInputStream; import org.apache.nifi.controller.repository.io.TaskTerminationOutputStream; +import org.apache.nifi.controller.repository.metrics.PerformanceTracker; +import org.apache.nifi.controller.repository.metrics.PerformanceTrackingInputStream; import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.controller.state.StandardStateMap; import org.apache.nifi.flowfile.FlowFile; @@ -137,6 +139,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn private final Map deleteOnCommit = new HashMap<>(); private final long sessionId; private final String connectableDescription; + private final PerformanceTracker performanceTracker; private Map countersOnCommit; private Map immediateCounters; @@ -180,14 +183,15 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn private final String retryAttribute; private final FlowFileLinkage flowFileLinkage = new FlowFileLinkage(); - public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination) { + public StandardProcessSession(final RepositoryContext context, final TaskTermination taskTermination, final PerformanceTracker performanceTracker) { this.context = context; this.taskTermination = taskTermination; + this.performanceTracker = performanceTracker; this.provenanceReporter = context.createProvenanceReporter(this::isFlowFileKnown, this); this.sessionId = idGenerator.getAndIncrement(); this.connectableDescription = context.getConnectableDescription(); - this.claimCache = context.createContentClaimWriteCache(); + this.claimCache = context.createContentClaimWriteCache(performanceTracker); LOG.trace("Session {} created for {}", this, connectableDescription); processingStartTime = System.nanoTime(); retryAttribute = "retryCount." + context.getConnectable().getIdentifier(); @@ -564,6 +568,7 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn @SuppressWarnings({"unchecked", "rawtypes"}) protected void commit(final Checkpoint checkpoint, final boolean asynchronous) { try { + performanceTracker.beginSessionCommit(); final long commitStartNanos = System.nanoTime(); resetReadClaim(); @@ -727,6 +732,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn } else { throw new ProcessException(e); } + } finally { + performanceTracker.endSessionCommit(); } } @@ -2597,7 +2604,8 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn } final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize()); - final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream); + final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, + contentClaimOffset, limitingInputStream, performanceTracker); return contentClaimInputStream; } } @@ -2609,8 +2617,13 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn } currentReadClaim = claim.getResourceClaim(); + + performanceTracker.beginContentRead(); final InputStream contentRepoStream = context.getContentRepository().read(claim.getResourceClaim()); - StreamUtils.skip(contentRepoStream, claim.getOffset() + contentClaimOffset); + performanceTracker.endContentRead(); + + final InputStream performanceTrackInputStream = new PerformanceTrackingInputStream(contentRepoStream, performanceTracker); + StreamUtils.skip(performanceTrackInputStream, claim.getOffset() + contentClaimOffset); final InputStream bufferedContentStream = new BufferedInputStream(contentRepoStream); final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset() + contentClaimOffset); currentReadClaimStream = byteCountingInputStream; @@ -2621,12 +2634,12 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn // Finally, we need to wrap the InputStream in a ContentClaimInputStream so that if mark/reset is used, we can provide that capability // without buffering data in memory. final InputStream limitingInputStream = new LimitingInputStream(new DisableOnCloseInputStream(currentReadClaimStream), flowFile.getSize()); - final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream); + final ContentClaimInputStream contentClaimInputStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, limitingInputStream, performanceTracker); return contentClaimInputStream; } else { claimCache.flush(claim); - final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset); + final InputStream rawInStream = new ContentClaimInputStream(context.getContentRepository(), claim, contentClaimOffset, performanceTracker); return rawInStream; } } catch (final ContentNotFoundException cnfe) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java index 7e4d5b047f..0c5e3fd5b2 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSessionFactory.java @@ -17,20 +17,23 @@ package org.apache.nifi.controller.repository; import org.apache.nifi.controller.lifecycle.TaskTermination; +import org.apache.nifi.controller.repository.metrics.PerformanceTracker; import org.apache.nifi.processor.ProcessSessionFactory; public class StandardProcessSessionFactory implements ProcessSessionFactory { private final RepositoryContext context; private final TaskTermination taskTermination; + private final PerformanceTracker performanceTracker; - public StandardProcessSessionFactory(final RepositoryContext context, final TaskTermination taskTermination) { + public StandardProcessSessionFactory(final RepositoryContext context, final TaskTermination taskTermination, final PerformanceTracker performanceTracker) { this.context = context; this.taskTermination = taskTermination; + this.performanceTracker = performanceTracker; } @Override public StandardProcessSession createSession() { - return new StandardProcessSession(context, taskTermination); + return new StandardProcessSession(context, taskTermination, performanceTracker); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java index 169f0e2b24..74696167cd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java @@ -18,6 +18,8 @@ package org.apache.nifi.controller.repository.io; import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.metrics.PerformanceTracker; +import org.apache.nifi.controller.repository.metrics.PerformanceTrackingInputStream; import org.apache.nifi.stream.io.StreamUtils; import java.io.IOException; @@ -32,20 +34,23 @@ public class ContentClaimInputStream extends InputStream { private final ContentRepository contentRepository; private final ContentClaim contentClaim; private final long claimOffset; + private final PerformanceTracker performanceTracker; private InputStream delegate; private long bytesConsumed; private long currentOffset; // offset into the Content Claim; will differ from bytesRead if reset() is called after reading at least one byte or if claimOffset > 0 private long markOffset; - public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset) { - this(contentRepository, contentClaim, claimOffset, null); + public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset, final PerformanceTracker performanceTracker) { + this(contentRepository, contentClaim, claimOffset, null, performanceTracker); } - public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset, final InputStream initialDelegate) { + public ContentClaimInputStream(final ContentRepository contentRepository, final ContentClaim contentClaim, final long claimOffset, final InputStream initialDelegate, + final PerformanceTracker performanceTracker) { this.contentRepository = contentRepository; this.contentClaim = contentClaim; this.claimOffset = claimOffset; + this.performanceTracker = performanceTracker; this.currentOffset = claimOffset; this.delegate = initialDelegate; @@ -142,7 +147,14 @@ public class ContentClaimInputStream extends InputStream { } formDelegate(); - StreamUtils.skip(delegate, markOffset - claimOffset); + + performanceTracker.beginContentRead(); + try { + StreamUtils.skip(delegate, markOffset - claimOffset); + } finally { + performanceTracker.endContentRead(); + } + currentOffset = markOffset; } } @@ -159,8 +171,13 @@ public class ContentClaimInputStream extends InputStream { delegate.close(); } - delegate = contentRepository.read(contentClaim); - StreamUtils.skip(delegate, claimOffset); - currentOffset = claimOffset; + performanceTracker.beginContentRead(); + try { + delegate = new PerformanceTrackingInputStream(contentRepository.read(contentClaim), performanceTracker); + StreamUtils.skip(delegate, claimOffset); + currentOffset = claimOffset; + } finally { + performanceTracker.endContentRead();; + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java index 3c141401e0..1ec5d12e1e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EmptyFlowFileEvent.java @@ -72,6 +72,31 @@ public class EmptyFlowFileEvent implements FlowFileEvent { return 0; } + @Override + public long getCpuNanoseconds() { + return 0; + } + + @Override + public long getContentReadNanoseconds() { + return 0; + } + + @Override + public long getContentWriteNanoseconds() { + return 0; + } + + @Override + public long getSessionCommitNanoseconds() { + return 0; + } + + @Override + public long getGargeCollectionMillis() { + return 0; + } + @Override public long getAverageLineageMillis() { return 0; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java index a618812bf2..316623d118 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/EventSumValue.java @@ -41,6 +41,11 @@ public class EventSumValue { private long bytesReceived = 0; private long bytesSent = 0; private long processingNanos = 0; + private long cpuNanos = 0; + private long contentReadNanos = 0; + private long contentWriteNanos = 0; + private long sessionCommitNanos = 0; + private long gcMillis = 0; private long aggregateLineageMillis = 0; private int invocations = 0; private Map counters; @@ -70,6 +75,11 @@ public class EventSumValue { this.flowFilesSent += flowFileEvent.getFlowFilesSent(); this.invocations += flowFileEvent.getInvocations(); this.processingNanos += flowFileEvent.getProcessingNanoseconds(); + this.cpuNanos += flowFileEvent.getCpuNanoseconds(); + this.contentReadNanos += flowFileEvent.getContentReadNanoseconds(); + this.contentWriteNanos += flowFileEvent.getContentWriteNanoseconds(); + this.gcMillis += flowFileEvent.getGargeCollectionMillis(); + this.sessionCommitNanos += flowFileEvent.getSessionCommitNanoseconds(); final Map eventCounters = flowFileEvent.getCounters(); if (eventCounters != null) { @@ -106,6 +116,11 @@ public class EventSumValue { event.setFlowFilesSent(flowFilesSent); event.setInvocations(invocations); event.setProcessingNanos(processingNanos); + event.setCpuNanoseconds(cpuNanos); + event.setContentReadNanoseconds(contentReadNanos); + event.setContentWriteNanoseconds(contentWriteNanos); + event.setSessionCommitNanos(sessionCommitNanos); + event.setGarbageCollectionMillis(gcMillis); event.setCounters(this.counters == null ? Collections.emptyMap() : Collections.unmodifiableMap(this.counters)); return event; } @@ -131,6 +146,10 @@ public class EventSumValue { this.flowFilesSent += other.flowFilesSent; this.invocations += other.invocations; this.processingNanos += other.processingNanos; + this.cpuNanos += other.cpuNanos; + this.contentReadNanos += other.contentReadNanos; + this.contentWriteNanos += other.contentWriteNanos; + this.sessionCommitNanos += other.sessionCommitNanos; final Map eventCounters = other.counters; if (eventCounters != null) { @@ -169,6 +188,10 @@ public class EventSumValue { this.flowFilesSent -= other.flowFilesSent; this.invocations -= other.invocations; this.processingNanos -= other.processingNanos; + this.cpuNanos -= other.cpuNanos; + this.contentReadNanos -= other.contentReadNanos; + this.contentWriteNanos -= other.contentWriteNanos; + this.sessionCommitNanos -= other.sessionCommitNanos; final Map eventCounters = other.counters; if (eventCounters != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NanoTimePerformanceTracker.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NanoTimePerformanceTracker.java new file mode 100644 index 0000000000..8e9c8cc762 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NanoTimePerformanceTracker.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.metrics; + +public class NanoTimePerformanceTracker implements PerformanceTracker { + private final Timer contentReadTimer = new Timer(); + private final Timer contentWriteTimer = new Timer(); + private final Timer sessionCommitTimer = new Timer(); + + @Override + public void beginContentRead() { + contentReadTimer.start(); + } + + @Override + public void endContentRead() { + contentReadTimer.stop(); + } + + @Override + public long getContentReadNanos() { + return contentReadTimer.get(); + } + + @Override + public void beginContentWrite() { + contentWriteTimer.start(); + } + + @Override + public void endContentWrite() { + contentWriteTimer.stop(); + } + + @Override + public long getContentWriteNanos() { + return contentWriteTimer.get(); + } + + @Override + public void beginSessionCommit() { + sessionCommitTimer.start(); + } + + @Override + public void endSessionCommit() { + sessionCommitTimer.stop(); + } + + @Override + public long getSessionCommitNanos() { + return sessionCommitTimer.get(); + } + + private static class Timer { + private long start = -1L; + private long total; + + public void start() { + start = System.nanoTime(); + } + + public void stop() { + if (start == -1) { + return; + } + + total += (System.nanoTime() - start); + start = -1L; + } + + public long get() { + return total; + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NopPerformanceTracker.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NopPerformanceTracker.java new file mode 100644 index 0000000000..2a8ea02dc9 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/NopPerformanceTracker.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.metrics; + +public class NopPerformanceTracker implements PerformanceTracker { + @Override + public void beginContentRead() { + + } + + @Override + public void endContentRead() { + + } + + @Override + public long getContentReadNanos() { + return 0; + } + + @Override + public void beginContentWrite() { + + } + + @Override + public void endContentWrite() { + + } + + @Override + public long getContentWriteNanos() { + return 0; + } + + @Override + public void beginSessionCommit() { + + } + + @Override + public void endSessionCommit() { + + } + + @Override + public long getSessionCommitNanos() { + return 0; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/PerformanceTracker.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/PerformanceTracker.java new file mode 100644 index 0000000000..822baab41f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/PerformanceTracker.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.metrics; + +public interface PerformanceTracker { + void beginContentRead(); + + void endContentRead(); + + long getContentReadNanos(); + + void beginContentWrite(); + + void endContentWrite(); + + long getContentWriteNanos(); + + void beginSessionCommit(); + + void endSessionCommit(); + + long getSessionCommitNanos(); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/PerformanceTrackingInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/PerformanceTrackingInputStream.java new file mode 100644 index 0000000000..a1a1595004 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/PerformanceTrackingInputStream.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.metrics; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +public class PerformanceTrackingInputStream extends FilterInputStream { + private final PerformanceTracker performanceTracker; + + public PerformanceTrackingInputStream(final InputStream in, final PerformanceTracker performanceTracker) { + super(in); + this.performanceTracker = performanceTracker; + } + + @Override + public long skip(final long n) throws IOException { + performanceTracker.beginContentRead(); + try { + return super.skip(n); + } finally { + performanceTracker.endContentRead(); + } + } + + @Override + public int read() throws IOException { + performanceTracker.beginContentRead(); + try { + return super.read(); + } finally { + performanceTracker.endContentRead(); + } + } + + @Override + public int read(final byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + performanceTracker.beginContentRead(); + try { + return super.read(b, off, len); + } finally { + performanceTracker.endContentRead(); + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/PerformanceTrackingOutputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/PerformanceTrackingOutputStream.java new file mode 100644 index 0000000000..d2c08bf625 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/PerformanceTrackingOutputStream.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.metrics; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +public class PerformanceTrackingOutputStream extends FilterOutputStream { + private final PerformanceTracker performanceTracker; + + public PerformanceTrackingOutputStream(final OutputStream out, final PerformanceTracker performanceTracker) { + super(out); + this.performanceTracker = performanceTracker; + } + + @Override + public void write(final int b) throws IOException { + performanceTracker.beginContentWrite(); + try { + out.write(b); + } finally { + performanceTracker.endContentWrite(); + } + } + + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + performanceTracker.beginContentWrite(); + try { + out.write(b, off, len); + } finally { + performanceTracker.endContentWrite(); + } + } + + @Override + public void close() throws IOException { + performanceTracker.beginContentWrite(); + try { + super.close(); + } finally { + performanceTracker.endContentWrite(); + } + } + + @Override + public void flush() throws IOException { + performanceTracker.beginContentWrite(); + try { + super.flush(); + } finally { + performanceTracker.endContentWrite(); + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java index 084a86eef4..d43d5afb1c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java @@ -32,6 +32,11 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable { private long bytesRead; private long bytesWritten; private long processingNanos; + private long cpuNanos; + private long contentReadNanos; + private long contentWriteNanos; + private long sessionCommitNanos; + private long gcMillis; private long aggregateLineageMillis; private int flowFilesReceived; private long bytesReceived; @@ -120,6 +125,50 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable { return processingNanos; } + @Override + public long getCpuNanoseconds() { + return cpuNanos; + } + + public void setCpuNanoseconds(final long nanos) { + this.cpuNanos = nanos; + } + + @Override + public long getContentReadNanoseconds() { + return contentReadNanos; + } + + public void setContentReadNanoseconds(final long nanos) { + this.contentReadNanos = nanos; + } + + @Override + public long getContentWriteNanoseconds() { + return contentWriteNanos; + } + + public void setContentWriteNanoseconds(final long nanos) { + this.contentWriteNanos = nanos; + } + + @Override + public long getSessionCommitNanoseconds() { + return sessionCommitNanos; + } + + public void setSessionCommitNanos(final long nanos) { + this.sessionCommitNanos = nanos; + } + + public long getGargeCollectionMillis() { + return gcMillis; + } + + public void setGarbageCollectionMillis(final long gcMillis) { + this.gcMillis = gcMillis; + } + public void setProcessingNanos(final long processingNanos) { this.processingNanos = processingNanos; } @@ -203,6 +252,11 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable { bytesRead += event.getBytesRead(); bytesWritten += event.getBytesWritten(); processingNanos += event.getProcessingNanoseconds(); + cpuNanos += event.getCpuNanoseconds(); + contentReadNanos += event.getContentReadNanoseconds(); + contentWriteNanos += event.getContentWriteNanoseconds(); + sessionCommitNanos += event.getSessionCommitNanoseconds(); + gcMillis += event.getGargeCollectionMillis(); aggregateLineageMillis += event.getAggregateLineageMillis(); flowFilesReceived += event.getFlowFilesReceived(); bytesReceived += event.getBytesReceived(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java index f7712711a4..cb5d8dd8e0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/controller/repository/TestRingBufferEventRepository.java @@ -123,6 +123,31 @@ public class TestRingBufferEventRepository { return 234782; } + @Override + public long getCpuNanoseconds() { + return 0; + } + + @Override + public long getContentReadNanoseconds() { + return 0; + } + + @Override + public long getContentWriteNanoseconds() { + return 0; + } + + @Override + public long getSessionCommitNanoseconds() { + return 0; + } + + @Override + public long getGargeCollectionMillis() { + return 0; + } + @Override public int getInvocations() { return 1; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java index 7b131ccf2d..37440c789f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEvent.java @@ -38,6 +38,16 @@ public interface FlowFileEvent { long getProcessingNanoseconds(); + long getCpuNanoseconds(); + + long getContentReadNanoseconds(); + + long getContentWriteNanoseconds(); + + long getSessionCommitNanoseconds(); + + long getGargeCollectionMillis(); + long getAverageLineageMillis(); long getAggregateLineageMillis(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 32616e8c8e..7ea5e82455 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3084,6 +3084,15 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node } } + /** + * Returns a number between 0 (inclusive) and 100 (inclusive) that indicates the percentage of time that processors should + * track detailed performance, such as CPU seconds used and time reading from/writing to content repo, etc. + * @return the percentage of time that detailed performance metrics should be tracked. A value of 0 indicates that these metrics + * should never be tracked; a value of 100 indicates that these metrics should always be tracked. + */ + public int getPerformanceTrackingPercentage() { + return nifiProperties.getPerformanceMetricTrackingPercentage(); + } public Integer getRemoteSiteListeningPort() { return remoteInputSocketPort; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/GarbageCollectionLog.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/GarbageCollectionLog.java index 0a8fa90c3d..fa3e42a081 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/GarbageCollectionLog.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/GarbageCollectionLog.java @@ -29,4 +29,6 @@ public interface GarbageCollectionLog { Map getAverageGarbageCollectionDurations(); GarbageCollectionEvent getLongestGarbageCollectionEvent(); + + long getTotalGarbageCollectionMillis(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/RingBufferGarbageCollectionLog.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/RingBufferGarbageCollectionLog.java index 817ed6a6ad..585388108a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/RingBufferGarbageCollectionLog.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/RingBufferGarbageCollectionLog.java @@ -32,12 +32,14 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; public class RingBufferGarbageCollectionLog implements GarbageCollectionLog, NotificationListener { private static final Logger logger = LoggerFactory.getLogger(RingBufferGarbageCollectionLog.class); private final RingBuffer events; private final long minDurationThreshold; private final long jvmStartTime; + private final AtomicLong totalGcMillis = new AtomicLong(0L); // guarded by synchronizing on this private GarbageCollectionEvent maxDurationEvent; @@ -54,6 +56,11 @@ public class RingBufferGarbageCollectionLog implements GarbageCollectionLog, Not return minDurationThreshold; } + @Override + public long getTotalGarbageCollectionMillis() { + return totalGcMillis.get(); + } + @Override public List getGarbageCollectionEvents() { return events.asList(); @@ -88,6 +95,8 @@ public class RingBufferGarbageCollectionLog implements GarbageCollectionLog, Not final GarbageCollectionNotificationInfo gcNotification = GarbageCollectionNotificationInfo.from(compositeData); final GcInfo gcInfo = gcNotification.getGcInfo(); + totalGcMillis.addAndGet(gcInfo.getDuration()); + final String gcName = gcNotification.getGcName(); final String action = gcNotification.getGcAction(); final String cause = gcNotification.getGcCause(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java index 990fd12c5f..2ae985999a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryContext.java @@ -20,6 +20,7 @@ import org.apache.nifi.components.state.StateManager; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache; import org.apache.nifi.controller.repository.claim.StandardContentClaimWriteCache; +import org.apache.nifi.controller.repository.metrics.PerformanceTracker; import org.apache.nifi.provenance.ProvenanceEventRepository; import java.util.concurrent.atomic.AtomicLong; @@ -33,7 +34,7 @@ public class StandardRepositoryContext extends AbstractRepositoryContext impleme } @Override - public ContentClaimWriteCache createContentClaimWriteCache() { - return new StandardContentClaimWriteCache(getContentRepository()); + public ContentClaimWriteCache createContentClaimWriteCache(final PerformanceTracker performanceTracker) { + return new StandardContentClaimWriteCache(getContentRepository(), performanceTracker); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java index 8e8e7c671e..b4d6d72098 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java @@ -17,6 +17,10 @@ package org.apache.nifi.controller.repository.claim; +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.metrics.PerformanceTracker; +import org.apache.nifi.controller.repository.metrics.PerformanceTrackingOutputStream; + import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -25,20 +29,20 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; -import org.apache.nifi.controller.repository.ContentRepository; - public class StandardContentClaimWriteCache implements ContentClaimWriteCache { private final ContentRepository contentRepo; private final Map streamMap = new ConcurrentHashMap<>(); private final Queue queue = new LinkedList<>(); + private final PerformanceTracker performanceTracker; private final int bufferSize; - public StandardContentClaimWriteCache(final ContentRepository contentRepo) { - this(contentRepo, 8192); + public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker) { + this(contentRepo, performanceTracker, 8192); } - public StandardContentClaimWriteCache(final ContentRepository contentRepo, final int bufferSize) { + public StandardContentClaimWriteCache(final ContentRepository contentRepo, final PerformanceTracker performanceTracker, final int bufferSize) { this.contentRepo = contentRepo; + this.performanceTracker = performanceTracker; this.bufferSize = bufferSize; } @@ -67,7 +71,8 @@ public class StandardContentClaimWriteCache implements ContentClaimWriteCache { private OutputStream registerStream(final ContentClaim contentClaim) throws IOException { final OutputStream out = contentRepo.write(contentClaim); - final OutputStream buffered = new BufferedOutputStream(out, bufferSize); + final OutputStream performanceTrackingOut = new PerformanceTrackingOutputStream(out, performanceTracker); + final OutputStream buffered = new BufferedOutputStream(performanceTrackingOut, bufferSize); streamMap.put(contentClaim.getResourceClaim(), buffered); return buffered; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 6751663ea6..15ad015e4c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -32,6 +32,7 @@ import org.apache.nifi.controller.repository.RepositoryContext; import org.apache.nifi.controller.repository.StandardProcessSession; import org.apache.nifi.controller.repository.StandardProcessSessionFactory; import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory; +import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker; import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext; import org.apache.nifi.controller.service.ControllerServiceProvider; @@ -224,12 +225,12 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { final StandardProcessSession rawSession; final boolean batch; if (procNode.isSessionBatchingSupported() && runNanos > 0L) { - rawSession = new StandardProcessSession(context, scheduleState::isTerminated); + rawSession = new StandardProcessSession(context, scheduleState::isTerminated, new NopPerformanceTracker()); sessionFactory = new BatchingSessionFactory(rawSession); batch = true; } else { rawSession = null; - sessionFactory = new StandardProcessSessionFactory(context, scheduleState::isTerminated); + sessionFactory = new StandardProcessSessionFactory(context, scheduleState::isTerminated, new NopPerformanceTracker()); batch = false; } @@ -292,7 +293,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { onEvent(procNode); } } else { - final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context, scheduleState::isTerminated); + final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context, scheduleState::isTerminated, new NopPerformanceTracker()); final ActiveProcessSessionFactory activeSessionFactory = new WeakHashMapProcessSessionFactory(sessionFactory); final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); trigger(connectable, scheduleState, connectableProcessContext, activeSessionFactory); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java index be1b1ebeb8..573536f834 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java @@ -31,6 +31,9 @@ import org.apache.nifi.controller.repository.RepositoryContext; import org.apache.nifi.controller.repository.StandardProcessSession; import org.apache.nifi.controller.repository.StandardProcessSessionFactory; import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory; +import org.apache.nifi.controller.repository.metrics.NanoTimePerformanceTracker; +import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker; +import org.apache.nifi.controller.repository.metrics.PerformanceTracker; import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext; import org.apache.nifi.controller.scheduling.LifecycleState; @@ -50,6 +53,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -68,6 +73,10 @@ public class ConnectableTask { private final ProcessContext processContext; private final FlowController flowController; private final int numRelationships; + private final ThreadMXBean threadMXBean; + private final AtomicLong invocations = new AtomicLong(0L); + private volatile SampledMetrics sampledMetrics = new SampledMetrics(); + private final int perfTrackingNthIteration; public ConnectableTask(final SchedulingAgent schedulingAgent, final Connectable connectable, final FlowController flowController, final RepositoryContextFactory contextFactory, final LifecycleState scheduleState) { @@ -77,6 +86,7 @@ public class ConnectableTask { this.scheduleState = scheduleState; this.numRelationships = connectable.getRelationships().size(); this.flowController = flowController; + this.threadMXBean = ManagementFactory.getThreadMXBean(); final PropertyEncryptor encryptor = flowController.getEncryptor(); @@ -89,6 +99,13 @@ public class ConnectableTask { } repositoryContext = contextFactory.newProcessContext(connectable, new AtomicLong(0L)); + + final int perfTrackingPercentage = flowController.getPerformanceTrackingPercentage(); + if (perfTrackingPercentage == 0) { + perfTrackingNthIteration = 0; + } else { + perfTrackingNthIteration = 100 / perfTrackingPercentage; + } } public Connectable getConnectable() { @@ -182,18 +199,33 @@ public class ConnectableTask { } logger.debug("Triggering {}", connectable); + final long totalInvocationCount = invocations.getAndIncrement(); + + final boolean measureExpensiveMetrics = isMeasureExpensiveMetrics(totalInvocationCount); + final boolean measureCpuTime = measureExpensiveMetrics && threadMXBean.isCurrentThreadCpuTimeSupported(); + final long startCpuTime; + final long startGcMillis; + if (measureCpuTime) { + startCpuTime = threadMXBean.getCurrentThreadCpuTime(); + startGcMillis = flowController.getGarbageCollectionLog().getTotalGarbageCollectionMillis(); + } else { + startCpuTime = 0L; + startGcMillis = 0L; + } + + final PerformanceTracker performanceTracker = measureExpensiveMetrics ? new NanoTimePerformanceTracker() : new NopPerformanceTracker(); final long batchNanos = connectable.getRunDuration(TimeUnit.NANOSECONDS); final ProcessSessionFactory sessionFactory; final StandardProcessSession rawSession; final boolean batch; if (connectable.isSessionBatchingSupported() && batchNanos > 0L) { - rawSession = new StandardProcessSession(repositoryContext, scheduleState::isTerminated); + rawSession = new StandardProcessSession(repositoryContext, scheduleState::isTerminated, performanceTracker); sessionFactory = new BatchingSessionFactory(rawSession); batch = true; } else { rawSession = null; - sessionFactory = new StandardProcessSessionFactory(repositoryContext, scheduleState::isTerminated); + sessionFactory = new StandardProcessSessionFactory(repositoryContext, scheduleState::isTerminated, performanceTracker); batch = false; } @@ -268,13 +300,8 @@ public class ConnectableTask { } } - final long processingNanos = System.nanoTime() - startNanos; - try { - final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(); - procEvent.setProcessingNanos(processingNanos); - procEvent.setInvocations(invocationCount); - repositoryContext.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier()); + updateEventRepo(startNanos, startCpuTime, startGcMillis, invocationCount, measureCpuTime, performanceTracker); } catch (final IOException e) { logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable.getRunnableComponent(), e.toString()); logger.error("", e); @@ -288,7 +315,128 @@ public class ConnectableTask { return InvocationResult.DO_NOT_YIELD; } + private void updateEventRepo(final long startNanoTime, final long startCpuTime, final long startGcMillis, final int invocationCount, final boolean measureCpuTime, + final PerformanceTracker performanceTracker) + throws IOException { + final long processingNanos = System.nanoTime() - startNanoTime; + final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(); + flowFileEvent.setProcessingNanos(processingNanos); + flowFileEvent.setInvocations(invocationCount); + + // We won't always measure CPU time because it's expensive to calculate. So when we do measure it, we keep track of + // total CPU nanos measured as well as total processing time for those iterations. This gives us a ratio of CPU time vs. total time. + // We can then use that to extrapolate an approximate CPU Time. + if (measureCpuTime) { + updatePerformanceTrackingMetrics(flowFileEvent, performanceTracker, startCpuTime, startGcMillis, processingNanos); + } else { + estimatePerformanceTrackingMetrics(flowFileEvent, processingNanos); + } + + repositoryContext.getFlowFileEventRepository().updateRepository(flowFileEvent, connectable.getIdentifier()); + } + + private void estimatePerformanceTrackingMetrics(final StandardFlowFileEvent flowFileEvent, final long processingNanos) { + // Use ratio of measured CPU time vs. total time for that those iterations, to estimate the CPU time for this iteration. + final SampledMetrics currentMetrics = sampledMetrics; + final double processingRatio = (double) processingNanos / (double) Math.max(1, currentMetrics.getProcessingNanosSampled()); + flowFileEvent.setCpuNanoseconds((long) (processingRatio * currentMetrics.getTotalCpuNanos())); + flowFileEvent.setContentReadNanoseconds((long) (processingRatio * currentMetrics.getReadNanos())); + flowFileEvent.setContentWriteNanoseconds((long) (processingRatio * currentMetrics.getWriteNanos())); + flowFileEvent.setSessionCommitNanos((long) (processingRatio * currentMetrics.getSessionCommitNanos())); + flowFileEvent.setGarbageCollectionMillis((long) (processingRatio * currentMetrics.getGcMillis())); + } + + private void updatePerformanceTrackingMetrics(final StandardFlowFileEvent flowFileEvent, final PerformanceTracker performanceTracker, final long startCpuTime, + final long startGcMillis, final long processingNanos) { + final long cpuTime = threadMXBean.getCurrentThreadCpuTime(); + final long cpuNanos = cpuTime - startCpuTime; + + final long endGcMillis = flowController.getGarbageCollectionLog().getTotalGarbageCollectionMillis(); + final long gcMillis = endGcMillis - startGcMillis; + + flowFileEvent.setCpuNanoseconds(cpuNanos); + flowFileEvent.setContentWriteNanoseconds(performanceTracker.getContentWriteNanos()); + flowFileEvent.setContentReadNanoseconds(performanceTracker.getContentReadNanos()); + flowFileEvent.setSessionCommitNanos(performanceTracker.getSessionCommitNanos()); + flowFileEvent.setGarbageCollectionMillis(gcMillis); + + final SampledMetrics previousMetrics = sampledMetrics; + final SampledMetrics updatedMetrics = new SampledMetrics(); + updatedMetrics.setProcessingNanosSampled(previousMetrics.getProcessingNanosSampled() + processingNanos); + updatedMetrics.setTotalCpuNanos(previousMetrics.getTotalCpuNanos() + cpuNanos); + updatedMetrics.setReadNanos(previousMetrics.getReadNanos() + performanceTracker.getContentReadNanos()); + updatedMetrics.setWriteNanos(previousMetrics.getWriteNanos() + performanceTracker.getContentWriteNanos()); + updatedMetrics.setSessionCommitNanos(previousMetrics.getSessionCommitNanos() + performanceTracker.getSessionCommitNanos()); + updatedMetrics.setGcMillis(gcMillis); + this.sampledMetrics = updatedMetrics; + } + + private boolean isMeasureExpensiveMetrics(final long invocationCount) { + if (perfTrackingNthIteration == 0) { // A value of 0 indicates we should never track performance metrics. + return false; + } + + return invocationCount % perfTrackingNthIteration == 0; + } + private ComponentLog getComponentLog() { return new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent()); } + + private static class SampledMetrics { + private long processingNanosSampled = 0L; + private long totalCpuNanos = 0L; + private long readNanos = 0L; + private long writeNanos = 0L; + private long sessionCommitNanos = 0L; + private long gcMillis = 0L; + + public long getProcessingNanosSampled() { + return processingNanosSampled; + } + + public void setProcessingNanosSampled(final long processingNanosSampled) { + this.processingNanosSampled = processingNanosSampled; + } + + public long getTotalCpuNanos() { + return totalCpuNanos; + } + + public void setTotalCpuNanos(final long totalCpuNanos) { + this.totalCpuNanos = totalCpuNanos; + } + + public long getReadNanos() { + return readNanos; + } + + public void setReadNanos(final long readNanos) { + this.readNanos = readNanos; + } + + public long getWriteNanos() { + return writeNanos; + } + + public void setWriteNanos(final long writeNanos) { + this.writeNanos = writeNanos; + } + + public long getSessionCommitNanos() { + return sessionCommitNanos; + } + + public void setSessionCommitNanos(final long sessionCommitNanos) { + this.sessionCommitNanos = sessionCommitNanos; + } + + public long getGcMillis() { + return gcMillis; + } + + public void setGcMillis(final long gcMillis) { + this.gcMillis = gcMillis; + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java index 35ab15e1e7..ee9f550efe 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java @@ -25,6 +25,7 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.repository.RepositoryContext; import org.apache.nifi.controller.repository.StandardProcessSession; import org.apache.nifi.controller.repository.StandardProcessSessionFactory; +import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker; import org.apache.nifi.controller.scheduling.RepositoryContextFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; @@ -61,7 +62,7 @@ public class ExpireFlowFiles implements Runnable { private StandardProcessSession createSession(final Connectable connectable) { final RepositoryContext context = contextFactory.newProcessContext(connectable, new AtomicLong(0L)); - final StandardProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context, () -> false); + final StandardProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context, () -> false, new NopPerformanceTracker()); return sessionFactory.createSession(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/BootstrapDiagnosticsFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/BootstrapDiagnosticsFactory.java index 759512bdcc..ce6c8f9424 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/BootstrapDiagnosticsFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/BootstrapDiagnosticsFactory.java @@ -22,6 +22,7 @@ import org.apache.nifi.diagnostics.DiagnosticsDump; import org.apache.nifi.diagnostics.DiagnosticsDumpElement; import org.apache.nifi.diagnostics.DiagnosticsFactory; import org.apache.nifi.diagnostics.StandardDiagnosticsDump; +import org.apache.nifi.diagnostics.ThreadDumpTask; import org.apache.nifi.diagnostics.bootstrap.tasks.ClusterDiagnosticTask; import org.apache.nifi.diagnostics.bootstrap.tasks.ComponentCountTask; import org.apache.nifi.diagnostics.bootstrap.tasks.ContentRepositoryScanTask; @@ -35,8 +36,8 @@ import org.apache.nifi.diagnostics.bootstrap.tasks.MemoryPoolPeakUsageTask; import org.apache.nifi.diagnostics.bootstrap.tasks.NarsDiagnosticTask; import org.apache.nifi.diagnostics.bootstrap.tasks.NiFiPropertiesDiagnosticTask; import org.apache.nifi.diagnostics.bootstrap.tasks.OperatingSystemDiagnosticTask; +import org.apache.nifi.diagnostics.bootstrap.tasks.ProcessorTimingDiagnosticTask; import org.apache.nifi.diagnostics.bootstrap.tasks.RepositoryDiagnosticTask; -import org.apache.nifi.diagnostics.ThreadDumpTask; import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,6 +75,7 @@ public class BootstrapDiagnosticsFactory implements DiagnosticsFactory { tasks.add(new OperatingSystemDiagnosticTask()); tasks.add(new NarsDiagnosticTask(flowController.getExtensionManager())); tasks.add(new FlowConfigurationDiagnosticTask(flowController)); + tasks.add(new ProcessorTimingDiagnosticTask(flowController.getFlowFileEventRepository(), flowController.getFlowManager())); tasks.add(new LongRunningProcessorTask(flowController)); tasks.add(new ClusterDiagnosticTask(flowController)); tasks.add(new GarbageCollectionDiagnosticTask(flowController)); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/DataValveDiagnosticsTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/DataValveDiagnosticsTask.java index 09124cde1a..1db318c5b4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/DataValveDiagnosticsTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/DataValveDiagnosticsTask.java @@ -37,6 +37,12 @@ public class DataValveDiagnosticsTask implements DiagnosticTask { @Override public DiagnosticsDumpElement captureDump(final boolean verbose) { + if (!verbose) { + // This task is very verbose and can make the diagnostics output difficult to read as a result. + // As such, it will not run at all if verbose output is disabled. + return null; + } + final ProcessGroup rootGroup = flowManager.getRootGroup(); final List allGroups = rootGroup.findAllProcessGroups(); allGroups.add(rootGroup); @@ -47,18 +53,18 @@ public class DataValveDiagnosticsTask implements DiagnosticTask { details.add("Process Group " + group.getIdentifier() + ", Name = " + group.getName()); details.add("Currently Have Data Flowing In: " + valveDiagnostics.getGroupsWithDataFlowingIn()); - details.add("Currently Have Data Flowing out: " + valveDiagnostics.getGroupsWithDataFlowingOut()); + details.add("Currently Have Data Flowing Out: " + valveDiagnostics.getGroupsWithDataFlowingOut()); details.add("Reason for Not allowing data to flow in:"); for (final Map.Entry> entry : valveDiagnostics.getReasonForInputNotAllowed().entrySet()) { details.add(" " + entry.getKey() + ":"); - entry.getValue().forEach(gr -> details.add(" " + gr)); + entry.getValue().forEach(pg -> details.add(" " + pg)); } details.add("Reason for Not allowing data to flow out:"); for (final Map.Entry> entry : valveDiagnostics.getReasonForOutputNotAllowed().entrySet()) { details.add(" " + entry.getKey() + ":"); - entry.getValue().forEach(gr -> details.add(" " + gr)); + entry.getValue().forEach(pg -> details.add(" " + pg)); } details.add(""); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ProcessorTimingDiagnosticTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ProcessorTimingDiagnosticTask.java new file mode 100644 index 0000000000..c319fb7247 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/diagnostics/bootstrap/tasks/ProcessorTimingDiagnosticTask.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.diagnostics.bootstrap.tasks; + +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.repository.FlowFileEvent; +import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.controller.repository.RepositoryStatusReport; +import org.apache.nifi.diagnostics.DiagnosticTask; +import org.apache.nifi.diagnostics.DiagnosticsDumpElement; +import org.apache.nifi.diagnostics.StandardDiagnosticsDumpElement; +import org.apache.nifi.processor.DataUnit; + +import java.text.DecimalFormat; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +public class ProcessorTimingDiagnosticTask implements DiagnosticTask { + private final FlowFileEventRepository eventRepo; + private final FlowManager flowManager; + + // | Proc ID | Proc Name | Proc Type | Group Name | Proc Secs | CPU Secs | %CPU used by Proc | + private static final String PROCESSOR_TIMING_FORMAT = "| %1$-36.36s | %2$-36.36s | %3$-36.36s | %4$-36.36s | %5$15.15s | %6$27.27s | %7$25.25s | " + + // Read Secs | Write Secs| Commit Sec | GC millis | MB Read | MB Write | + "%8$16.16s | %9$16.16s | %10$20.20s | %11$13.13s | %12$11.11s | %13$11.11s |"; + + public ProcessorTimingDiagnosticTask(final FlowFileEventRepository flowFileEventRepository, final FlowManager flowManager) { + this.eventRepo = flowFileEventRepository; + this.flowManager = flowManager; + } + + @Override + public DiagnosticsDumpElement captureDump(final boolean verbose) { + final List details = new ArrayList<>(); + + final RepositoryStatusReport statusReport = eventRepo.reportTransferEvents(System.currentTimeMillis()); + final Map eventsByComponentId = statusReport.getReportEntries(); + + final List timings = new ArrayList<>(); + eventsByComponentId.entrySet().stream() + .map(entry -> getTiming(entry.getKey(), entry.getValue())) + .filter(Objects::nonNull) + .forEach(timings::add); // create ArrayList and add here instead of .collect(toList()) because arraylist allows us to sort + + // Sort based on the Processor CPU time, highest CPU usage first + timings.sort(Comparator.comparing(ProcessorTiming::getCpuNanos).reversed()); + + final DecimalFormat dataSizeFormat = new DecimalFormat("#,###,###.##"); + final DecimalFormat percentageFormat = new DecimalFormat("##.##"); + final NumberFormat secondsFormat = NumberFormat.getInstance(); + + long totalCpuNanos = 0L; + long totalProcNanos = 0L; + long totalReadNanos = 0L; + long totalWriteNanos = 0L; + long totalSessionCommitNanos = 0L; + long totalBytesRead = 0L; + long totalBytesWritten = 0L; + long totalGcNanos = 0L; + + // Tally totals for all timing elements + for (final ProcessorTiming timing : timings) { + totalCpuNanos += timing.getCpuNanos(); + totalProcNanos += timing.getProcessingNanos(); + totalReadNanos += timing.getReadNanos(); + totalWriteNanos += timing.getWriteNanos(); + totalSessionCommitNanos += timing.getSessionCommitNanos(); + totalBytesRead += timing.getBytesRead(); + totalBytesWritten += timing.getBytesWritten(); + totalGcNanos += timing.getGarbageCollectionNanos(); + } + + if (totalCpuNanos < 1) { + details.add("No Processor Timing Diagnostic information has been gathered."); + return new StandardDiagnosticsDumpElement("Processor Timing Diagnostics (Stats over last 5 minutes)", details); + } + + details.add(String.format(PROCESSOR_TIMING_FORMAT, "Processor ID", "Processor Name", "Processor Type", "Process Group Name", "Processing Secs", + "CPU Secs (% time using CPU)", "Pct CPU Time Used by Proc", "Disk Read Secs", "Disk Write Secs", "Session Commit Secs", "GC Millis", "MB Read", "MB Written")); + + for (final ProcessorTiming timing : timings) { + final long procNanos = timing.getProcessingNanos(); + if (procNanos < 1) { + continue; + } + + final String cpuTime = nanosToPercentTime(timing.getCpuNanos(), procNanos, secondsFormat); + final String cpuPct = percentageFormat.format(timing.getCpuNanos() * 100 / totalCpuNanos); + final String readTime = nanosToPercentTime(timing.getReadNanos(), procNanos, secondsFormat); + final String writeTime = nanosToPercentTime(timing.getWriteNanos(), procNanos, secondsFormat); + final String commitTime = nanosToPercentTime(timing.getSessionCommitNanos(), procNanos, secondsFormat); + final String gcTime = nanosToPercentTime(timing.getGarbageCollectionNanos(), procNanos, secondsFormat); + + final String formatted = String.format(PROCESSOR_TIMING_FORMAT, timing.getId(), timing.getName(), timing.getType(), timing.getGroupName(), + secondsFormat.format(TimeUnit.NANOSECONDS.toSeconds(timing.getProcessingNanos())), + cpuTime, + cpuPct, + readTime, + writeTime, + commitTime, + gcTime, + dataSizeFormat.format(DataUnit.B.toMB(timing.getBytesRead())), + dataSizeFormat.format(DataUnit.B.toMB(timing.getBytesWritten()))); + details.add(formatted); + } + + final String formatted = String.format(PROCESSOR_TIMING_FORMAT, "Total", "--", "--", "--", + secondsFormat.format(TimeUnit.NANOSECONDS.toSeconds(totalProcNanos)), + nanosToPercentTime(totalCpuNanos, totalProcNanos, secondsFormat), + "100.00", // Always represents 100% of CPU time used + nanosToPercentTime(totalReadNanos, totalProcNanos, secondsFormat), + nanosToPercentTime(totalWriteNanos, totalProcNanos, secondsFormat), + nanosToPercentTime(totalSessionCommitNanos, totalProcNanos, secondsFormat), + nanosToPercentTime(totalGcNanos, totalProcNanos, secondsFormat), + dataSizeFormat.format(DataUnit.B.toMB(totalBytesRead)), + dataSizeFormat.format(DataUnit.B.toMB(totalBytesWritten))); + details.add(formatted); + + final double mbReadPerSecond = DataUnit.B.toMB(totalBytesRead) / 300; + final double mbWrittenPerSecond = DataUnit.B.toMB(totalBytesWritten) / 300; + details.add(""); + details.add("Average MB/sec read from Content Repo in last 5 mins: " + dataSizeFormat.format(mbReadPerSecond)); + details.add("Average MB/sec written to Content Repo in last 5 mins: " + dataSizeFormat.format(mbWrittenPerSecond)); + + return new StandardDiagnosticsDumpElement("Processor Timing Diagnostics (Stats over last 5 minutes)", details); + } + + private String nanosToPercentTime(final long nanos, final long processingNanos, final NumberFormat secondsFormat) { + return secondsFormat.format(TimeUnit.NANOSECONDS.toSeconds(nanos)) + " (" + Math.min(100, ((int) (nanos * 100 / processingNanos))) + "%)"; + } + + private ProcessorTiming getTiming(final String processorId, final FlowFileEvent flowFileEvent) { + final ProcessorNode processorNode = flowManager.getProcessorNode(processorId); + + // Processor may be null because the event may not be for a processor, or the processor may already have been removed. + if (processorNode == null) { + return null; + } + + final ProcessorTiming timing = new ProcessorTiming(processorNode, flowFileEvent); + return timing; + } + + private static class ProcessorTiming { + private final String id; + private final String name; + private final String type; + private final String groupName; + private final FlowFileEvent flowFileEvent; + + public ProcessorTiming(final ProcessorNode processor, final FlowFileEvent flowFileEvent) { + this.id = processor.getIdentifier(); + this.name = processor.getName(); + this.type = processor.getComponentType(); + this.groupName = processor.getProcessGroup().getName(); + this.flowFileEvent = flowFileEvent; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public String getGroupName() { + return groupName; + } + + public long getProcessingNanos() { + return flowFileEvent.getProcessingNanoseconds(); + } + + public long getCpuNanos() { + return flowFileEvent.getCpuNanoseconds(); + } + + public long getReadNanos() { + return flowFileEvent.getContentReadNanoseconds(); + } + + public long getWriteNanos() { + return flowFileEvent.getContentWriteNanoseconds(); + } + + public long getSessionCommitNanos() { + return flowFileEvent.getSessionCommitNanoseconds(); + } + + public long getBytesRead() { + return flowFileEvent.getBytesRead(); + } + + public long getBytesWritten() { + return flowFileEvent.getBytesWritten(); + } + + public long getGarbageCollectionNanos() { + return TimeUnit.MILLISECONDS.toNanos(flowFileEvent.getGargeCollectionMillis()); + } + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java index 4e300d16de..81bbc7f003 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java @@ -35,6 +35,7 @@ import org.apache.nifi.controller.repository.claim.ResourceClaim; import org.apache.nifi.controller.repository.claim.ResourceClaimManager; import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; +import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker; import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository; import org.apache.nifi.events.EventReporter; import org.apache.nifi.flowfile.FlowFile; @@ -221,7 +222,7 @@ public class StandardProcessSessionIT { stateManager.setIgnoreAnnotations(true); context = new StandardRepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo, stateManager); - session = new StandardProcessSession(context, () -> false); + session = new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); } private Connection createConnection() { @@ -340,7 +341,7 @@ public class StandardProcessSessionIT { children.add(child); } - final ProcessSession secondSession = new StandardProcessSession(context, () -> false); + final ProcessSession secondSession = new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); try { session.migrate(secondSession, children); Assert.fail("Expected a FlowFileHandlingException to be thrown because a child FlowFile was migrated while its parent was not"); @@ -383,7 +384,7 @@ public class StandardProcessSessionIT { FlowFile flowFile = session.get(); assertNotNull(flowFile); - final ProcessSession secondSession = new StandardProcessSession(context, () -> false); + final ProcessSession secondSession = new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); FlowFile clone = session.clone(flowFile); session.migrate(secondSession, Collections.singletonList(clone)); @@ -1743,7 +1744,7 @@ public class StandardProcessSessionIT { session.transfer(ffb, relationship); session.commit(); - final ProcessSession newSession = new StandardProcessSession(context, () -> false); + final ProcessSession newSession = new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); FlowFile toUpdate = newSession.get(); newSession.append(toUpdate, out -> out.write('C')); @@ -1816,7 +1817,7 @@ public class StandardProcessSessionIT { StandardProcessSession[] standardProcessSessions = new StandardProcessSession[100000]; for (int i = 0; i < 70000; i++) { - standardProcessSessions[i] = new StandardProcessSession(context, () -> false); + standardProcessSessions[i] = new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); FlowFile flowFile = standardProcessSessions[i].create(); final byte[] buff = new byte["Hello".getBytes().length]; @@ -2427,7 +2428,7 @@ public class StandardProcessSessionIT { flowFile = session.append(flowFile, out -> out.write("1".getBytes())); flowFile = session.append(flowFile, out -> out.write("2".getBytes())); - final StandardProcessSession newSession = new StandardProcessSession(context, () -> false); + final StandardProcessSession newSession = new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); assertTrue(session.isFlowFileKnown(flowFile)); assertFalse(newSession.isFlowFileKnown(flowFile)); @@ -2459,7 +2460,7 @@ public class StandardProcessSessionIT { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, out -> out.write("Hello".getBytes(StandardCharsets.UTF_8))); - final StandardProcessSession newSession = new StandardProcessSession(context, () -> false); + final StandardProcessSession newSession = new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); when(connectable.getConnections(any(Relationship.class))).thenReturn(Collections.emptySet()); when(connectable.isAutoTerminated(any(Relationship.class))).thenReturn(true); @@ -3241,7 +3242,7 @@ public class StandardProcessSessionIT { counterRepository, provenanceRepo, stateManager); - return new StandardProcessSession(context, () -> false); + return new StandardProcessSession(context, () -> false, new NopPerformanceTracker()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java index 0d2dd6e861..e3e75c3731 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java @@ -17,16 +17,10 @@ package org.apache.nifi.controller.repository.claim; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import org.apache.nifi.controller.repository.FileSystemRepository; import org.apache.nifi.controller.repository.StandardContentRepositoryContext; import org.apache.nifi.controller.repository.TestFileSystemRepository; +import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker; import org.apache.nifi.controller.repository.util.DiskUtils; import org.apache.nifi.events.EventReporter; import org.apache.nifi.stream.io.StreamUtils; @@ -36,6 +30,14 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + public class TestContentClaimWriteCache { private FileSystemRepository repository = null; @@ -62,7 +64,7 @@ public class TestContentClaimWriteCache { @Test public void testFlushWriteCorrectData() throws IOException { - final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, 4); + final ContentClaimWriteCache cache = new StandardContentClaimWriteCache(repository, new NopPerformanceTracker(), 4); final ContentClaim claim1 = cache.getContentClaim(); assertNotNull(claim1); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestContentClaimInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestContentClaimInputStream.java index 4cba0b33d6..e7d7f559b4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestContentClaimInputStream.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/io/TestContentClaimInputStream.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository.io; import org.apache.nifi.controller.repository.ContentRepository; import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker; import org.apache.nifi.stream.io.StreamUtils; import org.junit.Before; import org.junit.Test; @@ -56,7 +57,7 @@ public class TestContentClaimInputStream { @Test public void testStreamCreatedFromRepository() throws IOException { - final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L); + final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L, new NopPerformanceTracker()); final byte[] buff = new byte[5]; StreamUtils.fillBuffer(in, buff); @@ -78,7 +79,7 @@ public class TestContentClaimInputStream { @Test public void testThatContentIsSkipped() throws IOException { - final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 3L); + final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 3L, new NopPerformanceTracker()); final byte[] buff = new byte[2]; StreamUtils.fillBuffer(in, buff); @@ -100,7 +101,7 @@ public class TestContentClaimInputStream { @Test public void testRereadEntireClaim() throws IOException { - final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L); + final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L, new NopPerformanceTracker()); final byte[] buff = new byte[5]; @@ -131,7 +132,7 @@ public class TestContentClaimInputStream { @Test public void testMultipleResetCallsAfterMark() throws IOException { - final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L); + final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 0L, new NopPerformanceTracker()); final byte[] buff = new byte[5]; @@ -162,7 +163,7 @@ public class TestContentClaimInputStream { @Test public void testRereadWithOffset() throws IOException { - final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 3L); + final ContentClaimInputStream in = new ContentClaimInputStream(repo, contentClaim, 3L, new NopPerformanceTracker()); final byte[] buff = new byte[2]; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml index de5a0b5a71..0029695116 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml @@ -254,6 +254,8 @@ 1 min 5 mins + + 0 diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties index 87a3338364..7b66af721b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties @@ -343,6 +343,14 @@ nifi.diagnostics.on.shutdown.max.filecount=10 # The diagnostics folder's maximum permitted size in bytes. If the limit is exceeded, the oldest files are deleted. nifi.diagnostics.on.shutdown.max.directory.size=10 MB +# Performance tracking properties +## Specifies what percentage of the time we should track the amount of time processors are using CPU, reading from/writing to content repo, etc. +## This can be useful to understand which components are the most expensive and to understand where system bottlenecks may be occurring. +## The value must be in the range of 0 (inclusive) to 100 (inclusive). A larger value will produce more accurate results, while a smaller value may be +## less expensive to compute. +## Results can be obtained by running "nifi.sh diagnostics " and then inspecting the produced file. +nifi.performance.tracking.percentage=${nifi.performance.tracking.percentage} + # NAR Provider Properties # # These properties allow configuring one or more NAR providers. A NAR provider retrieves NARs from an external source # and copies them to the directory specified by nifi.nar.library.autoload.directory. diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java index c335ae7a3a..f1cd88e1ab 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java @@ -41,6 +41,7 @@ import org.apache.nifi.controller.repository.CounterRepository; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.RepositoryContext; import org.apache.nifi.controller.repository.StandardProcessSessionFactory; +import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; @@ -603,7 +604,7 @@ public class StandardStatelessFlow implements StatelessDataflow { } final RepositoryContext repositoryContext = repositoryContextFactory.createRepositoryContext(inputPort); - final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(repositoryContext, () -> false); + final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(repositoryContext, () -> false, new NopPerformanceTracker()); final ProcessSession session = sessionFactory.createSession(); try { // Get one of the outgoing connections for the Input Port so that we can return QueueSize for it. diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java index bb37eff7a7..841c029ddf 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessRepositoryContext.java @@ -26,6 +26,7 @@ import org.apache.nifi.controller.repository.FlowFileEventRepository; import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.RepositoryContext; import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache; +import org.apache.nifi.controller.repository.metrics.PerformanceTracker; import org.apache.nifi.provenance.ProvenanceEventRepository; import java.util.concurrent.atomic.AtomicLong; @@ -41,7 +42,7 @@ public class StatelessRepositoryContext extends AbstractRepositoryContext implem } @Override - public ContentClaimWriteCache createContentClaimWriteCache() { + public ContentClaimWriteCache createContentClaimWriteCache(final PerformanceTracker performanceTracker) { return writeCache; } } diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java index 1002ab75dd..b30390c0d8 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/session/StatelessProcessSession.java @@ -22,6 +22,7 @@ import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.repository.FlowFileRecord; import org.apache.nifi.controller.repository.StandardProcessSession; +import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker; import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessContext; @@ -55,7 +56,7 @@ public class StatelessProcessSession extends StandardProcessSession { public StatelessProcessSession(final Connectable connectable, final RepositoryContextFactory repositoryContextFactory, final ProcessContextFactory processContextFactory, final ExecutionProgress progress, final boolean requireSynchronousCommits, final AsynchronousCommitTracker tracker) { - super(repositoryContextFactory.createRepositoryContext(connectable), progress::isCanceled); + super(repositoryContextFactory.createRepositoryContext(connectable), progress::isCanceled, new NopPerformanceTracker()); this.connectable = connectable; this.repositoryContextFactory = repositoryContextFactory; this.processContextFactory = processContextFactory;