NIFI-7429 Adding status history for system level metrics

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4420.
This commit is contained in:
Bence Simon 2020-07-22 13:40:51 +02:00 committed by Pierre Villard
parent 73b7ff8fd4
commit 0dff3bc065
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
20 changed files with 1173 additions and 29 deletions

View File

@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status;
import java.util.ArrayList;
import java.util.List;
/**
* The status of a NiFi node.
*/
public class NodeStatus implements Cloneable {
private long createdAtInMs;
private long freeHeap;
private long usedHeap;
private long heapUtilization;
private long freeNonHeap;
private long usedNonHeap;
private long openFileHandlers;
private double processorLoadAverage;
private long totalThreads;
private long eventDrivenThreads;
private long timerDrivenThreads;
private long flowFileRepositoryFreeSpace;
private long flowFileRepositoryUsedSpace;
private List<StorageStatus> contentRepositories = new ArrayList<>();
private List<StorageStatus> provenanceRepositories = new ArrayList<>();
public long getCreatedAtInMs() {
return createdAtInMs;
}
public void setCreatedAtInMs(final long createdAtInMs) {
this.createdAtInMs = createdAtInMs;
}
public long getFreeHeap() {
return freeHeap;
}
public void setFreeHeap(final long freeHeap) {
this.freeHeap = freeHeap;
}
public long getUsedHeap() {
return usedHeap;
}
public void setUsedHeap(final long usedHeap) {
this.usedHeap = usedHeap;
}
public long getHeapUtilization() {
return heapUtilization;
}
public void setHeapUtilization(final long heapUtilization) {
this.heapUtilization = heapUtilization;
}
public long getFreeNonHeap() {
return freeNonHeap;
}
public void setFreeNonHeap(final long freeNonHeap) {
this.freeNonHeap = freeNonHeap;
}
public long getUsedNonHeap() {
return usedNonHeap;
}
public void setUsedNonHeap(final long usedNonHeap) {
this.usedNonHeap = usedNonHeap;
}
public long getOpenFileHandlers() {
return openFileHandlers;
}
public void setOpenFileHandlers(final long openFileHandlers) {
this.openFileHandlers = openFileHandlers;
}
public double getProcessorLoadAverage() {
return processorLoadAverage;
}
public void setProcessorLoadAverage(final double processorLoadAverage) {
this.processorLoadAverage = processorLoadAverage;
}
public long getTotalThreads() {
return totalThreads;
}
public void setTotalThreads(final long totalThreads) {
this.totalThreads = totalThreads;
}
public long getEventDrivenThreads() {
return eventDrivenThreads;
}
public void setEventDrivenThreads(final long eventDrivenThreads) {
this.eventDrivenThreads = eventDrivenThreads;
}
public long getTimerDrivenThreads() {
return timerDrivenThreads;
}
public void setTimerDrivenThreads(final long timerDrivenThreads) {
this.timerDrivenThreads = timerDrivenThreads;
}
public long getFlowFileRepositoryFreeSpace() {
return flowFileRepositoryFreeSpace;
}
public void setFlowFileRepositoryFreeSpace(final long flowFileRepositoryFreeSpace) {
this.flowFileRepositoryFreeSpace = flowFileRepositoryFreeSpace;
}
public long getFlowFileRepositoryUsedSpace() {
return flowFileRepositoryUsedSpace;
}
public void setFlowFileRepositoryUsedSpace(final long flowFileRepositoryUsedSpace) {
this.flowFileRepositoryUsedSpace = flowFileRepositoryUsedSpace;
}
public List<StorageStatus> getContentRepositories() {
return contentRepositories;
}
public void setContentRepositories(final List<StorageStatus> contentRepositories) {
this.contentRepositories = new ArrayList<>();
this.contentRepositories.addAll(contentRepositories);
}
public List<StorageStatus> getProvenanceRepositories() {
return provenanceRepositories;
}
public void setProvenanceRepositories(final List<StorageStatus> provenanceRepositories) {
this.provenanceRepositories = new ArrayList<>();
this.provenanceRepositories.addAll(provenanceRepositories);
}
@Override
protected NodeStatus clone() {
final NodeStatus clonedObj = new NodeStatus();
clonedObj.createdAtInMs = createdAtInMs;
clonedObj.freeHeap = freeHeap;
clonedObj.usedHeap = usedHeap;
clonedObj.heapUtilization = heapUtilization;
clonedObj.freeNonHeap = freeNonHeap;
clonedObj.usedNonHeap = usedNonHeap;
clonedObj.openFileHandlers = openFileHandlers;
clonedObj.processorLoadAverage = processorLoadAverage;
clonedObj.totalThreads = totalThreads;
clonedObj.eventDrivenThreads = eventDrivenThreads;
clonedObj.timerDrivenThreads = timerDrivenThreads;
clonedObj.flowFileRepositoryFreeSpace = flowFileRepositoryFreeSpace;
clonedObj.flowFileRepositoryUsedSpace = flowFileRepositoryUsedSpace;
final List<StorageStatus> clonedContentRepositories = new ArrayList<>();
contentRepositories.stream().map(r -> r.clone()).forEach(r -> clonedContentRepositories.add(r));
clonedObj.contentRepositories = clonedContentRepositories;
final List<StorageStatus> clonedProvenanceRepositories = new ArrayList<>();
provenanceRepositories.stream().map(r -> r.clone()).forEach(r -> clonedProvenanceRepositories.add(r));
clonedObj.provenanceRepositories = clonedProvenanceRepositories;
return clonedObj;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("NodeStatus{");
sb.append("createdAtInMs=").append(createdAtInMs);
sb.append(", freeHeap=").append(freeHeap);
sb.append(", usedHeap=").append(usedHeap);
sb.append(", heapUtilization=").append(heapUtilization);
sb.append(", freeNonHeap=").append(freeNonHeap);
sb.append(", usedNonHeap=").append(usedNonHeap);
sb.append(", openFileHandlers=").append(openFileHandlers);
sb.append(", processorLoadAverage=").append(processorLoadAverage);
sb.append(", totalThreads=").append(totalThreads);
sb.append(", eventDrivenThreads=").append(eventDrivenThreads);
sb.append(", timerDrivenThreads=").append(timerDrivenThreads);
sb.append(", flowFileRepositoryFreeSpace=").append(flowFileRepositoryFreeSpace);
sb.append(", flowFileRepositoryUsedSpace=").append(flowFileRepositoryUsedSpace);
sb.append(", contentRepositories=").append(contentRepositories);
sb.append(", provenanceRepositories=").append(provenanceRepositories);
sb.append('}');
return sb.toString();
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status;
/**
* The status of a storage repository.
*/
public class StorageStatus implements Cloneable {
private String name;
private long freeSpace;
private long usedSpace;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public long getFreeSpace() {
return freeSpace;
}
public void setFreeSpace(final long freeSpace) {
this.freeSpace = freeSpace;
}
public long getUsedSpace() {
return usedSpace;
}
public void setUsedSpace(final long usedSpace) {
this.usedSpace = usedSpace;
}
@Override
public String toString() {
final StringBuilder builder = new StringBuilder("StorageStatus{");
builder.append("name='").append(name).append('\'');
builder.append(", freeSpace=").append(freeSpace);
builder.append(", usedSpace=").append(usedSpace);
builder.append('}');
return builder.toString();
}
@Override
public StorageStatus clone() {
final StorageStatus clonedObj = new StorageStatus();
clonedObj.name = name;
clonedObj.freeSpace = freeSpace;
clonedObj.usedSpace = usedSpace;
return clonedObj;
}
}

View File

@ -3099,6 +3099,10 @@ recent change to the dataflow has caused a problem and needs to be fixed. The DF
adjust the flow as needed to fix the problem. While NiFi does not have an "undo" feature, the DFM can make new changes to the
dataflow that will fix the problem.
Select Node Status History to view instance specific metrics from the last 24 hours or if the instance runs for less time, then
since it has been started. The status history can help the DFM in troubleshooting performance issues and provides a general
view on the health of the node. The status history includes information about the memory usage and disk usage among other things.
Two other tools in the Global Menu are Controller Settings and Users. The Controller Settings page provides the ability to change
the name of the NiFi instance, add comments describing the NiFi instance, and set the maximum number of threads that are available
to the application. It also provides tabs where DFMs may add and configure <<Controller_Services>> and <<Reporting_Tasks>>. The Users page is used to manage user access, which is described in

View File

@ -19,6 +19,7 @@ package org.apache.nifi.controller.status.history;
import java.util.Date;
import java.util.List;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
/**
@ -38,21 +39,23 @@ public interface ComponentStatusRepository {
/**
* Captures the status information provided in the given report
*
* @param rootGroupStatus status of root group
* @param nodeStatus status of the node
* @param rootGroupStatus status of root group and it's content
* @param garbageCollectionStatus status of garbage collection
*/
void capture(ProcessGroupStatus rootGroupStatus, List<GarbageCollectionStatus> garbageCollectionStatus);
void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, List<GarbageCollectionStatus> garbageCollectionStatus);
/**
* Captures the status information provided in the given report, providing a
* timestamp that indicates the time at which the status report was
* generated. This can be used to replay historical values.
*
* @param rootGroupStatus status
* @param nodeStatus status of the node
* @param rootGroupStatus status of the root group and it's content
* @param timestamp timestamp of capture
* @param garbageCollectionStatus status of garbage collection
*/
void capture(ProcessGroupStatus rootGroupStatus, List<GarbageCollectionStatus> garbageCollectionStatus, Date timestamp);
void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, List<GarbageCollectionStatus> garbageCollectionStatus, Date timestamp);
/**
* @return the Date at which the latest capture was performed
@ -131,5 +134,13 @@ public interface ComponentStatusRepository {
*/
StatusHistory getRemoteProcessGroupStatusHistory(String remoteGroupId, Date start, Date end, int preferredDataPoints);
/**
* Returns the status history of the actual node
*
* @return a {@link StatusHistory} that provides the status information
* about the NiFi node from the period stored in the status repository.
*/
StatusHistory getNodeStatusHistory();
GarbageCollectionHistory getGarbageCollectionHistory(Date start, Date end);
}

View File

@ -23,10 +23,19 @@ package org.apache.nifi.controller.status.history;
*/
public interface MetricDescriptor<T> {
/**
* Used during creation of metric instance with {@link Formatter#FRACTION} formatting. The expected way is to multiply
* the floating point metric value with it before converting into long. By this, enough of the fractional part might be
* kept for visualisation. In order to show the correct number the result must be divided with the same number before
* presenting.
*/
int FRACTION_MULTIPLIER = 1_000_000;
enum Formatter {
COUNT,
DURATION,
DATA_SIZE
DATA_SIZE,
FRACTION
};
int getMetricIdentifier();

View File

@ -24,6 +24,7 @@ import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
import org.apache.nifi.controller.status.history.CounterMetricDescriptor;
import org.apache.nifi.controller.status.history.MetricDescriptor;
import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
import org.apache.nifi.controller.status.history.NodeStatusDescriptor;
import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
@ -56,6 +57,7 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
public static final Pattern PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/status/history");
public static final Pattern REMOTE_PROCESS_GROUP_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/remote-process-groups/[a-f0-9\\-]{36}/status/history");
public static final Pattern CONNECTION_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/flow/connections/[a-f0-9\\-]{36}/status/history");
public static final Pattern NODE_STATUS_HISTORY_URI_PATTERN = Pattern.compile("/nifi-api/controller/status/history");
private final long componentStatusSnapshotMillis;
@ -84,6 +86,10 @@ public class StatusHistoryEndpointMerger implements EndpointResponseMerger {
for (final ConnectionStatusDescriptor descriptor : ConnectionStatusDescriptor.values()) {
metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
}
} else if (NODE_STATUS_HISTORY_URI_PATTERN.matcher(path).matches()) {
for (final NodeStatusDescriptor descriptor : NodeStatusDescriptor.values()) {
metricDescriptors.put(descriptor.getField(), descriptor.getDescriptor());
}
}
return metricDescriptors;

View File

@ -117,6 +117,8 @@ import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.StorageStatus;
import org.apache.nifi.controller.status.analytics.CachingConnectionStatusAnalyticsEngine;
import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
@ -127,6 +129,7 @@ import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
import org.apache.nifi.controller.status.history.StandardGarbageCollectionStatus;
import org.apache.nifi.controller.status.history.StatusHistoryUtil;
import org.apache.nifi.controller.tasks.ExpireFlowFiles;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.diagnostics.SystemDiagnosticsFactory;
import org.apache.nifi.encrypt.StringEncryptor;
@ -690,7 +693,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
@Override
public void run() {
try {
componentStatusRepository.capture(eventAccess.getControllerStatus(), getGarbageCollectionStatus());
componentStatusRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), getGarbageCollectionStatus());
} catch (final Exception e) {
LOG.error("Failed to capture component stats for Stats History", e);
}
@ -2965,6 +2968,41 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getRemoteProcessGroupStatusHistory(remoteGroupId, startTime, endTime, preferredDataPoints));
}
public StatusHistoryDTO getNodeStatusHistory() {
return StatusHistoryUtil.createStatusHistoryDTO(componentStatusRepository.getNodeStatusHistory());
}
private NodeStatus getNodeStatusSnapshot() {
final SystemDiagnostics systemDiagnostics = getSystemDiagnostics();
final NodeStatus result = new NodeStatus();
result.setCreatedAtInMs(systemDiagnostics.getCreationTimestamp());
result.setFreeHeap(systemDiagnostics.getFreeHeap());
result.setUsedHeap(systemDiagnostics.getUsedHeap());
result.setHeapUtilization(systemDiagnostics.getHeapUtilization());
result.setFreeNonHeap(systemDiagnostics.getFreeNonHeap());
result.setUsedNonHeap(systemDiagnostics.getUsedNonHeap());
result.setOpenFileHandlers(systemDiagnostics.getOpenFileHandles());
result.setProcessorLoadAverage(systemDiagnostics.getProcessorLoadAverage());
result.setTotalThreads(systemDiagnostics.getTotalThreads());
result.setEventDrivenThreads(getActiveEventDrivenThreadCount());
result.setTimerDrivenThreads(getActiveTimerDrivenThreadCount());
result.setFlowFileRepositoryFreeSpace(systemDiagnostics.getFlowFileRepositoryStorageUsage().getFreeSpace());
result.setFlowFileRepositoryUsedSpace(systemDiagnostics.getFlowFileRepositoryStorageUsage().getUsedSpace());
result.setContentRepositories(systemDiagnostics.getContentRepositoryStorageUsage().entrySet().stream().map(e -> getStorageStatus(e)).collect(Collectors.toList()));
result.setProvenanceRepositories(systemDiagnostics.getProvenanceRepositoryStorageUsage().entrySet().stream().map(e -> getStorageStatus(e)).collect(Collectors.toList()));
return result;
}
private static StorageStatus getStorageStatus(final Map.Entry<String, StorageUsage> storageUsage) {
final StorageStatus result = new StorageStatus();
result.setName(storageUsage.getKey());
result.setFreeSpace(storageUsage.getValue().getFreeSpace());
result.setUsedSpace(storageUsage.getValue().getUsedSpace());
return result;
}
private static class HeartbeatBean {
private final ProcessGroup rootGroup;

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import org.apache.nifi.controller.status.NodeStatus;
import java.util.List;
import java.util.Objects;
public enum NodeStatusDescriptor {
FREE_HEAP(
"freeHeap",
"Free Heap",
"The amount of free memory in the heap that can be used by the Java virtual machine.",
MetricDescriptor.Formatter.DATA_SIZE,
s -> s.getFreeHeap()),
USED_HEAP(
"usedHeap",
"Used Heap",
"The amount of used memory in the heap that is used by the Java virtual machine.",
MetricDescriptor.Formatter.DATA_SIZE,
s -> s.getUsedHeap()),
HEAP_UTILIZATION(
"heapUtilization",
"Heap Utilization",
"The percentage of available heap currently used by the Java virtual machine.",
MetricDescriptor.Formatter.COUNT,
s -> s.getHeapUtilization(),
new ValueReducer<StatusSnapshot, Long>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
return (long) values.stream()
.map(snapshot -> snapshot.getStatusMetric(HEAP_UTILIZATION.getDescriptor()))
.filter(Objects::nonNull)
.mapToLong(value -> value)
.average()
.orElse(0L);
}
}),
FREE_NON_HEAP(
"freeNonHeap",
"Free Non Heap",
"The currently available non-heap memory that can be used by the Java virtual machine.",
MetricDescriptor.Formatter.DATA_SIZE,
s -> s.getFreeNonHeap()),
USED_NON_HEAP(
"usedNonHeap",
"Used Non Heap",
"The current usage of non-heap memory that is used by the Java virtual machine.",
MetricDescriptor.Formatter.DATA_SIZE,
s -> s.getUsedNonHeap()),
OPEN_FILE_HANDLES(
"openFileHandles",
"Open File Handles",
"The current number of open file handles used by the Java virtual machine.",
MetricDescriptor.Formatter.COUNT,
s -> s.getOpenFileHandlers()),
PROCESSOR_LOAD_AVERAGE(
"processorLoadAverage",
"Processor Load Average",
"The processor load. Every measurement point represents the system load average for the last minute.",
MetricDescriptor.Formatter.FRACTION,
s -> Double.valueOf(s.getProcessorLoadAverage() * MetricDescriptor.FRACTION_MULTIPLIER).longValue(),
new ValueReducer<StatusSnapshot, Long>() {
@Override
public Long reduce(final List<StatusSnapshot> values) {
return (long) values.stream()
.map(snapshot -> snapshot.getStatusMetric(HEAP_UTILIZATION.getDescriptor()))
.filter(Objects::nonNull)
.mapToLong(value -> value)
.average()
.orElse(0L);
}
}),
TOTAL_THREADS(
"totalThreads",
"Number of total threads",
"The current number of live threads in the Java virtual machine (both daemon and non-daemon threads).",
MetricDescriptor.Formatter.COUNT,
s -> s.getTotalThreads()),
EVENT_DRIVEN_THREADS(
"eventDrivenThreads",
"Number of event driven threads",
"The current number of active threads in the event driven thread pool.",
MetricDescriptor.Formatter.COUNT,
s -> s.getEventDrivenThreads()),
TIME_DRIVEN_THREADS(
"timeDrivenThreads",
"Number of time driven threads",
"The current number of active threads in the time driven thread pool.",
MetricDescriptor.Formatter.COUNT,
s -> s.getTimerDrivenThreads()),
FLOW_FILE_REPOSITORY_FREE_SPACE(
"flowFileRepositoryFreeSpace",
"Flow File Repository Free Space",
"The usable space available for file repositories on the underlying storage mechanism",
MetricDescriptor.Formatter.DATA_SIZE,
s -> s.getFlowFileRepositoryFreeSpace()),
FLOW_FILE_REPOSITORY_USED_SPACE(
"flowFileRepositoryUsedSpace",
"Flow File Repository Used Space",
"The space in use on the underlying storage mechanism.",
MetricDescriptor.Formatter.DATA_SIZE,
s -> s.getFlowFileRepositoryUsedSpace()),
CONTENT_REPOSITORY_FREE_SPACE(
"contentRepositoryFreeSpace",
"Sum content Repository Free Space",
"The usable space available for content repositories on the underlying storage mechanisms.",
MetricDescriptor.Formatter.DATA_SIZE,
s -> s.getContentRepositories().stream().mapToLong(r -> r.getFreeSpace()).sum()),
CONTENT_REPOSITORY_USED_SPACE(
"contentRepositoryUsedSpace",
"Sum content Repository Used Space",
"The space in use on the underlying storage mechanisms.",
MetricDescriptor.Formatter.DATA_SIZE,
s -> s.getContentRepositories().stream().mapToLong(r -> r.getUsedSpace()).sum()),
PROVENANCE_REPOSITORY_FREE_SPACE(
"provenanceRepositoryFreeSpace",
"Sum provenance Repository Free Space",
"The usable space available for use by the underlying storage mechanisms.",
MetricDescriptor.Formatter.DATA_SIZE,
s -> s.getProvenanceRepositories().stream().mapToLong(r -> r.getFreeSpace()).sum()),
PROVENANCE_REPOSITORY_USED_SPACE(
"provenanceRepositoryUsedSpace",
"Sum provenance Repository Used Space",
"The space in use on the underlying storage mechanisms.",
MetricDescriptor.Formatter.DATA_SIZE,
s -> s.getProvenanceRepositories().stream().mapToLong(r -> r.getUsedSpace()).sum());
private final MetricDescriptor<NodeStatus> descriptor;
NodeStatusDescriptor(
final String field,
final String label,
final String description,
final MetricDescriptor.Formatter formatter,
final ValueMapper<NodeStatus> valueFunction) {
this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction);
}
NodeStatusDescriptor(
final String field,
final String label,
final String description,
final MetricDescriptor.Formatter formatter,
final ValueMapper<NodeStatus> valueFunction,
final ValueReducer<StatusSnapshot, Long> reducer) {
this.descriptor = new StandardMetricDescriptor<>(this::ordinal, field, label, description, formatter, valueFunction, reducer);
}
public String getField() {
return descriptor.getField();
}
public MetricDescriptor<NodeStatus> getDescriptor() {
return descriptor;
}
}

