NIFI-1295:

- Adding UI controls for terminating hung threads.
- Showing current number of terminated threads.
- Fixing issue when replicating terminate threads request throughout the cluster.

This closes #2607.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Matt Gilman 2018-04-05 10:56:54 -04:00 committed by Mark Payne
parent fdea876ede
commit af2513adf8
20 changed files with 423 additions and 120 deletions

View File

@ -35,6 +35,7 @@ public class ProcessGroupStatus implements Cloneable {
private Integer outputCount;
private Long outputContentSize;
private Integer activeThreadCount;
private Integer terminatedThreadCount;
private Integer queuedCount;
private Long queuedContentSize;
private Long bytesRead;
@ -149,6 +150,14 @@ public class ProcessGroupStatus implements Cloneable {
this.activeThreadCount = activeThreadCount;
}
public Integer getTerminatedThreadCount() {
return terminatedThreadCount;
}
public void setTerminatedThreadCount(Integer terminatedThreadCount) {
this.terminatedThreadCount = terminatedThreadCount;
}
public Collection<ConnectionStatus> getConnectionStatus() {
return connectionStatus;
}
@ -257,6 +266,7 @@ public class ProcessGroupStatus implements Cloneable {
clonedObj.inputContentSize = inputContentSize;
clonedObj.inputCount = inputCount;
clonedObj.activeThreadCount = activeThreadCount;
clonedObj.terminatedThreadCount = terminatedThreadCount;
clonedObj.queuedContentSize = queuedContentSize;
clonedObj.queuedCount = queuedCount;
clonedObj.bytesRead = bytesRead;
@ -334,6 +344,8 @@ public class ProcessGroupStatus implements Cloneable {
builder.append(outputContentSize);
builder.append(", activeThreadCount=");
builder.append(activeThreadCount);
builder.append(", terminatedThreadCount=");
builder.append(terminatedThreadCount);
builder.append(", flowFilesTransferred=");
builder.append(flowFilesTransferred);
builder.append(", bytesTransferred=");
@ -403,6 +415,7 @@ public class ProcessGroupStatus implements Cloneable {
target.setBytesRead(target.getBytesRead() + toMerge.getBytesRead());
target.setBytesWritten(target.getBytesWritten() + toMerge.getBytesWritten());
target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
target.setTerminatedThreadCount(target.getTerminatedThreadCount() + toMerge.getTerminatedThreadCount());
target.setFlowFilesTransferred(target.getFlowFilesTransferred() + toMerge.getFlowFilesTransferred());
target.setBytesTransferred(target.getBytesTransferred() + toMerge.getBytesTransferred());
target.setFlowFilesReceived(target.getFlowFilesReceived() + toMerge.getFlowFilesReceived());
@ -452,6 +465,7 @@ public class ProcessGroupStatus implements Cloneable {
}
merged.setActiveThreadCount(merged.getActiveThreadCount() + statusToMerge.getActiveThreadCount());
merged.setTerminatedThreadCount(merged.getTerminatedThreadCount() + statusToMerge.getTerminatedThreadCount());
merged.setBytesRead(merged.getBytesRead() + statusToMerge.getBytesRead());
merged.setBytesWritten(merged.getBytesWritten() + statusToMerge.getBytesWritten());
merged.setInputBytes(merged.getInputBytes() + statusToMerge.getInputBytes());

View File

@ -43,6 +43,7 @@ public class ProcessorStatus implements Cloneable {
private int flowFilesRemoved;
private long averageLineageDuration;
private int activeThreadCount;
private int terminatedThreadCount;
private int flowFilesReceived;
private long bytesReceived;
private int flowFilesSent;
@ -193,6 +194,14 @@ public class ProcessorStatus implements Cloneable {
this.activeThreadCount = activeThreadCount;
}
public int getTerminatedThreadCount() {
return terminatedThreadCount;
}
public void setTerminatedThreadCount(int terminatedThreadCount) {
this.terminatedThreadCount = terminatedThreadCount;
}
public int getFlowFilesReceived() {
return flowFilesReceived;
}
@ -237,6 +246,7 @@ public class ProcessorStatus implements Cloneable {
public ProcessorStatus clone() {
final ProcessorStatus clonedObj = new ProcessorStatus();
clonedObj.activeThreadCount = activeThreadCount;
clonedObj.terminatedThreadCount = terminatedThreadCount;
clonedObj.bytesRead = bytesRead;
clonedObj.bytesWritten = bytesWritten;
clonedObj.flowFilesReceived = flowFilesReceived;
@ -294,6 +304,8 @@ public class ProcessorStatus implements Cloneable {
builder.append(processingNanos);
builder.append(", activeThreadCount=");
builder.append(activeThreadCount);
builder.append(", terminatedThreadCount=");
builder.append(terminatedThreadCount);
builder.append(", counters=");
builder.append(counters);
builder.append("]");

View File

@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlType;
public class ControllerStatusDTO implements Cloneable {
private Integer activeThreadCount = 0;
private Integer terminatedThreadCount = 0;
private String queued;
private Integer flowFilesQueued = 0;
private Long bytesQueued = 0L;
@ -58,6 +59,20 @@ public class ControllerStatusDTO implements Cloneable {
this.activeThreadCount = activeThreadCount;
}
/**
* The terminated thread count.
*
* @return The terminated thread count
*/
@ApiModelProperty("The number of terminated threads in the NiFi.")
public Integer getTerminatedThreadCount() {
return terminatedThreadCount;
}
public void setTerminatedThreadCount(Integer terminatedThreadCount) {
this.terminatedThreadCount = terminatedThreadCount;
}
/**
* @return queue for the controller
*/
@ -209,6 +224,7 @@ public class ControllerStatusDTO implements Cloneable {
public ControllerStatusDTO clone() {
final ControllerStatusDTO other = new ControllerStatusDTO();
other.setActiveThreadCount(getActiveThreadCount());
other.setTerminatedThreadCount(getTerminatedThreadCount());
other.setQueued(getQueued());
other.setFlowFilesQueued(getFlowFilesQueued());
other.setBytesQueued(getBytesQueued());

View File

@ -77,6 +77,7 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable {
private String sent;
private Integer activeThreadCount = 0;
private Integer terminatedThreadCount = 0;
/**
* The id for the process group.
@ -127,12 +128,24 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable {
this.activeThreadCount = activeThreadCount;
}
/**
* @return number of threads currently terminated for this process group
*/
@ApiModelProperty("The number of threads currently terminated for the process group.")
public Integer getTerminatedThreadCount() {
return terminatedThreadCount;
}
public void setTerminatedThreadCount(Integer terminatedThreadCount) {
this.terminatedThreadCount = terminatedThreadCount;
}
/**
* The status of all connections in this process group.
*
* @return The status of all connections
*/
@ApiModelProperty("The status of all conenctions in the process group.")
@ApiModelProperty("The status of all connections in the process group.")
public Collection<ConnectionStatusSnapshotEntity> getConnectionStatusSnapshots() {
return connectionStatusSnapshots;
}
@ -523,6 +536,7 @@ public class ProcessGroupStatusSnapshotDTO implements Cloneable {
other.setSent(getSent());
other.setActiveThreadCount(getActiveThreadCount());
other.setTerminatedThreadCount(getTerminatedThreadCount());
other.setConnectionStatusSnapshots(copy(getConnectionStatusSnapshots()));
other.setProcessorStatusSnapshots(copy(getProcessorStatusSnapshots()));

View File

@ -51,6 +51,7 @@ public class ProcessorStatusSnapshotDTO implements Cloneable {
private String tasks;
private String tasksDuration;
private Integer activeThreadCount = 0;
private Integer terminatedThreadCount = 0;
/* getters / setters */
/**
@ -188,6 +189,18 @@ public class ProcessorStatusSnapshotDTO implements Cloneable {
this.activeThreadCount = threadCount;
}
/**
* @return number of threads currently terminated for this Processor
*/
@ApiModelProperty("The number of threads currently terminated for the processor.")
public Integer getTerminatedThreadCount() {
return terminatedThreadCount;
}
public void setTerminatedThreadCount(Integer terminatedThreadCount) {
this.terminatedThreadCount = terminatedThreadCount;
}
/**
* @return number of task this connectable has had over the last 5 minutes
*/
@ -304,6 +317,7 @@ public class ProcessorStatusSnapshotDTO implements Cloneable {
other.setTasksDuration(getTasksDuration());
other.setTasksDurationNanos(getTasksDurationNanos());
other.setActiveThreadCount(getActiveThreadCount());
other.setTerminatedThreadCount(getTerminatedThreadCount());
other.setInput(getInput());
other.setOutput(getOutput());
other.setRead(getRead());

View File

@ -17,17 +17,6 @@
package org.apache.nifi.cluster.manager;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.status.RunStatus;
import org.apache.nifi.controller.status.TransmissionStatus;
import org.apache.nifi.registry.flow.VersionedFlowState;
@ -69,6 +58,17 @@ import org.apache.nifi.web.api.entity.ProcessGroupStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ProcessorStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupStatusSnapshotEntity;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class StatusMerger {
private static final String ZERO_COUNT = "0";
private static final String ZERO_BYTES = "0 bytes";
@ -82,6 +82,7 @@ public class StatusMerger {
}
target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
target.setTerminatedThreadCount(target.getTerminatedThreadCount() + toMerge.getTerminatedThreadCount());
target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
@ -158,6 +159,7 @@ public class StatusMerger {
target.setFlowFilesSent(target.getFlowFilesSent() + toMerge.getFlowFilesSent());
target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
target.setTerminatedThreadCount(target.getTerminatedThreadCount() + toMerge.getTerminatedThreadCount());
updatePrettyPrintedFields(target);
// connection status
@ -431,6 +433,7 @@ public class StatusMerger {
target.setTaskCount(target.getTaskCount() + toMerge.getTaskCount());
target.setTasksDurationNanos(target.getTasksDurationNanos() + toMerge.getTasksDurationNanos());
target.setActiveThreadCount(target.getActiveThreadCount() + toMerge.getActiveThreadCount());
target.setTerminatedThreadCount(target.getTerminatedThreadCount() + toMerge.getTerminatedThreadCount());
updatePrettyPrintedFields(target);
}

View File

@ -2780,6 +2780,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
status.setId(group.getIdentifier());
status.setName(isAuthorized.evaluate(group) ? group.getName() : group.getIdentifier());
int activeGroupThreads = 0;
int terminatedGroupThreads = 0;
long bytesRead = 0L;
long bytesWritten = 0L;
int queuedCount = 0;
@ -2802,6 +2803,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ProcessorStatus procStat = getProcessorStatus(statusReport, procNode, isAuthorized);
processorStatusCollection.add(procStat);
activeGroupThreads += procStat.getActiveThreadCount();
terminatedGroupThreads += procStat.getTerminatedThreadCount();
bytesRead += procStat.getBytesRead();
bytesWritten += procStat.getBytesWritten();
@ -2818,6 +2820,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ProcessGroupStatus childGroupStatus = getGroupStatus(childGroup, statusReport, isAuthorized);
localChildGroupStatusCollection.add(childGroupStatus);
activeGroupThreads += childGroupStatus.getActiveThreadCount();
terminatedGroupThreads += childGroupStatus.getTerminatedThreadCount();
bytesRead += childGroupStatus.getBytesRead();
bytesWritten += childGroupStatus.getBytesWritten();
queuedCount += childGroupStatus.getQueuedCount();
@ -3042,6 +3045,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
status.setActiveThreadCount(activeGroupThreads);
status.setTerminatedThreadCount(terminatedGroupThreads);
status.setBytesRead(bytesRead);
status.setBytesWritten(bytesWritten);
status.setQueuedCount(queuedCount);
@ -3216,6 +3220,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
status.setExecutionNode(procNode.getExecutionNode());
status.setTerminatedThreadCount(procNode.getTerminatedThreadCount());
status.setActiveThreadCount(processScheduler.getActiveThreadCount(procNode));
return status;
@ -3848,6 +3853,73 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return new QueueSize(count, contentSize);
}
public class GroupStatusCounts {
private int queuedCount = 0;
private long queuedContentSize = 0;
private int activeThreadCount = 0;
private int terminatedThreadCount = 0;
public GroupStatusCounts(final ProcessGroup group) {
calculateCounts(group);
}
private void calculateCounts(final ProcessGroup group) {
for (final Connection connection : group.getConnections()) {
final QueueSize size = connection.getFlowFileQueue().size();
queuedCount += size.getObjectCount();
queuedContentSize += size.getByteCount();
final Connectable source = connection.getSource();
if (ConnectableType.REMOTE_OUTPUT_PORT.equals(source.getConnectableType())) {
final RemoteGroupPort remoteOutputPort = (RemoteGroupPort) source;
activeThreadCount += processScheduler.getActiveThreadCount(remoteOutputPort);
}
final Connectable destination = connection.getDestination();
if (ConnectableType.REMOTE_INPUT_PORT.equals(destination.getConnectableType())) {
final RemoteGroupPort remoteInputPort = (RemoteGroupPort) destination;
activeThreadCount += processScheduler.getActiveThreadCount(remoteInputPort);
}
}
for (final ProcessorNode processor : group.getProcessors()) {
activeThreadCount += processScheduler.getActiveThreadCount(processor);
terminatedThreadCount += processor.getTerminatedThreadCount();
}
for (final Port port : group.getInputPorts()) {
activeThreadCount += processScheduler.getActiveThreadCount(port);
}
for (final Port port : group.getOutputPorts()) {
activeThreadCount += processScheduler.getActiveThreadCount(port);
}
for (final Funnel funnel : group.getFunnels()) {
activeThreadCount += processScheduler.getActiveThreadCount(funnel);
}
for (final ProcessGroup childGroup : group.getProcessGroups()) {
calculateCounts(childGroup);
}
}
public int getQueuedCount() {
return queuedCount;
}
public long getQueuedContentSize() {
return queuedContentSize;
}
public int getActiveThreadCount() {
return activeThreadCount;
}
public int getTerminatedThreadCount() {
return terminatedThreadCount;
}
}
public GroupStatusCounts getGroupStatusCounts(final ProcessGroup group) {
return new GroupStatusCounts(group);
}
public int getActiveThreadCount() {
final int timerDrivenCount = timerDrivenEngineRef.get().getActiveCount();
final int eventDrivenCount = eventDrivenSchedulingAgent.getActiveThreadCount();

View File

@ -16,34 +16,6 @@
*/
package org.apache.nifi.controller.repository;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessorNode;
@ -86,6 +58,34 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* <p>
* Provides a ProcessSession that ensures all accesses, changes and transfers
@ -330,7 +330,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
public void commit() {
public synchronized void commit() {
verifyTaskActive();
checkpoint();
commit(this.checkpoint);
@ -946,7 +946,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
verifyTaskActive();
}
private void rollback(final boolean penalize, final boolean rollbackCheckpoint) {
private synchronized void rollback(final boolean penalize, final boolean rollbackCheckpoint) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} session rollback called, FlowFile records are {} {}",
this, loggableFlowfileInfo(), new Throwable("Stack Trace on rollback"));
@ -1163,6 +1163,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private void acknowledgeRecords() {
for (final Map.Entry<FlowFileQueue, Set<FlowFileRecord>> entry : unacknowledgedFlowFiles.entrySet()) {
LOG.trace("Acknowledging {} for {}", entry.getValue(), entry.getKey());
entry.getKey().acknowledge(entry.getValue());
}
unacknowledgedFlowFiles.clear();

View File

@ -16,23 +16,12 @@
*/
package org.apache.nifi.web.api;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AuthorizeControllerServiceReference;
import org.apache.nifi.authorization.Authorizer;
@ -59,16 +48,25 @@ import org.apache.nifi.web.api.entity.PropertyDescriptorEntity;
import org.apache.nifi.web.api.request.ClientIdParameter;
import org.apache.nifi.web.api.request.LongParameter;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.List;
import java.util.Set;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
/**
* RESTful endpoint for managing a Processor.
*/
@ -229,7 +227,7 @@ public class ProcessorResource extends ApplicationResource {
@ApiParam(value = "The processor id.", required = true) @PathParam("id") final String id) {
if (isReplicateRequest()) {
return replicate(HttpMethod.POST);
return replicate(HttpMethod.DELETE);
}
final ProcessorEntity requestProcessorEntity = new ProcessorEntity();

View File

@ -1002,7 +1002,10 @@ public final class DtoFactory {
snapshot.setBytesSent(processGroupStatus.getBytesSent());
snapshot.setFlowFilesReceived(processGroupStatus.getFlowFilesReceived());
snapshot.setBytesReceived(processGroupStatus.getBytesReceived());
snapshot.setActiveThreadCount(processGroupStatus.getActiveThreadCount());
snapshot.setTerminatedThreadCount(processGroupStatus.getTerminatedThreadCount());
StatusMerger.updatePrettyPrintedFields(snapshot);
return processGroupStatusDto;
}
@ -1167,6 +1170,7 @@ public final class DtoFactory {
snapshot.setExecutionNode(procStatus.getExecutionNode().toString());
snapshot.setActiveThreadCount(procStatus.getActiveThreadCount());
snapshot.setTerminatedThreadCount(procStatus.getTerminatedThreadCount());
snapshot.setType(procStatus.getType());
StatusMerger.updatePrettyPrintedFields(snapshot);

View File

@ -16,10 +16,6 @@
*/
package org.apache.nifi.web.controller;
import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS;
import javax.ws.rs.WebApplicationException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
@ -45,11 +41,11 @@ import org.apache.nifi.controller.ContentAvailability;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.FlowController.GroupStatusCounts;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceNode;
@ -113,6 +109,7 @@ import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.WebApplicationException;
import java.io.IOException;
import java.io.InputStream;
import java.text.Collator;
@ -136,6 +133,8 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.nifi.controller.FlowController.ROOT_GROUP_ID_ALIAS;
public class ControllerFacade implements Authorizable {
private static final Logger logger = LoggerFactory.getLogger(ControllerFacade.class);
@ -572,13 +571,14 @@ public class ControllerFacade implements Authorizable {
*/
public ControllerStatusDTO getControllerStatus() {
final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
final GroupStatusCounts groupStatusCounts = flowController.getGroupStatusCounts(rootGroup);
final QueueSize controllerQueueSize = flowController.getTotalFlowFileCount(rootGroup);
final ControllerStatusDTO controllerStatus = new ControllerStatusDTO();
controllerStatus.setActiveThreadCount(flowController.getActiveThreadCount());
controllerStatus.setQueued(FormatUtils.formatCount(controllerQueueSize.getObjectCount()) + " / " + FormatUtils.formatDataSize(controllerQueueSize.getByteCount()));
controllerStatus.setBytesQueued(controllerQueueSize.getByteCount());
controllerStatus.setFlowFilesQueued(controllerQueueSize.getObjectCount());
controllerStatus.setActiveThreadCount(groupStatusCounts.getActiveThreadCount());
controllerStatus.setTerminatedThreadCount(groupStatusCounts.getTerminatedThreadCount());
controllerStatus.setQueued(FormatUtils.formatCount(groupStatusCounts.getQueuedCount()) + " / " + FormatUtils.formatDataSize(groupStatusCounts.getQueuedContentSize()));
controllerStatus.setBytesQueued(groupStatusCounts.getQueuedContentSize());
controllerStatus.setFlowFilesQueued(groupStatusCounts.getQueuedCount());
final ProcessGroupCounts counts = rootGroup.getCounts();
controllerStatus.setRunningCount(counts.getRunningCount());

View File

@ -17,23 +17,52 @@
<%@ page contentType="text/html" pageEncoding="UTF-8" session="false" %>
<div id="flow-status" flex layout="row" layout-align="space-between center">
<div id="flow-status-container" layout="row" layout-align="space-around center">
<div class="fa fa-cubes" ng-if="appCtrl.nf.ClusterSummary.isClustered()" title="Connected nodes / Total number of nodes in the cluster"><span id="connected-nodes-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.connectedNodesCount}}</span></div>
<div class="icon icon-threads" title="Active threads"><span id="active-thread-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.activeThreadCount}}</span></div>
<div class="fa fa-list" title="Total queued data"><span id="total-queued">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.totalQueued}}</span></div>
<div class="fa fa-bullseye" title="Transmitting Remote Process Groups"><span id="controller-transmitting-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerTransmittingCount}}</span></div>
<div class="icon icon-transmit-false" title="Not Transmitting Remote Process Groups"><span id="controller-not-transmitting-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerNotTransmittingCount}}</span></div>
<div class="fa fa-play" title="Running Components"><span id="controller-running-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerRunningCount}}</span></div>
<div class="fa fa-stop" title="Stopped Components"><span id="controller-stopped-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerStoppedCount}}</span></div>
<div class="fa fa-warning" title="Invalid Components"><span id="controller-invalid-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerInvalidCount}}</span></div>
<div class="icon icon-enable-false" title="Disabled Components"><span id="controller-disabled-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerDisabledCount}}</span></div>
<div class="fa fa-check" title="Up to date Versioned Process Groups"><span id="controller-up-to-date-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerUpToDateCount}}</span></div>
<div class="fa fa-asterisk" title="Locally modified Versioned Process Groups"><span id="controller-locally-modified-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerLocallyModifiedCount}}</span></div>
<div class="fa fa-arrow-circle-up" title="Stale Versioned Process Groups"><span id="controller-stale-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerStaleCount}}</span></div>
<div class="fa fa-cubes" ng-if="appCtrl.nf.ClusterSummary.isClustered()" title="Connected nodes / Total number of nodes in the cluster">
<span id="connected-nodes-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.connectedNodesCount}}</span>
</div>
<div class="icon icon-threads" ng-class="appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.getExtraThreadStyles()"
title="Active Threads{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.hasTerminatedThreads() ? ' (Terminated)' : ''}}">
<span id="active-thread-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.threadCounts}}</span>
</div>
<div class="fa fa-list" title="Total queued data">
<span id="total-queued">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.totalQueued}}</span>
</div>
<div class="fa fa-bullseye" title="Transmitting Remote Process Groups">
<span id="controller-transmitting-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerTransmittingCount}}</span>
</div>
<div class="icon icon-transmit-false" title="Not Transmitting Remote Process Groups">
<span id="controller-not-transmitting-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerNotTransmittingCount}}</span>
</div>
<div class="fa fa-play" title="Running Components">
<span id="controller-running-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerRunningCount}}</span>
</div>
<div class="fa fa-stop" title="Stopped Components">
<span id="controller-stopped-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerStoppedCount}}</span>
</div>
<div class="fa fa-warning" title="Invalid Components">
<span id="controller-invalid-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerInvalidCount}}</span>
</div>
<div class="icon icon-enable-false" title="Disabled Components">
<span id="controller-disabled-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerDisabledCount}}</span>
</div>
<div class="fa fa-check" title="Up to date Versioned Process Groups">
<span id="controller-up-to-date-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerUpToDateCount}}</span>
</div>
<div class="fa fa-asterisk" title="Locally modified Versioned Process Groups">
<span id="controller-locally-modified-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerLocallyModifiedCount}}</span>
</div>
<div class="fa fa-arrow-circle-up" title="Stale Versioned Process Groups">
<span id="controller-stale-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerStaleCount}}</span>
</div>
<div class="fa fa-exclamation-circle" title="Locally modified and stale Versioned Process Groups">
<span id="controller-locally-modified-and-stale-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerLocallyModifiedAndStaleCount}}</span>
</div>
<div class="fa fa-question" title="Sync failure Versioned Process Groups"><span id="controller-sync-failure-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerSyncFailureCount}}</span></div>
<div class="fa fa-refresh" title="Last refresh"><span id="stats-last-refreshed">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.statsLastRefreshed}}</span></div>
<div class="fa fa-question" title="Sync failure Versioned Process Groups">
<span id="controller-sync-failure-count">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.controllerSyncFailureCount}}</span>
</div>
<div class="fa fa-refresh" title="Last refresh">
<span id="stats-last-refreshed">{{appCtrl.serviceProvider.headerCtrl.flowStatusCtrl.statsLastRefreshed}}</span>
</div>
<div id="canvas-loading-container" class="loading-container"></div>
</div>
<div layout="row" layout-align="end center">

View File

@ -182,7 +182,7 @@ div.valid {
background-color: transparent;
}
div.has-bulletins {
div.has-bulletins, div.warning {
color: #ba554a !important;
}

View File

@ -67,6 +67,8 @@
function FlowStatusCtrl() {
this.connectedNodesCount = "-";
this.activeThreadCount = "-";
this.terminatedThreadCount = "-";
this.threadCounts = "-";
this.totalQueued = "-";
this.controllerTransmittingCount = "-";
this.controllerNotTransmittingCount = "-";
@ -436,27 +438,48 @@
$('#connected-nodes-count').closest('div.fa-cubes').css('color', color);
},
/**
* Returns whether there are any terminated threads.
*
* @returns {boolean} whether there are any terminated threads
*/
hasTerminatedThreads: function () {
if (Number.isInteger(this.terminatedThreadCount)) {
return this.terminatedThreadCount > 0;
} else {
return false;
}
},
/**
* Returns any additional styles to apply to the thread counts.
*
* @returns {string}
*/
getExtraThreadStyles: function () {
if (Number.isInteger(this.terminatedThreadCount) && this.terminatedThreadCount > 0) {
return 'warning';
} else if (this.activeThreadCount === 0) {
return 'zero';
}
return '';
},
/**
* Update the flow status counts.
*
* @param status The controller status returned from the `../nifi-api/flow/status` endpoint.
*/
update: function (status) {
var controllerInvalidCount = (nfCommon.isDefinedAndNotNull(status.invalidCount)) ? status.invalidCount : 0;
if (this.controllerInvalidCount > 0) {
$('#controller-invalid-count').parent().removeClass('zero').addClass('invalid');
} else {
$('#controller-invalid-count').parent().removeClass('invalid').addClass('zero');
}
// update the report values
this.activeThreadCount = status.activeThreadCount;
this.terminatedThreadCount = status.terminatedThreadCount;
if (this.activeThreadCount > 0) {
$('#flow-status-container').find('.icon-threads').removeClass('zero');
if (this.hasTerminatedThreads()) {
this.threadCounts = this.activeThreadCount + ' (' + this.terminatedThreadCount + ')';
} else {
$('#flow-status-container').find('.icon-threads').addClass('zero');
this.threadCounts = this.activeThreadCount;
}
this.totalQueued = status.queued;

View File

@ -766,6 +766,25 @@
}
},
/**
* Terminates active threads for the selected component.
*
* @param {selection} selection
*/
terminate: function (selection) {
if (selection.size() === 1 && nfCanvasUtils.isProcessor(selection)) {
var selectionData = selection.datum();
$.ajax({
type: 'DELETE',
url: selectionData.uri + '/threads',
dataType: 'json'
}).done(function (response) {
nfProcessor.set(response);
}).fail(nfErrorHandler.handleAjaxError);
}
},
/**
* Enables transmission for the components in the specified selection.
*

View File

@ -772,21 +772,45 @@
* @return
*/
activeThreadCount: function (selection, d, setOffset) {
var activeThreads = d.status.aggregateSnapshot.activeThreadCount;
var terminatedThreads = d.status.aggregateSnapshot.terminatedThreadCount;
// if there is active threads show the count, otherwise hide
if (d.status.aggregateSnapshot.activeThreadCount > 0) {
if (activeThreads > 0 || terminatedThreads > 0) {
var generateThreadsTip = function () {
var tip = activeThreads + ' active threads';
if (terminatedThreads > 0) {
tip += ' (' + terminatedThreads + ' terminated)';
}
return tip;
};
// update the active thread count
var activeThreadCount = selection.select('text.active-thread-count')
.text(function () {
return d.status.aggregateSnapshot.activeThreadCount;
if (terminatedThreads > 0) {
return activeThreads + ' (' + terminatedThreads + ')';
} else {
return activeThreads;
}
})
.style('display', 'block')
.each(function () {
var activeThreadCountText = d3.select(this);
var bBox = this.getBBox();
d3.select(this).attr('x', function () {
activeThreadCountText.attr('x', function () {
return d.dimensions.width - bBox.width - 15;
});
// reset the active thread count tooltip
activeThreadCountText.selectAll('title').remove();
});
// append the tooltip
activeThreadCount.append('title').text(generateThreadsTip);
// update the background width
selection.select('text.active-thread-count-icon')
.attr('x', function () {
@ -799,9 +823,26 @@
return d.dimensions.width - bBox.width - 20;
})
.style('display', 'block');
.style('fill', function () {
if (terminatedThreads > 0) {
return '#ba554a';
} else {
return '#728e9b';
}
})
.style('display', 'block')
.each(function () {
var activeThreadCountIcon = d3.select(this);
// reset the active thread count tooltip
activeThreadCountIcon.selectAll('title').remove();
}).append('title').text(generateThreadsTip);
} else {
selection.selectAll('text.active-thread-count, text.active-thread-count-icon').style('display', 'none');
selection.selectAll('text.active-thread-count, text.active-thread-count-icon')
.style('display', 'none')
.each(function () {
d3.select(this).selectAll('title').remove();
});
}
},

View File

@ -163,6 +163,30 @@
return nfCanvasUtils.areStoppable(selection);
};
/**
* Determines whether the components in the specified selection can be terminated.
*
* @param {selection} selection The selections of currently selected components
*/
var canTerminate = function (selection) {
if (selection.size() !== 1) {
return false;
}
if (nfCanvasUtils.canModify(selection) === false) {
return false;
}
var terminatable = false;
if (nfCanvasUtils.isProcessor(selection)) {
var selectionData = selection.datum();
var aggregateSnapshot = selectionData.status.aggregateSnapshot;
terminatable = aggregateSnapshot.runStatus !== 'Running' && aggregateSnapshot.activeThreadCount > 0;
}
return terminatable;
};
/**
* Determines whether the components in the specified selection support stats.
*
@ -771,6 +795,7 @@
{separator: true},
{id: 'start-menu-item', condition: isRunnable, menuItem: {clazz: 'fa fa-play', text: 'Start', action: 'start'}},
{id: 'stop-menu-item', condition: isStoppable, menuItem: {clazz: 'fa fa-stop', text: 'Stop', action: 'stop'}},
{id: 'terminate-menu-item', condition: canTerminate, menuItem: {clazz: 'fa fa-hourglass-end', text: 'Terminate', action: 'terminate'}},
{id: 'enable-menu-item', condition: canEnable, menuItem: {clazz: 'fa fa-flash', text: 'Enable', action: 'enable'}},
{id: 'disable-menu-item', condition: canDisable, menuItem: {clazz: 'icon icon-enable-false', text: 'Disable', action: 'disable'}},
{id: 'enable-transmission-menu-item', condition: canStartTransmission, menuItem: {clazz: 'fa fa-bullseye', text: 'Enable transmission', action: 'enableTransmission'}},

View File

@ -172,7 +172,7 @@
.attrs({
'x': 10,
'y': 20,
'width': 316,
'width': 300,
'height': 16,
'class': 'process-group-name'
});
@ -1214,9 +1214,9 @@
if (isUnderVersionControl(processGroupData)) {
var versionControlX = parseInt(versionControl.attr('x'), 10);
var processGroupNameX = parseInt(d3.select(this).attr('x'), 10);
return 316 - (processGroupNameX - versionControlX);
return 300 - (processGroupNameX - versionControlX);
} else {
return 316;
return 300;
}
}
})

View File

@ -148,7 +148,7 @@
.attrs({
'x': 75,
'y': 18,
'width': 210,
'width': 230,
'height': 14,
'class': 'processor-name'
});
@ -278,7 +278,7 @@
'class': 'processor-bundle',
'x': 75,
'y': 45,
'width': 230,
'width': 200,
'height': 12
});
@ -547,7 +547,7 @@
details.append('text')
.attrs({
'class': 'active-thread-count-icon',
'y': 45
'y': 46
})
.text('\ue83f');
@ -555,7 +555,7 @@
details.append('text')
.attrs({
'class': 'active-thread-count',
'y': 45
'y': 46
});
// ---------

View File

@ -306,9 +306,14 @@
// define a custom formatter for the run status column
var runStatusFormatter = function (row, cell, value, columnDef, dataContext) {
var activeThreadCount = '';
if (nfCommon.isDefinedAndNotNull(dataContext.activeThreadCount) && dataContext.activeThreadCount > 0) {
activeThreadCount = '(' + nfCommon.escapeHtml(dataContext.activeThreadCount) + ')';
var threadCounts = '';
var threadTip = '';
if (dataContext.terminatedThreadCount > 0) {
threadCounts = '(' + dataContext.activeThreadCount + ' / ' + dataContext.terminatedThreadCount + ')';
threadTip = 'Threads: (Active / Terminated)';
} else if (dataContext.activeThreadCount > 0) {
threadCounts = '(' + dataContext.activeThreadCount + ')';
threadTip = 'Active Threads';
}
var classes;
switch (value.toLowerCase()) {
@ -330,8 +335,20 @@
default:
classes = '';
}
var formattedValue = '<div layout="row"><div class="' + classes + '"></div>';
return formattedValue + '<div class="status-text" style="margin-top: 4px;">' + nfCommon.escapeHtml(value) + '</div><div style="float: left; margin-left: 4px;">' + nfCommon.escapeHtml(activeThreadCount) + '</div></div>';
var markup =
'<div layout="row">' +
'<div class="' + classes + '"></div>' +
'<div class="status-text" style="margin-top: 4px;">' +
nfCommon.escapeHtml(value) +
'</div>' +
'<div style="float: left; margin-left: 4px;" title="' + threadTip + '">' +
nfCommon.escapeHtml(threadCounts) +
'</div>' +
'</div>';
return markup;
};
// define the input, read, written, and output columns (reused between both tables)
@ -2694,6 +2711,7 @@
node: nodeSnapshot.address + ':' + nodeSnapshot.apiPort,
runStatus: snapshot.runStatus,
activeThreadCount: snapshot.activeThreadCount,
terminatedThreadCount: snapshot.terminatedThreadCount,
input: snapshot.input,
read: snapshot.read,
written: snapshot.written,