mirror of https://github.com/apache/nifi.git
NIFI-766:
- Improved connection UI display when backpressure is enabled - Updating the connection label to include backpressure indicators for object count and data size thresholds. - Coloring the connection path and drop shadow once backpressure is engaged. - Fixing bug with expiration icon tooltip. - Including columns in the summary table for backpressure. - Updating empty queue action to reload the connection status upon completion to ensure an updated count. This closes #1080.
This commit is contained in:
parent
c470fae065
commit
26f46538b3
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.nifi.controller.status;
|
||||
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ConnectionStatus implements Cloneable {
|
||||
|
@ -28,6 +30,7 @@ public class ConnectionStatus implements Cloneable {
|
|||
private String destinationId;
|
||||
private String destinationName;
|
||||
private String backPressureDataSizeThreshold;
|
||||
private long backPressureBytesThreshold;
|
||||
private long backPressureObjectThreshold;
|
||||
private int inputCount;
|
||||
private long inputBytes;
|
||||
|
@ -35,6 +38,8 @@ public class ConnectionStatus implements Cloneable {
|
|||
private long queuedBytes;
|
||||
private int outputCount;
|
||||
private long outputBytes;
|
||||
private int maxQueuedCount;
|
||||
private long maxQueuedBytes;
|
||||
|
||||
public String getId() {
|
||||
return id;
|
||||
|
@ -114,6 +119,7 @@ public class ConnectionStatus implements Cloneable {
|
|||
|
||||
public void setBackPressureDataSizeThreshold(String backPressureDataSizeThreshold) {
|
||||
this.backPressureDataSizeThreshold = backPressureDataSizeThreshold;
|
||||
setBackPressureBytesThreshold(DataUnit.parseDataSize(backPressureDataSizeThreshold, DataUnit.B).longValue());
|
||||
}
|
||||
|
||||
public long getBackPressureObjectThreshold() {
|
||||
|
@ -156,6 +162,30 @@ public class ConnectionStatus implements Cloneable {
|
|||
this.outputCount = outputCount;
|
||||
}
|
||||
|
||||
public int getMaxQueuedCount() {
|
||||
return maxQueuedCount;
|
||||
}
|
||||
|
||||
public void setMaxQueuedCount(int maxQueuedCount) {
|
||||
this.maxQueuedCount = maxQueuedCount;
|
||||
}
|
||||
|
||||
public long getMaxQueuedBytes() {
|
||||
return maxQueuedBytes;
|
||||
}
|
||||
|
||||
public void setMaxQueuedBytes(long maxQueueBytes) {
|
||||
this.maxQueuedBytes = maxQueueBytes;
|
||||
}
|
||||
|
||||
public long getBackPressureBytesThreshold() {
|
||||
return backPressureBytesThreshold;
|
||||
}
|
||||
|
||||
public void setBackPressureBytesThreshold(long backPressureBytesThreshold) {
|
||||
this.backPressureBytesThreshold = backPressureBytesThreshold;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionStatus clone() {
|
||||
final ConnectionStatus clonedObj = new ConnectionStatus();
|
||||
|
@ -174,6 +204,8 @@ public class ConnectionStatus implements Cloneable {
|
|||
clonedObj.destinationName = destinationName;
|
||||
clonedObj.backPressureDataSizeThreshold = backPressureDataSizeThreshold;
|
||||
clonedObj.backPressureObjectThreshold = backPressureObjectThreshold;
|
||||
clonedObj.maxQueuedBytes = maxQueuedBytes;
|
||||
clonedObj.maxQueuedCount = maxQueuedCount;
|
||||
return clonedObj;
|
||||
}
|
||||
|
||||
|
@ -210,6 +242,10 @@ public class ConnectionStatus implements Cloneable {
|
|||
builder.append(outputCount);
|
||||
builder.append(", outputBytes=");
|
||||
builder.append(outputBytes);
|
||||
builder.append(", maxQueuedCount=");
|
||||
builder.append(maxQueuedCount);
|
||||
builder.append(", maxQueueBytes=");
|
||||
builder.append(maxQueuedBytes);
|
||||
builder.append("]");
|
||||
return builder.toString();
|
||||
}
|
||||
|
|
|
@ -46,6 +46,8 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
|
|||
private String queued;
|
||||
private String queuedSize;
|
||||
private String queuedCount;
|
||||
private Integer percentUseCount;
|
||||
private Integer percentUseBytes;
|
||||
|
||||
/* getters / setters */
|
||||
/**
|
||||
|
@ -251,6 +253,24 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
|
|||
this.bytesQueued = bytesQueued;
|
||||
}
|
||||
|
||||
@ApiModelProperty("Connection percent use regarding queued flow files count and backpressure threshold if configured.")
|
||||
public Integer getPercentUseCount() {
|
||||
return percentUseCount;
|
||||
}
|
||||
|
||||
public void setPercentUseCount(Integer percentUseCount) {
|
||||
this.percentUseCount = percentUseCount;
|
||||
}
|
||||
|
||||
@ApiModelProperty("Connection percent use regarding queued flow files size and backpressure threshold if configured.")
|
||||
public Integer getPercentUseBytes() {
|
||||
return percentUseBytes;
|
||||
}
|
||||
|
||||
public void setPercentUseBytes(Integer percentUseBytes) {
|
||||
this.percentUseBytes = percentUseBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConnectionStatusSnapshotDTO clone() {
|
||||
final ConnectionStatusSnapshotDTO other = new ConnectionStatusSnapshotDTO();
|
||||
|
@ -273,6 +293,8 @@ public class ConnectionStatusSnapshotDTO implements Cloneable {
|
|||
other.setQueued(getQueued());
|
||||
other.setQueuedCount(getQueuedCount());
|
||||
other.setQueuedSize(getQueuedSize());
|
||||
other.setPercentUseBytes(getPercentUseBytes());
|
||||
other.setPercentUseCount(getPercentUseCount());
|
||||
|
||||
return other;
|
||||
}
|
||||
|
|
|
@ -450,6 +450,18 @@ public class StatusMerger {
|
|||
target.setBytesOut(target.getBytesOut() + toMerge.getBytesOut());
|
||||
target.setFlowFilesQueued(target.getFlowFilesQueued() + toMerge.getFlowFilesQueued());
|
||||
target.setBytesQueued(target.getBytesQueued() + toMerge.getBytesQueued());
|
||||
|
||||
if (target.getPercentUseBytes() == null) {
|
||||
target.setPercentUseBytes(toMerge.getPercentUseBytes());
|
||||
} else if (toMerge.getPercentUseBytes() != null) {
|
||||
target.setPercentUseBytes(Math.max(target.getPercentUseBytes(), toMerge.getPercentUseBytes()));
|
||||
}
|
||||
if (target.getPercentUseCount() == null) {
|
||||
target.setPercentUseCount(toMerge.getPercentUseCount());
|
||||
} else if (toMerge.getPercentUseCount() != null) {
|
||||
target.setPercentUseCount(Math.max(target.getPercentUseCount(), toMerge.getPercentUseCount()));
|
||||
}
|
||||
|
||||
updatePrettyPrintedFields(target);
|
||||
}
|
||||
|
||||
|
|
|
@ -1021,6 +1021,14 @@ public final class DtoFactory {
|
|||
|
||||
snapshot.setFlowFilesOut(connectionStatus.getOutputCount());
|
||||
snapshot.setBytesOut(connectionStatus.getOutputBytes());
|
||||
|
||||
if (connectionStatus.getBackPressureObjectThreshold() > 0) {
|
||||
snapshot.setPercentUseCount(Math.min(100, StatusMerger.getUtilization(connectionStatus.getQueuedCount(), connectionStatus.getBackPressureObjectThreshold())));
|
||||
}
|
||||
if (connectionStatus.getBackPressureBytesThreshold() > 0) {
|
||||
snapshot.setPercentUseBytes(Math.min(100, StatusMerger.getUtilization(connectionStatus.getQueuedBytes(), connectionStatus.getBackPressureBytesThreshold())));
|
||||
}
|
||||
|
||||
StatusMerger.updatePrettyPrintedFields(snapshot);
|
||||
|
||||
return connectionStatusDto;
|
||||
|
|
|
@ -197,6 +197,11 @@ g.connection rect.border.unauthorized {
|
|||
stroke-dasharray: 3,3;
|
||||
}
|
||||
|
||||
g.connection rect.border.full {
|
||||
stroke: rgba(186, 85, 74, 0.65);
|
||||
stroke-width: 1;
|
||||
}
|
||||
|
||||
g.connection.selected rect.border {
|
||||
/*stroke: #004849;*/
|
||||
stroke: #ffcc00;
|
||||
|
@ -225,6 +230,10 @@ g.connection path.connection-path {
|
|||
cursor: pointer;
|
||||
}
|
||||
|
||||
g.connection path.connection-path.full {
|
||||
stroke: #ba554a;
|
||||
}
|
||||
|
||||
g.connection path.connection-path.unauthorized {
|
||||
stroke: #ba554a;
|
||||
stroke-dasharray: 3,3;
|
||||
|
@ -240,6 +249,35 @@ text.connection-from-run-status.is-missing-port, text.connection-to-run-status.i
|
|||
fill: #ba554a;
|
||||
}
|
||||
|
||||
g.connection rect.backpressure-tick {
|
||||
fill: #3e3e3e;
|
||||
}
|
||||
|
||||
g.connection rect.backpressure-tick.not-configured {
|
||||
fill: #acacac;
|
||||
}
|
||||
|
||||
g.connection rect.backpressure-object, g.connection rect.backpressure-data-size {
|
||||
fill: #d8d8d8;
|
||||
}
|
||||
|
||||
g.connection rect.backpressure-object.not-configured, g.connection rect.backpressure-data-size.not-configured {
|
||||
fill: transparent;
|
||||
}
|
||||
|
||||
g.connection rect.backpressure-percent {
|
||||
fill: #67b796;
|
||||
pointer-events: none;
|
||||
}
|
||||
|
||||
g.connection rect.backpressure-percent.warning {
|
||||
fill: #cea958;
|
||||
}
|
||||
|
||||
g.connection rect.backpressure-percent.error {
|
||||
fill: #ba554a;
|
||||
}
|
||||
|
||||
/* grouped connection */
|
||||
|
||||
g.connection.grouped path.connection-path, g.connection.grouped rect.connection-label {
|
||||
|
|
|
@ -937,6 +937,10 @@ nf.Actions = (function () {
|
|||
|
||||
// completes the drop request by removing it and showing how many flowfiles were deleted
|
||||
var completeDropRequest = function () {
|
||||
// reload the connection status
|
||||
nf.Connection.reloadStatus(connection.id);
|
||||
|
||||
// clean up as appropriate
|
||||
if (nf.Common.isDefinedAndNotNull(dropRequest)) {
|
||||
$.ajax({
|
||||
type: 'DELETE',
|
||||
|
@ -993,14 +997,7 @@ nf.Actions = (function () {
|
|||
// update the status of the drop request
|
||||
$('#drop-request-status-message').text(dropRequest.state);
|
||||
|
||||
// update the current number of enqueued flowfiles
|
||||
if (nf.Common.isDefinedAndNotNull(dropRequest.currentCount)) {
|
||||
connection.status.queued = dropRequest.current;
|
||||
connection.status.aggregateSnapshot.queued = dropRequest.current;
|
||||
nf.Connection.refresh(connection.id);
|
||||
}
|
||||
|
||||
// close the dialog if the
|
||||
// close the dialog if the
|
||||
if (dropRequest.finished === true || cancelled === true) {
|
||||
completeDropRequest();
|
||||
} else {
|
||||
|
|
|
@ -197,7 +197,7 @@ nf.Canvas = (function () {
|
|||
|
||||
// create arrow definitions for the various line types
|
||||
defs.selectAll('marker')
|
||||
.data(['normal', 'ghost', 'unauthorized'])
|
||||
.data(['normal', 'ghost', 'unauthorized', 'full'])
|
||||
.enter().append('marker')
|
||||
.attr({
|
||||
'id': function (d) {
|
||||
|
@ -214,6 +214,8 @@ nf.Canvas = (function () {
|
|||
return '#aaaaaa';
|
||||
} else if (d === 'unauthorized') {
|
||||
return '#ba554a';
|
||||
} else if (d === 'full') {
|
||||
return '#ba554a';
|
||||
} else {
|
||||
return '#000000';
|
||||
}
|
||||
|
@ -223,7 +225,7 @@ nf.Canvas = (function () {
|
|||
.attr('d', 'M2,3 L0,6 L6,3 L0,0 z');
|
||||
|
||||
// filter for drop shadow
|
||||
var filter = defs.append('filter')
|
||||
var componentDropShadowFilter = defs.append('filter')
|
||||
.attr({
|
||||
'id': 'component-drop-shadow',
|
||||
'height': '140%',
|
||||
|
@ -231,7 +233,7 @@ nf.Canvas = (function () {
|
|||
});
|
||||
|
||||
// blur
|
||||
filter.append('feGaussianBlur')
|
||||
componentDropShadowFilter.append('feGaussianBlur')
|
||||
.attr({
|
||||
'in': 'SourceAlpha',
|
||||
'stdDeviation': 3,
|
||||
|
@ -239,7 +241,7 @@ nf.Canvas = (function () {
|
|||
});
|
||||
|
||||
// offset
|
||||
filter.append('feOffset')
|
||||
componentDropShadowFilter.append('feOffset')
|
||||
.attr({
|
||||
'in': 'blur',
|
||||
'dx': 0,
|
||||
|
@ -248,7 +250,7 @@ nf.Canvas = (function () {
|
|||
});
|
||||
|
||||
// color/opacity
|
||||
filter.append('feFlood')
|
||||
componentDropShadowFilter.append('feFlood')
|
||||
.attr({
|
||||
'flood-color': '#000000',
|
||||
'flood-opacity': 0.4,
|
||||
|
@ -256,7 +258,7 @@ nf.Canvas = (function () {
|
|||
});
|
||||
|
||||
// combine
|
||||
filter.append('feComposite')
|
||||
componentDropShadowFilter.append('feComposite')
|
||||
.attr({
|
||||
'in': 'offsetColor',
|
||||
'in2': 'offsetBlur',
|
||||
|
@ -265,34 +267,61 @@ nf.Canvas = (function () {
|
|||
});
|
||||
|
||||
// stack the effect under the source graph
|
||||
var feMerge = filter.append('feMerge');
|
||||
feMerge.append('feMergeNode')
|
||||
var componentDropShadowFeMerge = componentDropShadowFilter.append('feMerge');
|
||||
componentDropShadowFeMerge.append('feMergeNode')
|
||||
.attr('in', 'offsetColorBlur');
|
||||
feMerge.append('feMergeNode')
|
||||
componentDropShadowFeMerge.append('feMergeNode')
|
||||
.attr('in', 'SourceGraphic');
|
||||
|
||||
// define the gradient for the expiration icon
|
||||
var expirationBackground = defs.append('linearGradient')
|
||||
// filter for drop shadow
|
||||
var connectionFullDropShadowFilter = defs.append('filter')
|
||||
.attr({
|
||||
'id': 'expiration',
|
||||
'x1': '0%',
|
||||
'y1': '0%',
|
||||
'x2': '0%',
|
||||
'y2': '100%'
|
||||
'id': 'connection-full-drop-shadow',
|
||||
'height': '140%',
|
||||
'y': '-20%'
|
||||
});
|
||||
|
||||
expirationBackground.append('stop')
|
||||
// blur
|
||||
connectionFullDropShadowFilter.append('feGaussianBlur')
|
||||
.attr({
|
||||
'offset': '0%',
|
||||
'stop-color': '#aeafb1'
|
||||
'in': 'SourceAlpha',
|
||||
'stdDeviation': 3,
|
||||
'result': 'blur'
|
||||
});
|
||||
|
||||
expirationBackground.append('stop')
|
||||
// offset
|
||||
connectionFullDropShadowFilter.append('feOffset')
|
||||
.attr({
|
||||
'offset': '100%',
|
||||
'stop-color': '#87888a'
|
||||
'in': 'blur',
|
||||
'dx': 0,
|
||||
'dy': 1,
|
||||
'result': 'offsetBlur'
|
||||
});
|
||||
|
||||
// color/opacity
|
||||
connectionFullDropShadowFilter.append('feFlood')
|
||||
.attr({
|
||||
'flood-color': '#ba554a',
|
||||
'flood-opacity': 1,
|
||||
'result': 'offsetColor'
|
||||
});
|
||||
|
||||
// combine
|
||||
connectionFullDropShadowFilter.append('feComposite')
|
||||
.attr({
|
||||
'in': 'offsetColor',
|
||||
'in2': 'offsetBlur',
|
||||
'operator': 'in',
|
||||
'result': 'offsetColorBlur'
|
||||
});
|
||||
|
||||
// stack the effect under the source graph
|
||||
var connectionFullFeMerge = connectionFullDropShadowFilter.append('feMerge');
|
||||
connectionFullFeMerge.append('feMergeNode')
|
||||
.attr('in', 'offsetColorBlur');
|
||||
connectionFullFeMerge.append('feMergeNode')
|
||||
.attr('in', 'SourceGraphic');
|
||||
|
||||
// create the canvas element
|
||||
canvas = svg.append('g')
|
||||
.attr({
|
||||
|
|
|
@ -24,6 +24,9 @@ nf.Connection = (function () {
|
|||
width: 200
|
||||
};
|
||||
|
||||
// width of a backpressure indicator - half of width, left/right padding, left/right border
|
||||
var backpressureBarWidth = (dimensions.width / 2) - 15 - 2;
|
||||
|
||||
/**
|
||||
* Gets the position of the label for the specified connection.
|
||||
*
|
||||
|
@ -293,6 +296,83 @@ nf.Connection = (function () {
|
|||
return unavailable;
|
||||
};
|
||||
|
||||
// gets the appropriate end marker
|
||||
var getEndMarker = function (d) {
|
||||
var marker = 'normal';
|
||||
|
||||
if (d.permissions.canRead) {
|
||||
// if the connection has a relationship that is unavailable, mark it a ghost relationship
|
||||
if (isFullBytes(d) || isFullCount(d)) {
|
||||
marker = 'full';
|
||||
} else if (hasUnavailableRelationship(d)) {
|
||||
marker = 'ghost';
|
||||
}
|
||||
} else {
|
||||
marker = 'unauthorized';
|
||||
}
|
||||
|
||||
return 'url(#' + marker + ')';
|
||||
};
|
||||
|
||||
// gets the appropriate drop shadow
|
||||
var getDropShadow = function (d) {
|
||||
if (isFullCount(d) || isFullBytes(d)) {
|
||||
return 'url(#connection-full-drop-shadow)';
|
||||
} else {
|
||||
return 'url(#component-drop-shadow)';
|
||||
}
|
||||
};
|
||||
|
||||
// determines whether the connection is full based on the object count threshold
|
||||
var isFullCount = function (d) {
|
||||
return d.status.aggregateSnapshot.percentUseCount === 100;
|
||||
};
|
||||
|
||||
// determines whether the connection is in warning based on the object count threshold
|
||||
var isWarningCount = function (d) {
|
||||
var percentUseCount = d.status.aggregateSnapshot.percentUseCount;
|
||||
if (nf.Common.isDefinedAndNotNull(percentUseCount)) {
|
||||
return percentUseCount >= 61 && percentUseCount <= 85;
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
// determines whether the connection is in error based on the object count threshold
|
||||
var isErrorCount = function (d) {
|
||||
var percentUseCount = d.status.aggregateSnapshot.percentUseCount;
|
||||
if (nf.Common.isDefinedAndNotNull(percentUseCount)) {
|
||||
return percentUseCount > 85;
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
// determines whether the connection is full based on the data size threshold
|
||||
var isFullBytes = function (d) {
|
||||
return d.status.aggregateSnapshot.percentUseBytes === 100
|
||||
};
|
||||
|
||||
// determines whether the connection is in warning based on the data size threshold
|
||||
var isWarningBytes = function (d) {
|
||||
var percentUseBytes = d.status.aggregateSnapshot.percentUseBytes;
|
||||
if (nf.Common.isDefinedAndNotNull(percentUseBytes)) {
|
||||
return percentUseBytes >= 61 && percentUseBytes <= 85;
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
// determines whether the connection is in error based on the data size threshold
|
||||
var isErrorBytes = function (d) {
|
||||
var percentUseBytes = d.status.aggregateSnapshot.percentUseBytes;
|
||||
if (nf.Common.isDefinedAndNotNull(percentUseBytes)) {
|
||||
return percentUseBytes > 85;
|
||||
}
|
||||
|
||||
return false;
|
||||
};
|
||||
|
||||
// updates the specified connections
|
||||
var updateConnections = function (updated, options) {
|
||||
if (updated.empty()) {
|
||||
|
@ -469,20 +549,6 @@ nf.Connection = (function () {
|
|||
'd': function () {
|
||||
var datum = [d.start].concat(d.bends, [d.end]);
|
||||
return lineGenerator(datum);
|
||||
},
|
||||
'marker-end': function () {
|
||||
var marker = 'normal';
|
||||
|
||||
if (d.permissions.canRead) {
|
||||
// if the connection has a relationship that is unavailable, mark it a ghost relationship
|
||||
if (hasUnavailableRelationship(d)) {
|
||||
marker = 'ghost';
|
||||
}
|
||||
} else {
|
||||
marker = 'unauthorized';
|
||||
}
|
||||
|
||||
return 'url(#' + marker + ')';
|
||||
}
|
||||
});
|
||||
nf.CanvasUtils.transition(connection.select('path.connection-selection-path'), transition)
|
||||
|
@ -685,8 +751,7 @@ nf.Connection = (function () {
|
|||
'class': 'body',
|
||||
'width': dimensions.width,
|
||||
'x': 0,
|
||||
'y': 0,
|
||||
'filter': 'url(#component-drop-shadow)'
|
||||
'y': 0
|
||||
});
|
||||
|
||||
// processor border
|
||||
|
@ -986,6 +1051,8 @@ nf.Connection = (function () {
|
|||
// connection label - queued
|
||||
// -------------------------
|
||||
|
||||
var HEIGHT_FOR_BACKPRESSURE = 3;
|
||||
|
||||
// see if the queue label is already rendered
|
||||
var queued = connectionLabelContainer.select('g.queued-container');
|
||||
if (queued.empty()) {
|
||||
|
@ -999,7 +1066,7 @@ nf.Connection = (function () {
|
|||
.attr({
|
||||
'class': 'connection-label-background',
|
||||
'width': dimensions.width,
|
||||
'height': rowHeight
|
||||
'height': rowHeight + HEIGHT_FOR_BACKPRESSURE
|
||||
}));
|
||||
|
||||
// border
|
||||
|
@ -1037,12 +1104,107 @@ nf.Connection = (function () {
|
|||
'class': 'size'
|
||||
});
|
||||
|
||||
// expiration icon
|
||||
queued.append('text')
|
||||
.attr({
|
||||
'class': 'expiration-icon',
|
||||
'x': 185,
|
||||
'y': 14
|
||||
}).append('title');
|
||||
})
|
||||
.text(function () {
|
||||
return '\uf017';
|
||||
})
|
||||
.append('title');
|
||||
|
||||
var yBackpressureOffset = rowHeight + HEIGHT_FOR_BACKPRESSURE - 4;
|
||||
|
||||
// backpressure object threshold
|
||||
|
||||
// start
|
||||
queued.append('rect')
|
||||
.attr({
|
||||
'class': 'backpressure-tick object',
|
||||
'width': 1,
|
||||
'height': 3,
|
||||
'x': 5,
|
||||
'y': yBackpressureOffset
|
||||
});
|
||||
|
||||
// bar
|
||||
var backpressureCountOffset = 6;
|
||||
queued.append('rect')
|
||||
.attr({
|
||||
'class': 'backpressure-object',
|
||||
'width': backpressureBarWidth,
|
||||
'height': 3,
|
||||
'x': backpressureCountOffset,
|
||||
'y': yBackpressureOffset
|
||||
})
|
||||
.append('title');
|
||||
|
||||
// end
|
||||
queued.append('rect')
|
||||
.attr({
|
||||
'class': 'backpressure-tick object',
|
||||
'width': 1,
|
||||
'height': 3,
|
||||
'x': backpressureCountOffset + backpressureBarWidth,
|
||||
'y': yBackpressureOffset
|
||||
});
|
||||
|
||||
// percent full
|
||||
queued.append('rect')
|
||||
.attr({
|
||||
'class': 'backpressure-percent object',
|
||||
'width': 0,
|
||||
'height': 3,
|
||||
'x': backpressureCountOffset,
|
||||
'y': yBackpressureOffset
|
||||
});
|
||||
|
||||
// backpressure data size threshold
|
||||
|
||||
// start
|
||||
queued.append('rect')
|
||||
.attr({
|
||||
'class': 'backpressure-tick data-size',
|
||||
'width': 1,
|
||||
'height': 3,
|
||||
'x': (dimensions.width / 2) + 10,
|
||||
'y': yBackpressureOffset
|
||||
});
|
||||
|
||||
// bar
|
||||
var backpressureDataSizeOffset = (dimensions.width / 2) + 10 + 1;
|
||||
queued.append('rect')
|
||||
.attr({
|
||||
'class': 'backpressure-data-size',
|
||||
'width': backpressureBarWidth,
|
||||
'height': 3,
|
||||
'x': backpressureDataSizeOffset,
|
||||
'y': yBackpressureOffset
|
||||
})
|
||||
.append('title');
|
||||
|
||||
// end
|
||||
queued.append('rect')
|
||||
.attr({
|
||||
'class': 'backpressure-tick data-size',
|
||||
'width': 1,
|
||||
'height': 3,
|
||||
'x': backpressureDataSizeOffset + backpressureBarWidth,
|
||||
'y': yBackpressureOffset
|
||||
});
|
||||
|
||||
// percent full
|
||||
queued.append('rect')
|
||||
.attr({
|
||||
'class': 'backpressure-percent data-size',
|
||||
'width': 0,
|
||||
'height': 3,
|
||||
'x': backpressureDataSizeOffset,
|
||||
'y': yBackpressureOffset
|
||||
});
|
||||
} else {
|
||||
backgrounds.push(queued.select('rect.connection-label-background'));
|
||||
borders.push(queued.select('rect.connection-label-border'));
|
||||
|
@ -1057,14 +1219,14 @@ nf.Connection = (function () {
|
|||
// update the height based on the labels being rendered
|
||||
connectionLabelContainer.select('rect.body')
|
||||
.attr('height', function () {
|
||||
return (rowHeight * labelCount);
|
||||
return (rowHeight * labelCount) + HEIGHT_FOR_BACKPRESSURE;
|
||||
})
|
||||
.classed('unauthorized', function () {
|
||||
return d.permissions.canRead === false;
|
||||
});
|
||||
connectionLabelContainer.select('rect.border')
|
||||
.attr('height', function () {
|
||||
return (rowHeight * labelCount);
|
||||
return (rowHeight * labelCount) + HEIGHT_FOR_BACKPRESSURE;
|
||||
})
|
||||
.classed('unauthorized', function () {
|
||||
return d.permissions.canRead === false;
|
||||
|
@ -1097,10 +1259,7 @@ nf.Connection = (function () {
|
|||
return true;
|
||||
}
|
||||
})
|
||||
.text(function () {
|
||||
return '\uf017';
|
||||
})
|
||||
.select('title', function () {
|
||||
.select('title').text(function () {
|
||||
if (d.permissions.canRead) {
|
||||
return 'Expires FlowFiles older than ' + d.component.flowFileExpiration;
|
||||
} else {
|
||||
|
@ -1108,6 +1267,26 @@ nf.Connection = (function () {
|
|||
}
|
||||
});
|
||||
|
||||
// update backpressure object fill
|
||||
connectionLabelContainer.select('rect.backpressure-object')
|
||||
.classed('not-configured', function () {
|
||||
return nf.Common.isUndefinedOrNull(d.status.aggregateSnapshot.percentUseCount);
|
||||
});
|
||||
connectionLabelContainer.selectAll('rect.backpressure-tick.object')
|
||||
.classed('not-configured', function () {
|
||||
return nf.Common.isUndefinedOrNull(d.status.aggregateSnapshot.percentUseCount);
|
||||
});
|
||||
|
||||
// update backpressure data size fill
|
||||
connectionLabelContainer.select('rect.backpressure-data-size')
|
||||
.classed('not-configured', function () {
|
||||
return nf.Common.isUndefinedOrNull(d.status.aggregateSnapshot.percentUseBytes);
|
||||
});
|
||||
connectionLabelContainer.selectAll('rect.backpressure-tick.data-size')
|
||||
.classed('not-configured', function () {
|
||||
return nf.Common.isUndefinedOrNull(d.status.aggregateSnapshot.percentUseBytes);
|
||||
});
|
||||
|
||||
if (d.permissions.canWrite) {
|
||||
// only support dragging the label when appropriate
|
||||
connectionLabelContainer.call(labelDrag);
|
||||
|
@ -1142,17 +1321,101 @@ nf.Connection = (function () {
|
|||
return;
|
||||
}
|
||||
|
||||
// queued count value
|
||||
updated.select('text.queued tspan.count')
|
||||
.text(function (d) {
|
||||
return nf.Common.substringBeforeFirst(d.status.aggregateSnapshot.queued, ' ');
|
||||
});
|
||||
// update data size
|
||||
var dataSizeDeferred = $.Deferred(function (deferred) {
|
||||
// queued count value
|
||||
updated.select('text.queued tspan.count')
|
||||
.text(function (d) {
|
||||
return nf.Common.substringBeforeFirst(d.status.aggregateSnapshot.queued, ' ');
|
||||
});
|
||||
|
||||
// queued size value
|
||||
updated.select('text.queued tspan.size')
|
||||
.text(function (d) {
|
||||
return ' ' + nf.Common.substringAfterFirst(d.status.aggregateSnapshot.queued, ' ');
|
||||
var backpressurePercentDataSize = updated.select('rect.backpressure-percent.data-size');
|
||||
backpressurePercentDataSize.transition()
|
||||
.duration(400)
|
||||
.attr({
|
||||
'width': function (d) {
|
||||
return (backpressureBarWidth * d.status.aggregateSnapshot.percentUseBytes) / 100;
|
||||
}
|
||||
}).each('end', function () {
|
||||
backpressurePercentDataSize
|
||||
.classed('warning', function (d) {
|
||||
return isWarningBytes(d);
|
||||
})
|
||||
.classed('error', function (d) {
|
||||
return isErrorBytes(d);
|
||||
});
|
||||
|
||||
deferred.resolve();
|
||||
});
|
||||
|
||||
updated.select('rect.backpressure-data-size').select('title').text(function (d) {
|
||||
if (nf.Common.isDefinedAndNotNull(d.status.aggregateSnapshot.percentUseBytes)) {
|
||||
return 'Queue is ' + d.status.aggregateSnapshot.percentUseBytes + '% full based on Back Pressure Data Size Threshold';
|
||||
} else {
|
||||
return 'Back Pressure Data Size Threshold is not configured';
|
||||
}
|
||||
});
|
||||
}).promise();
|
||||
|
||||
// update object count
|
||||
var objectCountDeferred = $.Deferred(function (deferred) {
|
||||
// queued size value
|
||||
updated.select('text.queued tspan.size')
|
||||
.text(function (d) {
|
||||
return ' ' + nf.Common.substringAfterFirst(d.status.aggregateSnapshot.queued, ' ');
|
||||
});
|
||||
|
||||
var backpressurePercentObject = updated.select('rect.backpressure-percent.object');
|
||||
backpressurePercentObject.transition()
|
||||
.duration(400)
|
||||
.attr({
|
||||
'width': function (d) {
|
||||
return (backpressureBarWidth * d.status.aggregateSnapshot.percentUseCount) / 100;
|
||||
}
|
||||
}).each('end', function () {
|
||||
backpressurePercentObject
|
||||
.classed('warning', function (d) {
|
||||
return isWarningCount(d);
|
||||
})
|
||||
.classed('error', function (d) {
|
||||
return isErrorCount(d);
|
||||
});
|
||||
|
||||
deferred.resolve();
|
||||
});
|
||||
|
||||
updated.select('rect.backpressure-object').select('title').text(function (d) {
|
||||
if (nf.Common.isDefinedAndNotNull(d.status.aggregateSnapshot.percentUseCount)) {
|
||||
return 'Queue is ' + d.status.aggregateSnapshot.percentUseCount + '% full based on Back Pressure Object Threshold';
|
||||
} else {
|
||||
return 'Back Pressure Object Threshold is not configured';
|
||||
}
|
||||
});
|
||||
}).promise();
|
||||
|
||||
// update connection once progress bars have transitioned
|
||||
$.when(dataSizeDeferred, objectCountDeferred).done(function () {
|
||||
// connection stroke
|
||||
updated.select('path.connection-path')
|
||||
.classed('full', function (d) {
|
||||
return isFullCount(d) || isFullBytes(d);
|
||||
})
|
||||
.attr({
|
||||
'marker-end': getEndMarker
|
||||
});
|
||||
|
||||
// border
|
||||
updated.select('rect.border')
|
||||
.classed('full', function (d) {
|
||||
return isFullCount(d) || isFullBytes(d);
|
||||
});
|
||||
|
||||
// drop shadow
|
||||
updated.select('rect.body')
|
||||
.attr({
|
||||
'filter': getDropShadow
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -1714,6 +1977,29 @@ nf.Connection = (function () {
|
|||
}
|
||||
},
|
||||
|
||||
/**
|
||||
* Reloads the connection status from the server and refreshes the UI.
|
||||
*
|
||||
* @param {string} id The connection id
|
||||
*/
|
||||
reloadStatus: function (id) {
|
||||
if (connectionMap.has(id)) {
|
||||
return $.ajax({
|
||||
type: 'GET',
|
||||
url: '../nifi-api/flow/connections/' + encodeURIComponent(id) + '/status',
|
||||
dataType: 'json'
|
||||
}).done(function (response) {
|
||||
// update the existing connection
|
||||
var connectionEntity = connectionMap.get(id);
|
||||
connectionEntity.status = response.connectionStatus;
|
||||
connectionMap.set(id, connectionEntity);
|
||||
|
||||
// update the UI
|
||||
select().call(updateConnectionStatus);
|
||||
});
|
||||
}
|
||||
},
|
||||
|
||||
/**
|
||||
* Gets the connection that have a source or destination component with the specified id.
|
||||
*
|
||||
|
|
|
@ -617,6 +617,18 @@ nf.SummaryTable = (function () {
|
|||
return '<div class="pointer show-connection-details fa fa-info-circle" title="View Connection Details" style="margin-top: 5px;"></div>';
|
||||
};
|
||||
|
||||
var backpressureFormatter = function (row, cell, value, columnDef, dataContext) {
|
||||
var percentUseCount = 'NA';
|
||||
if (nf.Common.isDefinedAndNotNull(dataContext.percentUseCount)) {
|
||||
percentUseCount = dataContext.percentUseCount + '%';
|
||||
}
|
||||
var percentUseBytes = 'NA';
|
||||
if (nf.Common.isDefinedAndNotNull(dataContext.percentUseBytes)) {
|
||||
percentUseBytes = dataContext.percentUseBytes + '%';
|
||||
}
|
||||
return percentUseCount + ' / ' + percentUseBytes;
|
||||
};
|
||||
|
||||
// define the input, read, written, and output columns (reused between both tables)
|
||||
var queueColumn = {
|
||||
id: 'queued',
|
||||
|
@ -627,6 +639,17 @@ nf.SummaryTable = (function () {
|
|||
resize: true
|
||||
};
|
||||
|
||||
// define the backpressure column (reused between both tables)
|
||||
var backpressureColumn = {
|
||||
id: 'backpressure',
|
||||
field: 'backpressure',
|
||||
name: '<span class="backpressure-object-title">Queue</span> / <span class="backpressure-data-size-title">Size</span> Threshold',
|
||||
sortable: true,
|
||||
defaultSortAsc: false,
|
||||
formatter: backpressureFormatter,
|
||||
resize: true
|
||||
};
|
||||
|
||||
// define the column model for the summary table
|
||||
var connectionsColumnModel = [
|
||||
{
|
||||
|
@ -649,6 +672,7 @@ nf.SummaryTable = (function () {
|
|||
},
|
||||
inputColumn,
|
||||
queueColumn,
|
||||
backpressureColumn,
|
||||
outputColumn
|
||||
];
|
||||
|
||||
|
@ -818,6 +842,7 @@ nf.SummaryTable = (function () {
|
|||
{id: 'node', field: 'node', name: 'Node', sortable: true, resizable: true},
|
||||
inputColumn,
|
||||
queueColumn,
|
||||
backpressureColumn,
|
||||
outputColumn
|
||||
];
|
||||
|
||||
|
@ -2073,6 +2098,19 @@ nf.SummaryTable = (function () {
|
|||
var bQueueSize = nf.Common.parseSize(b['queuedSize']);
|
||||
return aQueueSize - bQueueSize;
|
||||
}
|
||||
} else if (sortDetails.columnId === 'backpressure') {
|
||||
var mod = sortState[tableId].count % 4;
|
||||
if (mod < 2) {
|
||||
$('#' + tableId + ' span.backpressure-object-title').addClass('sorted');
|
||||
var aPercentUseObject = nf.Common.isDefinedAndNotNull(a['percentUseCount']) ? a['percentUseCount'] : -1;
|
||||
var bPercentUseObject = nf.Common.isDefinedAndNotNull(b['percentUseCount']) ? b['percentUseCount'] : -1;
|
||||
return aPercentUseObject - bPercentUseObject;
|
||||
} else {
|
||||
$('#' + tableId + ' span.backpressure-data-size-title').addClass('sorted');
|
||||
var aPercentUseDataSize = nf.Common.isDefinedAndNotNull(a['percentUseBytes']) ? a['percentUseBytes'] : -1;
|
||||
var bPercentUseDataSize = nf.Common.isDefinedAndNotNull(b['percentUseBytes']) ? b['percentUseBytes'] : -1;
|
||||
return aPercentUseDataSize - bPercentUseDataSize;
|
||||
}
|
||||
} else if (sortDetails.columnId === 'sent' || sortDetails.columnId === 'received' || sortDetails.columnId === 'input' || sortDetails.columnId === 'output' || sortDetails.columnId === 'transferred') {
|
||||
var aSplit = a[sortDetails.columnId].split(/ \/ /);
|
||||
var bSplit = b[sortDetails.columnId].split(/ \/ /);
|
||||
|
@ -2124,6 +2162,8 @@ nf.SummaryTable = (function () {
|
|||
// remove previous sort indicators
|
||||
$('#' + tableId + ' span.queued-title').removeClass('sorted');
|
||||
$('#' + tableId + ' span.queued-size-title').removeClass('sorted');
|
||||
$('#' + tableId + ' span.backpressure-object-title').removeClass('sorted');
|
||||
$('#' + tableId + ' span.backpressure-data-size-title').removeClass('sorted');
|
||||
$('#' + tableId + ' span.input-title').removeClass('sorted');
|
||||
$('#' + tableId + ' span.input-size-title').removeClass('sorted');
|
||||
$('#' + tableId + ' span.output-title').removeClass('sorted');
|
||||
|
@ -2473,6 +2513,8 @@ nf.SummaryTable = (function () {
|
|||
queued: snapshot.queued,
|
||||
queuedCount: snapshot.queuedCount,
|
||||
queuedSize: snapshot.queuedSize,
|
||||
percentUseCount: snapshot.percentUseCount,
|
||||
percentUseBytes: snapshot.percentUseBytes,
|
||||
output: snapshot.output
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue