NIFI-108:

- Removing sort from UI.
- Addressing issues with listing and flowfile retrieval when clustered.
- Making the context menu item available when source and destination are still running.
- Adding a refresh button to the queue listing table.
- Fixing the flowfile summary sorting in the cluster manager.
- Adding a message when the source or destination of a connection is actively running.
- Updating the documentation regarding queue interaction.
- Updating the error message when a flowfile is no longer in the active queue.
- Updated queue listing to allow listing to be done while source and destination are running but not sort or have ability to search
- Added heartbeat when we finish clearing queue
- Addressing comments from review.
This commit is contained in:
Matt Gilman 2016-01-07 15:59:02 -05:00
parent 7d73ae77f8
commit 0d7edcb3ac
25 changed files with 386 additions and 751 deletions

View File

@ -211,11 +211,18 @@ public interface FlowFileQueue {
DropFlowFileStatus cancelDropFlowFileRequest(String requestIdentifier);
/**
* <p>
* Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a
* ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist
* within the queue. Additionally, the ListFlowFileStatus provides a request identifier that
* can then be passed to the {@link #getListFlowFileStatus(String)}. The listing of FlowFiles
* will be returned ordered by the position of the FlowFile in the queue.
* </p>
*
* <p>
* Note that if maxResults is larger than the size of the "active queue" (i.e., the un-swapped queued,
* FlowFiles that are swapped out will not be returned.)
* </p>
*
* @param requestIdentifier the identifier of the List FlowFile Request
* @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus
@ -227,45 +234,6 @@ public interface FlowFileQueue {
*/
ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults);
/**
* Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a
* ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist
* within the queue. Additionally, the ListFlowFileStatus provides a request identifier that
* can then be passed to the {@link #getListFlowFileStatus(String)}
*
* @param requestIdentifier the identifier of the List FlowFile Request
* @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus
* @param sortColumn specifies which column to sort on
* @param direction specifies which direction to sort the FlowFiles
*
* @return the status for the request
*
* @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs
* is currently running.
*/
ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, SortColumn sortColumn, SortDirection direction);
/**
* Initiates a request to obtain a listing of FlowFiles in this queue. This method returns a
* ListFlowFileStatus that can be used to obtain information about the FlowFiles that exist
* within the queue. Additionally, the ListFlowFileStatus provides a request identifier that
* can then be passed to the {@link #getListFlowFileStatus(String)}
*
* @param requestIdentifier the identifier of the List FlowFile Request
* @param maxResults the maximum number of FlowFileSummary objects to add to the ListFlowFileStatus
* @param query an Expression Language expression that will be evaluated against all FlowFiles. Only FlowFiles that satisfy the expression will
* be included in the results. The expression must be a valid expression and return a Boolean type
* @param sortColumn specifies which column to sort on
* @param direction specifies which direction to sort the FlowFiles
*
* @return the status for the request
*
* @throws IllegalStateException if either the source or the destination of the connection to which this queue belongs
* is currently running.
* @throws IllegalArgumentException if query is not a valid Expression Language expression or does not return a boolean type
*/
ListFlowFileStatus listFlowFiles(String requestIdentifier, int maxResults, String query, SortColumn sortColumn, SortDirection direction);
/**
* Returns the current status of a List FlowFile Request that was initiated via the {@link #listFlowFiles(String)}
* method that has the given identifier

View File

@ -43,16 +43,6 @@ public interface ListFlowFileStatus {
*/
long getLastUpdated();
/**
* @return the column on which the listing is sorted
*/
SortColumn getSortColumn();
/**
* @return the direction in which the FlowFiles are sorted
*/
SortDirection getSortDirection();
/**
* @return the current state of the operation
*/
@ -77,14 +67,4 @@ public interface ListFlowFileStatus {
* @return the percentage (an integer between 0 and 100, inclusive) of how close the request is to being completed
*/
int getCompletionPercentage();
/**
* @return the total number of steps that are required in order to finish the listing
*/
int getTotalStepCount();
/**
* @return the total number of steps that have already been completed. The value returned will be >= 0 and <= the result of calling {@link #getTotalStepCount()}.
*/
int getCompletedStepCount();
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 549 B

View File

@ -1260,19 +1260,23 @@ image:iconNotSecure.png["Not Secure"]
[[Queue_Listing]]
=== Listing FlowFiles in a Queue
[[Queue_Interaction]]
=== Queue Interaction
The FlowFiles enqueued in a Connection can be viewed when necessary. The Queue listing is opened via a menu item in
a Connection's context menu. This option is only available when the source and destination of the Connection have
been stopped and all active threads have completed. The listing will return the top 100 FlowFiles according to
the currently sorted column.
a Connection's context menu. The listing will return the top 100 FlowFiles in the active queue according to the
configured priority. The listing can be performed even if the source and destination are actively running.
Additionally, details for a Flowfile in the listing can be viewed by clicking on the Details icon (
image:iconDetails.png["Details"]
) in the left most column. From here, the FlowFile details and attributes are available as well buttons for
downloading or viewing the content. Viewing the content is only available if the nifi.content.viewer.url has been configured.
If the source or destination of the Connection are actively running, there is a chance that the desired FlowFile will
no longer be in the active queue.
The FlowFiles enqueued in a Connection can also be deleted when necessary. The removal of the FlowFiles is initiated
via a menu item in the Connection's context menu. This action can also be performed if the source and destination
are actively running.
[[Summary_Page]]

View File

@ -22,6 +22,7 @@ import java.util.List;
import javax.xml.bind.annotation.XmlType;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.apache.nifi.web.api.dto.util.TimeAdapter;
import org.apache.nifi.web.api.dto.util.TimestampAdapter;
import com.wordnik.swagger.annotations.ApiModelProperty;
@ -38,11 +39,10 @@ public class ListingRequestDTO {
private Integer percentCompleted;
private Boolean finished;
private String failureReason;
private String sortColumn;
private String sortDirection;
private Integer maxResults;
private Integer totalStepCount;
private Integer completedStepCount;
private Boolean isSourceRunning;
private Boolean isDestinationRunning;
private String state;
private QueueSizeDTO queueSize;
@ -95,7 +95,7 @@ public class ListingRequestDTO {
/**
* @return the time this request was last updated
*/
@XmlJavaTypeAdapter(TimestampAdapter.class)
@XmlJavaTypeAdapter(TimeAdapter.class)
@ApiModelProperty(
value = "The last time this listing request was updated."
)
@ -177,30 +177,6 @@ public class ListingRequestDTO {
this.flowFileSummaries = flowFileSummaries;
}
/**
* @return the column on which the listing is sorted
*/
@ApiModelProperty(value = "The column on which the FlowFiles are sorted.")
public String getSortColumn() {
return sortColumn;
}
public void setSortColumn(String sortColumn) {
this.sortColumn = sortColumn;
}
/**
* @return the direction in which the FlowFiles are sorted
*/
@ApiModelProperty(value = "The direction in which the FlowFiles are sorted. Either ASCENDING or DESCENDING.")
public String getSortDirection() {
return sortDirection;
}
public void setSortDirection(String sortDirection) {
this.sortDirection = sortDirection;
}
/**
* @return the maximum number of FlowFileSummary objects to return
*/
@ -213,31 +189,6 @@ public class ListingRequestDTO {
this.maxResults = maxResults;
}
/**
* @return the total number of steps required to complete the listing
*/
@ApiModelProperty(value = "The total number of steps required to complete the listing")
public Integer getTotalStepCount() {
return totalStepCount;
}
public void setTotalStepCount(Integer totalStepCount) {
this.totalStepCount = totalStepCount;
}
/**
* @return the number of steps that have already been completed. This value will be >= 0 and <= the total step count
*/
@ApiModelProperty(value = "The number of steps that have already been completed. This value will be between 0 and the total step count (inclusive)")
public Integer getCompletedStepCount() {
return completedStepCount;
}
public void setCompletedStepCount(Integer completedStepCount) {
this.completedStepCount = completedStepCount;
}
/**
* @return the size for the queue
*/
@ -249,4 +200,28 @@ public class ListingRequestDTO {
public void setQueueSize(QueueSizeDTO queueSize) {
this.queueSize = queueSize;
}
/**
* @return whether the source is running
*/
@ApiModelProperty(value = "Whether the source of the connection is running")
public Boolean getSourceRunning() {
return isSourceRunning;
}
public void setSourceRunning(Boolean sourceRunning) {
isSourceRunning = sourceRunning;
}
/**
* @return whether the destination is running
*/
@ApiModelProperty(value = "Whether the destination of the connection is running")
public Boolean getDestinationRunning() {
return isDestinationRunning;
}
public void setDestinationRunning(Boolean destinationRunning) {
isDestinationRunning = destinationRunning;
}
}

View File

@ -23,7 +23,7 @@ import javax.xml.bind.annotation.XmlRootElement;
/**
* A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a FlowFileDTO.
*/
@XmlRootElement(name = "listingRequestEntity")
@XmlRootElement(name = "flowFileEntity")
public class FlowFileEntity extends Entity {
private FlowFileDTO flowFile;

View File

@ -127,7 +127,6 @@ import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowFileSummaries;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
@ -137,8 +136,6 @@ import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.ListFlowFileState;
import org.apache.nifi.controller.queue.SortColumn;
import org.apache.nifi.controller.queue.SortDirection;
import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
@ -2447,7 +2444,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
private static boolean isListFlowFilesEndpoint(final URI uri, final String method) {
if ("GET".equalsIgnoreCase(method) && LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) {
if (("GET".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && LISTING_REQUEST_URI.matcher(uri.getPath()).matches()) {
return true;
} else if ("POST".equalsIgnoreCase(method) && LISTING_REQUESTS_URI.matcher(uri.getPath()).matches()) {
return true;
@ -2516,7 +2513,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|| isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method)
|| isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method)
|| isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method)
|| isDropRequestEndpoint(uri, method);
|| isDropRequestEndpoint(uri, method) || isListFlowFilesEndpoint(uri, method);
}
private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) {
@ -2860,8 +2857,28 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
* @param listingRequestMap the mapping of all responses being merged
*/
private void mergeListingRequests(final ListingRequestDTO listingRequest, final Map<NodeIdentifier, ListingRequestDTO> listingRequestMap) {
final Comparator<FlowFileSummaryDTO> comparator = FlowFileSummaries.createDTOComparator(
SortColumn.valueOf(listingRequest.getSortColumn()), SortDirection.valueOf(listingRequest.getSortDirection()));
final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() {
@Override
public int compare(final FlowFileSummaryDTO dto1, final FlowFileSummaryDTO dto2) {
int positionCompare = dto1.getPosition().compareTo(dto2.getPosition());
if (positionCompare != 0) {
return positionCompare;
}
final String address1 = dto1.getClusterNodeAddress();
final String address2 = dto2.getClusterNodeAddress();
if (address1 == null && address2 == null) {
return 0;
}
if (address1 == null) {
return 1;
}
if (address2 == null) {
return -1;
}
return address1.compareTo(address2);
}
};
final NavigableSet<FlowFileSummaryDTO> flowFileSummaries = new TreeSet<>(comparator);
@ -2877,8 +2894,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final ListingRequestDTO nodeRequest = entry.getValue();
numStepsCompleted += nodeRequest.getCompletedStepCount();
numStepsTotal += nodeRequest.getTotalStepCount();
numStepsTotal++;
if (Boolean.TRUE.equals(nodeRequest.getFinished())) {
numStepsCompleted++;
}
final QueueSizeDTO nodeQueueSize = nodeRequest.getQueueSize();
objectCount += nodeQueueSize.getObjectCount();
@ -2898,15 +2917,17 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
state = nodeState;
}
for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) {
summaryDTO.setClusterNodeId(nodeIdentifier.getId());
summaryDTO.setClusterNodeAddress(nodeAddress);
if (nodeRequest.getFlowFileSummaries() != null) {
for (final FlowFileSummaryDTO summaryDTO : nodeRequest.getFlowFileSummaries()) {
summaryDTO.setClusterNodeId(nodeIdentifier.getId());
summaryDTO.setClusterNodeAddress(nodeAddress);
flowFileSummaries.add(summaryDTO);
flowFileSummaries.add(summaryDTO);
// Keep the set from growing beyond our max
if (flowFileSummaries.size() > listingRequest.getMaxResults()) {
flowFileSummaries.pollLast();
// Keep the set from growing beyond our max
if (flowFileSummaries.size() > listingRequest.getMaxResults()) {
flowFileSummaries.pollLast();
}
}
}

View File

@ -25,24 +25,17 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
private final String requestId;
private final int maxResults;
private final QueueSize queueSize;
private final SortColumn sortColumn;
private final SortDirection sortDirection;
private final long submissionTime = System.currentTimeMillis();
private final List<FlowFileSummary> flowFileSummaries = new ArrayList<>();
private ListFlowFileState state = ListFlowFileState.WAITING_FOR_LOCK;
private String failureReason;
private int numSteps;
private int completedStepCount;
private long lastUpdated = System.currentTimeMillis();
public ListFlowFileRequest(final String requestId, final SortColumn sortColumn, final SortDirection sortDirection, final int maxResults, final QueueSize queueSize, final int numSteps) {
public ListFlowFileRequest(final String requestId, final int maxResults, final QueueSize queueSize) {
this.requestId = requestId;
this.sortColumn = sortColumn;
this.sortDirection = sortDirection;
this.maxResults = maxResults;
this.queueSize = queueSize;
this.numSteps = numSteps;
}
@Override
@ -60,16 +53,6 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
return lastUpdated;
}
@Override
public SortColumn getSortColumn() {
return sortColumn;
}
@Override
public SortDirection getSortDirection() {
return sortDirection;
}
@Override
public synchronized ListFlowFileState getState() {
return state;
@ -118,25 +101,11 @@ public class ListFlowFileRequest implements ListFlowFileStatus {
@Override
public synchronized int getCompletionPercentage() {
return (int) (100F * completedStepCount / numSteps);
}
public synchronized void setCompletedStepCount(final int completedStepCount) {
this.completedStepCount = completedStepCount;
return state == ListFlowFileState.COMPLETE ? 100 : 0;
}
@Override
public int getMaxResults() {
return maxResults;
}
@Override
public int getTotalStepCount() {
return numSteps;
}
@Override
public int getCompletedStepCount() {
return completedStepCount;
}
}

View File

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.StandardFlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueue;
@ -70,7 +71,7 @@ public final class StandardConnection implements Connection {
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
scheduler = builder.scheduler;
flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager,
scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold(), builder.heartbeater);
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
}
@ -269,6 +270,7 @@ public final class StandardConnection implements Connection {
private FlowFileRepository flowFileRepository;
private ProvenanceEventRepository provenanceRepository;
private ResourceClaimManager resourceClaimManager;
private Heartbeater heartbeater;
public Builder(final ProcessScheduler scheduler) {
this.scheduler = scheduler;
@ -304,6 +306,11 @@ public final class StandardConnection implements Connection {
return this;
}
public Builder heartbeater(final Heartbeater heartbeater) {
this.heartbeater = heartbeater;
return this;
}
public Builder bendPoints(final List<Position> bendPoints) {
this.bendPoints.clear();
this.bendPoints.addAll(bendPoints);

View File

@ -770,6 +770,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
.resourceClaimManager(resourceClaimManager)
.flowFileRepository(flowFileRepository)
.provenanceRepository(provenanceEventRepository)
.heartbeater(this)
.build();
}

View File

@ -1,95 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import java.util.Collections;
import java.util.Comparator;
import org.apache.nifi.controller.queue.FlowFileSummary;
import org.apache.nifi.controller.queue.SortColumn;
import org.apache.nifi.controller.queue.SortDirection;
import org.apache.nifi.web.api.dto.FlowFileSummaryDTO;
public class FlowFileSummaries {
public static Comparator<FlowFileSummary> createComparator(final SortColumn column, final SortDirection direction) {
final Comparator<FlowFileSummary> comparator = new Comparator<FlowFileSummary>() {
@Override
public int compare(final FlowFileSummary o1, final FlowFileSummary o2) {
switch (column) {
case FILENAME:
return o1.getFilename().compareTo(o2.getFilename());
case FLOWFILE_AGE:
return Long.compare(o1.getLineageStartDate(), o2.getLineageStartDate());
case FLOWFILE_SIZE:
return Long.compare(o1.getSize(), o2.getSize());
case FLOWFILE_UUID:
return o1.getUuid().compareTo(o2.getUuid());
case PENALIZATION:
return Boolean.compare(o1.isPenalized(), o2.isPenalized());
case QUEUE_POSITION:
return Long.compare(o1.getPosition(), o2.getPosition());
case QUEUED_DURATION:
return Long.compare(o1.getLastQueuedTime(), o2.getLastQueuedTime());
}
return 0;
}
};
if (direction == SortDirection.DESCENDING) {
return Collections.reverseOrder(comparator);
} else {
return comparator;
}
}
public static Comparator<FlowFileSummaryDTO> createDTOComparator(final SortColumn column, final SortDirection direction) {
final Comparator<FlowFileSummaryDTO> comparator = new Comparator<FlowFileSummaryDTO>() {
@Override
public int compare(final FlowFileSummaryDTO o1, final FlowFileSummaryDTO o2) {
switch (column) {
case FILENAME:
return o1.getFilename().compareTo(o2.getFilename());
case FLOWFILE_AGE:
return o1.getLineageDuration().compareTo(o2.getLineageDuration());
case FLOWFILE_SIZE:
return Long.compare(o1.getSize(), o2.getSize());
case FLOWFILE_UUID:
return o1.getUuid().compareTo(o2.getUuid());
case PENALIZATION:
return Boolean.compare(o1.getPenalized(), o2.getPenalized());
case QUEUE_POSITION:
return Long.compare(o1.getPosition(), o2.getPosition());
case QUEUED_DURATION:
return o1.getQueuedDuration().compareTo(o2.getQueuedDuration());
}
return 0;
}
};
if (direction == SortDirection.DESCENDING) {
return Collections.reverseOrder(comparator);
} else {
return comparator;
}
}
}

View File

@ -16,9 +16,25 @@
*/
package org.apache.nifi.controller;
import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
@ -28,8 +44,6 @@ import org.apache.nifi.controller.queue.ListFlowFileRequest;
import org.apache.nifi.controller.queue.ListFlowFileState;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.SortColumn;
import org.apache.nifi.controller.queue.SortDirection;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
@ -39,7 +53,6 @@ import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@ -57,27 +70,6 @@ import org.apache.nifi.util.concurrency.TimedLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* A FlowFileQueue is used to queue FlowFile objects that are awaiting further
* processing. Must be thread safe.
@ -115,6 +107,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final FlowFileRepository flowFileRepository;
private final ProvenanceEventRepository provRepository;
private final ResourceClaimManager resourceClaimManager;
private final Heartbeater heartbeater;
private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>();
@ -123,7 +116,8 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final ProcessScheduler scheduler;
public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold,
final Heartbeater heartbeater) {
activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
priorities = new ArrayList<>();
swapQueue = new ArrayList<>();
@ -137,6 +131,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
this.swapThreshold = swapThreshold;
this.scheduler = scheduler;
this.connection = connection;
this.heartbeater = heartbeater;
readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100);
writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100);
@ -852,33 +847,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults) {
return listFlowFiles(requestIdentifier, maxResults, SortColumn.QUEUE_POSITION, SortDirection.ASCENDING);
}
@Override
public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final SortColumn sortColumn, final SortDirection direction) {
return listFlowFiles(requestIdentifier, maxResults, null, sortColumn, direction);
}
@Override
public ListFlowFileStatus listFlowFiles(final String requestIdentifier, final int maxResults, final String query, final SortColumn sortColumn, final SortDirection direction) {
final PreparedQuery preparedQuery;
if (query == null) {
preparedQuery = null;
} else {
try {
final ResultType resultType = Query.compile(query).getResultType();
if (resultType != ResultType.BOOLEAN) {
throw new IllegalArgumentException("Invalid expression Language provided to search the listing of FlowFiles. "
+ "The expression must return a 'Boolean' type but returns a " + resultType.name() + " type");
}
preparedQuery = Query.prepare(query);
} catch (final AttributeExpressionLanguageParsingException e) {
throw new IllegalArgumentException("Invalid Expression Language provided to search the listing of FlowFiles: " + query, e);
}
}
// purge any old requests from the map just to keep it clean. But if there are very requests, which is usually the case, then don't bother
// purge any old requests from the map just to keep it clean. But if there are very few requests, which is usually the case, then don't bother
if (listRequestMap.size() > 10) {
final List<String> toDrop = new ArrayList<>();
for (final Map.Entry<String, ListFlowFileRequest> entry : listRequestMap.entrySet()) {
@ -896,101 +865,49 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
}
// numSteps = 1 for each swap location + 1 for active queue + 1 for swap queue.
final int numSteps = 2 + size.get().swapFiles;
final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, sortColumn, direction, maxResults, size(), numSteps);
final ListFlowFileRequest listRequest = new ListFlowFileRequest(requestIdentifier, maxResults, size());
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
int position = 0;
int resultCount = 0;
final Comparator<FlowFileSummary> comparator = FlowFileSummaries.createComparator(sortColumn, direction);
final NavigableSet<FlowFileSummary> summaries = new TreeSet<>(comparator);
int completedStepCount = 0;
final List<FlowFileSummary> summaries = new ArrayList<>();
// we need a write lock while using the Active Queue because we can't iterate over it - we have to poll from it
// continually. This is because the iterator for PriorityQueue does not iterate over the elements in any particular
// order. Since we need the 'position' of the element in the queue, we need to iterate over them in the proper order.
writeLock.lock();
// Create an ArrayList that contains all of the contents of the active queue.
// We do this so that we don't have to hold the lock any longer than absolutely necessary.
// We cannot simply pull the first 'maxResults' records from the queue, however, because the
// Iterator provided by PriorityQueue does not return records in order. So we would have to either
// use a writeLock and 'pop' the first 'maxResults' records off the queue or use a read lock and
// do a shallow copy of the queue. The shallow copy is generally quicker because it doesn't have to do
// the sorting to put the records back. So even though this has an expensive of Java Heap to create the
// extra collection, we are making this trade-off to avoid locking the queue any longer than required.
final List<FlowFileRecord> allFlowFiles;
final Prioritizer prioritizer;
readLock.lock();
try {
logger.debug("{} Acquired lock to perform listing of FlowFiles", StandardFlowFileQueue.this);
listRequest.setState(ListFlowFileState.CALCULATING_LIST);
final List<FlowFileRecord> flowFileRecords = new ArrayList<>(activeQueue.size());
FlowFileRecord flowFile;
try {
while ((flowFile = activeQueue.poll()) != null) {
flowFileRecords.add(flowFile);
position++;
if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
summaries.add(summarize(flowFile, position));
if (summaries.size() > maxResults) {
summaries.pollLast();
}
}
}
} finally {
activeQueue.addAll(flowFileRecords);
}
allFlowFiles = new ArrayList<>(activeQueue);
prioritizer = new Prioritizer(StandardFlowFileQueue.this.priorities);
} finally {
writeLock.unlock("List FlowFiles");
readLock.unlock("List FlowFiles");
}
listRequest.setState(ListFlowFileState.CALCULATING_LIST);
// sort the FlowFileRecords so that we have the list in the same order as on the queue.
Collections.sort(allFlowFiles, prioritizer);
for (final FlowFileRecord flowFile : allFlowFiles) {
summaries.add(summarize(flowFile, ++position));
if (summaries.size() >= maxResults) {
break;
}
}
logger.debug("{} Finished listing FlowFiles for active queue with a total of {} results", StandardFlowFileQueue.this, resultCount);
listRequest.setCompletedStepCount(++completedStepCount);
position = activeQueue.size();
try {
// We are now iterating over swap files, and we don't need the write lock for this, just the read lock, since
// we are not modifying anything.
readLock.lock();
try {
for (final String location : swapLocations) {
logger.debug("{} Performing listing of FlowFiles for Swap Location {}", StandardFlowFileQueue.this, location);
final List<FlowFileRecord> flowFiles = swapManager.peek(location, StandardFlowFileQueue.this);
for (final FlowFileRecord flowFile : flowFiles) {
position++;
if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
summaries.add(summarize(flowFile, position));
if (summaries.size() > maxResults) {
summaries.pollLast();
}
}
}
listRequest.setCompletedStepCount(++completedStepCount);
}
logger.debug("{} Performing listing of FlowFiles from Swap Queue", StandardFlowFileQueue.this);
for (final FlowFileRecord flowFile : swapQueue) {
position++;
if (preparedQuery == null || "true".equals(preparedQuery.evaluateExpressions(flowFile))) {
summaries.add(summarize(flowFile, position));
if (summaries.size() > maxResults) {
summaries.pollLast();
}
}
}
listRequest.setCompletedStepCount(++completedStepCount);
} finally {
readLock.unlock("List FlowFiles");
}
} catch (final IOException ioe) {
logger.error("Failed to read swapped FlowFiles in order to perform listing of queue " + StandardFlowFileQueue.this, ioe);
listRequest.setFailure("Could not read FlowFiles from queue. Check log files for more details.");
}
// We have now completed the listing successfully. Set the number of completed steps to the total number of steps. We may have
// skipped some steps because we have reached the maximum number of results, so we consider those steps completed.
logger.debug("{} Completed listing of FlowFiles", StandardFlowFileQueue.this);
listRequest.setCompletedStepCount(listRequest.getTotalStepCount());
listRequest.setFlowFileSummaries(summaries);
listRequest.setState(ListFlowFileState.COMPLETE);
listRequest.setFlowFileSummaries(new ArrayList<FlowFileSummary>(summaries));
}
}, "List FlowFiles for Connection " + getIdentifier());
t.setDaemon(true);
@ -1082,24 +999,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
return flowFile;
}
}
for (final FlowFileRecord flowFile : swapQueue) {
if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
return flowFile;
}
}
// TODO: consider using a Long flowFileId instead of a UUID, and then having the swap manager
// write out the min and max FlowFile ID's. This would allow us to then have a method: boolean isFlowFilePossiblyContained(long id)
// which can return a boolean value that can be used to determine whether or not to even call peek
for (final String swapLocation : swapLocations) {
final List<FlowFileRecord> flowFiles = swapManager.peek(swapLocation, this);
for (final FlowFileRecord flowFile : flowFiles) {
if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) {
return flowFile;
}
}
}
} finally {
readLock.unlock("getFlowFile");
}
@ -1110,13 +1009,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
@Override
public void verifyCanList() throws IllegalStateException {
if (connection.getSource().isRunning()) {
throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's source is still running");
}
if (connection.getDestination().isRunning()) {
throw new IllegalStateException("Cannot list the FlowFiles of queue because the connection's destination is still running");
}
}
@Override
@ -1248,6 +1140,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}",
dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), StandardFlowFileQueue.this.getIdentifier(), requestor);
dropRequest.setState(DropFlowFileState.COMPLETE);
if (heartbeater != null) {
heartbeater.heartbeat();
}
} catch (final Exception e) {
logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), e.toString());
logger.error("", e);

View File

@ -17,17 +17,31 @@
package org.apache.nifi.controller;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileSummary;
import org.apache.nifi.controller.queue.ListFlowFileState;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.SortDirection;
import org.apache.nifi.controller.queue.SortColumn;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
@ -49,23 +63,6 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestStandardFlowFileQueue {
private TestSwapManager swapManager = null;
private StandardFlowFileQueue queue = null;
@ -105,7 +102,7 @@ public class TestStandardFlowFileQueue {
}
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, null);
TestFlowFile.idGenerator.set(0L);
}
@ -417,72 +414,8 @@ public class TestStandardFlowFileQueue {
assertEquals(9999, status.getFlowFileSummaries().size());
assertEquals(100, status.getCompletionPercentage());
assertNull(status.getFailureReason());
assertEquals(2, status.getTotalStepCount());
assertEquals(2, status.getCompletedStepCount());
}
@Test(timeout = 5000)
public void testListFlowFilesActiveQueueAndSwapQueue() throws InterruptedException {
for (int i = 0; i < 11000; i++) {
queue.put(new TestFlowFile());
}
final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 11000);
assertNotNull(status);
assertEquals(11000, status.getQueueSize().getObjectCount());
while (status.getState() != ListFlowFileState.COMPLETE) {
Thread.sleep(100);
}
assertEquals(11000, status.getFlowFileSummaries().size());
assertEquals(100, status.getCompletionPercentage());
assertNull(status.getFailureReason());
assertEquals(2, status.getTotalStepCount());
assertEquals(2, status.getCompletedStepCount());
}
@Test(timeout = 5000)
public void testListFlowFilesActiveQueueAndSwapFile() throws InterruptedException {
for (int i = 0; i < 20000; i++) {
queue.put(new TestFlowFile());
}
final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 20000);
assertNotNull(status);
assertEquals(20000, status.getQueueSize().getObjectCount());
while (status.getState() != ListFlowFileState.COMPLETE) {
Thread.sleep(100);
}
assertEquals(20000, status.getFlowFileSummaries().size());
assertEquals(100, status.getCompletionPercentage());
assertNull(status.getFailureReason());
assertEquals(3, status.getTotalStepCount());
assertEquals(3, status.getCompletedStepCount());
}
@Test(timeout = 5000)
public void testListFlowFilesActiveQueueAndSwapFilesAndSwapQueue() throws InterruptedException {
for (int i = 0; i < 30050; i++) {
queue.put(new TestFlowFile());
}
final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 30050);
assertNotNull(status);
assertEquals(30050, status.getQueueSize().getObjectCount());
while (status.getState() != ListFlowFileState.COMPLETE) {
Thread.sleep(100);
}
assertEquals(30050, status.getFlowFileSummaries().size());
assertEquals(100, status.getCompletionPercentage());
assertNull(status.getFailureReason());
assertEquals(4, status.getTotalStepCount());
assertEquals(4, status.getCompletedStepCount());
}
@Test(timeout = 5000)
public void testListFlowFilesResultsLimited() throws InterruptedException {
@ -501,62 +434,6 @@ public class TestStandardFlowFileQueue {
assertEquals(100, status.getFlowFileSummaries().size());
assertEquals(100, status.getCompletionPercentage());
assertNull(status.getFailureReason());
assertEquals(4, status.getTotalStepCount());
assertEquals(4, status.getCompletedStepCount());
}
@Test
public void testListFlowFilesSortedAscending() throws InterruptedException {
for (int i = 0; i < 30050; i++) {
queue.put(new TestFlowFile(i));
}
final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100, SortColumn.FLOWFILE_SIZE, SortDirection.ASCENDING);
assertNotNull(status);
assertEquals(30050, status.getQueueSize().getObjectCount());
while (status.getState() != ListFlowFileState.COMPLETE) {
Thread.sleep(100);
}
assertEquals(100, status.getFlowFileSummaries().size());
assertEquals(100, status.getCompletionPercentage());
assertNull(status.getFailureReason());
assertEquals(4, status.getTotalStepCount());
assertEquals(4, status.getCompletedStepCount());
int counter = 0;
for (final FlowFileSummary summary : status.getFlowFileSummaries()) {
assertEquals(counter++, summary.getSize());
}
}
@Test
public void testListFlowFilesSortedDescending() throws InterruptedException {
for (int i = 0; i < 30050; i++) {
queue.put(new TestFlowFile(i));
}
final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100, SortColumn.FLOWFILE_SIZE, SortDirection.DESCENDING);
assertNotNull(status);
assertEquals(30050, status.getQueueSize().getObjectCount());
while (status.getState() != ListFlowFileState.COMPLETE) {
Thread.sleep(100);
}
assertEquals(100, status.getFlowFileSummaries().size());
assertEquals(100, status.getCompletionPercentage());
assertNull(status.getFailureReason());
assertEquals(4, status.getTotalStepCount());
assertEquals(4, status.getCompletedStepCount());
int counter = 0;
for (final FlowFileSummary summary : status.getFlowFileSummaries()) {
assertEquals((30050 - 1 - counter++), summary.getSize());
}
}

View File

@ -139,7 +139,7 @@ public class TestStandardProcessSession {
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000, null);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
Mockito.doAnswer(new Answer<Object>() {

View File

@ -16,25 +16,22 @@
*/
package org.apache.nifi.web;
import java.util.Collection;
import java.util.Date;
import java.util.Set;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.queue.SortColumn;
import org.apache.nifi.controller.queue.SortDirection;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.web.api.dto.BulletinBoardDTO;
import org.apache.nifi.web.api.dto.BulletinQueryDTO;
import org.apache.nifi.web.api.dto.ClusterDTO;
import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerConfigurationDTO;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersDTO;
import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.FlowFileDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
@ -45,9 +42,6 @@ import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.PortDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
@ -61,8 +55,8 @@ import org.apache.nifi.web.api.dto.UserGroupDTO;
import org.apache.nifi.web.api.dto.action.ActionDTO;
import org.apache.nifi.web.api.dto.action.HistoryDTO;
import org.apache.nifi.web.api.dto.action.HistoryQueryDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
@ -78,6 +72,10 @@ import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import java.util.Collection;
import java.util.Date;
import java.util.Set;
/**
* Defines the NiFiServiceFacade interface.
*/
@ -585,11 +583,9 @@ public interface NiFiServiceFacade {
* @param groupId group
* @param connectionId The ID of the connection
* @param listingRequestId The ID of the listing request
* @param column sort column
* @param direction sort direction
* @return The ListingRequest
*/
ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId, SortColumn column, SortDirection direction);
ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId);
/**
* Gets a new flow file listing request.

View File

@ -16,25 +16,8 @@
*/
package org.apache.nifi.web;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.WebApplicationException;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.action.Component;
import org.apache.nifi.action.FlowChangeAction;
@ -53,18 +36,23 @@ import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.SortColumn;
import org.apache.nifi.controller.queue.SortDirection;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
@ -82,9 +70,7 @@ import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.web.api.dto.FlowFileDTO;
import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.security.user.NiFiUserUtils;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.user.AccountStatus;
import org.apache.nifi.user.NiFiUser;
import org.apache.nifi.user.NiFiUserGroup;
@ -94,16 +80,22 @@ import org.apache.nifi.web.api.dto.BulletinBoardDTO;
import org.apache.nifi.web.api.dto.BulletinDTO;
import org.apache.nifi.web.api.dto.BulletinQueryDTO;
import org.apache.nifi.web.api.dto.ClusterDTO;
import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ControllerConfigurationDTO;
import org.apache.nifi.web.api.dto.ControllerDTO;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.CounterDTO;
import org.apache.nifi.web.api.dto.CountersDTO;
import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.FlowFileDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
import org.apache.nifi.web.api.dto.ListingRequestDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.NodeSystemDiagnosticsDTO;
import org.apache.nifi.web.api.dto.PortDTO;
@ -111,10 +103,11 @@ import org.apache.nifi.web.api.dto.PreviousValueDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.PropertyHistoryDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
import org.apache.nifi.web.api.dto.SnippetDTO;
import org.apache.nifi.web.api.dto.SystemDiagnosticsDTO;
@ -131,6 +124,7 @@ import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.dto.status.ClusterConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterPortStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterProcessorStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterRemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.ClusterStatusDTO;
@ -138,6 +132,7 @@ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.NodePortStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessorStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeRemoteProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeStatusDTO;
@ -145,40 +140,41 @@ import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.controller.ControllerFacade;
import org.apache.nifi.web.dao.ConnectionDAO;
import org.apache.nifi.web.dao.ControllerServiceDAO;
import org.apache.nifi.web.dao.FunnelDAO;
import org.apache.nifi.web.dao.LabelDAO;
import org.apache.nifi.web.dao.PortDAO;
import org.apache.nifi.web.dao.ProcessGroupDAO;
import org.apache.nifi.web.dao.ProcessorDAO;
import org.apache.nifi.web.dao.RemoteProcessGroupDAO;
import org.apache.nifi.web.dao.ReportingTaskDAO;
import org.apache.nifi.web.dao.SnippetDAO;
import org.apache.nifi.web.dao.TemplateDAO;
import org.apache.nifi.web.util.SnippetUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusDTO;
import org.apache.nifi.web.dao.ControllerServiceDAO;
import org.apache.nifi.web.dao.ReportingTaskDAO;
import org.apache.nifi.web.security.user.NewAccountRequest;
import org.apache.nifi.web.security.user.NiFiUserUtils;
import org.apache.nifi.web.util.SnippetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.AccessDeniedException;
import javax.ws.rs.WebApplicationException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* Implementation of NiFiServiceFacade that performs revision checking.
*/
@ -828,7 +824,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public ListingRequestDTO deleteFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) {
return dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(groupId, connectionId, listingRequestId));
final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.deleteFlowFileListingRequest(groupId, connectionId, listingRequestId));
// include whether the source and destination are running
final Connection connection = connectionDAO.getConnection(groupId, connectionId);
if (connection.getSource() != null) {
listRequest.setSourceRunning(connection.getSource().isRunning());
}
if (connection.getDestination() != null) {
listRequest.setDestinationRunning(connection.getDestination().isRunning());
}
return listRequest;
}
@Override
@ -1088,8 +1095,19 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId, SortColumn column, SortDirection direction) {
return dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(groupId, connectionId, listingRequestId, column, direction));
public ListingRequestDTO createFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) {
final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.createFlowFileListingRequest(groupId, connectionId, listingRequestId));
// include whether the source and destination are running
final Connection connection = connectionDAO.getConnection(groupId, connectionId);
if (connection.getSource() != null) {
listRequest.setSourceRunning(connection.getSource().isRunning());
}
if (connection.getDestination() != null) {
listRequest.setDestinationRunning(connection.getDestination().isRunning());
}
return listRequest;
}
@Override
@ -2153,7 +2171,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
@Override
public ListingRequestDTO getFlowFileListingRequest(String groupId, String connectionId, String listingRequestId) {
return dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(groupId, connectionId, listingRequestId));
final ListingRequestDTO listRequest = dtoFactory.createListingRequestDTO(connectionDAO.getFlowFileListingRequest(groupId, connectionId, listingRequestId));
// include whether the source and destination are running
final Connection connection = connectionDAO.getConnection(groupId, connectionId);
if (connection.getSource() != null) {
listRequest.setSourceRunning(connection.getSource().isRunning());
}
if (connection.getDestination() != null) {
listRequest.setDestinationRunning(connection.getDestination().isRunning());
}
return listRequest;
}
@Override

View File

@ -29,8 +29,6 @@ import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.SortColumn;
import org.apache.nifi.controller.queue.SortDirection;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.ConfigurationSnapshot;
@ -1173,6 +1171,11 @@ public class ConnectionResource extends ApplicationResource {
)
@PathParam("connection-id") String id) {
// replicate if cluster manager
if (properties.isClusterManager()) {
return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
// defer to the new endpoint that references /drop-requests in the URI
return createDropRequest(httpServletRequest, clientId, id);
}
@ -1186,7 +1189,7 @@ public class ConnectionResource extends ApplicationResource {
* @return A listRequestEntity
*/
@POST
@Consumes(MediaType.WILDCARD)
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/{connection-id}/listing-requests")
@PreAuthorize("hasRole('ROLE_DFM')")
@ -1213,47 +1216,16 @@ public class ConnectionResource extends ApplicationResource {
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
required = false
)
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@ApiParam(
value = "The connection id.",
required = true
)
@PathParam("connection-id") String id,
@ApiParam(
value = "The sort column.",
required = false,
defaultValue = "QUEUE_POSITION",
allowableValues = "QUEUE_POSITION, FLOWFILE_UUID, FILENAME, FLOWFILE_SIZE, QUEUED_DURATION, FLOWFILE_AGE, PENALIZATION"
)
@FormParam("sortColumn") String sortColumn,
@ApiParam(
value = "The sort direction.",
required = false,
defaultValue = "asc",
allowableValues = "asc, desc"
)
@FormParam("sortOrder") @DefaultValue("asc") String sortOrder) {
// parse the sort column
final SortColumn column;
if (sortColumn == null) {
column = SortColumn.QUEUE_POSITION;
} else {
try {
column = SortColumn.valueOf(sortColumn);
} catch (final IllegalArgumentException iae) {
throw new IllegalArgumentException(String.format("Sort Column: Value must be one of [%s]", StringUtils.join(SortColumn.values(), ", ")));
}
}
// normalize the sort order
if (!sortOrder.equalsIgnoreCase("asc") && !sortOrder.equalsIgnoreCase("desc")) {
throw new IllegalArgumentException("The sort order must be 'asc' or 'desc'.");
}
@PathParam("connection-id") String id) {
// replicate if cluster manager
if (properties.isClusterManager()) {
return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
// handle expects request (usually from the cluster manager)
@ -1263,13 +1235,6 @@ public class ConnectionResource extends ApplicationResource {
return generateContinueResponse().build();
}
final SortDirection direction;
if (sortOrder.equalsIgnoreCase("asc")) {
direction = SortDirection.ASCENDING;
} else {
direction = SortDirection.DESCENDING;
}
// ensure the id is the same across the cluster
final String listingRequestId;
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
@ -1280,7 +1245,7 @@ public class ConnectionResource extends ApplicationResource {
}
// submit the listing request
final ListingRequestDTO listingRequest = serviceFacade.createFlowFileListingRequest(groupId, id, listingRequestId, column, direction);
final ListingRequestDTO listingRequest = serviceFacade.createFlowFileListingRequest(groupId, id, listingRequestId);
populateRemainingFlowFileListingContent(id, listingRequest);
// create the revision
@ -1452,7 +1417,7 @@ public class ConnectionResource extends ApplicationResource {
* @return A dropRequestEntity
*/
@POST
@Consumes(MediaType.WILDCARD)
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/{connection-id}/drop-requests")
@PreAuthorize("hasRole('ROLE_DFM')")
@ -1479,7 +1444,7 @@ public class ConnectionResource extends ApplicationResource {
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
required = false
)
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@FormParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@ApiParam(
value = "The connection id.",
required = true
@ -1488,7 +1453,7 @@ public class ConnectionResource extends ApplicationResource {
// replicate if cluster manager
if (properties.isClusterManager()) {
return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
// handle expects request (usually from the cluster manager)

View File

@ -378,10 +378,6 @@ public final class DtoFactory {
dto.setFailureReason(listingRequest.getFailureReason());
dto.setFinished(isListingRequestComplete(listingRequest.getState()));
dto.setMaxResults(listingRequest.getMaxResults());
dto.setSortColumn(listingRequest.getSortColumn().name());
dto.setSortDirection(listingRequest.getSortDirection().name());
dto.setTotalStepCount(listingRequest.getTotalStepCount());
dto.setCompletedStepCount(listingRequest.getCompletedStepCount());
dto.setPercentCompleted(listingRequest.getCompletionPercentage());
dto.setQueueSize(createQueueSizeDTO(listingRequest.getQueueSize()));

View File

@ -16,16 +16,15 @@
*/
package org.apache.nifi.web.dao;
import java.util.Set;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.SortColumn;
import org.apache.nifi.controller.queue.SortDirection;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import java.util.Set;
public interface ConnectionDAO {
/**
@ -118,11 +117,9 @@ public interface ConnectionDAO {
* @param groupId group id
* @param id connection id
* @param listingRequestId listing request id
* @param column sort column
* @param direction sort direction
* @return The listing request status
*/
ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId, SortColumn column, SortDirection direction);
ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId);
/**
* Verifies the listing can be processed.

View File

@ -16,18 +16,6 @@
*/
package org.apache.nifi.web.dao.impl;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import javax.ws.rs.WebApplicationException;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.authorization.DownloadAuthorization;
import org.apache.nifi.connectable.Connectable;
@ -40,14 +28,12 @@ import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
import org.apache.nifi.controller.queue.SortColumn;
import org.apache.nifi.controller.queue.SortDirection;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.user.NiFiUser;
@ -64,6 +50,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.AccessDeniedException;
import javax.ws.rs.WebApplicationException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO {
private static final Logger logger = LoggerFactory.getLogger(StandardConnectionDAO.class);
@ -126,7 +124,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid);
if (flowFile == null) {
throw new ResourceNotFoundException(String.format("Unable to find FlowFile '%s' in Connection '%s'.", flowFileUuid, id));
throw new ResourceNotFoundException(String.format("The FlowFile with UUID %s is no longer in the active queue.", flowFileUuid));
}
return flowFile;
@ -375,14 +373,14 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
}
@Override
public ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId, SortColumn column, SortDirection direction) {
public ListFlowFileStatus createFlowFileListingRequest(String groupId, String id, String listingRequestId) {
final Connection connection = locateConnection(groupId, id);
final FlowFileQueue queue = connection.getFlowFileQueue();
// ensure we can list
verifyList(queue);
return queue.listFlowFiles(listingRequestId, 100, column, direction);
return queue.listFlowFiles(listingRequestId, 100);
}
@Override
@ -606,7 +604,7 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
final FlowFileRecord flowFile = queue.getFlowFile(flowFileUuid);
if (flowFile == null) {
throw new ResourceNotFoundException(String.format("Unable to find FlowFile '%s' in Connection '%s'.", flowFileUuid, id));
throw new ResourceNotFoundException(String.format("The FlowFile with UUID %s is no longer in the active queue.", flowFileUuid));
}
// calculate the dn chain

View File

@ -20,10 +20,16 @@
<div id="queue-listing-header-text"></div>
</div>
<div id="queue-listing-stats-container">
<div id="queue-listing-refresh-button" class="pointer" title="Refresh"></div>
<div id="queue-listing-last-refreshed-container">
Last updated:&nbsp;<span id="queue-listing-last-refreshed"></span>
</div>
<div id="queue-listing-loading-container" class="loading-container"></div>
<div id="queue-listing-stats">
Displaying&nbsp;<span id="displayed-flowfiles"></span>&nbsp;of&nbsp;<span id="total-flowfiles-count"></span>&nbsp;(<span id="total-flowfiles-size"></span>)
</div>
<div id="queue-listing-loading-container" class="loading-container"></div>
<div class="clear"></div>
</div>
<div id="queue-listing-table"></div>
<div id="queue-listing-message" class="hidden"></div>
</div>

View File

@ -21,10 +21,10 @@
#queue-listing-container {
position: absolute;
top: 0px;
bottom: 0px;
left: 0px;
right: 0px;
top: 0;
bottom: 0;
left: 0;
right: 0;
}
#queue-listing-header-container {
@ -41,19 +41,28 @@
}
#queue-listing-stats-container {
margin-left: 15px;
margin-top: 30px;
margin-left: 20px;
margin-top: 18px;
-webkit-user-select: none;
-moz-user-select: none;
}
#queue-listing-stats {
font-size: 9px;
#queue-listing-refresh-button {
float: left;
height: 24px;
width: 26px;
}
#queue-listing-last-refreshed-container {
float: left;
color: #666;
font-weight: normal;
margin-top: 6px;
margin-left: 3px;
}
#queue-listing-last-refreshed {
font-weight: bold;
color: #9f6000;
clear: left;
line-height: normal;
margin-left: 5px;
}
#queue-listing-loading-container {
@ -65,6 +74,16 @@
margin-left: 3px;
}
#queue-listing-stats {
font-size: 9px;
font-weight: bold;
color: #9f6000;
float: right;
line-height: normal;
margin-right: 20px;
margin-top: 6px;
}
/* queue listing table */
#queue-listing-table {
@ -77,6 +96,16 @@
overflow: hidden;
}
/* queue listing table */
#queue-listing-message {
position: absolute;
left: 20px;
bottom: 20px;
color: #f00;
font-size: 10px;
}
/* flowfile details */
#flowfile-details-dialog {

View File

@ -975,7 +975,7 @@ nf.Actions = (function () {
});
} else {
// nothing was removed
nf.Dialog.showYesNoDialog({
nf.Dialog.showOkDialog({
dialogContent: 'No FlowFiles were removed.',
overlayBackground: false
});

View File

@ -307,7 +307,7 @@ nf.ContextMenu = (function () {
* @param {selection} selection
*/
var canListQueue = function (selection) {
return nf.Common.isDFM() && isConnection(selection) && nf.CanvasUtils.supportsModification(selection);
return nf.Common.isDFM() && isConnection(selection);
};
/**

View File

@ -22,9 +22,6 @@
*/
nf.QueueListing = (function () {
var DEFAULT_SORT_COL = 'QUEUE_POSITION';
var DEFAULT_SORT_ASC = true;
/**
* Initializes the listing request status dialog.
*/
@ -78,7 +75,7 @@ nf.QueueListing = (function () {
var dataUri = $('#flowfile-uri').text() + '/content';
// conditionally include the cluster node id
var clusterNodeId = $('#flowfile-cluster-node-id').text();;
var clusterNodeId = $('#flowfile-cluster-node-id').text();
if (!nf.Common.isBlank(clusterNodeId)) {
var parameters = {
'clusterNodeId': clusterNodeId
@ -158,10 +155,8 @@ nf.QueueListing = (function () {
* Performs a listing on the specified connection.
*
* @param connection the connection
* @param sortCol the sort column
* @param sortAsc if sort is asc
*/
var performListing = function (connection, sortCol, sortAsc) {
var performListing = function (connection) {
var MAX_DELAY = 4;
var cancelled = false;
var listingRequest = null;
@ -231,6 +226,26 @@ nf.QueueListing = (function () {
$('#total-flowfiles-count').text(nf.Common.formatInteger(listingRequest.queueSize.objectCount));
$('#total-flowfiles-size').text(nf.Common.formatDataSize(listingRequest.queueSize.byteCount));
// update the last updated time
$('#queue-listing-last-refreshed').text(listingRequest.lastUpdated);
// show a message for the queue listing if necessary
var queueListingTable = $('#queue-listing-table');
var queueListingMessage = $('#queue-listing-message');
if (listingRequest.sourceRunning === true || listingRequest.destinationRunning === true) {
if (listingRequest.souceRunning === true && listingRequest.destinationRunning === true) {
queueListingMessage.text('The source and destination of this queue are currently running. This listing may no longer be accurate.').show();
} else if (listingRequest.sourceRunning === true) {
queueListingMessage.text('The source of this queue is currently running. This listing may no longer be accurate.').show();
} else if (listingRequest.destinationRunning === true) {
queueListingMessage.text('The destination of this queue is currently running. This listing may no longer be accurate.').show();
}
queueListingTable.css('bottom', '35px');
} else {
queueListingMessage.text('').hide();
queueListingTable.css('bottom', '20px');
}
// get the grid to load the data
var queueListingGrid = $('#queue-listing-table').data('gridInstance');
var queueListingData = queueListingGrid.getData();
@ -290,10 +305,6 @@ nf.QueueListing = (function () {
$.ajax({
type: 'POST',
url: connection.component.uri + '/listing-requests',
data: {
sortColumn: sortCol,
sortOrder: sortAsc ? 'asc' : 'desc'
},
dataType: 'json'
}).done(function(response) {
// initialize the progress bar value
@ -332,9 +343,15 @@ nf.QueueListing = (function () {
}
};
var params = {};
if (nf.Common.isDefinedAndNotNull(flowFileSummary.clusterNodeId)) {
params['clusterNodeId'] = flowFileSummary.clusterNodeId;
}
$.ajax({
type: 'GET',
url: flowFileSummary.uri,
data: params,
dataType: 'json'
}).done(function(response) {
var flowFile = response.flowFile;
@ -352,12 +369,12 @@ nf.QueueListing = (function () {
$('#flowfile-penalized').text(flowFile.penalized === true ? 'Yes' : 'No');
// conditionally show the cluster node identifier
if (nf.Common.isDefinedAndNotNull(flowFile.clusterNodeId)) {
if (nf.Common.isDefinedAndNotNull(flowFileSummary.clusterNodeId)) {
// save the cluster node id
$('#flowfile-cluster-node-id').text(flowFile.clusterNodeId);
$('#flowfile-cluster-node-id').text(flowFileSummary.clusterNodeId);
// render the cluster node address
formatFlowFileDetail('Node Address', flowFile.clusterNodeAddress);
formatFlowFileDetail('Node Address', flowFileSummary.clusterNodeAddress);
}
if (nf.Common.isDefinedAndNotNull(flowFile.contentClaimContainer)) {
@ -423,6 +440,12 @@ nf.QueueListing = (function () {
resetTableSize();
});
// define mouse over event for the refresh button
nf.Common.addHoverEffect('#queue-listing-refresh-button', 'button-refresh', 'button-refresh-hover').click(function () {
var connection = $('#queue-listing-table').data('connection');
performListing(connection);
});
// define a custom formatter for showing more processor details
var moreDetailsFormatter = function (row, cell, value, columnDef, dataContext) {
return '<img src="images/iconDetails.png" title="View Details" class="pointer show-flowfile-details" style="margin-top: 5px; float: left;"/>';
@ -452,13 +475,13 @@ nf.QueueListing = (function () {
// initialize the queue listing table
var queueListingColumns = [
{id: 'moreDetails', field: 'moreDetails', name: '&nbsp;', sortable: false, resizable: false, formatter: moreDetailsFormatter, width: 50, maxWidth: 50},
{id: 'QUEUE_POSITION', name: 'Position', field: 'position', sortable: true, resizable: false, width: 75, maxWidth: 75},
{id: 'FLOWFILE_UUID', name: 'UUID', field: 'uuid', sortable: true, resizable: true},
{id: 'FILENAME', name: 'Filename', field: 'filename', sortable: true, resizable: true},
{id: 'FLOWFILE_SIZE', name: 'File Size', field: 'size', sortable: true, resizable: true, defaultSortAsc: false, formatter: dataSizeFormatter},
{id: 'QUEUED_DURATION', name: 'Queued Duration', field: 'queuedDuration', sortable: true, resizable: true, formatter: durationFormatter},
{id: 'FLOWFILE_AGE', name: 'Lineage Duration', field: 'lineageDuration', sortable: true, resizable: true, formatter: durationFormatter},
{id: 'PENALIZATION', name: 'Penalized', field: 'penalized', sortable: true, resizable: false, width: 100, maxWidth: 100, formatter: penalizedFormatter}
{id: 'position', name: 'Position', field: 'position', sortable: false, resizable: false, width: 75, maxWidth: 75},
{id: 'uuid', name: 'UUID', field: 'uuid', sortable: false, resizable: true},
{id: 'filename', name: 'Filename', field: 'filename', sortable: false, resizable: true},
{id: 'size', name: 'File Size', field: 'size', sortable: false, resizable: true, defaultSortAsc: false, formatter: dataSizeFormatter},
{id: 'queuedDuration', name: 'Queued Duration', field: 'queuedDuration', sortable: false, resizable: true, formatter: durationFormatter},
{id: 'lineageDuration', name: 'Lineage Duration', field: 'lineageDuration', sortable: false, resizable: true, formatter: durationFormatter},
{id: 'penalized', name: 'Penalized', field: 'penalized', sortable: false, resizable: false, width: 100, maxWidth: 100, formatter: penalizedFormatter}
];
// conditionally show the cluster node identifier
@ -484,10 +507,6 @@ nf.QueueListing = (function () {
var queueListingGrid = new Slick.Grid('#queue-listing-table', queueListingData, queueListingColumns, queueListingOptions);
queueListingGrid.setSelectionModel(new Slick.RowSelectionModel());
queueListingGrid.registerPlugin(new Slick.AutoTooltips());
queueListingGrid.onSort.subscribe(function (e, args) {
var connection = $('#queue-listing-table').data('connection');
performListing(connection, args.sortCol.id, args.sortAsc);
});
// configure a click listener
queueListingGrid.onClick.subscribe(function (e, args) {
@ -530,11 +549,8 @@ nf.QueueListing = (function () {
* @param {object} The connection
*/
listQueue: function (connection) {
var queueListingGrid = $('#queue-listing-table').data('gridInstance');
queueListingGrid.setSortColumn(DEFAULT_SORT_COL, DEFAULT_SORT_ASC);
// perform the initial listing
performListing(connection, DEFAULT_SORT_COL, DEFAULT_SORT_ASC).done(function () {
performListing(connection).done(function () {
// update the connection name
var connectionName = nf.CanvasUtils.formatConnectionName(connection.component);
if (connectionName === '') {
@ -547,6 +563,7 @@ nf.QueueListing = (function () {
$('#queue-listing-table').removeData('connection');
// clear the table
var queueListingGrid = $('#queue-listing-table').data('gridInstance');
var queueListingData = queueListingGrid.getData();
// clear the flowfiles