NIFI-5722 Expose Penalty Remaining Duration (#3091)

Signed-off-by: Koji Kawamura <ijokarumawak@gmail.com>
This commit is contained in:
Peter Wicks 2019-02-01 13:04:51 -07:00 committed by GitHub
parent fef41b3022
commit ad1f2fb666
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 40 additions and 8 deletions

View File

@ -56,4 +56,9 @@ public interface FlowFileSummary {
* @return <code>true</code> if the FlowFile is penalized, <code>false</code> otherwise
*/
boolean isPenalized();
/**
* @return the timestamp (in milliseconds since epoch) at which the FlowFiles Penalty Expires
*/
long getPenaltyExpirationMillis();
}

View File

@ -31,6 +31,7 @@ public class FlowFileSummaryDTO {
private Long size;
private Long queuedDuration;
private Long lineageDuration;
private Long penaltyExpiresIn;
private Boolean isPenalized;
private String clusterNodeId; // include when clustered
@ -134,6 +135,20 @@ public class FlowFileSummaryDTO {
this.lineageDuration = lineageDuration;
}
/**
* @return when the FlowFile will no longer be penalized
*/
@ApiModelProperty(
value = "How long in milliseconds until the FlowFile penalty expires."
)
public Long getPenaltyExpiresIn() {
return penaltyExpiresIn;
}
public void setPenaltyExpiresIn(Long penaltyExpiration) {
penaltyExpiresIn = penaltyExpiration;
}
/**
* @return if the FlowFile is penalized
*/

View File

@ -325,7 +325,7 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
}
protected FlowFileSummary summarize(final FlowFile flowFile, final int position) {
protected FlowFileSummary summarize(final FlowFileRecord flowFile, final int position) {
// extract all of the information that we care about into new variables rather than just
// wrapping the FlowFile object with a FlowFileSummary object. We do this because we want to
// be able to hold many FlowFileSummary objects in memory and if we just wrap the FlowFile object,
@ -337,6 +337,7 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
final Long lastQueuedTime = flowFile.getLastQueueDate();
final long lineageStart = flowFile.getLineageStartDate();
final boolean penalized = flowFile.isPenalized();
final long penaltyExpires = flowFile.getPenaltyExpirationMillis();
return new FlowFileSummary() {
@Override
@ -373,6 +374,11 @@ public abstract class AbstractFlowFileQueue implements FlowFileQueue {
public boolean isPenalized() {
return penalized;
}
@Override
public long getPenaltyExpirationMillis() {
return penaltyExpires;
}
};
}

View File

@ -601,7 +601,11 @@ public final class DtoFactory {
final FlowFileSummaryDTO dto = new FlowFileSummaryDTO();
dto.setUuid(summary.getUuid());
dto.setFilename(summary.getFilename());
dto.setPenalized(summary.isPenalized());
final long penaltyExpiration = summary.getPenaltyExpirationMillis() - now.getTime();
dto.setPenaltyExpiresIn(penaltyExpiration>=0?penaltyExpiration:0);
dto.setPosition(summary.getPosition());
dto.setSize(summary.getSize());
@ -625,7 +629,11 @@ public final class DtoFactory {
final FlowFileDTO dto = new FlowFileDTO();
dto.setUuid(record.getAttribute(CoreAttributes.UUID.key()));
dto.setFilename(record.getAttribute(CoreAttributes.FILENAME.key()));
dto.setPenalized(record.isPenalized());
final long penaltyExpiration = record.getPenaltyExpirationMillis() - now.getTime();
dto.setPenaltyExpiresIn(penaltyExpiration>=0?penaltyExpiration:0);
dto.setSize(record.getSize());
dto.setAttributes(record.getAttributes());

View File

@ -110,13 +110,11 @@
// function for formatting penalization
var penalizedFormatter = function (row, cell, value, columnDef, dataContext) {
var markup = '';
if (value === true) {
markup += 'Yes';
if(value == 0) {
return 'No';
}
return markup;
return nfCommon.formatDuration(value);
};
// initialize the queue listing table
@ -185,7 +183,7 @@
{
id: 'penalized',
name: 'Penalized',
field: 'penalized',
field: 'penaltyExpiresIn',
sortable: false,
resizable: false,
width: 100,
@ -567,7 +565,7 @@
$('#flowfile-file-size').html(nfCommon.formatValue(fileSize));
$('#flowfile-queued-duration').text(nfCommon.formatDuration(flowFile.queuedDuration));
$('#flowfile-lineage-duration').text(nfCommon.formatDuration(flowFile.lineageDuration));
$('#flowfile-penalized').text(flowFile.penalized === true ? 'Yes' : 'No');
$('#flowfile-penalized').text(flowFile.penaltyExpiresIn == 0 ? 'No' : nfCommon.formatDuration(flowFile.penaltyExpiresIn));
// conditionally show the cluster node identifier
if (nfCommon.isDefinedAndNotNull(flowFileSummary.clusterNodeId)) {