View File

@ -21,6 +21,7 @@ import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
@ -75,18 +76,13 @@ public class StatusHistoryUtil {
}
public static List<StatusDescriptorDTO> createFieldDescriptorDtos(final Collection<MetricDescriptor<?>> metricDescriptors) {
final List<StatusDescriptorDTO> dtos = new ArrayList<>();
final StatusDescriptorDTO[] result = new StatusDescriptorDTO[metricDescriptors.size()];
final Set<MetricDescriptor<?>> allDescriptors = new LinkedHashSet<>();
for (final MetricDescriptor<?> metricDescriptor : metricDescriptors) {
allDescriptors.add(metricDescriptor);
result[metricDescriptor.getMetricIdentifier()] = createStatusDescriptorDto(metricDescriptor);
}
for (final MetricDescriptor<?> metricDescriptor : allDescriptors) {
dtos.add(createStatusDescriptorDto(metricDescriptor));
}
return dtos;
return Arrays.asList(result);
}
public static List<StatusDescriptorDTO> createFieldDescriptorDtos(final StatusHistory statusHistory) {

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.status.history;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
@ -30,9 +31,12 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class VolatileComponentStatusRepository implements ComponentStatusRepository {
@ -50,7 +54,17 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
private static final Set<MetricDescriptor<?>> DEFAULT_RPG_METRICS = Arrays.stream(RemoteProcessGroupStatusDescriptor.values())
.map(RemoteProcessGroupStatusDescriptor::getDescriptor)
.collect(Collectors.toSet());
private static final Set<MetricDescriptor<NodeStatus>> DEFAULT_NODE_METRICS = Arrays.stream(NodeStatusDescriptor.values())
.map(NodeStatusDescriptor::getDescriptor)
.collect(Collectors.toSet());
private static final String STORAGE_FREE_DESCRIPTION = "The usable space available for use by the underlying storage mechanism.";
private static final String STORAGE_USED_DESCRIPTION = "The space in use on the underlying storage mechanism";
private static final String GC_TIME_DESCRIPTION = "The sum time the garbage collection has run since the start of the Java virtual machine.";
private static final String GC_TIME_DIFF_DESCRIPTION = "The sum time the garbage collection has run since the last measurement.";
private static final String GC_COUNT_DESCRIPTION = "The sum amount of occasions the garbage collection has run since the start of the Java virtual machine.";
private static final String GC_COUNT_DIFF_DESCRIPTION = "The sum amount of occasions the garbage collection has run since the last measurement.";
public static final String NUM_DATA_POINTS_PROPERTY = "nifi.components.status.repository.buffer.size";
public static final int DEFAULT_NUM_DATA_POINTS = 288; // 1 day worth of 5-minute snapshots
@ -60,6 +74,7 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
// Changed to protected to allow unit testing
protected final RingBuffer<Date> timestamps;
private final RingBuffer<List<GarbageCollectionStatus>> gcStatuses;
private final RingBuffer<NodeStatus> nodeStatuses;
private final int numDataPoints;
private volatile long lastCaptureTime = 0L;
@ -70,27 +85,30 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
numDataPoints = DEFAULT_NUM_DATA_POINTS;
gcStatuses = null;
timestamps = null;
nodeStatuses = null;
}
public VolatileComponentStatusRepository(final NiFiProperties nifiProperties) {
numDataPoints = nifiProperties.getIntegerProperty(NUM_DATA_POINTS_PROPERTY, DEFAULT_NUM_DATA_POINTS);
gcStatuses = new RingBuffer<>(numDataPoints);
timestamps = new RingBuffer<>(numDataPoints);
nodeStatuses = new RingBuffer<>(numDataPoints);
}
@Override
public void capture(final ProcessGroupStatus rootGroupStatus, final List<GarbageCollectionStatus> gcStatus) {
capture(rootGroupStatus, gcStatus, new Date());
public void capture(final NodeStatus nodeStatus, final ProcessGroupStatus rootGroupStatus, final List<GarbageCollectionStatus> gcStatus) {
capture(nodeStatus, rootGroupStatus, gcStatus, new Date());
}
@Override
public synchronized void capture(final ProcessGroupStatus rootGroupStatus, final List<GarbageCollectionStatus> gcStatus, final Date timestamp) {
public synchronized void capture(final NodeStatus nodeStatus, final ProcessGroupStatus rootGroupStatus, final List<GarbageCollectionStatus> gcStatus, final Date timestamp) {
final Date evicted = timestamps.add(timestamp);
if (evicted != null) {
componentStatusHistories.values().forEach(history -> history.expireBefore(evicted));
}
capture(rootGroupStatus, timestamp);
nodeStatuses.add(nodeStatus);
gcStatuses.add(gcStatus);
logger.debug("Captured metrics for {}", this);
@ -164,6 +182,172 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
return getStatusHistory(remoteGroupId, true, DEFAULT_RPG_METRICS, start, end, preferredDataPoints);
}
@Override
public StatusHistory getNodeStatusHistory() {
final List<NodeStatus> nodeStatusList = nodeStatuses.asList();
final List<List<GarbageCollectionStatus>> gcStatusList = gcStatuses.asList();
final LinkedList<StatusSnapshot> snapshots = new LinkedList<>();
final Set<MetricDescriptor<?>> metricDescriptors = new HashSet<>();
final Set<MetricDescriptor<NodeStatus>> nodeStatusDescriptors = new HashSet<>(DEFAULT_NODE_METRICS);
final List<MetricDescriptor<List<GarbageCollectionStatus>>> gcMetricDescriptors = new LinkedList<>();
final List<MetricDescriptor<List<GarbageCollectionStatus>>> gcMetricDescriptorsDifferential = new LinkedList<>();
final AtomicInteger counter = new AtomicInteger(DEFAULT_NODE_METRICS.size() - 1);
// Uses the first measurement (if any) as reference for repository metrics descriptors. The reference will be used
// as a schema for creating descriptors. This is needed as the number of repositories are not predictable.
if (nodeStatusList.size() > 0) {
final NodeStatus referenceNodeStatus = nodeStatusList.get(0);
for (int i = 0; i < referenceNodeStatus.getContentRepositories().size(); i++) {
nodeStatusDescriptors.add(getContentStorageFree(referenceNodeStatus, i, counter.incrementAndGet()));
nodeStatusDescriptors.add(getContentStorageUsed(referenceNodeStatus, i, counter.incrementAndGet()));
}
for (int i = 0; i < referenceNodeStatus.getProvenanceRepositories().size(); i++) {
nodeStatusDescriptors.add(getProvenanceStorageFree(referenceNodeStatus, i, counter.incrementAndGet()));
nodeStatusDescriptors.add(getProvenanceStorageUsed(referenceNodeStatus, i, counter.incrementAndGet()));
}
}
// Uses the first measurement (if any) as reference for GC metrics descriptors. The reference will be used
// as a schema for creating descriptors. This is needed as the exact details of the garbage collector statuses
// are not predictable.
if (gcStatusList.size() > 0) {
final List<GarbageCollectionStatus> gcStatuses = gcStatusList.get(0);
for (int i = 0; i < gcStatuses.size(); i++) {
final String memoryManager = gcStatuses.get(i).getMemoryManagerName();
gcMetricDescriptors.add(getGarbageCollectorTime(i, memoryManager, counter.incrementAndGet()));
gcMetricDescriptors.add(getGarbageCollectorCount(i, memoryManager, counter.incrementAndGet()));
gcMetricDescriptorsDifferential.add(getGarbageCollectorTimeDifference(i, memoryManager, counter.incrementAndGet()));
gcMetricDescriptorsDifferential.add(getGarbageCollectorCountDifference(i, memoryManager, counter.incrementAndGet()));
}
}
metricDescriptors.addAll(nodeStatusDescriptors);
metricDescriptors.addAll(gcMetricDescriptors);
metricDescriptors.addAll(gcMetricDescriptorsDifferential);
// Adding measurements
for (int i = 0; i < nodeStatusList.size(); i++) {
final StandardStatusSnapshot snapshot = new StandardStatusSnapshot(metricDescriptors);
final NodeStatus nodeStatus = nodeStatusList.get(i);
final List<GarbageCollectionStatus> garbageCollectionStatuses = gcStatusList.get(i);
snapshot.setTimestamp(new Date(nodeStatus.getCreatedAtInMs()));
nodeStatusDescriptors.forEach(d -> snapshot.addStatusMetric(d, d.getValueFunction().getValue(nodeStatus)));
gcMetricDescriptors.forEach(d -> snapshot.addStatusMetric(d, d.getValueFunction().getValue(garbageCollectionStatuses)));
// Adding GC metrics uses previous measurement for generating diff
if (!snapshots.isEmpty()) {
for (int j = 0; j < gcMetricDescriptorsDifferential.size(); j++) {
long previousValue = snapshots.getLast().getStatusMetric(gcMetricDescriptors.get(j));
long currentValue = snapshot.getStatusMetric(gcMetricDescriptors.get(j));
snapshot.addStatusMetric(gcMetricDescriptorsDifferential.get(j), currentValue - previousValue);
}
} else {
for (int j = 0; j < gcMetricDescriptorsDifferential.size(); j++) {
snapshot.addStatusMetric(gcMetricDescriptorsDifferential.get(j), 0L);
}
}
snapshots.add(snapshot);
}
return new StandardStatusHistory(snapshots, new HashMap<>(), new Date());
}
// Descriptors for node status
private StandardMetricDescriptor<NodeStatus> getProvenanceStorageUsed(final NodeStatus referenceNodeStatus, final int storageNumber, final int order) {
return new StandardMetricDescriptor<>(
() -> order,
"provenanceStorage" + storageNumber + "Used",
"Provenance Repository (" + referenceNodeStatus.getProvenanceRepositories().get(storageNumber).getName() + ") Used Space",
STORAGE_USED_DESCRIPTION,
MetricDescriptor.Formatter.DATA_SIZE,
n -> n.getProvenanceRepositories().get(storageNumber).getUsedSpace()
);
}
private StandardMetricDescriptor<NodeStatus> getProvenanceStorageFree(final NodeStatus referenceNodeStatus, final int storageNumber, final int order) {
return new StandardMetricDescriptor<>(
() -> order,
"provenanceStorage" + storageNumber + "Free",
"Provenance Repository (" + referenceNodeStatus.getProvenanceRepositories().get(storageNumber).getName() + ") Free Space",
STORAGE_FREE_DESCRIPTION,
MetricDescriptor.Formatter.DATA_SIZE,
n -> n.getProvenanceRepositories().get(storageNumber).getFreeSpace()
);
}
private StandardMetricDescriptor<NodeStatus> getContentStorageUsed(NodeStatus referenceNodeStatus, int storageNumber, int order) {
return new StandardMetricDescriptor<>(
() -> order,
"contentStorage" + storageNumber + "Used",
"Content Repository (" + referenceNodeStatus.getContentRepositories().get(storageNumber).getName() + ") Used Space",
STORAGE_USED_DESCRIPTION,
MetricDescriptor.Formatter.DATA_SIZE,
n -> n.getContentRepositories().get(storageNumber).getUsedSpace()
);
}
private StandardMetricDescriptor<NodeStatus> getContentStorageFree(NodeStatus referenceNodeStatus, int storageNumber, int order) {
return new StandardMetricDescriptor<>(
() -> order,
"contentStorage" + storageNumber + "Free",
"Content Repository (" + referenceNodeStatus.getContentRepositories().get(storageNumber).getName() + ") Free Space",
STORAGE_FREE_DESCRIPTION,
MetricDescriptor.Formatter.DATA_SIZE,
n -> n.getContentRepositories().get(storageNumber).getFreeSpace()
);
}
// Descriptors for garbage collectors
private static StandardMetricDescriptor<List<GarbageCollectionStatus>> getGarbageCollectorCount(final int gcNumber, final String memoryManagerName, final int order) {
return new StandardMetricDescriptor<>(
() -> order,
"gc" + gcNumber + "Count",
memoryManagerName + " Collection Count",
GC_COUNT_DESCRIPTION,
MetricDescriptor.Formatter.COUNT,
gcs -> gcs.get(gcNumber).getCollectionCount());
}
private StandardMetricDescriptor<List<GarbageCollectionStatus>> getGarbageCollectorTime(final int gcNumber, final String memoryManagerName, final int order) {
return new StandardMetricDescriptor<>(
() -> order,
"gc" + gcNumber + "Time",
memoryManagerName + " Collection Time (milliseconds)",
GC_TIME_DESCRIPTION,
MetricDescriptor.Formatter.COUNT,
gcs -> gcs.get(gcNumber).getCollectionMillis());
}
// Descriptors for garbage collectors (difference values)
private static StandardMetricDescriptor<List<GarbageCollectionStatus>> getGarbageCollectorTimeDifference(final int gcNumber, final String memoryManagerName, final int order) {
return new StandardMetricDescriptor<>(
() -> order,
"gc" + gcNumber + "TimeDifference",
memoryManagerName + " Collection Time (5 mins, in milliseconds)",
GC_TIME_DIFF_DESCRIPTION,
MetricDescriptor.Formatter.COUNT,
gcs -> 0L); // Value function is not in use, filled below as value from previous measurement is needed
}
private static StandardMetricDescriptor<List<GarbageCollectionStatus>> getGarbageCollectorCountDifference(final int gcNumber, final String memoryManagerName, final int order) {
return new StandardMetricDescriptor<>(
() -> order,
"gc" + gcNumber + "CountDifference",
memoryManagerName + " Collection Count (5 mins)",
GC_COUNT_DIFF_DESCRIPTION,
MetricDescriptor.Formatter.COUNT,
gcs -> 0L); // Value function is not in use, filled below as value from previous measurement is needed
}
// Updated getStatusHistory to utilize the start/end/preferredDataPoints parameters passed into
// the calling methods. Although for VolatileComponentStatusRepository the timestamps buffer is

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import org.apache.nifi.web.api.dto.status.StatusDescriptorDTO;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
public class StatusHistoryUtilTest {
@Test
public void testCreateFieldDescriptorDtos() {
// given
final Collection<MetricDescriptor<?>> metricDescriptors = Arrays.asList(
new StandardMetricDescriptor<>(() -> 1, "field2", "Field2", "Field 2", MetricDescriptor.Formatter.COUNT, __ -> 2L),
new StandardMetricDescriptor<>(() -> 0, "field1", "Field1", "Field 1", MetricDescriptor.Formatter.COUNT, __ -> 1L)
);
final List<StatusDescriptorDTO> expected = Arrays.asList(
new StatusDescriptorDTO("field1", "Field1", "Field 1", MetricDescriptor.Formatter.COUNT.name()),
new StatusDescriptorDTO("field2", "Field2", "Field 2", MetricDescriptor.Formatter.COUNT.name())
);
// when
final List<StatusDescriptorDTO> result = StatusHistoryUtil.createFieldDescriptorDtos(metricDescriptors);
// then
Assert.assertEquals(expected, result);
}
@Test(expected = IndexOutOfBoundsException.class)
public void testCreateFieldDescriptorDtosWhenNotConsecutive() {
// given
final Collection<MetricDescriptor<?>> metricDescriptors = Arrays.asList(
new StandardMetricDescriptor<>(() -> 3, "field2", "Field2", "Field 2", MetricDescriptor.Formatter.COUNT, __ -> 2L),
new StandardMetricDescriptor<>(() -> 0, "field1", "Field1", "Field 1", MetricDescriptor.Formatter.COUNT, __ -> 1L)
);
// when
StatusHistoryUtil.createFieldDescriptorDtos(metricDescriptors);
}
@Test
public void testCreateFieldDescriptorDtosWhenEmpty() {
// given
final Collection<MetricDescriptor<?>> metricDescriptors = new ArrayList<>();
final List<StatusDescriptorDTO> expected = new ArrayList<>();
// when
final List<StatusDescriptorDTO> result = StatusHistoryUtil.createFieldDescriptorDtos(metricDescriptors);
// then
Assert.assertEquals(expected, result);
}
}

View File

@ -16,15 +16,23 @@
*/
package org.apache.nifi.controller.status.history;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Date;
import java.util.List;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.StorageStatus;
import org.apache.nifi.util.NiFiProperties;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import org.testng.Assert;
import org.apache.nifi.util.NiFiProperties;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import static junit.framework.TestCase.assertTrue;
import static org.apache.nifi.controller.status.history.VolatileComponentStatusRepository.DEFAULT_NUM_DATA_POINTS;
@ -228,4 +236,165 @@ public class VolatileComponentStatusRepositoryTest {
assertEquals(emptyRepo.timestamps.getNewestElement(), dates.get(dates.size() - 1));
}
}
@Test
public void testNodeStatusHistory() {
// given
final VolatileComponentStatusRepository testSubject = createRepo(BUFSIZE3);
final List<NodeStatus> nodeStatuses = Arrays.asList(
givenNodeStatus(0),
givenNodeStatus(1)
);
testSubject.capture(nodeStatuses.get(0), givenProcessGroupStatus(), givenGarbageCollectionStatus(1, 100, 2, 300));
testSubject.capture(nodeStatuses.get(1), givenProcessGroupStatus(), givenGarbageCollectionStatus(1, 100, 5, 700));
// when
final StatusHistory result = testSubject.getNodeStatusHistory();
// then
// checking on snapshots
Assert.assertEquals(nodeStatuses.size(), result.getStatusSnapshots().size());;
// metrics based on NodeStatus
for (int i = 0; i < result.getStatusSnapshots().size(); i++) {
final StatusSnapshot snapshot = result.getStatusSnapshots().get(i);
final NodeStatus nodeStatus = nodeStatuses.get(i);
Assert.assertEquals(nodeStatus.getFreeHeap(), snapshot.getStatusMetric(NodeStatusDescriptor.FREE_HEAP.getDescriptor()).longValue());
Assert.assertEquals(nodeStatus.getUsedHeap(), snapshot.getStatusMetric(NodeStatusDescriptor.USED_HEAP.getDescriptor()).longValue());
Assert.assertEquals(nodeStatus.getHeapUtilization(), snapshot.getStatusMetric(NodeStatusDescriptor.HEAP_UTILIZATION.getDescriptor()).longValue());
Assert.assertEquals(nodeStatus.getFreeNonHeap(), snapshot.getStatusMetric(NodeStatusDescriptor.FREE_NON_HEAP.getDescriptor()).longValue());
Assert.assertEquals(nodeStatus.getUsedNonHeap(), snapshot.getStatusMetric(NodeStatusDescriptor.USED_NON_HEAP.getDescriptor()).longValue());
Assert.assertEquals(nodeStatus.getOpenFileHandlers(), snapshot.getStatusMetric(NodeStatusDescriptor.OPEN_FILE_HANDLES.getDescriptor()).longValue());
Assert.assertEquals(
Double.valueOf(nodeStatus.getProcessorLoadAverage() * MetricDescriptor.FRACTION_MULTIPLIER).longValue(),
snapshot.getStatusMetric(NodeStatusDescriptor.PROCESSOR_LOAD_AVERAGE.getDescriptor()).longValue());
Assert.assertEquals(nodeStatus.getTotalThreads(), snapshot.getStatusMetric(NodeStatusDescriptor.TOTAL_THREADS.getDescriptor()).longValue());
Assert.assertEquals(nodeStatus.getEventDrivenThreads(), snapshot.getStatusMetric(NodeStatusDescriptor.EVENT_DRIVEN_THREADS.getDescriptor()).longValue());
Assert.assertEquals(nodeStatus.getTimerDrivenThreads(), snapshot.getStatusMetric(NodeStatusDescriptor.TIME_DRIVEN_THREADS.getDescriptor()).longValue());
Assert.assertEquals(nodeStatus.getFlowFileRepositoryFreeSpace(), snapshot.getStatusMetric(NodeStatusDescriptor.FLOW_FILE_REPOSITORY_FREE_SPACE.getDescriptor()).longValue());
Assert.assertEquals(nodeStatus.getFlowFileRepositoryUsedSpace(), snapshot.getStatusMetric(NodeStatusDescriptor.FLOW_FILE_REPOSITORY_USED_SPACE.getDescriptor()).longValue());
Assert.assertEquals(
nodeStatus.getContentRepositories().stream().map(r -> r.getFreeSpace()).reduce(0L, (a, b) -> a + b).longValue(),
snapshot.getStatusMetric(NodeStatusDescriptor.CONTENT_REPOSITORY_FREE_SPACE.getDescriptor()).longValue());
Assert.assertEquals(
nodeStatus.getContentRepositories().stream().map(r -> r.getUsedSpace()).reduce(0L, (a, b) -> a + b).longValue(),
snapshot.getStatusMetric(NodeStatusDescriptor.CONTENT_REPOSITORY_USED_SPACE.getDescriptor()).longValue());
Assert.assertEquals(
nodeStatus.getProvenanceRepositories().stream().map(r -> r.getFreeSpace()).reduce(0L, (a, b) -> a + b).longValue(),
snapshot.getStatusMetric(NodeStatusDescriptor.PROVENANCE_REPOSITORY_FREE_SPACE.getDescriptor()).longValue());
Assert.assertEquals(
nodeStatus.getProvenanceRepositories().stream().map(r -> r.getUsedSpace()).reduce(0L, (a, b) -> a + b).longValue(),
snapshot.getStatusMetric(NodeStatusDescriptor.PROVENANCE_REPOSITORY_USED_SPACE.getDescriptor()).longValue());
// metrics based on repositories
Assert.assertEquals(12 + i, getMetricAtOrdinal(snapshot, 16)); // c1 free
Assert.assertEquals(13 + i, getMetricAtOrdinal(snapshot, 17)); // c1 used
Assert.assertEquals(14 + i, getMetricAtOrdinal(snapshot, 18)); // c2 free
Assert.assertEquals(15 + i, getMetricAtOrdinal(snapshot, 19)); // c2 used
Assert.assertEquals(16 + i, getMetricAtOrdinal(snapshot, 20)); // p1 free
Assert.assertEquals(17 + i, getMetricAtOrdinal(snapshot, 21)); // p1 used
Assert.assertEquals(18 + i, getMetricAtOrdinal(snapshot, 22)); // p2 free
Assert.assertEquals(19 + i, getMetricAtOrdinal(snapshot, 23)); // p2 used
}
// metrics based on GarbageCollectionStatus (The ordinal numbers are true for setup, in production it might differ)
final int g0TimeOrdinal = 24;
final int g0CountOrdinal = 25;
final int g0TimeDiffOrdinal = 26;
final int g0CountDiffOrdinal = 27;
final int g1TimeOrdinal = 28;
final int g1CountOrdinal = 29;
final int g1TimeDiffOrdinal = 30;
final int g1CountDiffOrdinal = 31;
final StatusSnapshot snapshot1 = result.getStatusSnapshots().get(0);
final StatusSnapshot snapshot2 = result.getStatusSnapshots().get(1);
Assert.assertEquals(100L, getMetricAtOrdinal(snapshot1, g0TimeOrdinal));
Assert.assertEquals(0L, getMetricAtOrdinal(snapshot1, g0TimeDiffOrdinal));
Assert.assertEquals(1L, getMetricAtOrdinal(snapshot1, g0CountOrdinal));
Assert.assertEquals(0L, getMetricAtOrdinal(snapshot1, g0CountDiffOrdinal));
Assert.assertEquals(300L, getMetricAtOrdinal(snapshot1, g1TimeOrdinal));
Assert.assertEquals(0L, getMetricAtOrdinal(snapshot1, g1TimeDiffOrdinal));
Assert.assertEquals(2L, getMetricAtOrdinal(snapshot1, g1CountOrdinal));
Assert.assertEquals(0L, getMetricAtOrdinal(snapshot1, g1CountDiffOrdinal));
Assert.assertEquals(100L, getMetricAtOrdinal(snapshot2, g0TimeOrdinal));
Assert.assertEquals(0L, getMetricAtOrdinal(snapshot2, g0TimeDiffOrdinal));
Assert.assertEquals(1L, getMetricAtOrdinal(snapshot2, g0CountOrdinal));
Assert.assertEquals(0L, getMetricAtOrdinal(snapshot2, g0CountDiffOrdinal));
Assert.assertEquals(700L, getMetricAtOrdinal(snapshot2, g1TimeOrdinal));
Assert.assertEquals(400L, getMetricAtOrdinal(snapshot2, g1TimeDiffOrdinal));
Assert.assertEquals(5L, getMetricAtOrdinal(snapshot2, g1CountOrdinal));
Assert.assertEquals(3L, getMetricAtOrdinal(snapshot2, g1CountDiffOrdinal));
}
private long getMetricAtOrdinal(final StatusSnapshot snapshot, final long ordinal) {
final Set<MetricDescriptor<?>> metricDescriptors = snapshot.getMetricDescriptors();
for (final MetricDescriptor<?> metricDescriptor : metricDescriptors) {
if (metricDescriptor.getMetricIdentifier() == ordinal) {
return snapshot.getStatusMetric(metricDescriptor);
}
}
Assert.fail();
return Long.MIN_VALUE;
}
private NodeStatus givenNodeStatus(final int number) {
final NodeStatus result = new NodeStatus();
result.setCreatedAtInMs(System.currentTimeMillis());
result.setFreeHeap(1 + number);
result.setUsedHeap(2 + number);
result.setHeapUtilization(3 + number);
result.setFreeNonHeap(4 + number);
result.setUsedNonHeap(5 + number);
result.setOpenFileHandlers(6 + number);
result.setProcessorLoadAverage(7.1d + number);
result.setTotalThreads(9 + number);
result.setEventDrivenThreads(20 + number);
result.setTimerDrivenThreads(21 + number);
result.setFlowFileRepositoryFreeSpace(10 + number);
result.setFlowFileRepositoryUsedSpace(11 + number);
result.setContentRepositories(Arrays.asList(
givenStorageStatus("c1", 12 + number, 13 + number),
givenStorageStatus("c2", 14 + number, 15 + number)
));
result.setProvenanceRepositories(Arrays.asList(
givenStorageStatus("p1", 16 + number, 17 + number),
givenStorageStatus("p2", 18 + number, 19 + number)
));
return result;
}
private StorageStatus givenStorageStatus(final String name, final long freeSpace, final long usedSpace) {
final StorageStatus result = new StorageStatus();
result.setName(name);
result.setFreeSpace(freeSpace);
result.setUsedSpace(usedSpace);
return result;
}
private ProcessGroupStatus givenProcessGroupStatus() {
final ProcessGroupStatus result = Mockito.mock(ProcessGroupStatus.class);
Mockito.when(result.getId()).thenReturn("rootId");
Mockito.when(result.getName()).thenReturn("rootName");
Mockito.when(result.getProcessorStatus()).thenReturn(Collections.emptyList());
Mockito.when(result.getConnectionStatus()).thenReturn(Collections.emptyList());
Mockito.when(result.getRemoteProcessGroupStatus()).thenReturn(Collections.emptyList());
Mockito.when(result.getProcessGroupStatus()).thenReturn(Collections.emptyList());
return result;
}
private List<GarbageCollectionStatus> givenGarbageCollectionStatus(long gc1Count, long gc1Millis, long gc2Count, long gc2Millis) {
final List<GarbageCollectionStatus> result = new ArrayList<>(2);
result.add(new StandardGarbageCollectionStatus("gc0", new Date(), gc1Count, gc1Millis));
result.add(new StandardGarbageCollectionStatus("gc1", new Date(), gc2Count, gc2Millis));
return result;
}
}

View File

@ -108,8 +108,8 @@ import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.RegistryClientEntity;
import org.apache.nifi.web.api.entity.RegistryEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
@ -583,6 +583,15 @@ public interface NiFiServiceFacade {
*/
StatusHistoryEntity getProcessorStatusHistory(String id);
// ----------------------------------------
// System diagnostics history
// ----------------------------------------
/**
* @return the system diagnostics history
*/
StatusHistoryEntity getNodeStatusHistory();
/**
* Get the descriptor for the specified property of the specified processor.
*

View File

@ -269,9 +269,9 @@ import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.ProcessorRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorsRunStatusDetailsEntity;
import org.apache.nifi.web.api.entity.RegistryClientEntity;
import org.apache.nifi.web.api.entity.RegistryEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
@ -3469,6 +3469,13 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entityFactory.createStatusHistoryEntity(dto, permissions);
}
@Override
public StatusHistoryEntity getNodeStatusHistory() {
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(controllerFacade, NiFiUserUtils.getNiFiUser());
final StatusHistoryDTO dto = controllerFacade.getNodeStatusHistory();
return entityFactory.createStatusHistoryEntity(dto, permissions);
}
private boolean authorizeBulletin(final Bulletin bulletin) {
final String sourceId = bulletin.getSourceId();
final ComponentType type = bulletin.getSourceType();

View File

@ -41,6 +41,7 @@ import org.apache.nifi.web.api.dto.RegistryDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ClusterEntity;
import org.apache.nifi.web.api.entity.ComponentHistoryEntity;
import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.Entity;
@ -1057,6 +1058,39 @@ public class ControllerResource extends ApplicationResource {
// history
// -------
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("status/history")
@ApiOperation(
value = "Gets status history for the node",
notes = NON_GUARANTEED_ENDPOINT,
response = ComponentHistoryEntity.class,
authorizations = {
@Authorization(value = "Read - /controller")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getNodeStatusHistory() {
authorizeController(RequestAction.READ);
// replicate if cluster manager
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// generate the response
return generateOkResponse(serviceFacade.getNodeStatusHistory()).build();
}
/**
* Deletes flow history from the specified end date.
*

View File

@ -278,6 +278,22 @@ public class ControllerFacade implements Authorizable {
return flowController.getFlowManager().getPublicOutputPorts();
}
/**
* Returns the status history for the node.
*
* @return status history
*/
public StatusHistoryDTO getNodeStatusHistory() {
final boolean authorized = isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
final StatusHistoryDTO statusHistory = flowController.getNodeStatusHistory();
if (!authorized) {
statusHistory.getComponentDetails().put(ComponentStatusRepository.COMPONENT_DETAIL_TYPE, "Node");
}
return statusHistory;
}
/**
* Returns the status history for the specified processor.
*

View File

@ -52,10 +52,13 @@ import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.EntityFactory;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.entity.ActionEntity;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.security.token.NiFiAuthenticationToken;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -63,6 +66,7 @@ import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContextHolder;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
@ -244,6 +248,25 @@ public class StandardNiFiServiceFacadeTest {
}
}
@Test
public void testGetStatusHistory() {
// given
final Date generated = new Date();
final StatusHistoryDTO dto = new StatusHistoryDTO();
dto.setGenerated(generated);
final ControllerFacade controllerFacade = mock(ControllerFacade.class);
Mockito.when(controllerFacade.getNodeStatusHistory()).thenReturn(dto);
serviceFacade.setControllerFacade(controllerFacade);
// when
final StatusHistoryEntity result = serviceFacade.getNodeStatusHistory();
// then
Mockito.verify(controllerFacade).getNodeStatusHistory();
Assert.assertNotNull(result);
Assert.assertEquals(generated, result.getStatusHistory().getGenerated());
}
@Test
public void testGetActionApprovedThroughController() throws Exception {
// set the user

View File

@ -152,6 +152,12 @@
<i class="fa fa-history"></i>Flow Configuration History
</a>
</md-menu-item>
<md-menu-item layout-align="space-around center">
<a id="status-history-link"
ng-click="appCtrl.serviceProvider.headerCtrl.globalMenuCtrl.nodeStatusHistory.shell.launch();">
<i class="fa fa-area-chart"></i>Node Status History
</a>
</md-menu-item>
<md-menu-divider ng-if="appCtrl.nf.CanvasUtils.isManagedAuthorizer()"></md-menu-divider>
<md-menu-item layout-align="space-around center" ng-if="appCtrl.nf.CanvasUtils.isManagedAuthorizer()">
<a id="users-link" layout="row"

View File

@ -26,11 +26,12 @@
'nf.ParameterContexts',
'nf.PolicyManagement',
'nf.ClusterSummary',
'nf.StatusHistory',
'nf.ErrorHandler',
'nf.Settings',
'nf.CanvasUtils'],
function ($, nfCommon, nfQueueListing, nfShell, nfParameterContexts, nfPolicyManagement, nfClusterSummary, nfErrorHandler, nfSettings, nfCanvasUtils) {
return (nf.ng.Canvas.GlobalMenuCtrl = factory($, nfCommon, nfQueueListing, nfShell, nfPolicyManagement, nfClusterSummary, nfErrorHandler, nfSettings, nfCanvasUtils));
function ($, nfCommon, nfQueueListing, nfShell, nfParameterContexts, nfPolicyManagement, nfClusterSummary, nfStatusHistory, nfErrorHandler, nfSettings, nfCanvasUtils) {
return (nf.ng.Canvas.GlobalMenuCtrl = factory($, nfCommon, nfQueueListing, nfShell, nfPolicyManagement, nfClusterSummary, nfStatusHistory, nfErrorHandler, nfSettings, nfCanvasUtils));
});
} else if (typeof exports === 'object' && typeof module === 'object') {
module.exports = (nf.ng.Canvas.GlobalMenuCtrl =
@ -41,6 +42,7 @@
require('nf.ParameterContexts'),
require('nf.PolicyManagement'),
require('nf.ClusterSummary'),
require('nf.StatusHistory'),
require('nf.ErrorHandler'),
require('nf.Settings'),
require('nf.CanvasUtils')));
@ -52,11 +54,12 @@
root.nf.ParameterContexts,
root.nf.PolicyManagement,
root.nf.ClusterSummary,
root.nf.StatusHistory,
root.nf.ErrorHandler,
root.nf.Settings,
root.nf.CanvasUtils);
}
}(this, function ($, nfCommon, nfQueueListing, nfShell, nfParameterContexts, nfPolicyManagement, nfClusterSummary, nfErrorHandler, nfSettings, nfCanvasUtils) {
}(this, function ($, nfCommon, nfQueueListing, nfShell, nfParameterContexts, nfPolicyManagement, nfClusterSummary, nfStatusHistory, nfErrorHandler, nfSettings, nfCanvasUtils) {
'use strict';
return function (serviceProvider) {
@ -238,6 +241,26 @@
}
};
/**
* The node status history menu item controller.
*/
this.nodeStatusHistory = {
/**
* The node status history menu item's shell controller.
*/
shell: {
/**
* Launch the history shell.
*/
launch: function () {
nfStatusHistory.showNodeChart();
}
}
};
/**
* The users menu item controller.
*/

View File

@ -53,7 +53,8 @@
connection: 'Connection',
funnel: 'Funnel',
template: 'Template',
label: 'Label'
label: 'Label',
node: 'Node'
},
urls: {
api: '../nifi-api'
@ -78,6 +79,9 @@
},
'DATA_SIZE': function (d) {
return nfCommon.formatDataSize(d);
},
'FRACTION': function (d) {
return nfCommon.formatFloat(d / 1000000);
}
};
@ -163,6 +167,62 @@
}
};
/**
* Handles the status history response for node status history.
*
* @param {object} componentStatusHistory
*/
var handleNodeStatusHistoryResponse = function (componentStatusHistory) {
// update the last refreshed
$('#status-history-last-refreshed').text(componentStatusHistory.generated);
// initialize the status history
var statusHistory = {
type: 'Node',
instances: []
};
// get the descriptors
var descriptors = componentStatusHistory.fieldDescriptors;
statusHistory.details = componentStatusHistory.componentDetails;
statusHistory.selectedDescriptor = descriptors[0];
// ensure enough status snapshots
if (nfCommon.isDefinedAndNotNull(componentStatusHistory.aggregateSnapshots) && componentStatusHistory.aggregateSnapshots.length > 1) {
statusHistory.instances.push({
id: config.nifiInstanceId,
label: config.nifiInstanceLabel,
snapshots: componentStatusHistory.aggregateSnapshots
});
} else {
insufficientHistory();
return;
}
// get the status for each node in the cluster if applicable
$.each(componentStatusHistory.nodeSnapshots, function (_, nodeSnapshots) {
// ensure enough status snapshots
if (nfCommon.isDefinedAndNotNull(nodeSnapshots.statusSnapshots) && nodeSnapshots.statusSnapshots.length > 1) {
statusHistory.instances.push({
id: nodeSnapshots.nodeId,
label: nodeSnapshots.address + ':' + nodeSnapshots.apiPort,
snapshots: nodeSnapshots.statusSnapshots
});
}
});
// ensure we found eligible status history
if (statusHistory.instances.length > 0) {
// store the status history
$('#status-history-dialog').data('status-history', statusHistory);
// chart the status history
chart(statusHistory, descriptors);
} else {
insufficientHistory();
}
};
/**
* Shows an error message stating there is insufficient history available.
*/
@ -1024,6 +1084,8 @@
nfStatusHistory.showProcessGroupChart(statusHistory.groupId, statusHistory.id, statusHistory.selectedDescriptor);
} else if (statusHistory.type === config.type.remoteProcessGroup) {
nfStatusHistory.showRemoteProcessGroupChart(statusHistory.groupId, statusHistory.id, statusHistory.selectedDescriptor);
} else if (statusHistory.type === config.type.node) {
nfStatusHistory.showNodeChart();
} else {
nfStatusHistory.showConnectionChart(statusHistory.groupId, statusHistory.id, statusHistory.selectedDescriptor);
}
@ -1108,6 +1170,19 @@
}).fail(nfErrorHandler.handleAjaxError);
},
/**
* Shows the status history for the node.
*/
showNodeChart: function () {
$.ajax({
type: 'GET',
url: config.urls.api + '/controller/status/history',
dataType: 'json'
}).done(function (response) {
handleNodeStatusHistoryResponse(response.statusHistory);
}).fail(nfErrorHandler.handleAjaxError);
},
/**
* Shows the status history for the specified process group in this instance.
*