NIFI-4849: Implemented REST Endpoint and associated backend code to generate a Diagnostics Report for a Processor

Implemented review feedback. Refactored data model to make the API cleaner and delineate more along the lines of what permissions are required in order to see which details
Implementing review feedback
Removed sensitive information from the diagnostics reports
Fixed bug in merging logic for GCDiagnosticsSnapshots
This closes #2468
This commit is contained in:
Mark Payne 2018-02-02 12:16:36 -05:00 committed by Matt Gilman
parent 867ffdb52e
commit 844da06344
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
55 changed files with 3048 additions and 37 deletions

View File

@ -59,6 +59,8 @@ public interface FlowFileQueue {
*/
void purgeSwapFiles();
int getSwapFileCount();
/**
* Resets the comparator used by this queue to maintain order.
*
@ -115,6 +117,10 @@ public interface FlowFileQueue {
*/
QueueSize getUnacknowledgedQueueSize();
QueueSize getActiveQueueSize();
QueueSize getSwapQueueSize();
void acknowledge(FlowFileRecord flowFile);
void acknowledge(Collection<FlowFileRecord> flowFiles);
@ -125,6 +131,10 @@ public interface FlowFileQueue {
*/
boolean isFull();
boolean isAnyActiveFlowFilePenalized();
boolean isAllActiveFlowFilesPenalized();
/**
* places the given file into the queue
*

View File

@ -76,6 +76,15 @@ public interface ContentRepository {
*/
long getContainerUsableSpace(String containerName) throws IOException;
/**
* Returns the name of the FileStore that the given container is stored on, or <code>null</code>
* if not applicable or unable to determine the file store name
*
* @param containerName the name of the container
* @return the name of the FileStore
*/
String getContainerFileStoreName(String containerName);
/**
* Creates a new content claim
*

View File

@ -56,6 +56,14 @@ public interface FlowFileRepository extends Closeable {
*/
long getUsableStorageSpace() throws IOException;
/**
* Returns the name of the FileStore that the repository is stored on, or <code>null</code>
* if not applicable or unable to determine the file store name
*
* @return the name of the FileStore
*/
String getFileStoreName();
/**
* Updates the repository with the given RepositoryRecords.
*

View File

@ -16,9 +16,10 @@
*/
package org.apache.nifi.controller.status.history;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import java.util.Date;
import java.util.List;
import org.apache.nifi.controller.status.ProcessGroupStatus;
/**
* A repository for storing and retrieving components' historical status
@ -38,8 +39,9 @@ public interface ComponentStatusRepository {
* Captures the status information provided in the given report
*
* @param rootGroupStatus status of root group
* @param garbageCollectionStatus status of garbage collection
*/
void capture(ProcessGroupStatus rootGroupStatus);
void capture(ProcessGroupStatus rootGroupStatus, List<GarbageCollectionStatus> garbageCollectionStatus);
/**
* Captures the status information provided in the given report, providing a
@ -48,8 +50,9 @@ public interface ComponentStatusRepository {
*
* @param rootGroupStatus status
* @param timestamp timestamp of capture
* @param garbageCollectionStatus status of garbage collection
*/
void capture(ProcessGroupStatus rootGroupStatus, Date timestamp);
void capture(ProcessGroupStatus rootGroupStatus, List<GarbageCollectionStatus> garbageCollectionStatus, Date timestamp);
/**
* @return the Date at which the latest capture was performed
@ -127,4 +130,6 @@ public interface ComponentStatusRepository {
* period
*/
StatusHistory getRemoteProcessGroupStatusHistory(String remoteGroupId, Date start, Date end, int preferredDataPoints);
GarbageCollectionHistory getGarbageCollectionHistory(Date start, Date end);
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import java.util.List;
import java.util.Set;
public interface GarbageCollectionHistory {
Set<String> getMemoryManagerNames();
List<GarbageCollectionStatus> getGarbageCollectionStatuses(String memoryManagerName);
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import java.util.Date;
public interface GarbageCollectionStatus {
/**
* @return the name of the JVM memory manager that this status pertains to
*/
String getMemoryManagerName();
/**
* @return timestamp of when the status was created
*/
Date getTimestamp();
/**
* @return the number of times that garbage collection has occurred
*/
long getCollectionCount();
/**
* @return the number of milliseconds spent performing garbage collection
*/
long getCollectionMillis();
}

View File

@ -199,6 +199,15 @@ public interface ProvenanceRepository extends ProvenanceEventRepository {
*/
long getContainerCapacity(String containerName) throws IOException;
/**
* Returns the name of the FileStore that the given container is stored on, or <code>null</code>
* if not applicable or unable to determine the file store name
*
* @param containerName the name of the container
* @return the name of the FileStore
*/
String getContainerFileStoreName(String containerName);
/**
* @param containerName to check space on
* @return the number of bytes available to be used used by the storage

View File

@ -159,9 +159,14 @@ public class MockProvenanceRepository implements ProvenanceRepository {
return 0;
}
@Override
public String getContainerFileStoreName(String containerName) {
return null;
}
@Override
public Set<String> getContainerNames() {
return new HashSet<String>();
return new HashSet<>();
}
@Override

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import javax.xml.bind.annotation.XmlType;
import org.apache.nifi.web.api.dto.BundleDTO;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "classLoaderDiagnostics")
public class ClassLoaderDiagnosticsDTO {
private BundleDTO bundle;
private ClassLoaderDiagnosticsDTO parentClassLoader;
@ApiModelProperty("Information about the Bundle that the ClassLoader belongs to, if any")
public BundleDTO getBundle() {
return bundle;
}
public void setBundle(BundleDTO bundle) {
this.bundle = bundle;
}
@ApiModelProperty("A ClassLoaderDiagnosticsDTO that provides information about the parent ClassLoader")
public ClassLoaderDiagnosticsDTO getParentClassLoader() {
return parentClassLoader;
}
public void setParentClassLoader(ClassLoaderDiagnosticsDTO parentClassLoader) {
this.parentClassLoader = parentClassLoader;
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import javax.xml.bind.annotation.XmlType;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "connectionDiagnostics")
public class ConnectionDiagnosticsDTO {
private ConnectionDTO connection;
private int totalFlowFileCount;
private long totalByteCount;
private int activeQueueFlowFileCount;
private long activeQueueByteCount;
private int swapFlowFileCount;
private long swapByteCount;
private int swapFiles;
private int inFlightFlowFileCount;
private long inFlightByteCount;
private Boolean allActiveQueueFlowFilesPenalized;
private Boolean anyActiveQueueFlowFilesPenalized;
@ApiModelProperty("Information about the Connection")
public ConnectionDTO getConnection() {
return connection;
}
public void setConnection(ConnectionDTO connection) {
this.connection = connection;
}
@ApiModelProperty("Total number of FlowFiles owned by the Connection")
public int getTotalFlowFileCount() {
return totalFlowFileCount;
}
public void setTotalFlowFileCount(int totalFlowFileCount) {
this.totalFlowFileCount = totalFlowFileCount;
}
@ApiModelProperty("Total number of bytes that make up the content for the FlowFiles owned by this Connection")
public long getTotalByteCount() {
return totalByteCount;
}
public void setTotalByteCount(long totalByteCount) {
this.totalByteCount = totalByteCount;
}
@ApiModelProperty("Total number of FlowFiles that exist in the Connection's Active Queue, immediately available to be offered up to a component")
public int getActiveQueueFlowFileCount() {
return activeQueueFlowFileCount;
}
public void setActiveQueueFlowFileCount(int activeQueueFlowFileCount) {
this.activeQueueFlowFileCount = activeQueueFlowFileCount;
}
@ApiModelProperty("Total number of bytes that make up the content for the FlowFiles that are present in the Connection's Active Queue")
public long getActiveQueueByteCount() {
return activeQueueByteCount;
}
public void setActiveQueueByteCount(long activeQueueByteCount) {
this.activeQueueByteCount = activeQueueByteCount;
}
@ApiModelProperty("The total number of FlowFiles that are swapped out for this Connection")
public int getSwapFlowFileCount() {
return swapFlowFileCount;
}
public void setSwapFlowFileCount(int swapFlowFileCount) {
this.swapFlowFileCount = swapFlowFileCount;
}
@ApiModelProperty("Total number of bytes that make up the content for the FlowFiles that are swapped out to disk for the Connection")
public long getSwapByteCount() {
return swapByteCount;
}
public void setSwapByteCount(long swapByteCount) {
this.swapByteCount = swapByteCount;
}
@ApiModelProperty("The number of Swap Files that exist for this Connection")
public int getSwapFiles() {
return swapFiles;
}
public void setSwapFiles(int swapFiles) {
this.swapFiles = swapFiles;
}
@ApiModelProperty("The number of In-Flight FlowFiles for this Connection. These are FlowFiles that belong to the connection but are currently being operated on by a Processor, Port, etc.")
public int getInFlightFlowFileCount() {
return inFlightFlowFileCount;
}
public void setInFlightFlowFileCount(int inFlightFlowFileCount) {
this.inFlightFlowFileCount = inFlightFlowFileCount;
}
@ApiModelProperty("The number bytes that make up the content of the FlowFiles that are In-Flight")
public long getInFlightByteCount() {
return inFlightByteCount;
}
public void setInFlightByteCount(long inFlightByteCount) {
this.inFlightByteCount = inFlightByteCount;
}
@ApiModelProperty("Whether or not all of the FlowFiles in the Active Queue are penalized")
public Boolean getAllActiveQueueFlowFilesPenalized() {
return allActiveQueueFlowFilesPenalized;
}
public void setAllActiveQueueFlowFilesPenalized(Boolean allFlowFilesPenalized) {
this.allActiveQueueFlowFilesPenalized = allFlowFilesPenalized;
}
@ApiModelProperty("Whether or not any of the FlowFiles in the Active Queue are penalized")
public Boolean getAnyActiveQueueFlowFilesPenalized() {
return anyActiveQueueFlowFilesPenalized;
}
public void setAnyActiveQueueFlowFilesPenalized(Boolean anyFlowFilesPenalized) {
this.anyActiveQueueFlowFilesPenalized = anyFlowFilesPenalized;
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import javax.xml.bind.annotation.XmlType;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "controllerServiceDiagnostics")
public class ControllerServiceDiagnosticsDTO {
private ControllerServiceEntity controllerService;
private ClassLoaderDiagnosticsDTO classLoaderDiagnostics;
public void setControllerService(final ControllerServiceEntity controllerService) {
this.controllerService = controllerService;
}
@ApiModelProperty("The Controller Service")
public ControllerServiceEntity getControllerService() {
return controllerService;
}
public void setClassLoaderDiagnostics(ClassLoaderDiagnosticsDTO classLoaderDiagnostics) {
this.classLoaderDiagnostics = classLoaderDiagnostics;
}
@ApiModelProperty("Information about the Controller Service's Class Loader")
public ClassLoaderDiagnosticsDTO getClassLoaderDiagnostics() {
return classLoaderDiagnostics;
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import java.util.Date;
import javax.xml.bind.annotation.XmlType;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "gcDiagnosticsSnapshot")
public class GCDiagnosticsSnapshotDTO implements Cloneable {
private Date timestamp;
private Long collectionCount;
private Long collectionMillis;
@ApiModelProperty("The timestamp of when the Snapshot was taken")
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
@ApiModelProperty("The number of times that Garbage Collection has occurred")
public Long getCollectionCount() {
return collectionCount;
}
public void setCollectionCount(Long collectionCount) {
this.collectionCount = collectionCount;
}
@ApiModelProperty("The number of milliseconds that the Garbage Collector spent performing Garbage Collection duties")
public Long getCollectionMillis() {
return collectionMillis;
}
public void setCollectionMillis(Long collectionMillis) {
this.collectionMillis = collectionMillis;
}
@Override
public GCDiagnosticsSnapshotDTO clone() {
final GCDiagnosticsSnapshotDTO clone = new GCDiagnosticsSnapshotDTO();
clone.timestamp = timestamp;
clone.collectionCount = collectionCount;
clone.collectionMillis = collectionMillis;
return clone;
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import java.util.List;
import java.util.stream.Collectors;
import javax.xml.bind.annotation.XmlType;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "garbageCollectionDiagnostics")
public class GarbageCollectionDiagnosticsDTO implements Cloneable {
private String memoryManagerName;
private List<GCDiagnosticsSnapshotDTO> snapshots;
@ApiModelProperty("The name of the Memory Manager that this Garbage Collection information pertains to")
public String getMemoryManagerName() {
return memoryManagerName;
}
public void setMemoryManagerName(String memoryManagerName) {
this.memoryManagerName = memoryManagerName;
}
@ApiModelProperty("A list of snapshots that have been taken to determine the health of the JVM's heap")
public List<GCDiagnosticsSnapshotDTO> getSnapshots() {
return snapshots;
}
public void setSnapshots(List<GCDiagnosticsSnapshotDTO> snapshots) {
this.snapshots = snapshots;
}
@Override
protected GarbageCollectionDiagnosticsDTO clone() {
final GarbageCollectionDiagnosticsDTO clone = new GarbageCollectionDiagnosticsDTO();
clone.memoryManagerName = memoryManagerName;
if (snapshots != null) {
clone.snapshots = snapshots.stream()
.map(GCDiagnosticsSnapshotDTO::clone)
.collect(Collectors.toList());
}
return clone;
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import javax.xml.bind.annotation.XmlType;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "jvmControllerDiagnosticsSnapshot")
public class JVMControllerDiagnosticsSnapshotDTO implements Cloneable {
private Boolean primaryNode;
private Boolean clusterCoordinator;
private Integer maxTimerDrivenThreads;
private Integer maxEventDrivenThreads;
@ApiModelProperty("Whether or not this node is primary node")
public Boolean getPrimaryNode() {
return primaryNode;
}
public void setPrimaryNode(Boolean primaryNode) {
this.primaryNode = primaryNode;
}
@ApiModelProperty("Whether or not this node is cluster coordinator")
public Boolean getClusterCoordinator() {
return clusterCoordinator;
}
public void setClusterCoordinator(Boolean clusterCoordinator) {
this.clusterCoordinator = clusterCoordinator;
}
@ApiModelProperty("The maximum number of timer-driven threads")
public Integer getMaxTimerDrivenThreads() {
return maxTimerDrivenThreads;
}
public void setMaxTimerDrivenThreads(Integer maxTimerDrivenThreads) {
this.maxTimerDrivenThreads = maxTimerDrivenThreads;
}
@ApiModelProperty("The maximum number of event-driven threads")
public Integer getMaxEventDrivenThreads() {
return maxEventDrivenThreads;
}
public void setMaxEventDrivenThreads(Integer maxEventDrivenThreads) {
this.maxEventDrivenThreads = maxEventDrivenThreads;
}
@Override
public JVMControllerDiagnosticsSnapshotDTO clone() {
final JVMControllerDiagnosticsSnapshotDTO clone = new JVMControllerDiagnosticsSnapshotDTO();
clone.clusterCoordinator = clusterCoordinator;
clone.primaryNode = primaryNode;
clone.maxEventDrivenThreads = maxEventDrivenThreads;
clone.maxTimerDrivenThreads = maxTimerDrivenThreads;
return clone;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import java.util.List;
import javax.xml.bind.annotation.XmlType;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "jvmDiagnostics")
public class JVMDiagnosticsDTO {
private Boolean clustered;
private Boolean connected;
private JVMDiagnosticsSnapshotDTO aggregateSnapshot;
private List<NodeJVMDiagnosticsSnapshotDTO> nodeSnapshots;
@ApiModelProperty("Whether or not the NiFi instance is clustered")
public Boolean getClustered() {
return clustered;
}
public void setClustered(Boolean clustered) {
this.clustered = clustered;
}
@ApiModelProperty("Whether or not the node is connected to the cluster")
public Boolean getConnected() {
return connected;
}
public void setConnected(Boolean connected) {
this.connected = connected;
}
@ApiModelProperty("Aggregate JVM diagnostic information about the entire cluster")
public JVMDiagnosticsSnapshotDTO getAggregateSnapshot() {
return aggregateSnapshot;
}
public void setAggregateSnapshot(JVMDiagnosticsSnapshotDTO aggregateSnapshot) {
this.aggregateSnapshot = aggregateSnapshot;
}
@ApiModelProperty("Node-wise breakdown of JVM diagnostic information")
public List<NodeJVMDiagnosticsSnapshotDTO> getNodeSnapshots() {
return nodeSnapshots;
}
public void setNodeSnapshots(List<NodeJVMDiagnosticsSnapshotDTO> nodeSnapshots) {
this.nodeSnapshots = nodeSnapshots;
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import javax.xml.bind.annotation.XmlType;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "jvmDiagnosticsSnapshot")
public class JVMDiagnosticsSnapshotDTO implements Cloneable {
private JVMSystemDiagnosticsSnapshotDTO systemDiagnosticsDto;
private JVMFlowDiagnosticsSnapshotDTO flowDiagnosticsDto;
private JVMControllerDiagnosticsSnapshotDTO controllerDiagnosticsDto;
@ApiModelProperty("System-related diagnostics information")
public JVMSystemDiagnosticsSnapshotDTO getSystemDiagnosticsDto() {
return systemDiagnosticsDto;
}
public void setSystemDiagnosticsDto(JVMSystemDiagnosticsSnapshotDTO systemDiagnosticsDto) {
this.systemDiagnosticsDto = systemDiagnosticsDto;
}
@ApiModelProperty("Flow-related diagnostics information")
public JVMFlowDiagnosticsSnapshotDTO getFlowDiagnosticsDto() {
return flowDiagnosticsDto;
}
public void setFlowDiagnosticsDto(JVMFlowDiagnosticsSnapshotDTO flowDiagnosticsDto) {
this.flowDiagnosticsDto = flowDiagnosticsDto;
}
@ApiModelProperty("Controller-related diagnostics information")
public JVMControllerDiagnosticsSnapshotDTO getControllerDiagnostics() {
return controllerDiagnosticsDto;
}
public void setControllerDiagnostics(JVMControllerDiagnosticsSnapshotDTO controllerDiagnostics) {
this.controllerDiagnosticsDto = controllerDiagnostics;
}
@Override
public JVMDiagnosticsSnapshotDTO clone() {
final JVMDiagnosticsSnapshotDTO clone = new JVMDiagnosticsSnapshotDTO();
clone.systemDiagnosticsDto = systemDiagnosticsDto == null ? null : systemDiagnosticsDto.clone();
clone.flowDiagnosticsDto = flowDiagnosticsDto == null ? null : flowDiagnosticsDto.clone();
clone.controllerDiagnosticsDto = controllerDiagnosticsDto == null ? null : controllerDiagnosticsDto.clone();
return clone;
}
@XmlType(name = "versionInfo")
public static class VersionInfoDTO implements Cloneable {
private String niFiVersion;
private String javaVendor;
private String javaVersion;
private String javaVmVendor;
private String osName;
private String osVersion;
private String osArchitecture;
@ApiModelProperty("The version of this NiFi.")
public String getNiFiVersion() {
return niFiVersion;
}
public void setNiFiVersion(String niFiVersion) {
this.niFiVersion = niFiVersion;
}
@ApiModelProperty("Java vendor")
public String getJavaVendor() {
return javaVendor;
}
public void setJavaVendor(String javaVendor) {
this.javaVendor = javaVendor;
}
@ApiModelProperty("Java VM Vendor")
public String getJavaVmVendor() {
return javaVmVendor;
}
public void setJavaVmVendor(String javaVmVendor) {
this.javaVmVendor = javaVmVendor;
}
@ApiModelProperty("Java version")
public String getJavaVersion() {
return javaVersion;
}
public void setJavaVersion(String javaVersion) {
this.javaVersion = javaVersion;
}
@ApiModelProperty("Host operating system name")
public String getOsName() {
return osName;
}
public void setOsName(String osName) {
this.osName = osName;
}
@ApiModelProperty("Host operating system version")
public String getOsVersion() {
return osVersion;
}
public void setOsVersion(String osVersion) {
this.osVersion = osVersion;
}
@ApiModelProperty("Host operating system architecture")
public String getOsArchitecture() {
return osArchitecture;
}
public void setOsArchitecture(String osArchitecture) {
this.osArchitecture = osArchitecture;
}
@Override
public VersionInfoDTO clone() {
final VersionInfoDTO other = new VersionInfoDTO();
other.setNiFiVersion(getNiFiVersion());
other.setJavaVendor(getJavaVendor());
other.setJavaVersion(getJavaVersion());
other.setOsName(getOsName());
other.setOsVersion(getOsVersion());
other.setOsArchitecture(getOsArchitecture());
return other;
}
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import java.util.HashSet;
import java.util.Set;
import javax.xml.bind.annotation.XmlType;
import org.apache.nifi.web.api.dto.BundleDTO;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "jvmFlowDiagnosticsSnapshot")
public class JVMFlowDiagnosticsSnapshotDTO implements Cloneable {
private String uptime;
private String timeZone;
private Integer activeTimerDrivenThreads;
private Integer activeEventDrivenThreads;
private Set<BundleDTO> bundlesLoaded;
@ApiModelProperty("How long this node has been running, formatted as hours:minutes:seconds.milliseconds")
public String getUptime() {
return uptime;
}
public void setUptime(String uptime) {
this.uptime = uptime;
}
@ApiModelProperty("The name of the Time Zone that is configured, if available")
public String getTimeZone() {
return timeZone;
}
public void setTimeZone(String timeZone) {
this.timeZone = timeZone;
}
@ApiModelProperty("The number of timer-driven threads that are active")
public Integer getActiveTimerDrivenThreads() {
return activeTimerDrivenThreads;
}
public void setActiveTimerDrivenThreads(Integer activeTimerDrivenThreads) {
this.activeTimerDrivenThreads = activeTimerDrivenThreads;
}
@ApiModelProperty("The number of event-driven threads that are active")
public Integer getActiveEventDrivenThreads() {
return activeEventDrivenThreads;
}
public void setActiveEventDrivenThreads(Integer activeEventDrivenThreads) {
this.activeEventDrivenThreads = activeEventDrivenThreads;
}
@ApiModelProperty("The NiFi Bundles (NARs) that are loaded by NiFi")
public Set<BundleDTO> getBundlesLoaded() {
return bundlesLoaded;
}
public void setBundlesLoaded(Set<BundleDTO> bundlesLoaded) {
this.bundlesLoaded = bundlesLoaded;
}
@Override
public JVMFlowDiagnosticsSnapshotDTO clone() {
final JVMFlowDiagnosticsSnapshotDTO clone = new JVMFlowDiagnosticsSnapshotDTO();
clone.activeEventDrivenThreads = activeEventDrivenThreads;
clone.activeTimerDrivenThreads = activeTimerDrivenThreads;
clone.bundlesLoaded = bundlesLoaded == null ? null : new HashSet<>(bundlesLoaded);
clone.timeZone = timeZone;
clone.uptime = uptime;
return clone;
}
}

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.xml.bind.annotation.XmlType;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "jvmSystemDiagnosticsSnapshot")
public class JVMSystemDiagnosticsSnapshotDTO implements Cloneable {
private RepositoryUsageDTO flowFileRepositoryStorageUsage;
private Set<RepositoryUsageDTO> contentRepositoryStorageUsage;
private Set<RepositoryUsageDTO> provenanceRepositoryStorageUsage;
private Long maxHeapBytes;
private String maxHeap;
private List<GarbageCollectionDiagnosticsDTO> garbageCollectionDiagnostics;
private Integer cpuCores;
private Double cpuLoadAverage;
private Long physicalMemoryBytes;
private String physicalMemory;
// Only available if we get OS MXBean and can create class com.sun.management.UnixOperatingSystemMXBean and the OS MXBean
// is of this type.
private Long openFileDescriptors;
private Long maxOpenFileDescriptors;
@ApiModelProperty("Information about the FlowFile Repository's usage")
public RepositoryUsageDTO getFlowFileRepositoryStorageUsage() {
return flowFileRepositoryStorageUsage;
}
public void setFlowFileRepositoryStorageUsage(RepositoryUsageDTO flowFileRepositoryStorageUsage) {
this.flowFileRepositoryStorageUsage = flowFileRepositoryStorageUsage;
}
@ApiModelProperty("Information about the Content Repository's usage")
public Set<RepositoryUsageDTO> getContentRepositoryStorageUsage() {
return contentRepositoryStorageUsage;
}
public void setContentRepositoryStorageUsage(Set<RepositoryUsageDTO> contentRepositoryStorageUsage) {
this.contentRepositoryStorageUsage = contentRepositoryStorageUsage;
}
@ApiModelProperty("Information about the Provenance Repository's usage")
public Set<RepositoryUsageDTO> getProvenanceRepositoryStorageUsage() {
return provenanceRepositoryStorageUsage;
}
public void setProvenanceRepositoryStorageUsage(Set<RepositoryUsageDTO> provenanceRepositoryStorageUsage) {
this.provenanceRepositoryStorageUsage = provenanceRepositoryStorageUsage;
}
@ApiModelProperty("The maximum number of bytes that the JVM heap is configured to use for heap")
public Long getMaxHeapBytes() {
return maxHeapBytes;
}
public void setMaxHeapBytes(Long heapBytes) {
this.maxHeapBytes = heapBytes;
}
@ApiModelProperty("The maximum number of bytes that the JVM heap is configured to use, as a human-readable value")
public String getMaxHeap() {
return maxHeap;
}
public void setMaxHeap(String maxHeap) {
this.maxHeap = maxHeap;
}
@ApiModelProperty("The number of CPU Cores available on the system")
public Integer getCpuCores() {
return cpuCores;
}
public void setCpuCores(Integer cpuCores) {
this.cpuCores = cpuCores;
}
@ApiModelProperty("The 1-minute CPU Load Average")
public Double getCpuLoadAverage() {
return cpuLoadAverage;
}
public void setCpuLoadAverage(Double cpuLoadAverage) {
this.cpuLoadAverage = cpuLoadAverage;
}
@ApiModelProperty("The number of bytes of RAM available on the system")
public Long getPhysicalMemoryBytes() {
return physicalMemoryBytes;
}
public void setPhysicalMemoryBytes(Long memoryBytes) {
this.physicalMemoryBytes = memoryBytes;
}
@ApiModelProperty("The number of bytes of RAM available on the system as a human-readable value")
public String getPhysicalMemory() {
return physicalMemory;
}
public void setPhysicalMemory(String memory) {
this.physicalMemory = memory;
}
@ApiModelProperty("The number of files that are open by the NiFi process")
public Long getOpenFileDescriptors() {
return openFileDescriptors;
}
public void setOpenFileDescriptors(Long openFileDescriptors) {
this.openFileDescriptors = openFileDescriptors;
}
@ApiModelProperty("The maximum number of open file descriptors that are available to each process")
public Long getMaxOpenFileDescriptors() {
return maxOpenFileDescriptors;
}
public void setMaxOpenFileDescriptors(Long maxOpenFileDescriptors) {
this.maxOpenFileDescriptors = maxOpenFileDescriptors;
}
@ApiModelProperty("Diagnostic information about the JVM's garbage collections")
public List<GarbageCollectionDiagnosticsDTO> getGarbageCollectionDiagnostics() {
return garbageCollectionDiagnostics;
}
public void setGarbageCollectionDiagnostics(List<GarbageCollectionDiagnosticsDTO> garbageCollectionDiagnostics) {
this.garbageCollectionDiagnostics = garbageCollectionDiagnostics;
}
@Override
public JVMSystemDiagnosticsSnapshotDTO clone() {
final JVMSystemDiagnosticsSnapshotDTO clone = new JVMSystemDiagnosticsSnapshotDTO();
clone.contentRepositoryStorageUsage = cloneRepoUsage(contentRepositoryStorageUsage);
clone.cpuCores = cpuCores;
clone.cpuLoadAverage = cpuLoadAverage;
clone.flowFileRepositoryStorageUsage = flowFileRepositoryStorageUsage == null ? null : flowFileRepositoryStorageUsage.clone();
clone.maxHeap = maxHeap;
clone.maxHeapBytes = maxHeapBytes;
clone.maxOpenFileDescriptors = maxOpenFileDescriptors;
clone.openFileDescriptors = openFileDescriptors;
clone.physicalMemory = physicalMemory;
clone.physicalMemoryBytes = physicalMemoryBytes;
clone.provenanceRepositoryStorageUsage = cloneRepoUsage(provenanceRepositoryStorageUsage);
if (garbageCollectionDiagnostics != null) {
clone.garbageCollectionDiagnostics = garbageCollectionDiagnostics.stream()
.map(gcDiag -> gcDiag.clone())
.collect(Collectors.toList());
}
return clone;
}
private static Set<RepositoryUsageDTO> cloneRepoUsage(final Set<RepositoryUsageDTO> repoUsage) {
if (repoUsage == null) {
return null;
}
return repoUsage.stream()
.map(usage -> usage.clone())
.collect(Collectors.toSet());
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import javax.xml.bind.annotation.XmlType;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "nodeGCDiagnosticsSnapshot")
public class NodeGCDiagnosticsSnapshotDTO {
private String nodeId;
private String address;
private Integer apiPort;
private GCDiagnosticsSnapshotDTO snapshot;
@ApiModelProperty("The unique ID that identifies the node")
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
@ApiModelProperty("The API address of the node")
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@ApiModelProperty("The API port used to communicate with the node")
public Integer getApiPort() {
return apiPort;
}
public void setApiPort(Integer apiPort) {
this.apiPort = apiPort;
}
@ApiModelProperty("The Garbage Collection Diagnostics Snapshot")
public GCDiagnosticsSnapshotDTO getSnapshot() {
return snapshot;
}
public void setSnapshot(GCDiagnosticsSnapshotDTO snapshot) {
this.snapshot = snapshot;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import javax.xml.bind.annotation.XmlType;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "nodeJVMDiagnosticsSnapshot")
public class NodeJVMDiagnosticsSnapshotDTO {
private String nodeId;
private String address;
private Integer apiPort;
private JVMDiagnosticsSnapshotDTO snapshot;
@ApiModelProperty("The unique ID that identifies the node")
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
@ApiModelProperty("The API address of the node")
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
@ApiModelProperty("The API port used to communicate with the node")
public Integer getApiPort() {
return apiPort;
}
public void setApiPort(Integer apiPort) {
this.apiPort = apiPort;
}
@ApiModelProperty("The JVM Diagnostics Snapshot")
public JVMDiagnosticsSnapshotDTO getSnapshot() {
return snapshot;
}
public void setSnapshot(JVMDiagnosticsSnapshotDTO snapshot) {
this.snapshot = snapshot;
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import java.util.List;
import java.util.Set;
import javax.xml.bind.annotation.XmlType;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.status.ProcessorStatusDTO;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "processorDiagnostics")
public class ProcessorDiagnosticsDTO {
private ProcessorDTO processor;
private ProcessorStatusDTO processorStatus;
private Set<ControllerServiceDiagnosticsDTO> referencedControllerServices;
private Set<ConnectionDiagnosticsDTO> incomingConnections;
private Set<ConnectionDiagnosticsDTO> outgoingConnections;
private JVMDiagnosticsDTO jvmDiagnostics;
private List<ThreadDumpDTO> threadDumps;
private ClassLoaderDiagnosticsDTO classLoaderDiagnostics;
@ApiModelProperty("Information about the Processor for which the Diagnostic Report is generated")
public ProcessorDTO getProcessor() {
return processor;
}
public void setProcessor(ProcessorDTO processor) {
this.processor = processor;
}
@ApiModelProperty("The Status for the Processor for which the Diagnostic Report is generated")
public ProcessorStatusDTO getProcessorStatus() {
return processorStatus;
}
public void setProcessorStatus(ProcessorStatusDTO processorStatus) {
this.processorStatus = processorStatus;
}
@ApiModelProperty("Diagnostic Information about all Controller Services that the Processor is referencing")
public Set<ControllerServiceDiagnosticsDTO> getReferencedControllerServices() {
return referencedControllerServices;
}
public void setReferencedControllerServices(Set<ControllerServiceDiagnosticsDTO> referencedControllerServices) {
this.referencedControllerServices = referencedControllerServices;
}
@ApiModelProperty("Diagnostic Information about all incoming Connections")
public Set<ConnectionDiagnosticsDTO> getIncomingConnections() {
return incomingConnections;
}
public void setIncomingConnections(Set<ConnectionDiagnosticsDTO> incomingConnections) {
this.incomingConnections = incomingConnections;
}
@ApiModelProperty("Diagnostic Information about all outgoing Connections")
public Set<ConnectionDiagnosticsDTO> getOutgoingConnections() {
return outgoingConnections;
}
public void setOutgoingConnections(Set<ConnectionDiagnosticsDTO> outgoingConnections) {
this.outgoingConnections = outgoingConnections;
}
@ApiModelProperty("Diagnostic Information about the JVM and system-level diagnostics")
public JVMDiagnosticsDTO getJvmDiagnostics() {
return jvmDiagnostics;
}
public void setJvmDiagnostics(JVMDiagnosticsDTO jvmDiagnostics) {
this.jvmDiagnostics = jvmDiagnostics;
}
@ApiModelProperty("Thread Dumps that were taken of the threads that are active in the Processor")
public List<ThreadDumpDTO> getThreadDumps() {
return threadDumps;
}
public void setThreadDumps(List<ThreadDumpDTO> threadDumps) {
this.threadDumps = threadDumps;
}
@ApiModelProperty("Information about the Controller Service's Class Loader")
public ClassLoaderDiagnosticsDTO getClassLoaderDiagnostics() {
return classLoaderDiagnostics;
}
public void setClassLoaderDiagnostics(ClassLoaderDiagnosticsDTO classLoaderDiagnostics) {
this.classLoaderDiagnostics = classLoaderDiagnostics;
}
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import javax.xml.bind.annotation.XmlType;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "repositoryUsage")
public class RepositoryUsageDTO implements Cloneable {
private String name;
private String fileStoreHash;
private String freeSpace;
private String totalSpace;
private Long freeSpaceBytes;
private Long totalSpaceBytes;
private String utilization;
@ApiModelProperty("The name of the repository")
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@ApiModelProperty("A SHA-256 hash of the File Store name/path that is used to store the repository's data. This information is exposed as a hash in order to avoid "
+ "exposing potentially sensitive information that is not generally relevant. What is typically relevant is whether or not multiple repositories on the same node are "
+ "using the same File Store, as this indicates that the repositories are competing for the resources of the backing disk/storage mechanism.")
public String getFileStoreHash() {
return fileStoreHash;
}
public void setFileStoreHash(String fileStore) {
this.fileStoreHash = fileStore;
}
@ApiModelProperty("Amount of free space.")
public String getFreeSpace() {
return freeSpace;
}
public void setFreeSpace(String freeSpace) {
this.freeSpace = freeSpace;
}
@ApiModelProperty("Amount of total space.")
public String getTotalSpace() {
return totalSpace;
}
public void setTotalSpace(String totalSpace) {
this.totalSpace = totalSpace;
}
@ApiModelProperty("Utilization of this storage location.")
public String getUtilization() {
return utilization;
}
public void setUtilization(String utilization) {
this.utilization = utilization;
}
@ApiModelProperty("The number of bytes of free space.")
public Long getFreeSpaceBytes() {
return freeSpaceBytes;
}
public void setFreeSpaceBytes(Long freeSpaceBytes) {
this.freeSpaceBytes = freeSpaceBytes;
}
@ApiModelProperty("The number of bytes of total space.")
public Long getTotalSpaceBytes() {
return totalSpaceBytes;
}
public void setTotalSpaceBytes(Long totalSpaceBytes) {
this.totalSpaceBytes = totalSpaceBytes;
}
@Override
public RepositoryUsageDTO clone() {
final RepositoryUsageDTO clone = new RepositoryUsageDTO();
clone.fileStoreHash = fileStoreHash;
clone.freeSpace = freeSpace;
clone.freeSpaceBytes = freeSpaceBytes;
clone.name = name;
clone.totalSpace = totalSpace;
clone.totalSpaceBytes = totalSpaceBytes;
clone.utilization = utilization;
return clone;
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.diagnostics;
import javax.xml.bind.annotation.XmlType;
import io.swagger.annotations.ApiModelProperty;
@XmlType(name = "threadDump")
public class ThreadDumpDTO {
private String nodeId;
private String nodeAddress;
private Integer apiPort;
private String stackTrace;
private String threadName;
private long threadActiveMillis;
@ApiModelProperty("The ID of the node in the cluster")
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
@ApiModelProperty("The address of the node in the cluster")
public String getNodeAddress() {
return nodeAddress;
}
public void setNodeAddress(String nodeAddress) {
this.nodeAddress = nodeAddress;
}
@ApiModelProperty("The port the node is listening for API requests.")
public Integer getApiPort() {
return apiPort;
}
public void setApiPort(Integer port) {
this.apiPort = port;
}
@ApiModelProperty("The stack trace for the thread")
public String getStackTrace() {
return stackTrace;
}
public void setStackTrace(String stackTrace) {
this.stackTrace = stackTrace;
}
@ApiModelProperty("The name of the thread")
public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
@ApiModelProperty("The number of milliseconds that the thread has been executing in the Processor")
public long getThreadActiveMillis() {
return threadActiveMillis;
}
public void setThreadActiveMillis(long threadActiveMillis) {
this.threadActiveMillis = threadActiveMillis;
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
import io.swagger.annotations.ApiModelProperty;
@XmlRootElement(name = "processorDiagnosticsEntity")
public class ProcessorDiagnosticsEntity extends ComponentEntity implements Permissible<ProcessorDiagnosticsDTO> {
private ProcessorDiagnosticsDTO processorDiagnostics;
@Override
@ApiModelProperty("The Processor Diagnostics")
public ProcessorDiagnosticsDTO getComponent() {
return processorDiagnostics;
}
@Override
public void setComponent(ProcessorDiagnosticsDTO processorDiagnostics) {
this.processorDiagnostics = processorDiagnostics;
}
}

View File

@ -49,6 +49,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.PortStatusEndpointMer
import org.apache.nifi.cluster.coordination.http.endpoints.PrioritizerTypesEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessGroupsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorDiagnosticsEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorStatusEndpointMerger;
import org.apache.nifi.cluster.coordination.http.endpoints.ProcessorTypesEndpointMerger;
@ -155,6 +156,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper {
endpointMergers.add(new AccessPolicyEndpointMerger());
endpointMergers.add(new SearchUsersEndpointMerger());
endpointMergers.add(new VariableRegistryEndpointMerger());
endpointMergers.add(new ProcessorDiagnosticsEndpointMerger(snapshotMillis));
}
@Override

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.coordination.http.endpoints;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.ProcessorDiagnosticsEntityMerger;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
public class ProcessorDiagnosticsEndpointMerger implements EndpointResponseMerger {
public static final Pattern PROCESSOR_DIAGNOSTICS_URI_PATTERN = Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}/diagnostics");
private final ProcessorDiagnosticsEntityMerger diagnosticsEntityMerger;
public ProcessorDiagnosticsEndpointMerger(final long componentStatusSnapshotMillis) {
diagnosticsEntityMerger = new ProcessorDiagnosticsEntityMerger(componentStatusSnapshotMillis);
}
@Override
public boolean canHandle(final URI uri, final String method) {
if (!"GET".equalsIgnoreCase(method)) {
return false;
}
return PROCESSOR_DIAGNOSTICS_URI_PATTERN.matcher(uri.getPath()).matches();
}
@Override
public NodeResponse merge(URI uri, String method, Set<NodeResponse> successfulResponses, Set<NodeResponse> problematicResponses, NodeResponse clientResponse) {
final ProcessorDiagnosticsEntity clientEntity = clientResponse.getClientResponse().readEntity(ProcessorDiagnosticsEntity.class);
// Unmarshall each response into an entity.
final Map<NodeIdentifier, ProcessorDiagnosticsEntity> entityMap = new HashMap<>();
for (final NodeResponse nodeResponse : successfulResponses) {
final ProcessorDiagnosticsEntity nodeResponseEntity = nodeResponse == clientResponse ? clientEntity : nodeResponse.getClientResponse().readEntity(ProcessorDiagnosticsEntity.class);
entityMap.put(nodeResponse.getNodeId(), nodeResponseEntity);
}
diagnosticsEntityMerger.merge(clientEntity, entityMap);
return new NodeResponse(clientResponse, clientEntity);
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.manager;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.NodeJVMDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.ThreadDumpDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
public class ProcessorDiagnosticsEntityMerger implements ComponentEntityMerger<ProcessorDiagnosticsEntity> {
private final long componentStatusSnapshotMillis;
public ProcessorDiagnosticsEntityMerger(final long componentStatusSnapshotMillis) {
this.componentStatusSnapshotMillis = componentStatusSnapshotMillis;
}
@Override
public void mergeComponents(final ProcessorDiagnosticsEntity clientEntity, final Map<NodeIdentifier, ProcessorDiagnosticsEntity> entityMap) {
final ProcessorDiagnosticsDTO clientDto = clientEntity.getComponent();
final List<NodeJVMDiagnosticsSnapshotDTO> nodeJvmDiagnosticsSnapshots = new ArrayList<>(entityMap.size());
// Merge the Processor Statuses and create a separate NodeJVMDiagnosticsSnapshotDTO for each. We do both of these
// together simply because we are already iterating over the entityMap and we have to create the Node-specific JVM diagnostics
// before we start merging the values, in the second iteration over the map.
for (final Map.Entry<NodeIdentifier, ProcessorDiagnosticsEntity> entry : entityMap.entrySet()) {
final NodeIdentifier nodeId = entry.getKey();
final ProcessorDiagnosticsEntity diagnosticsEntity = entry.getValue();
final ProcessorDiagnosticsDTO diagnosticsDto = diagnosticsEntity.getComponent();
StatusMerger.merge(clientDto.getProcessorStatus(), clientEntity.getPermissions().getCanRead(),
diagnosticsDto.getProcessorStatus(), diagnosticsEntity.getPermissions().getCanRead(),
nodeId.getId(), nodeId.getApiAddress(), nodeId.getApiPort());
final NodeJVMDiagnosticsSnapshotDTO nodeJvmDiagnosticsSnapshot = new NodeJVMDiagnosticsSnapshotDTO();
nodeJvmDiagnosticsSnapshot.setAddress(nodeId.getApiAddress());
nodeJvmDiagnosticsSnapshot.setApiPort(nodeId.getApiPort());
nodeJvmDiagnosticsSnapshot.setNodeId(nodeId.getId());
nodeJvmDiagnosticsSnapshot.setSnapshot(diagnosticsDto.getJvmDiagnostics().getAggregateSnapshot());
nodeJvmDiagnosticsSnapshots.add(nodeJvmDiagnosticsSnapshot);
}
clientDto.getJvmDiagnostics().setNodeSnapshots(nodeJvmDiagnosticsSnapshots);
// Merge JVM Diagnostics and thread dumps
final JVMDiagnosticsSnapshotDTO mergedJvmDiagnosticsSnapshot = clientDto.getJvmDiagnostics().getAggregateSnapshot().clone();
for (final Map.Entry<NodeIdentifier, ProcessorDiagnosticsEntity> entry : entityMap.entrySet()) {
final NodeIdentifier nodeId = entry.getKey();
final ProcessorDiagnosticsEntity diagnosticsEntity = entry.getValue();
if (diagnosticsEntity == clientEntity) {
for (final ThreadDumpDTO threadDump : clientDto.getThreadDumps()) {
threadDump.setNodeAddress(nodeId.getApiAddress());
threadDump.setApiPort(nodeId.getApiPort());
threadDump.setNodeId(nodeId.getId());
}
continue;
}
final ProcessorDiagnosticsDTO diagnosticsDto = diagnosticsEntity.getComponent();
final JVMDiagnosticsSnapshotDTO snapshot = diagnosticsDto.getJvmDiagnostics().getAggregateSnapshot();
StatusMerger.merge(mergedJvmDiagnosticsSnapshot, snapshot, componentStatusSnapshotMillis);
final List<ThreadDumpDTO> threadDumps = diagnosticsEntity.getComponent().getThreadDumps();
for (final ThreadDumpDTO threadDump : threadDumps) {
threadDump.setNodeAddress(nodeId.getApiAddress());
threadDump.setApiPort(nodeId.getApiPort());
threadDump.setNodeId(nodeId.getId());
clientDto.getThreadDumps().add(threadDump);
}
}
clientDto.getJvmDiagnostics().setAggregateSnapshot(mergedJvmDiagnosticsSnapshot);
// Merge permissions on referenced controller services
final Map<String, ControllerServiceEntity> serviceEntityById = clientDto.getReferencedControllerServices().stream()
.map(diagnosticsDto -> diagnosticsDto.getControllerService())
.collect(Collectors.toMap(ControllerServiceEntity::getId, Function.identity()));
for (final Map.Entry<NodeIdentifier, ProcessorDiagnosticsEntity> entry : entityMap.entrySet()) {
final ProcessorDiagnosticsEntity procDiagnostics = entry.getValue();
final Set<ControllerServiceDiagnosticsDTO> serviceDtos = procDiagnostics.getComponent().getReferencedControllerServices();
for (final ControllerServiceDiagnosticsDTO serviceDto : serviceDtos) {
final ControllerServiceEntity serviceEntity = serviceDto.getControllerService();
final ControllerServiceEntity targetEntity = serviceEntityById.get(serviceEntity.getId());
if (targetEntity != null) {
PermissionsDtoMerger.mergePermissions(targetEntity.getPermissions(), serviceEntity.getPermissions());
}
}
}
}
}

View File

@ -17,6 +17,17 @@
package org.apache.nifi.cluster.manager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.registry.flow.VersionedFlowState;
@ -30,6 +41,12 @@ import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.GarbageCollectionDTO;
import org.apache.nifi.web.api.dto.SystemDiagnosticsSnapshotDTO.StorageUsageDTO;
import org.apache.nifi.web.api.dto.diagnostics.GCDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.GarbageCollectionDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMControllerDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMFlowDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMSystemDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
@ -52,15 +69,6 @@ import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class StatusMerger {
private static final String ZERO_COUNT = "0";
private static final String ZERO_BYTES = "0 bytes";
@ -626,6 +634,155 @@ public class StatusMerger {
updatePrettyPrintedFields(target);
}
public static void merge(final JVMDiagnosticsSnapshotDTO target, final JVMDiagnosticsSnapshotDTO toMerge, final long numMillis) {
if (target == null || toMerge == null) {
return;
}
if (toMerge.getControllerDiagnostics() == null) {
target.setControllerDiagnostics(null);
} else {
merge(target.getControllerDiagnostics(), toMerge.getControllerDiagnostics());
}
if (toMerge.getFlowDiagnosticsDto() == null) {
target.setFlowDiagnosticsDto(null);
} else {
merge(target.getFlowDiagnosticsDto(), toMerge.getFlowDiagnosticsDto());
}
if (toMerge.getSystemDiagnosticsDto() == null) {
target.setSystemDiagnosticsDto(null);
} else {
merge(target.getSystemDiagnosticsDto(), toMerge.getSystemDiagnosticsDto(), numMillis);
}
}
private static void merge(final JVMControllerDiagnosticsSnapshotDTO target, final JVMControllerDiagnosticsSnapshotDTO toMerge) {
if (toMerge == null || target == null) {
return;
}
target.setMaxEventDrivenThreads(add(target.getMaxEventDrivenThreads(), toMerge.getMaxEventDrivenThreads()));
target.setMaxTimerDrivenThreads(add(target.getMaxTimerDrivenThreads(), toMerge.getMaxTimerDrivenThreads()));
target.setClusterCoordinator(null);
target.setPrimaryNode(null);
}
private static void merge(final JVMFlowDiagnosticsSnapshotDTO target, final JVMFlowDiagnosticsSnapshotDTO toMerge) {
if (toMerge == null || target == null) {
return;
}
target.setActiveEventDrivenThreads(add(target.getActiveEventDrivenThreads(), toMerge.getActiveEventDrivenThreads()));
target.setActiveTimerDrivenThreads(add(target.getActiveTimerDrivenThreads(), toMerge.getActiveTimerDrivenThreads()));
target.setBundlesLoaded(null);
target.setUptime(null);
if (!Objects.equals(target.getTimeZone(), toMerge.getTimeZone())) {
target.setTimeZone(null);
}
}
private static void merge(final JVMSystemDiagnosticsSnapshotDTO target, final JVMSystemDiagnosticsSnapshotDTO toMerge, final long numMillis) {
if (toMerge == null || target == null) {
return;
}
target.setCpuCores(add(target.getCpuCores(), toMerge.getCpuCores()));
target.setCpuLoadAverage(add(target.getCpuLoadAverage(), toMerge.getCpuLoadAverage()));
target.setOpenFileDescriptors(add(target.getOpenFileDescriptors(), toMerge.getOpenFileDescriptors()));
target.setMaxOpenFileDescriptors(add(target.getMaxOpenFileDescriptors(), toMerge.getMaxOpenFileDescriptors()));
target.setPhysicalMemoryBytes(add(target.getPhysicalMemoryBytes(), toMerge.getPhysicalMemoryBytes()));
target.setPhysicalMemory(FormatUtils.formatDataSize(target.getPhysicalMemoryBytes()));
target.setContentRepositoryStorageUsage(null);
target.setFlowFileRepositoryStorageUsage(null);
target.setProvenanceRepositoryStorageUsage(null);
target.setMaxHeapBytes(add(target.getMaxHeapBytes(), toMerge.getMaxHeapBytes()));
target.setMaxHeap(FormatUtils.formatDataSize(target.getMaxHeapBytes()));
final List<GarbageCollectionDiagnosticsDTO> mergedGcDiagnosticsDtos = mergeGarbageCollectionDiagnostics(target.getGarbageCollectionDiagnostics(),
toMerge.getGarbageCollectionDiagnostics(), numMillis);
target.setGarbageCollectionDiagnostics(mergedGcDiagnosticsDtos);
}
private static List<GarbageCollectionDiagnosticsDTO> mergeGarbageCollectionDiagnostics(final List<GarbageCollectionDiagnosticsDTO> target,
final List<GarbageCollectionDiagnosticsDTO> toMerge, final long numMillis) {
final Map<String, Map<Date, GCDiagnosticsSnapshotDTO>> metricsByMemoryMgr = new HashMap<>();
merge(target, metricsByMemoryMgr, numMillis);
merge(toMerge, metricsByMemoryMgr, numMillis);
final List<GarbageCollectionDiagnosticsDTO> gcDiagnosticsDtos = new ArrayList<>();
for (final Map.Entry<String, Map<Date, GCDiagnosticsSnapshotDTO>> entry : metricsByMemoryMgr.entrySet()) {
final String memoryManagerName = entry.getKey();
final Map<Date, GCDiagnosticsSnapshotDTO> snapshotMap = entry.getValue();
final GarbageCollectionDiagnosticsDTO gcDiagnosticsDto = new GarbageCollectionDiagnosticsDTO();
gcDiagnosticsDto.setMemoryManagerName(memoryManagerName);
final List<GCDiagnosticsSnapshotDTO> gcDiagnosticsSnapshots = new ArrayList<>(snapshotMap.values());
Collections.sort(gcDiagnosticsSnapshots, (a, b) -> a.getTimestamp().compareTo(b.getTimestamp()));
gcDiagnosticsDto.setSnapshots(gcDiagnosticsSnapshots);
gcDiagnosticsDtos.add(gcDiagnosticsDto);
}
return gcDiagnosticsDtos;
}
private static void merge(final List<GarbageCollectionDiagnosticsDTO> toMerge, final Map<String, Map<Date, GCDiagnosticsSnapshotDTO>> metricsByMemoryMgr, final long numMillis) {
for (final GarbageCollectionDiagnosticsDTO gcDiagnostics : toMerge) {
final String memoryManagerName = gcDiagnostics.getMemoryManagerName();
final Map<Date, GCDiagnosticsSnapshotDTO> metricsByDate = metricsByMemoryMgr.computeIfAbsent(memoryManagerName, key -> new HashMap<>());
for (final GCDiagnosticsSnapshotDTO snapshot : gcDiagnostics.getSnapshots()) {
final long timestamp = snapshot.getTimestamp().getTime();
final Date normalized = new Date(timestamp - timestamp % numMillis);
final GCDiagnosticsSnapshotDTO aggregate = metricsByDate.computeIfAbsent(normalized, key -> new GCDiagnosticsSnapshotDTO());
aggregate.setCollectionCount(add(aggregate.getCollectionCount(), snapshot.getCollectionCount()));
aggregate.setCollectionMillis(add(aggregate.getCollectionMillis(), snapshot.getCollectionMillis()));
aggregate.setTimestamp(normalized);
}
}
}
private static Integer add(final Integer a, final Integer b) {
if (a == null) {
return b;
}
if (b == null) {
return a;
}
return a + b;
}
private static Double add(final Double a, final Double b) {
if (a == null) {
return b;
}
if (b == null) {
return a;
}
return a + b;
}
private static Long add(final Long a, final Long b) {
if (a == null) {
return b;
}
if (b == null) {
return a;
}
return a + b;
}
public static void updatePrettyPrintedFields(final SystemDiagnosticsSnapshotDTO target) {
// heap
target.setMaxHeap(FormatUtils.formatDataSize(target.getMaxHeapBytes()));

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
public class ActiveThreadInfo {
private final String threadName;
private final String stackTrace;
private final long activeMillis;
public ActiveThreadInfo(final String threadName, final String stackTrace, final long activeMillis) {
this.threadName = threadName;
this.stackTrace = stackTrace;
this.activeMillis = activeMillis;
}
public String getThreadName() {
return threadName;
}
public String getStackTrace() {
return stackTrace;
}
public long getActiveMillis() {
return activeMillis;
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -68,6 +69,8 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
public abstract Requirement getInputRequirement();
public abstract List<ActiveThreadInfo> getActiveThreads();
@Override
public abstract boolean isValid();

View File

@ -122,6 +122,9 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
import org.apache.nifi.controller.status.history.StandardGarbageCollectionStatus;
import org.apache.nifi.controller.status.history.StatusHistoryUtil;
import org.apache.nifi.controller.tasks.ExpireFlowFiles;
import org.apache.nifi.diagnostics.SystemDiagnostics;
@ -230,6 +233,8 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@ -612,7 +617,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
componentStatusRepository.capture(getControllerStatus());
componentStatusRepository.capture(getControllerStatus(), getGarbageCollectionStatus());
}
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
@ -1639,6 +1644,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return maxEventDrivenThreads.get();
}
public int getActiveEventDrivenThreadCount() {
return eventDrivenEngineRef.get().getActiveCount();
}
public int getActiveTimerDrivenThreadCount() {
return timerDrivenEngineRef.get().getActiveCount();
}
public void setMaxTimerDrivenThreadCount(final int maxThreadCount) {
writeLock.lock();
try {
@ -1718,6 +1731,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return factory.create(flowFileRepository, contentRepository, provenanceRepository);
}
public String getContentRepoFileStoreName(final String containerName) {
return contentRepository.getContainerFileStoreName(containerName);
}
public String getFlowRepoFileStoreName() {
return flowFileRepository.getFileStoreName();
}
public String getProvenanceRepoFileStoreName(final String containerName) {
return provenanceRepository.getContainerFileStoreName(containerName);
}
//
// ProcessGroup access
//
@ -2641,6 +2666,26 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public Funnel getFunnel(final String id) {
return allFunnels.get(id);
}
public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
final List<GarbageCollectionStatus> statuses = new ArrayList<>();
final Date now = new Date();
for (final GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
final String managerName = mbean.getName();
final long count = mbean.getCollectionCount();
final long millis = mbean.getCollectionTime();
final GarbageCollectionStatus status = new StandardGarbageCollectionStatus(managerName, now, count, millis);
statuses.add(status);
}
return statuses;
}
public GarbageCollectionHistory getGarbageCollectionHistory() {
return componentStatusRepository.getGarbageCollectionHistory(new Date(0L), new Date());
}
/**
* Returns the status of all components in the controller. This request is
* not in the context of a user so the results will be unfiltered.
@ -4063,6 +4108,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return isClustered() && leaderElectionManager != null && leaderElectionManager.isLeader(ClusterRoles.PRIMARY_NODE);
}
public boolean isClusterCoordinator() {
return isClustered() && leaderElectionManager != null && leaderElectionManager.isLeader(ClusterRoles.CLUSTER_COORDINATOR);
}
public void setPrimary(final boolean primary) {
final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
final ProcessGroup rootGroup = getGroup(getRootGroupId());

View File

@ -218,10 +218,56 @@ public class StandardFlowFileQueue implements FlowFileQueue {
return queueSize.activeQueueCount == 0 && queueSize.swappedCount == 0;
}
@Override
public QueueSize getActiveQueueSize() {
return size.get().activeQueueSize();
}
@Override
public QueueSize getSwapQueueSize() {
return size.get().swapQueueSize();
}
@Override
public int getSwapFileCount() {
readLock.lock();
try {
return this.swapLocations.size();
} finally {
readLock.unlock("getSwapFileCount");
}
}
@Override
public boolean isAllActiveFlowFilesPenalized() {
readLock.lock();
try {
// If there are no elements then we return false
if (activeQueue.isEmpty()) {
return false;
}
// If the first element on the queue is penalized, then we know they all are,
// because our Comparator will put Penalized FlowFiles at the end. If the first
// FlowFile is not penalized, then we also know that they are not all penalized,
// so we can simplify this by looking solely at the first FlowFile in the queue.
final FlowFileRecord first = activeQueue.peek();
return first.isPenalized();
} finally {
readLock.unlock("isAllActiveFlowFilesPenalized");
}
}
@Override
public boolean isAnyActiveFlowFilePenalized() {
readLock.lock();
try {
return activeQueue.stream().anyMatch(FlowFileRecord::isPenalized);
} finally {
readLock.unlock("isAnyActiveFlowFilePenalized");
}
}
@Override
public void acknowledge(final FlowFileRecord flowFile) {
incrementUnacknowledgedQueueSize(-1, -flowFile.getSize());
@ -1369,7 +1415,6 @@ public class StandardFlowFileQueue implements FlowFileQueue {
return size.get().unacknowledgedQueueSize();
}
private void incrementActiveQueueSize(final int count, final long bytes) {
boolean updated = false;
while (!updated) {

View File

@ -18,6 +18,9 @@ package org.apache.nifi.controller;
import static java.util.Objects.requireNonNull;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
@ -38,6 +41,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
@ -87,6 +93,7 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.apache.nifi.util.ThreadUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -135,6 +142,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// ??????? NOT any more
private ExecutionNode executionNode;
private final long onScheduleTimeoutMillis;
private final Map<Thread, Long> activeThreads = new HashMap<>(48);
public StandardProcessorNode(final LoggableComponent<Processor> processor, final String uuid,
final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler,
@ -1127,8 +1135,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) {
final Processor processor = processorRef.get().getProcessor();
activateThread();
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
processor.onTrigger(context, sessionFactory);
} finally {
deactivateThread();
}
}
@ -1329,6 +1341,44 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
}
}
private synchronized void activateThread() {
final Thread thread = Thread.currentThread();
final Long timestamp = System.currentTimeMillis();
activeThreads.put(thread, timestamp);
}
private synchronized void deactivateThread() {
activeThreads.remove(Thread.currentThread());
}
@Override
public synchronized List<ActiveThreadInfo> getActiveThreads() {
final long now = System.currentTimeMillis();
final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
final Map<Long, ThreadInfo> threadInfoMap = Stream.of(infos)
.collect(Collectors.toMap(info -> info.getThreadId(), Function.identity(), (a, b) -> a));
final List<ActiveThreadInfo> threadList = new ArrayList<>(activeThreads.size());
for (final Map.Entry<Thread, Long> entry : activeThreads.entrySet()) {
final Thread thread = entry.getKey();
final Long timestamp = entry.getValue();
final long activeMillis = now - timestamp;
final ThreadInfo threadInfo = threadInfoMap.get(thread.getId());
final String stackTrace = ThreadUtils.createStackTrace(thread, threadInfo, deadlockedThreadIds, monitorDeadlockThreadIds, activeMillis);
final ActiveThreadInfo activeThreadInfo = new ActiveThreadInfo(thread.getName(), stackTrace, activeMillis);
threadList.add(activeThreadInfo);
}
return threadList;
}
private void initiateStart(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis,
final ProcessContext processContext, final SchedulingAgentCallback schedulingAgentCallback) {
@ -1343,7 +1393,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
activateThread();
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, processor, processContext);
} finally {
deactivateThread();
}
if (scheduledState.compareAndSet(ScheduledState.STARTING, ScheduledState.RUNNING)) {
LOG.debug("Successfully completed the @OnScheduled methods of {}; will now start triggering processor to run", processor);
@ -1352,10 +1407,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
LOG.debug("Successfully invoked @OnScheduled methods of {} but scheduled state is no longer STARTING so will stop processor now", processor);
// can only happen if stopProcessor was called before service was transitioned to RUNNING state
activateThread();
try {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
} finally {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
} finally {
deactivateThread();
}
scheduledState.set(ScheduledState.STOPPED);
@ -1369,10 +1426,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// If processor's task completed Exceptionally, then we want to retry initiating the start (if Processor is still scheduled to run).
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
activateThread();
try {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
} finally {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
} finally {
deactivateThread();
}
}
@ -1460,8 +1519,12 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
try {
if (scheduleState.isScheduled()) {
schedulingAgent.unschedule(StandardProcessorNode.this, scheduleState);
activateThread();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, processor, processContext);
} finally {
deactivateThread();
}
}
@ -1469,8 +1532,11 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
// performing the lifecycle actions counts as 1 thread.
final boolean allThreadsComplete = scheduleState.getActiveThreadCount() == 1;
if (allThreadsComplete) {
activateThread();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processor, processContext);
} finally {
deactivateThread();
}
scheduleState.decrementActiveThreadCount();

View File

@ -430,6 +430,16 @@ public class FileSystemRepository implements ContentRepository {
return FileUtils.getContainerUsableSpace(path);
}
@Override
public String getContainerFileStoreName(final String containerName) {
final Path path = containers.get(containerName);
try {
return Files.getFileStore(path).name();
} catch (IOException e) {
return null;
}
}
@Override
public void cleanup() {
for (final Map.Entry<String, Path> entry : containers.entrySet()) {

View File

@ -196,6 +196,11 @@ public class VolatileContentRepository implements ContentRepository {
return maxBytes - repoSize.get();
}
@Override
public String getContainerFileStoreName(String containerName) {
return null;
}
@Override
public ContentClaim create(boolean lossTolerant) throws IOException {
if (lossTolerant) {

View File

@ -56,6 +56,11 @@ public class VolatileFlowFileRepository implements FlowFileRepository {
return 0L;
}
@Override
public String getFileStoreName() {
return null;
}
@Override
public void close() throws IOException {
}

View File

@ -241,6 +241,17 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
return usableSpace;
}
@Override
public String getFileStoreName() {
final Path path = flowFileRepositoryPaths.iterator().next().toPath();
try {
return Files.getFileStore(path).name();
} catch (IOException e) {
return null;
}
}
@Override
public void updateRepository(final Collection<RepositoryRecord> records) throws IOException {
updateRepository(records, alwaysSync);

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class StandardGarbageCollectionHistory implements GarbageCollectionHistory {
private final Map<String, List<GarbageCollectionStatus>> statusesByManagerName = new HashMap<>();
@Override
public Set<String> getMemoryManagerNames() {
return statusesByManagerName.keySet();
}
@Override
public List<GarbageCollectionStatus> getGarbageCollectionStatuses(final String memoryManagerName) {
final List<GarbageCollectionStatus> statuses = statusesByManagerName.get(memoryManagerName);
if (statuses == null) {
return Collections.emptyList();
}
return Collections.unmodifiableList(statuses);
}
public void addGarbageCollectionStatus(final GarbageCollectionStatus status) {
final String managerName = status.getMemoryManagerName();
final List<GarbageCollectionStatus> statuses = statusesByManagerName.computeIfAbsent(managerName, key -> new ArrayList<>());
statuses.add(status);
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.status.history;
import java.util.Date;
public class StandardGarbageCollectionStatus implements GarbageCollectionStatus {
private final String managerName;
private final Date timestamp;
private final long collectionCount;
private final long collectionMillis;
public StandardGarbageCollectionStatus(final String managerName, final Date timestamp, final long collectionCount, final long collectionMillis) {
this.managerName = managerName;
this.timestamp = timestamp;
this.collectionCount = collectionCount;
this.collectionMillis = collectionMillis;
}
@Override
public Date getTimestamp() {
return timestamp;
}
@Override
public String getMemoryManagerName() {
return managerName;
}
@Override
public long getCollectionCount() {
return collectionCount;
}
@Override
public long getCollectionMillis() {
return collectionMillis;
}
}

View File

@ -16,6 +16,10 @@
*/
package org.apache.nifi.controller.status.history;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
@ -29,9 +33,6 @@ import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Map;
public class VolatileComponentStatusRepository implements ComponentStatusRepository {
public static final String NUM_DATA_POINTS_PROPERTY = "nifi.components.status.repository.buffer.size";
@ -51,19 +52,20 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
public VolatileComponentStatusRepository(final NiFiProperties nifiProperties) {
final int numDataPoints = nifiProperties.getIntegerProperty(NUM_DATA_POINTS_PROPERTY, DEFAULT_NUM_DATA_POINTS);
captures = new RingBuffer<>(numDataPoints);
}
@Override
public void capture(final ProcessGroupStatus rootGroupStatus) {
capture(rootGroupStatus, new Date());
public void capture(final ProcessGroupStatus rootGroupStatus, final List<GarbageCollectionStatus> gcStatus) {
capture(rootGroupStatus, gcStatus, new Date());
}
@Override
public synchronized void capture(final ProcessGroupStatus rootGroupStatus, final Date timestamp) {
captures.add(new Capture(timestamp, ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentType.PROCESSOR,
ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, ComponentType.REMOTE_PROCESS_GROUP)));
public synchronized void capture(final ProcessGroupStatus rootGroupStatus, final List<GarbageCollectionStatus> gcStatus, final Date timestamp) {
final ComponentStatusReport statusReport = ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentType.PROCESSOR,
ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, ComponentType.REMOTE_PROCESS_GROUP);
captures.add(new Capture(timestamp, statusReport, gcStatus));
logger.debug("Captured metrics for {}", this);
lastCaptureTime = Math.max(lastCaptureTime, timestamp.getTime());
}
@ -221,15 +223,41 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
return history;
}
@Override
public GarbageCollectionHistory getGarbageCollectionHistory(final Date start, final Date end) {
final StandardGarbageCollectionHistory history = new StandardGarbageCollectionHistory();
captures.forEach(new ForEachEvaluator<Capture>() {
@Override
public boolean evaluate(final Capture capture) {
if (capture.getCaptureDate().before(start)) {
return true;
}
if (capture.getCaptureDate().after(end)) {
return false;
}
final List<GarbageCollectionStatus> statuses = capture.getGarbageCollectionStatus();
if (statuses != null) {
statuses.stream().forEach(history::addGarbageCollectionStatus);
}
return true;
}
});
return history;
}
private static class Capture {
private final Date captureDate;
private final ComponentStatusReport statusReport;
private final List<GarbageCollectionStatus> gcStatus;
public Capture(final Date date, final ComponentStatusReport statusReport) {
public Capture(final Date date, final ComponentStatusReport statusReport, final List<GarbageCollectionStatus> gcStatus) {
this.captureDate = date;
this.statusReport = statusReport;
this.gcStatus = gcStatus;
}
public Date getCaptureDate() {
@ -239,5 +267,9 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
public ComponentStatusReport getStatusReport() {
return statusReport;
}
public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
return gcStatus;
}
}
}

View File

@ -39,6 +39,10 @@ public class SystemDiagnostics implements Cloneable {
private int totalThreads;
private int daemonThreads;
private Long totalPhysicalMemory;
private Long maxOpenFileHandles;
private Long openFileHandles;
private long uptime;
private StorageUsage flowFileRepositoryStorageUsage;
@ -200,6 +204,30 @@ public class SystemDiagnostics implements Cloneable {
this.uptime = uptime;
}
public long getTotalPhysicalMemory() {
return totalPhysicalMemory;
}
public void setTotalPhysicalMemory(long totalPhysicalMemory) {
this.totalPhysicalMemory = totalPhysicalMemory;
}
public long getMaxOpenFileHandles() {
return maxOpenFileHandles;
}
public void setMaxOpenFileHandles(long maxOpenFileHandles) {
this.maxOpenFileHandles = maxOpenFileHandles;
}
public long getOpenFileHandles() {
return openFileHandles;
}
public void setOpenFileHandles(long openFileHandles) {
this.openFileHandles = openFileHandles;
}
@Override
public SystemDiagnostics clone() {
final SystemDiagnostics clonedObj = new SystemDiagnostics();
@ -239,6 +267,9 @@ public class SystemDiagnostics implements Cloneable {
clonedObj.usedNonHeap = usedNonHeap;
clonedObj.creationTimestamp = creationTimestamp;
clonedObj.uptime = uptime;
clonedObj.totalPhysicalMemory = totalPhysicalMemory;
clonedObj.openFileHandles = openFileHandles;
clonedObj.maxOpenFileHandles = maxOpenFileHandles;
return clonedObj;
}

View File

@ -24,6 +24,7 @@ import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
@ -154,6 +155,30 @@ public class SystemDiagnosticsFactory {
}
systemDiagnostics.setGarbageCollection(garbageCollection);
// This information is available only for *nix systems.
final OperatingSystemMXBean osStats = ManagementFactory.getOperatingSystemMXBean();
try {
final Class<?> unixOsMxBeanClass = Class.forName("com.sun.management.UnixOperatingSystemMXBean");
if (unixOsMxBeanClass.isAssignableFrom(osStats.getClass())) {
final Method totalPhysicalMemory = unixOsMxBeanClass.getMethod("getTotalPhysicalMemorySize");
totalPhysicalMemory.setAccessible(true);
final Long ramBytes = (Long) totalPhysicalMemory.invoke(osStats);
systemDiagnostics.setTotalPhysicalMemory(ramBytes);
final Method maxFileDescriptors = unixOsMxBeanClass.getMethod("getMaxFileDescriptorCount");
maxFileDescriptors.setAccessible(true);
final Long maxOpenFileDescriptors = (Long) maxFileDescriptors.invoke(osStats);
systemDiagnostics.setMaxOpenFileHandles(maxOpenFileDescriptors);
final Method openFileDescriptors = unixOsMxBeanClass.getMethod("getOpenFileDescriptorCount");
openFileDescriptors.setAccessible(true);
final Long openDescriptorCount = (Long) openFileDescriptors.invoke(osStats);
systemDiagnostics.setOpenFileHandles(openDescriptorCount);
}
} catch (final Throwable t) {
// Ignore. This will throw either ClassNotFound or NoClassDefFoundError if unavailable in this JVM.
}
// set the creation timestamp
systemDiagnostics.setCreationTimestamp(new Date().getTime());

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;
import java.lang.management.LockInfo;
import java.lang.management.MonitorInfo;
import java.lang.management.ThreadInfo;
public class ThreadUtils {
public static String createStackTrace(final Thread thread, final ThreadInfo threadInfo, final long[] deadlockedThreadIds, final long[] monitorDeadlockThreadIds, final long activeMillis) {
final StringBuilder sb = new StringBuilder();
sb.append("\"").append(threadInfo.getThreadName()).append("\" Id=");
sb.append(threadInfo.getThreadId()).append(" ");
sb.append(threadInfo.getThreadState().toString()).append(" ");
switch (threadInfo.getThreadState()) {
case BLOCKED:
case TIMED_WAITING:
case WAITING:
sb.append(" on ");
sb.append(threadInfo.getLockInfo());
break;
default:
break;
}
if (threadInfo.isSuspended()) {
sb.append(" (suspended)");
}
if (threadInfo.isInNative()) {
sb.append(" (in native code)");
}
if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
for (final long id : deadlockedThreadIds) {
if (id == threadInfo.getThreadId()) {
sb.append(" ** DEADLOCKED THREAD **");
}
}
}
if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
for (final long id : monitorDeadlockThreadIds) {
if (id == threadInfo.getThreadId()) {
sb.append(" ** MONITOR-DEADLOCKED THREAD **");
}
}
}
final StackTraceElement[] stackTraces = threadInfo.getStackTrace();
for (final StackTraceElement element : stackTraces) {
sb.append("\n\tat ").append(element);
final MonitorInfo[] monitors = threadInfo.getLockedMonitors();
for (final MonitorInfo monitor : monitors) {
if (monitor.getLockedStackFrame().equals(element)) {
sb.append("\n\t- waiting on ").append(monitor);
}
}
}
final LockInfo[] lockInfos = threadInfo.getLockedSynchronizers();
if (lockInfos.length > 0) {
sb.append("\n\t");
sb.append("Number of Locked Synchronizers: ").append(lockInfos.length);
for (final LockInfo lockInfo : lockInfos) {
sb.append("\n\t- ").append(lockInfo.toString());
}
}
sb.append("\n");
return sb.toString();
}
}

View File

@ -1971,6 +1971,11 @@ public class TestStandardProcessSession {
return 0;
}
@Override
public String getFileStoreName() {
return null;
}
@Override
public boolean isVolatile() {
return false;
@ -2090,6 +2095,11 @@ public class TestStandardProcessSession {
return 0;
}
@Override
public String getContainerFileStoreName(String containerName) {
return null;
}
@Override
public int decrementClaimantCount(ContentClaim claim) {
if (claim == null) {

View File

@ -112,6 +112,11 @@ public class TestWriteAheadFlowFileRepository {
public void purgeSwapFiles() {
}
@Override
public int getSwapFileCount() {
return 0;
}
@Override
public void setPriorities(List<FlowFilePrioritizer> newPriorities) {
}
@ -154,6 +159,16 @@ public class TestWriteAheadFlowFileRepository {
return null;
}
@Override
public QueueSize getActiveQueueSize() {
return size();
}
@Override
public QueueSize getSwapQueueSize() {
return null;
}
@Override
public void acknowledge(FlowFileRecord flowFile) {
}
@ -162,6 +177,16 @@ public class TestWriteAheadFlowFileRepository {
public void acknowledge(Collection<FlowFileRecord> flowFiles) {
}
@Override
public boolean isAllActiveFlowFilesPenalized() {
return false;
}
@Override
public boolean isAnyActiveFlowFilePenalized() {
return false;
}
@Override
public boolean isFull() {
return false;

View File

@ -95,6 +95,12 @@ public class ExtensionManager {
definitionMap.put(StateProvider.class, new HashSet<>());
}
public static Set<Bundle> getAllBundles() {
return classNameBundleLookup.values().stream()
.flatMap(List::stream)
.collect(Collectors.toSet());
}
/**
* Loads all FlowFileProcessor, FlowFileComparator, ReportingTask class types that can be found on the bootstrap classloader and by creating classloaders for all NARs found within the classpath.
* @param narBundles the bundles to scan through in search of extensions

View File

@ -95,6 +95,7 @@ import org.apache.nifi.web.api.entity.PortStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
import org.apache.nifi.web.api.entity.RegistryClientEntity;
@ -524,6 +525,14 @@ public interface NiFiServiceFacade {
*/
ProcessorEntity getProcessor(String id);
/**
* Gets the Diagnostic information for the Processor with the specified id
*
* @param id the id of the processor
* @return the diagnostics information for the processor
*/
ProcessorDiagnosticsEntity getProcessorDiagnostics(String id);
/**
* Gets the Processor transfer object for the specified id, as it is visible to the given user
*

View File

@ -16,7 +16,9 @@
*/
package org.apache.nifi.web;
import com.google.common.collect.Sets;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
@ -79,6 +81,7 @@ import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
@ -183,6 +186,11 @@ import org.apache.nifi.web.api.dto.VersionControlInformationDTO;
import org.apache.nifi.web.api.dto.VersionedFlowDTO;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
@ -225,6 +233,7 @@ import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
import org.apache.nifi.web.api.entity.RegistryClientEntity;
@ -275,8 +284,8 @@ import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@ -297,6 +306,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -4526,6 +4536,99 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return history;
}
private ControllerServiceEntity createControllerServiceEntity(final String serviceId, final NiFiUser user) {
final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
return createControllerServiceEntity(serviceNode, Collections.emptySet(), user);
}
@Override
public ProcessorDiagnosticsEntity getProcessorDiagnostics(final String id) {
final ProcessorNode processor = processorDAO.getProcessor(id);
final ProcessorStatus processorStatus = controllerFacade.getProcessorStatus(id);
// Generate Processor Diagnostics
final NiFiUser user = NiFiUserUtils.getNiFiUser();
final ProcessorDiagnosticsDTO dto = controllerFacade.getProcessorDiagnostics(processor, processorStatus, bulletinRepository, serviceId -> createControllerServiceEntity(serviceId, user));
// Filter anything out of diagnostics that the user is not authorized to see.
final List<JVMDiagnosticsSnapshotDTO> jvmDiagnosticsSnaphots = new ArrayList<>();
final JVMDiagnosticsDTO jvmDiagnostics = dto.getJvmDiagnostics();
jvmDiagnosticsSnaphots.add(jvmDiagnostics.getAggregateSnapshot());
// filter controller-related information
final boolean canReadController = authorizableLookup.getController().isAuthorized(authorizer, RequestAction.READ, user);
if (!canReadController) {
for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
snapshot.setControllerDiagnostics(null);
}
}
// filter system diagnostics information
final boolean canReadSystem = authorizableLookup.getSystem().isAuthorized(authorizer, RequestAction.READ, user);
if (!canReadSystem) {
for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
snapshot.setSystemDiagnosticsDto(null);
}
}
final boolean canReadFlow = authorizableLookup.getFlow().isAuthorized(authorizer, RequestAction.READ, user);
if (!canReadFlow) {
for (final JVMDiagnosticsSnapshotDTO snapshot : jvmDiagnosticsSnaphots) {
snapshot.setFlowDiagnosticsDto(null);
}
}
// filter connections
final Predicate<ConnectionDiagnosticsDTO> connectionAuthorized = connectionDiagnostics -> {
final String connectionId = connectionDiagnostics.getConnection().getId();
return authorizableLookup.getConnection(connectionId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user);
};
// Filter incoming connections by what user is authorized to READ
final Set<ConnectionDiagnosticsDTO> incoming = dto.getIncomingConnections();
final Set<ConnectionDiagnosticsDTO> filteredIncoming = incoming.stream()
.filter(connectionAuthorized)
.collect(Collectors.toSet());
dto.setIncomingConnections(filteredIncoming);
// Filter outgoing connections by what user is authorized to READ
final Set<ConnectionDiagnosticsDTO> outgoing = dto.getOutgoingConnections();
final Set<ConnectionDiagnosticsDTO> filteredOutgoing = outgoing.stream()
.filter(connectionAuthorized)
.collect(Collectors.toSet());
dto.setOutgoingConnections(filteredOutgoing);
// Filter out any controller services that are referenced by the Processor
final Set<ControllerServiceDiagnosticsDTO> referencedServices = dto.getReferencedControllerServices();
final Set<ControllerServiceDiagnosticsDTO> filteredReferencedServices = referencedServices.stream()
.filter(csDiagnostics -> {
final String csId = csDiagnostics.getControllerService().getId();
return authorizableLookup.getControllerService(csId).getAuthorizable().isAuthorized(authorizer, RequestAction.READ, user);
})
.map(csDiagnostics -> {
// Filter out any referencing components because those are generally not relevant from this context.
final ControllerServiceDTO serviceDto = csDiagnostics.getControllerService().getComponent();
if (serviceDto != null) {
serviceDto.setReferencingComponents(null);
}
return csDiagnostics;
})
.collect(Collectors.toSet());
dto.setReferencedControllerServices(filteredReferencedServices);
final Revision revision = revisionManager.getRevision(id);
final RevisionDTO revisionDto = dtoFactory.createRevisionDTO(revision);
final PermissionsDTO permissionsDto = dtoFactory.createPermissionsDto(processor);
final List<BulletinEntity> bulletins = bulletinRepository.findBulletinsForSource(id).stream()
.map(bulletin -> dtoFactory.createBulletinDto(bulletin))
.map(bulletin -> entityFactory.createBulletinEntity(bulletin, permissionsDto.getCanRead()))
.collect(Collectors.toList());
final ProcessorStatusDTO processorStatusDto = dtoFactory.createProcessorStatusDto(controllerFacade.getProcessorStatus(processor.getIdentifier()));
return entityFactory.createProcessorDiagnosticsEntity(dto, revisionDto, permissionsDto, processorStatusDto, bulletins);
}
@Override
public boolean isClustered() {
return controllerFacade.isClustered();

View File

@ -42,6 +42,7 @@ import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
@ -110,6 +111,22 @@ public class ProcessorResource extends ApplicationResource {
return processorEntity;
}
/**
* Populate the uri's for the specified processors and their relationships.
*
* @param processorDiagnosticsEntity processor's diagnostics entity
* @return processor diagnostics entity
*/
public ProcessorDiagnosticsEntity populateRemainingProcessorDiagnosticsEntityContent(ProcessorDiagnosticsEntity processorDiagnosticsEntity) {
processorDiagnosticsEntity.setUri(generateResourceUri("processors", processorDiagnosticsEntity.getId(), "diagnostics"));
// populate remaining content
if (processorDiagnosticsEntity.getComponent() != null && processorDiagnosticsEntity.getComponent().getProcessor() != null) {
populateRemainingProcessorContent(processorDiagnosticsEntity.getComponent().getProcessor());
}
return processorDiagnosticsEntity;
}
/**
* Populate the uri's for the specified processor and its relationships.
*/
@ -192,6 +209,43 @@ public class ProcessorResource extends ApplicationResource {
return generateOkResponse(entity).build();
}
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("/{id}/diagnostics")
@ApiOperation(value = "Gets diagnostics information about a processor",
response = ProcessorEntity.class,
notes = NON_GUARANTEED_ENDPOINT,
authorizations = { @Authorization(value = "Read - /processors/{uuid}")}
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
})
public Response getProcessorDiagnostics(
@ApiParam(value = "The processor id.", required = true) @PathParam("id") final String id) throws InterruptedException {
if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}
// authorize access
serviceFacade.authorizeAccess(lookup -> {
final Authorizable processor = lookup.getProcessor(id).getAuthorizable();
processor.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
});
// get the specified processor's diagnostics
final ProcessorDiagnosticsEntity entity = serviceFacade.getProcessorDiagnostics(id);
populateRemainingProcessorDiagnosticsEntityContent(entity);
// generate the response
return generateOkResponse(entity).build();
}
/**
* Returns the descriptor for the specified property.
*

View File

@ -16,6 +16,9 @@
*/
package org.apache.nifi.web.api.dto;
import javax.ws.rs.WebApplicationException;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
@ -69,9 +72,11 @@ import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.ActiveThreadInfo;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.Snippet;
@ -79,6 +84,7 @@ import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileSummary;
import org.apache.nifi.controller.queue.ListFlowFileState;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
@ -94,6 +100,8 @@ import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
import org.apache.nifi.diagnostics.GarbageCollection;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.diagnostics.SystemDiagnostics;
@ -154,6 +162,19 @@ import org.apache.nifi.web.api.dto.action.details.ConfigureDetailsDTO;
import org.apache.nifi.web.api.dto.action.details.ConnectDetailsDTO;
import org.apache.nifi.web.api.dto.action.details.MoveDetailsDTO;
import org.apache.nifi.web.api.dto.action.details.PurgeDetailsDTO;
import org.apache.nifi.web.api.dto.diagnostics.ClassLoaderDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.ConnectionDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.ControllerServiceDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.GCDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.GarbageCollectionDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMControllerDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMFlowDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.JVMSystemDiagnosticsSnapshotDTO;
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
import org.apache.nifi.web.api.dto.diagnostics.RepositoryUsageDTO;
import org.apache.nifi.web.api.dto.diagnostics.ThreadDumpDTO;
import org.apache.nifi.web.api.dto.flow.FlowBreadcrumbDTO;
import org.apache.nifi.web.api.dto.flow.FlowDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
@ -194,8 +215,8 @@ import org.apache.nifi.web.api.entity.VariableEntity;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.revision.RevisionManager;
import javax.ws.rs.WebApplicationException;
import java.text.Collator;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -3157,6 +3178,303 @@ public final class DtoFactory {
return dto;
}
/**
* Creates a ProcessorDiagnosticsDTO from the given Processor and status information with some additional supporting information
*
* @param procNode the processor to create diagnostics for
* @param procStatus the status of given processor
* @param bulletinRepo the bulletin repository
* @param flowController flowController
* @param serviceEntityFactory function for creating a ControllerServiceEntity from a given ID
* @return ProcessorDiagnosticsDTO for the given Processor
*/
public ProcessorDiagnosticsDTO createProcessorDiagnosticsDto(final ProcessorNode procNode, final ProcessorStatus procStatus, final BulletinRepository bulletinRepo,
final FlowController flowController, final Function<String, ControllerServiceEntity> serviceEntityFactory) {
final ProcessorDiagnosticsDTO procDiagnostics = new ProcessorDiagnosticsDTO();
procDiagnostics.setClassLoaderDiagnostics(createClassLoaderDiagnosticsDto(procNode));
procDiagnostics.setIncomingConnections(procNode.getIncomingConnections().stream()
.map(this::createConnectionDiagnosticsDto)
.collect(Collectors.toSet()));
procDiagnostics.setOutgoingConnections(procNode.getConnections().stream()
.map(this::createConnectionDiagnosticsDto)
.collect(Collectors.toSet()));
procDiagnostics.setJvmDiagnostics(createJvmDiagnosticsDto(flowController));
procDiagnostics.setProcessor(createProcessorDto(procNode));
procDiagnostics.setProcessorStatus(createProcessorStatusDto(procStatus));
procDiagnostics.setThreadDumps(createThreadDumpDtos(procNode));
final Set<ControllerServiceDiagnosticsDTO> referencedServiceDiagnostics = createReferencedServiceDiagnostics(procNode.getProperties(), flowController, serviceEntityFactory);
procDiagnostics.setReferencedControllerServices(referencedServiceDiagnostics);
return procDiagnostics;
}
private Set<ControllerServiceDiagnosticsDTO> createReferencedServiceDiagnostics(final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider,
final Function<String, ControllerServiceEntity> serviceEntityFactory) {
final Set<ControllerServiceDiagnosticsDTO> referencedServiceDiagnostics = new HashSet<>();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.getControllerServiceDefinition() == null) {
continue;
}
final String serviceId = entry.getValue();
final ControllerServiceNode serviceNode = serviceProvider.getControllerServiceNode(serviceId);
if (serviceNode == null) {
continue;
}
final ControllerServiceDiagnosticsDTO serviceDiagnostics = createControllerServiceDiagnosticsDto(serviceNode, serviceEntityFactory, serviceProvider);
if (serviceDiagnostics != null) {
referencedServiceDiagnostics.add(serviceDiagnostics);
}
}
return referencedServiceDiagnostics;
}
/**
* Creates a ControllerServiceDiagnosticsDTO from the given Controller Service with some additional supporting information
*
* @param serviceNode the controller service to create diagnostics for
* @param bulletinRepo the bulletin repository
* @param serviceProvider the controller service provider
* @return ControllerServiceDiagnosticsDTO for the given Controller Service
*/
public ControllerServiceDiagnosticsDTO createControllerServiceDiagnosticsDto(final ControllerServiceNode serviceNode, final Function<String, ControllerServiceEntity> serviceEntityFactory,
final ControllerServiceProvider serviceProvider) {
final ControllerServiceDiagnosticsDTO serviceDiagnostics = new ControllerServiceDiagnosticsDTO();
final ControllerServiceEntity serviceEntity = serviceEntityFactory.apply(serviceNode.getIdentifier());
serviceDiagnostics.setControllerService(serviceEntity);
serviceDiagnostics.setClassLoaderDiagnostics(createClassLoaderDiagnosticsDto(serviceNode));
return serviceDiagnostics;
}
private ClassLoaderDiagnosticsDTO createClassLoaderDiagnosticsDto(final ControllerServiceNode serviceNode) {
ClassLoader componentClassLoader = ExtensionManager.getInstanceClassLoader(serviceNode.getIdentifier());
if (componentClassLoader == null) {
componentClassLoader = serviceNode.getControllerServiceImplementation().getClass().getClassLoader();
}
return createClassLoaderDiagnosticsDto(componentClassLoader);
}
private ClassLoaderDiagnosticsDTO createClassLoaderDiagnosticsDto(final ProcessorNode procNode) {
ClassLoader componentClassLoader = ExtensionManager.getInstanceClassLoader(procNode.getIdentifier());
if (componentClassLoader == null) {
componentClassLoader = procNode.getProcessor().getClass().getClassLoader();
}
return createClassLoaderDiagnosticsDto(componentClassLoader);
}
private ClassLoaderDiagnosticsDTO createClassLoaderDiagnosticsDto(final ClassLoader classLoader) {
final ClassLoaderDiagnosticsDTO dto = new ClassLoaderDiagnosticsDTO();
final Bundle bundle = ExtensionManager.getBundle(classLoader);
if (bundle != null) {
dto.setBundle(createBundleDto(bundle.getBundleDetails().getCoordinate()));
}
final ClassLoader parentClassLoader = classLoader.getParent();
if (parentClassLoader != null) {
dto.setParentClassLoader(createClassLoaderDiagnosticsDto(parentClassLoader));
}
return dto;
}
private ConnectionDiagnosticsDTO createConnectionDiagnosticsDto(final Connection connection) {
final ConnectionDiagnosticsDTO dto = new ConnectionDiagnosticsDTO();
dto.setConnection(createConnectionDto(connection));
final FlowFileQueue queue = connection.getFlowFileQueue();
final QueueSize totalSize = queue.size();
dto.setTotalByteCount(totalSize.getByteCount());
dto.setTotalFlowFileCount(totalSize.getObjectCount());
final QueueSize activeSize = queue.getActiveQueueSize();
dto.setActiveQueueByteCount(activeSize.getByteCount());
dto.setActiveQueueFlowFileCount(activeSize.getObjectCount());
final QueueSize inFlightSize = queue.getUnacknowledgedQueueSize();
dto.setInFlightByteCount(inFlightSize.getByteCount());
dto.setInFlightFlowFileCount(inFlightSize.getObjectCount());
final QueueSize swapSize = queue.getSwapQueueSize();
dto.setSwapByteCount(swapSize.getByteCount());
dto.setSwapFlowFileCount(swapSize.getObjectCount());
dto.setSwapFiles(queue.getSwapFileCount());
dto.setAllActiveQueueFlowFilesPenalized(queue.isAllActiveFlowFilesPenalized());
dto.setAnyActiveQueueFlowFilesPenalized(queue.isAnyActiveFlowFilePenalized());
return dto;
}
private JVMDiagnosticsDTO createJvmDiagnosticsDto(final FlowController flowController) {
final JVMDiagnosticsDTO dto = new JVMDiagnosticsDTO();
dto.setAggregateSnapshot(createJvmDiagnosticsSnapshotDto(flowController));
dto.setClustered(flowController.isClustered());
dto.setConnected(flowController.isConnected());
return dto;
}
private JVMDiagnosticsSnapshotDTO createJvmDiagnosticsSnapshotDto(final FlowController flowController) {
final JVMDiagnosticsSnapshotDTO dto = new JVMDiagnosticsSnapshotDTO();
final JVMControllerDiagnosticsSnapshotDTO controllerDiagnosticsDto = new JVMControllerDiagnosticsSnapshotDTO();
final JVMFlowDiagnosticsSnapshotDTO flowDiagnosticsDto = new JVMFlowDiagnosticsSnapshotDTO();
final JVMSystemDiagnosticsSnapshotDTO systemDiagnosticsDto = new JVMSystemDiagnosticsSnapshotDTO();
dto.setControllerDiagnostics(controllerDiagnosticsDto);
dto.setFlowDiagnosticsDto(flowDiagnosticsDto);
dto.setSystemDiagnosticsDto(systemDiagnosticsDto);
final SystemDiagnostics systemDiagnostics = flowController.getSystemDiagnostics();
// flow-related information
final Set<BundleDTO> bundlesLoaded = ExtensionManager.getAllBundles().stream()
.map(bundle -> bundle.getBundleDetails().getCoordinate())
.sorted((a, b) -> a.getCoordinate().compareTo(b.getCoordinate()))
.map(this::createBundleDto)
.collect(Collectors.toCollection(LinkedHashSet::new));
flowDiagnosticsDto.setActiveEventDrivenThreads(flowController.getActiveEventDrivenThreadCount());
flowDiagnosticsDto.setActiveTimerDrivenThreads(flowController.getActiveTimerDrivenThreadCount());
flowDiagnosticsDto.setBundlesLoaded(bundlesLoaded);
flowDiagnosticsDto.setTimeZone(System.getProperty("user.timezone"));
flowDiagnosticsDto.setUptime(FormatUtils.formatHoursMinutesSeconds(systemDiagnostics.getUptime(), TimeUnit.MILLISECONDS));
// controller-related information
controllerDiagnosticsDto.setClusterCoordinator(flowController.isClusterCoordinator());
controllerDiagnosticsDto.setPrimaryNode(flowController.isPrimary());
controllerDiagnosticsDto.setMaxEventDrivenThreads(flowController.getMaxEventDrivenThreadCount());
controllerDiagnosticsDto.setMaxTimerDrivenThreads(flowController.getMaxTimerDrivenThreadCount());
// system-related information
systemDiagnosticsDto.setMaxOpenFileDescriptors(systemDiagnostics.getMaxOpenFileHandles());
systemDiagnosticsDto.setOpenFileDescriptors(systemDiagnostics.getOpenFileHandles());
systemDiagnosticsDto.setPhysicalMemoryBytes(systemDiagnostics.getTotalPhysicalMemory());
systemDiagnosticsDto.setPhysicalMemory(FormatUtils.formatDataSize(systemDiagnostics.getTotalPhysicalMemory()));
final NumberFormat percentageFormat = NumberFormat.getPercentInstance();
percentageFormat.setMaximumFractionDigits(2);
final Set<RepositoryUsageDTO> contentRepoUsage = new HashSet<>();
for (final Map.Entry<String, StorageUsage> entry : systemDiagnostics.getContentRepositoryStorageUsage().entrySet()) {
final String repoName = entry.getKey();
final StorageUsage usage = entry.getValue();
final RepositoryUsageDTO usageDto = new RepositoryUsageDTO();
usageDto.setName(repoName);
usageDto.setFileStoreHash(DigestUtils.sha256Hex(flowController.getContentRepoFileStoreName(repoName)));
usageDto.setFreeSpace(FormatUtils.formatDataSize(usage.getFreeSpace()));
usageDto.setFreeSpaceBytes(usage.getFreeSpace());
usageDto.setTotalSpace(FormatUtils.formatDataSize(usage.getTotalSpace()));
usageDto.setTotalSpaceBytes(usage.getTotalSpace());
final double usedPercentage = (usage.getTotalSpace() - usage.getFreeSpace()) / (double) usage.getTotalSpace();
final String utilization = percentageFormat.format(usedPercentage);
usageDto.setUtilization(utilization);
contentRepoUsage.add(usageDto);
}
final Set<RepositoryUsageDTO> provRepoUsage = new HashSet<>();
for (final Map.Entry<String, StorageUsage> entry : systemDiagnostics.getProvenanceRepositoryStorageUsage().entrySet()) {
final String repoName = entry.getKey();
final StorageUsage usage = entry.getValue();
final RepositoryUsageDTO usageDto = new RepositoryUsageDTO();
usageDto.setName(repoName);
usageDto.setFileStoreHash(DigestUtils.sha256Hex(flowController.getProvenanceRepoFileStoreName(repoName)));
usageDto.setFreeSpace(FormatUtils.formatDataSize(usage.getFreeSpace()));
usageDto.setFreeSpaceBytes(usage.getFreeSpace());
usageDto.setTotalSpace(FormatUtils.formatDataSize(usage.getTotalSpace()));
usageDto.setTotalSpaceBytes(usage.getTotalSpace());
final double usedPercentage = (usage.getTotalSpace() - usage.getFreeSpace()) / (double) usage.getTotalSpace();
final String utilization = percentageFormat.format(usedPercentage);
usageDto.setUtilization(utilization);
provRepoUsage.add(usageDto);
}
final RepositoryUsageDTO flowFileRepoUsage = new RepositoryUsageDTO();
for (final Map.Entry<String, StorageUsage> entry : systemDiagnostics.getProvenanceRepositoryStorageUsage().entrySet()) {
final String repoName = entry.getKey();
final StorageUsage usage = entry.getValue();
flowFileRepoUsage.setName(repoName);
flowFileRepoUsage.setFileStoreHash(DigestUtils.sha256Hex(flowController.getFlowRepoFileStoreName()));
flowFileRepoUsage.setFreeSpace(FormatUtils.formatDataSize(usage.getFreeSpace()));
flowFileRepoUsage.setFreeSpaceBytes(usage.getFreeSpace());
flowFileRepoUsage.setTotalSpace(FormatUtils.formatDataSize(usage.getTotalSpace()));
flowFileRepoUsage.setTotalSpaceBytes(usage.getTotalSpace());
final double usedPercentage = (usage.getTotalSpace() - usage.getFreeSpace()) / (double) usage.getTotalSpace();
final String utilization = percentageFormat.format(usedPercentage);
flowFileRepoUsage.setUtilization(utilization);
}
systemDiagnosticsDto.setContentRepositoryStorageUsage(contentRepoUsage);
systemDiagnosticsDto.setCpuCores(systemDiagnostics.getAvailableProcessors());
systemDiagnosticsDto.setCpuLoadAverage(systemDiagnostics.getProcessorLoadAverage());
systemDiagnosticsDto.setFlowFileRepositoryStorageUsage(flowFileRepoUsage);
systemDiagnosticsDto.setMaxHeapBytes(systemDiagnostics.getMaxHeap());
systemDiagnosticsDto.setMaxHeap(FormatUtils.formatDataSize(systemDiagnostics.getMaxHeap()));
systemDiagnosticsDto.setProvenanceRepositoryStorageUsage(provRepoUsage);
// Create the Garbage Collection History info
final GarbageCollectionHistory gcHistory = flowController.getGarbageCollectionHistory();
final List<GarbageCollectionDiagnosticsDTO> gcDiagnostics = new ArrayList<>();
for (final String memoryManager : gcHistory.getMemoryManagerNames()) {
final List<GarbageCollectionStatus> statuses = gcHistory.getGarbageCollectionStatuses(memoryManager);
final List<GCDiagnosticsSnapshotDTO> gcSnapshots = new ArrayList<>();
for (final GarbageCollectionStatus status : statuses) {
final GCDiagnosticsSnapshotDTO snapshotDto = new GCDiagnosticsSnapshotDTO();
snapshotDto.setTimestamp(status.getTimestamp());
snapshotDto.setCollectionCount(status.getCollectionCount());
snapshotDto.setCollectionMillis(status.getCollectionMillis());
gcSnapshots.add(snapshotDto);
}
final GarbageCollectionDiagnosticsDTO gcDto = new GarbageCollectionDiagnosticsDTO();
gcDto.setMemoryManagerName(memoryManager);
gcDto.setSnapshots(gcSnapshots);
gcDiagnostics.add(gcDto);
}
systemDiagnosticsDto.setGarbageCollectionDiagnostics(gcDiagnostics);
return dto;
}
private List<ThreadDumpDTO> createThreadDumpDtos(final ProcessorNode procNode) {
final List<ThreadDumpDTO> threadDumps = new ArrayList<>();
final List<ActiveThreadInfo> activeThreads = procNode.getActiveThreads();
for (final ActiveThreadInfo threadInfo : activeThreads) {
final ThreadDumpDTO dto = new ThreadDumpDTO();
dto.setStackTrace(threadInfo.getStackTrace());
dto.setThreadActiveMillis(threadInfo.getActiveMillis());
dto.setThreadName(threadInfo.getThreadName());
threadDumps.add(dto);
}
return threadDumps;
}
/**
* Creates a ProcessorConfigDTO from the specified ProcessorNode.
*

View File

@ -17,6 +17,7 @@
package org.apache.nifi.web.api.dto;
import org.apache.nifi.web.api.dto.action.ActionDTO;
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
import org.apache.nifi.web.api.dto.flow.FlowBreadcrumbDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
@ -54,6 +55,7 @@ import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessorDiagnosticsEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity;
@ -77,6 +79,23 @@ import java.util.List;
public final class EntityFactory {
public ProcessorDiagnosticsEntity createProcessorDiagnosticsEntity(final ProcessorDiagnosticsDTO dto, final RevisionDTO revision, final PermissionsDTO processorPermissions,
final ProcessorStatusDTO status, final List<BulletinEntity> bulletins) {
final ProcessorDiagnosticsEntity entity = new ProcessorDiagnosticsEntity();
entity.setRevision(revision);
if (dto != null) {
entity.setPermissions(processorPermissions);
entity.setId(dto.getProcessor().getId());
if (processorPermissions != null && processorPermissions.getCanRead()) {
entity.setComponent(dto);
entity.setBulletins(bulletins);
}
}
entity.setBulletins(bulletins);
return entity;
}
public StatusHistoryEntity createStatusHistoryEntity(final StatusHistoryDTO statusHistory, final PermissionsDTO permissions) {
final StatusHistoryEntity entity = new StatusHistoryEntity();
entity.setCanRead(permissions.getCanRead());

View File

@ -16,6 +16,10 @@
*/
package org.apache.nifi.web.controller;
import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS;
import javax.ws.rs.WebApplicationException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
@ -79,6 +83,7 @@ import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.util.BundleUtils;
@ -90,6 +95,7 @@ import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.BundleDTO;
import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
import org.apache.nifi.web.api.dto.provenance.AttributeDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
@ -103,10 +109,10 @@ import org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO.LineageR
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.WebApplicationException;
import java.io.IOException;
import java.io.InputStream;
import java.text.Collator;
@ -127,10 +133,9 @@ import java.util.TimeZone;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS;
public class ControllerFacade implements Authorizable {
private static final Logger logger = LoggerFactory.getLogger(ControllerFacade.class);
@ -1527,6 +1532,12 @@ public class ControllerFacade implements Authorizable {
flowController.verifyComponentTypesInSnippet(versionedFlow);
}
public ProcessorDiagnosticsDTO getProcessorDiagnostics(final ProcessorNode processor, final ProcessorStatus processorStatus, final BulletinRepository bulletinRepository,
final Function<String, ControllerServiceEntity> serviceEntityFactory) {
return dtoFactory.createProcessorDiagnosticsDto(processor, processorStatus, bulletinRepository, flowController, serviceEntityFactory);
}
/*
* setters
*/

View File

@ -548,6 +548,21 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
}
}
@Override
public String getContainerFileStoreName(final String containerName) {
final Map<String, File> map = configuration.getStorageDirectories();
final File container = map.get(containerName);
if (container == null) {
return null;
}
try {
return Files.getFileStore(container.toPath()).name();
} catch (IOException e) {
return null;
}
}
@Override
public long getContainerUsableSpace(String containerName) throws IOException {
Map<String, File> map = configuration.getStorageDirectories();
@ -2478,7 +2493,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
public Collection<Path> getAllLogFiles() {
final SortedMap<Long, Path> map = idToPathMap.get();
return map == null ? new ArrayList<Path>() : map.values();
return map == null ? new ArrayList<>() : map.values();
}
private static class PathMapComparator implements Comparator<Long> {

View File

@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -321,6 +322,21 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
}
}
@Override
public String getContainerFileStoreName(final String containerName) {
final Map<String, File> map = config.getStorageDirectories();
final File container = map.get(containerName);
if (container == null) {
return null;
}
try {
return Files.getFileStore(container.toPath()).name();
} catch (IOException e) {
return null;
}
}
@Override
public long getContainerUsableSpace(String containerName) throws IOException {
Map<String, File> map = config.getStorageDirectories();

View File

@ -613,6 +613,11 @@ public class VolatileProvenanceRepository implements ProvenanceRepository {
return maxSize - ringBuffer.getSize();
}
public String getContainerFileStoreName(String containerName) {
return null;
}
private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final NiFiUser user, final LineageComputationType computationType, final Long eventId) {
final String userId = user.getIdentity();
final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, 1, userId);