diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java index 7998d37537..a59db4c6af 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/queue/QueueSize.java @@ -50,7 +50,7 @@ public class QueueSize { /** * Returns a new QueueSize that is the sum of this QueueSize and the provided QueueSize - * + * * @param other the other QueueSize to add to this QueueSize * @return a new QueueSize that is the sum of this QueueSize and the provided QueueSize */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java index dd4289f734..f0224693d6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java @@ -32,11 +32,20 @@ public class DropRequestDTO { private String uri; private Date submissionTime; - private Date expiration; + private Date lastUpdated; private Integer percentCompleted; private Boolean finished; + private Integer currentCount; + private String currentSize; + private Integer originalCount; + private String originalSize; + private Integer droppedCount; + private String droppedSize; + + private String state; + /** * The id for this component. * @@ -84,21 +93,6 @@ public class DropRequestDTO { this.submissionTime = submissionTime; } - /** - * @return expiration time of the query results - */ - @XmlJavaTypeAdapter(TimestampAdapter.class) - @ApiModelProperty( - value = "The timestamp when the query will expire." - ) - public Date getExpiration() { - return expiration; - } - - public void setExpiration(Date expiration) { - this.expiration = expiration; - } - /** * @return percent completed */ @@ -126,4 +120,118 @@ public class DropRequestDTO { public void setFinished(Boolean finished) { this.finished = finished; } + + /** + * @return the time this request was last updated + */ + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty( + value = "The last time this drop request was updated." + ) + public Date getLastUpdated() { + return lastUpdated; + } + + public void setLastUpdated(Date lastUpdated) { + this.lastUpdated = lastUpdated; + } + + /** + * @return the number of flow files currently queued. + */ + @ApiModelProperty( + value = "The number of flow files currently queued." + ) + public Integer getCurrentCount() { + return currentCount; + } + + public void setCurrentCount(Integer currentCount) { + this.currentCount = currentCount; + } + + /** + * @return the siez of the flow files currently queued. + */ + @ApiModelProperty( + value = "The size of flow files currently queued." + ) + public String getCurrentSize() { + return currentSize; + } + + public void setCurrentSize(String currentSize) { + this.currentSize = currentSize; + } + + /** + * @return the number of flow files to be dropped as a result of this request. + */ + @ApiModelProperty( + value = "The number of flow files to be dropped as a result of this request." + ) + public Integer getOriginalCount() { + return originalCount; + } + + public void setOriginalCount(Integer originalCount) { + this.originalCount = originalCount; + } + + /** + * @return the size of the flow files to be dropped as a result of this request. + */ + @ApiModelProperty( + value = "The size of flow files to be dropped as a result of this request." + ) + public String getOriginalSize() { + return originalSize; + } + + public void setOriginalSize(String originalSize) { + this.originalSize = originalSize; + } + + /** + * @return the number of flow files that have been dropped thus far. + */ + @ApiModelProperty( + value = "The number of flow files that have been dropped thus far." + ) + public Integer getDroppedCount() { + return droppedCount; + } + + public void setDroppedCount(Integer droppedCount) { + this.droppedCount = droppedCount; + } + + /** + * @return the size of the flow files that have been dropped thus far. + */ + @ApiModelProperty( + value = "The size of flow files that have been dropped thus far." + ) + public String getDroppedSize() { + return droppedSize; + } + + public void setDroppedSize(String droppedSize) { + this.droppedSize = droppedSize; + } + + /** + * @return the current state of the drop request. + */ + @ApiModelProperty( + value = "The current state of the drop request." + ) + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 5f89d8edae..0703a41486 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -531,24 +531,29 @@ public interface NiFiServiceFacade { * * @param groupId group * @param connectionId The ID of the connection + * @param dropRequestId The ID of the drop request * @return The DropRequest */ - DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId); + DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId, String dropRequestId); /** * Gets the specified flow file drop request. * + * @param groupId group + * @param connectionId The ID of the connection * @param dropRequestId The flow file drop request * @return The DropRequest */ - DropRequestDTO getFlowFileDropRequest(String dropRequestId); + DropRequestDTO getFlowFileDropRequest(String groupId, String connectionId, String dropRequestId); /** * Cancels/removes the specified flow file drop request. * + * @param groupId group + * @param connectionId The ID of the connection * @param dropRequestId The flow file drop request */ - void deleteFlowFileDropRequest(String dropRequestId); + void deleteFlowFileDropRequest(String groupId, String connectionId, String dropRequestId); // ---------------------------------------- // InputPort methods diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 7f0a29619f..b2e600003e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -810,8 +810,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public void deleteFlowFileDropRequest(String dropRequestId) { - // TODO + public void deleteFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) { + connectionDAO.deleteFlowFileDropRequest(groupId, connectionId, dropRequestId); } @Override @@ -1066,15 +1066,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId) { - // TODO - final DropRequestDTO dto = new DropRequestDTO(); - dto.setFinished(false); - dto.setSubmissionTime(new Date()); - dto.setExpiration(new Date(System.currentTimeMillis() + 10000)); - dto.setId(UUID.randomUUID().toString()); - dto.setPercentCompleted(100); - return dto; + public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) { + return dtoFactory.createDropRequestDTO(connectionDAO.createFileFlowDropRequest(groupId, connectionId, dropRequestId)); } @Override @@ -2110,9 +2103,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } @Override - public DropRequestDTO getFlowFileDropRequest(String dropRequestId) { - // TODO - return null; + public DropRequestDTO getFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) { + return dtoFactory.createDropRequestDTO(connectionDAO.getFlowFileDropRequest(groupId, connectionId, dropRequestId)); } @Override @@ -3460,8 +3452,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { } /** - * Utility method for extracting component counts from the specified group - * status. + * Utility method for extracting component counts from the specified group status. */ private ProcessGroupCounts extractProcessGroupCounts(ProcessGroupStatus groupStatus) { int running = 0; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index 4265ead8ec..0d00468aff 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -946,8 +946,8 @@ public class ConnectionResource extends ApplicationResource { } // submit the drop request - final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id); - dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequestId)); + final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id, dropRequestId); + dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequest.getId())); // create the revision final RevisionDTO revision = new RevisionDTO(); @@ -1019,7 +1019,7 @@ public class ConnectionResource extends ApplicationResource { } // get the drop request - final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(dropRequestId); + final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(groupId, connectionId, dropRequestId); dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId)); // create the revision @@ -1086,7 +1086,7 @@ public class ConnectionResource extends ApplicationResource { } // delete the drop request - serviceFacade.deleteFlowFileDropRequest(dropRequestId); + serviceFacade.deleteFlowFileDropRequest(groupId, connectionId, dropRequestId); // create the revision final RevisionDTO revision = new RevisionDTO(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 76bce6ffad..d123704126 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -124,6 +124,9 @@ import org.apache.nifi.web.api.dto.status.StatusDTO; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.queue.DropFlowFileState; +import org.apache.nifi.controller.queue.DropFlowFileStatus; +import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceReference; import org.apache.nifi.reporting.ReportingTask; @@ -293,6 +296,42 @@ public final class DtoFactory { return new PositionDTO(position.getX(), position.getY()); } + /** + * Creates a DropRequestDTO from the specified flow file status. + * + * @param dropRequest dropRequest + * @return dto + */ + public DropRequestDTO createDropRequestDTO(final DropFlowFileStatus dropRequest) { + final DropRequestDTO dto = new DropRequestDTO(); + dto.setId(dropRequest.getRequestIdentifier()); + dto.setSubmissionTime(new Date(dropRequest.getRequestSubmissionTime())); + dto.setLastUpdated(new Date(dropRequest.getLastUpdated())); + dto.setState(dropRequest.getState().toString()); + dto.setFinished(DropFlowFileState.COMPLETE.equals(dropRequest.getState())); + + final QueueSize dropped = dropRequest.getDroppedSize(); + dto.setDroppedCount(dropped.getObjectCount()); + dto.setDroppedSize(FormatUtils.formatDataSize(dropped.getByteCount())); + + if (dropRequest.getOriginalSize() != null) { + final QueueSize original = dropRequest.getOriginalSize(); + dto.setOriginalCount(original.getObjectCount()); + dto.setOriginalSize(FormatUtils.formatDataSize(original.getByteCount())); + dto.setPercentCompleted((dropped.getObjectCount() * 100 ) / original.getObjectCount()); + } else { + dto.setPercentCompleted(0); + } + + if (dropRequest.getCurrentSize() != null) { + final QueueSize current = dropRequest.getCurrentSize(); + dto.setCurrentCount(current.getObjectCount()); + dto.setCurrentSize(FormatUtils.formatDataSize(current.getByteCount())); + } + + return dto; + } + /** * Creates a ConnectionDTO from the specified Connection. * @@ -1521,8 +1560,7 @@ public final class DtoFactory { } /** - * Creates a ProvenanceEventNodeDTO for the specified - * ProvenanceEventLineageNode. + * Creates a ProvenanceEventNodeDTO for the specified ProvenanceEventLineageNode. * * @param node node * @return dto @@ -2173,9 +2211,8 @@ public final class DtoFactory { /** * * @param original orig - * @param deep if true, all Connections, ProcessGroups, Ports, - * Processors, etc. will be copied. If false, the copy will - * have links to the same objects referenced by original. + * @param deep if true, all Connections, ProcessGroups, Ports, Processors, etc. will be copied. If false, the copy will have links to the same objects referenced by + * original. * * @return dto */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java index 8c099192b4..3665a4e728 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java @@ -18,6 +18,7 @@ package org.apache.nifi.web.dao; import java.util.Set; import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.queue.DropFlowFileStatus; import org.apache.nifi.web.api.dto.ConnectionDTO; public interface ConnectionDAO { @@ -34,9 +35,12 @@ public interface ConnectionDAO { /** * Gets the specified flow file drop request. * + * @param groupId group id + * @param id The id of the connection * @param dropRequestId The drop request id + * @return The drop request status */ - void getFlowFileDropRequest(String dropRequestId); + DropFlowFileStatus getFlowFileDropRequest(String groupId, String id, String dropRequestId); /** * Gets the connections for the specified source processor. @@ -78,8 +82,10 @@ public interface ConnectionDAO { * * @param groupId group id * @param id connection id + * @param dropRequestId drop request id + * @return The drop request status */ - void createFileFlowDropRequest(String groupId, String id); + DropFlowFileStatus createFileFlowDropRequest(String groupId, String id, String dropRequestId); /** * Verifies the create request can be processed. @@ -125,7 +131,9 @@ public interface ConnectionDAO { /** * Deletes the specified flow file drop request. * + * @param groupId group id + * @param id The id of the connection * @param dropRequestId The drop request id */ - void deleteFlowFileDropRequest(String dropRequestId); + void deleteFlowFileDropRequest(String groupId, String id, String dropRequestId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java index 8fa9d3bf1f..399ffa1a57 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.regex.Matcher; +import javax.ws.rs.WebApplicationException; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; @@ -31,17 +32,21 @@ import org.apache.nifi.connectable.Position; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.exception.ValidationException; +import org.apache.nifi.controller.queue.DropFlowFileStatus; +import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.processor.Relationship; import org.apache.nifi.remote.RemoteGroupPort; +import org.apache.nifi.user.NiFiUser; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.ResourceNotFoundException; import org.apache.nifi.web.api.dto.ConnectableDTO; import org.apache.nifi.web.api.dto.ConnectionDTO; import org.apache.nifi.web.api.dto.PositionDTO; import org.apache.nifi.web.dao.ConnectionDAO; +import org.apache.nifi.web.security.user.NiFiUserUtils; public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO { @@ -69,8 +74,16 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO } @Override - public void getFlowFileDropRequest(String dropRequestId) { - // TODO + public DropFlowFileStatus getFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) { + final Connection connection = locateConnection(groupId, connectionId); + final FlowFileQueue queue = connection.getFlowFileQueue(); + + final DropFlowFileStatus dropRequest = queue.getDropFlowFileStatus(dropRequestId); + if (dropRequest == null) { + throw new ResourceNotFoundException(String.format("Unable to find drop request with id '%s'.", dropRequestId)); + } + + return dropRequest; } @Override @@ -299,8 +312,16 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO } @Override - public void createFileFlowDropRequest(String groupId, String id) { - // TODO + public DropFlowFileStatus createFileFlowDropRequest(String groupId, String id, String dropRequestId) { + final Connection connection = locateConnection(groupId, id); + final FlowFileQueue queue = connection.getFlowFileQueue(); + + final NiFiUser user = NiFiUserUtils.getNiFiUser(); + if (user == null) { + throw new WebApplicationException(new Throwable("Unable to access details for current user.")); + } + + return queue.dropFlowFiles(dropRequestId, user.getDn()); } @Override @@ -475,8 +496,10 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO } @Override - public void deleteFlowFileDropRequest(String dropRequestId) { - // TODO + public void deleteFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) { + final Connection connection = locateConnection(groupId, connectionId); + final FlowFileQueue queue = connection.getFlowFileQueue(); + queue.cancelDropFlowFileRequest(dropRequestId); } /* setters */ diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js index 1d8152b464..2a94ec5268 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js @@ -854,13 +854,17 @@ nf.Actions = (function () { if (selection.size() !== 1 || !nf.CanvasUtils.isConnection(selection)) { return; } + + var MAX_DELAY = 4; // process the drop request var processDropRequest = function (dropRequest, nextDelay) { // see if the drop request has completed if (dropRequest.finished === true) { + // request is finished so it can be removed deleteDropRequest(dropRequest); } else { + // update the UI with the current status of the drop request schedule(dropRequest, nextDelay); } }; @@ -874,7 +878,7 @@ nf.Actions = (function () { dataType: 'json' }).done(function(response) { var dropRequest = response.dropRequest; - processDropRequest(dropRequest, Math.min(8, delay * 2)); + processDropRequest(dropRequest, Math.min(MAX_DELAY, delay * 2)); }).fail(nf.Common.handleAjaxError); }, delay * 1000); };