From e0ac7cde372f428b0655465b7adc59ad41f8f270 Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Mon, 12 Oct 2015 10:00:54 -0400 Subject: [PATCH] NIFI-730: - Starting to add support for deleting flow files from a queue by creating endpoints and starting to wire everything together. - Adding context menu item for initiating the request to drop flow files. --- .../nifi/web/api/dto/DropRequestDTO.java | 129 +++++++++++ .../web/api/entity/DropRequestEntity.java | 44 ++++ .../apache/nifi/web/NiFiServiceFacade.java | 25 ++ .../nifi/web/StandardNiFiServiceFacade.java | 24 ++ .../nifi/web/api/ConnectionResource.java | 217 +++++++++++++++++- .../apache/nifi/web/dao/ConnectionDAO.java | 22 ++ .../web/dao/impl/StandardConnectionDAO.java | 15 ++ .../main/webapp/js/nf/canvas/nf-actions.js | 58 +++++ .../webapp/js/nf/canvas/nf-context-menu.js | 10 + 9 files changed, 542 insertions(+), 2 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java new file mode 100644 index 0000000000..dd4289f734 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/DropRequestDTO.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.dto; + +import com.wordnik.swagger.annotations.ApiModelProperty; +import java.util.Date; +import javax.xml.bind.annotation.XmlType; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import org.apache.nifi.web.api.dto.util.TimestampAdapter; + +/** + * A request to drop the contents of a connection. + */ +@XmlType(name = "dropRequest") +public class DropRequestDTO { + + private String id; + private String uri; + + private Date submissionTime; + private Date expiration; + + private Integer percentCompleted; + private Boolean finished; + + /** + * The id for this component. + * + * @return The id + */ + @ApiModelProperty( + value = "The id of the component." + ) + public String getId() { + return this.id; + } + + public void setId(final String id) { + this.id = id; + } + + /** + * The uri for linking to this component in this NiFi. + * + * @return The uri + */ + @ApiModelProperty( + value = "The URI for futures requests to the component." + ) + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + /** + * @return time the query was submitted + */ + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty( + value = "The timestamp when the query was submitted." + ) + public Date getSubmissionTime() { + return submissionTime; + } + + public void setSubmissionTime(Date submissionTime) { + this.submissionTime = submissionTime; + } + + /** + * @return expiration time of the query results + */ + @XmlJavaTypeAdapter(TimestampAdapter.class) + @ApiModelProperty( + value = "The timestamp when the query will expire." + ) + public Date getExpiration() { + return expiration; + } + + public void setExpiration(Date expiration) { + this.expiration = expiration; + } + + /** + * @return percent completed + */ + @ApiModelProperty( + value = "The current percent complete." + ) + public Integer getPercentCompleted() { + return percentCompleted; + } + + public void setPercentCompleted(Integer percentCompleted) { + this.percentCompleted = percentCompleted; + } + + /** + * @return whether the query has finished + */ + @ApiModelProperty( + value = "Whether the query has finished." + ) + public Boolean isFinished() { + return finished; + } + + public void setFinished(Boolean finished) { + this.finished = finished; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java new file mode 100644 index 0000000000..078c019d12 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/DropRequestEntity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.web.api.entity; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.nifi.web.api.dto.DropRequestDTO; + +/** + * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a DropRequestDTO. + */ +@XmlRootElement(name = "dropRequestEntity") +public class DropRequestEntity extends Entity { + + private DropRequestDTO dropRequest; + + /** + * The DropRequestDTO that is being serialized. + * + * @return The DropRequestDTO object + */ + public DropRequestDTO getDropRequest() { + return dropRequest; + } + + public void setDropRequest(DropRequestDTO dropRequest) { + this.dropRequest = dropRequest; + } + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index c98b1e43c7..28f6b61535 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -43,6 +43,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO; import org.apache.nifi.web.api.dto.ProcessorDTO; import org.apache.nifi.web.api.dto.ComponentHistoryDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; @@ -525,6 +526,30 @@ public interface NiFiServiceFacade { */ ConfigurationSnapshot deleteConnection(Revision revision, String groupId, String connectionId); + /** + * Creates a new flow file drop request. + * + * @param groupId group + * @param connectionId The ID of the connection + * @return + */ + DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId); + + /** + * Gets the specified flow file drop request. + * + * @param dropRequestId The flow file drop request + * @return The DropRequest + */ + DropRequestDTO getFlowFileDropRequest(String dropRequestId); + + /** + * Cancels/removes the specified flow file drop request. + * + * @param dropRequestId The flow file drop request + */ + void deleteFlowFileDropRequest(String dropRequestId); + // ---------------------------------------- // InputPort methods // ---------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 2286213ab0..7f0a29619f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -162,6 +162,7 @@ import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.reporting.ComponentType; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; +import org.apache.nifi.web.api.dto.DropRequestDTO; import org.apache.nifi.web.api.dto.PropertyDescriptorDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO; @@ -808,6 +809,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { }); } + @Override + public void deleteFlowFileDropRequest(String dropRequestId) { + // TODO + } + @Override public ConfigurationSnapshot deleteProcessor(final Revision revision, final String groupId, final String processorId) { return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { @@ -1059,6 +1065,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { }); } + @Override + public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId) { + // TODO + final DropRequestDTO dto = new DropRequestDTO(); + dto.setFinished(false); + dto.setSubmissionTime(new Date()); + dto.setExpiration(new Date(System.currentTimeMillis() + 10000)); + dto.setId(UUID.randomUUID().toString()); + dto.setPercentCompleted(100); + return dto; + } + @Override public ConfigurationSnapshot createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) { return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest() { @@ -2091,6 +2109,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade { return dtoFactory.createConnectionDto(connectionDAO.getConnection(groupId, connectionId)); } + @Override + public DropRequestDTO getFlowFileDropRequest(String dropRequestId) { + // TODO + return null; + } + @Override public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) { return controllerFacade.getConnectionStatusHistory(groupId, connectionId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java index 64c14fa6e6..dfc20fb77f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ConnectionResource.java @@ -24,6 +24,7 @@ import com.wordnik.swagger.annotations.ApiResponses; import com.wordnik.swagger.annotations.Authorization; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -49,6 +50,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; import org.apache.nifi.cluster.manager.impl.WebClusterManager; import org.apache.nifi.util.NiFiProperties; @@ -71,6 +73,10 @@ import org.apache.nifi.web.api.request.IntegerParameter; import org.apache.nifi.web.api.request.LongParameter; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.cluster.context.ClusterContext; +import org.apache.nifi.cluster.context.ClusterContextThreadLocal; +import org.apache.nifi.web.api.dto.DropRequestDTO; +import org.apache.nifi.web.api.entity.DropRequestEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.security.access.prepost.PreAuthorize; @@ -476,8 +482,7 @@ public class ConnectionResource extends ApplicationResource { @ApiParam( value = "The connection configuration details.", required = true - ) - ConnectionEntity connectionEntity) { + ) ConnectionEntity connectionEntity) { if (connectionEntity == null || connectionEntity.getConnection() == null) { throw new IllegalArgumentException("Connection details must be specified."); @@ -886,6 +891,214 @@ public class ConnectionResource extends ApplicationResource { return clusterContext(generateOkResponse(entity)).build(); } + /** + * Drops the flowfiles in the queue of the specified connection. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param id The id of the connection + * @return A dropRequestEntity + */ + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/contents") + @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Drops the contents of the queue in this connection.", + response = DropRequestEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response dropQueueContents( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String id) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + + // ensure the id is the same across the cluster + final String dropRequestId; + final ClusterContext clusterContext = ClusterContextThreadLocal.getContext(); + if (clusterContext != null) { + dropRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString(); + } else { + dropRequestId = UUID.randomUUID().toString(); + } + + // submit the drop request + final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id); + dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequestId)); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // create the response entity + final DropRequestEntity entity = new DropRequestEntity(); + entity.setRevision(revision); + entity.setDropRequest(dropRequest); + + // generate the URI where the response will be + final URI location = URI.create(dropRequest.getUri()); + if (dropRequest.isFinished()) { + return generateCreatedResponse(location, entity).build(); + } else { + return Response.status(Status.ACCEPTED).location(location).entity(entity).build(); + } + } + + /** + * Checks the status of an outstanding drop request. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param connectionId The id of the connection + * @param dropRequestId The id of the drop request + * @return A dropRequestEntity + */ + @GET + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/contents/drop-requests/{drop-request-id}") + @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Gets the current status of a drop request for the specified connection.", + response = DropRequestEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response getDropRequest( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String connectionId, + @ApiParam( + value = "The drop request id.", + required = true + ) + @PathParam("drop-request-id") String dropRequestId) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + + // get the drop request + final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(dropRequestId); + dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId)); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // create the response entity + final DropRequestEntity entity = new DropRequestEntity(); + entity.setRevision(revision); + entity.setDropRequest(dropRequest); + + return generateOkResponse(entity).build(); + } + + /** + * Deletes the specified drop request. + * + * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. + * @param connectionId The connection id + * @param dropRequestId The drop request id + * @return A dropRequestEntity + */ + @DELETE + @Consumes(MediaType.WILDCARD) + @Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML}) + @Path("/{connection-id}/contents/drop-requests/{drop-request-id}") + @PreAuthorize("hasRole('ROLE_DFM')") + @ApiOperation( + value = "Cancels and/or removes a request drop of the contents in this connection.", + response = DropRequestEntity.class, + authorizations = { + @Authorization(value = "Data Flow Manager", type = "ROLE_DFM") + } + ) + @ApiResponses( + value = { + @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(code = 401, message = "Client could not be authenticated."), + @ApiResponse(code = 403, message = "Client is not authorized to make this request."), + @ApiResponse(code = 404, message = "The specified resource could not be found."), + @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.") + } + ) + public Response removeDropRequest( + @ApiParam( + value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", + required = false + ) + @QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId, + @ApiParam( + value = "The connection id.", + required = true + ) + @PathParam("connection-id") String connectionId, + @ApiParam( + value = "The drop request id.", + required = true + ) + @PathParam("drop-request-id") String dropRequestId) { + + // replicate if cluster manager + if (properties.isClusterManager()) { + return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); + } + + // delete the drop request + serviceFacade.deleteFlowFileDropRequest(dropRequestId); + + // create the revision + final RevisionDTO revision = new RevisionDTO(); + revision.setClientId(clientId.getClientId()); + + // create the response entity + final DropRequestEntity entity = new DropRequestEntity(); + entity.setRevision(revision); + + return generateOkResponse(entity).build(); + } + // setters public void setServiceFacade(NiFiServiceFacade serviceFacade) { this.serviceFacade = serviceFacade; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java index e0fb89e387..ce1d1fdf87 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/ConnectionDAO.java @@ -31,6 +31,13 @@ public interface ConnectionDAO { */ Connection getConnection(String groupId, String id); + /** + * Gets the specified flow file drop request. + * + * @param dropRequestId The drop request id + */ + void getFlowFileDropRequest(String dropRequestId); + /** * Gets the connections for the specified source processor. * @@ -66,6 +73,14 @@ public interface ConnectionDAO { */ Connection createConnection(String groupId, ConnectionDTO connectionDTO); + /** + * Creates a new flow file drop request. + * + * @param groupId group id + * @param id connection id + */ + void createFileFlowDropRequest(String groupId, String id); + /** * Verifies the create request can be processed. * @@ -106,4 +121,11 @@ public interface ConnectionDAO { * @param id The id of the connection */ void deleteConnection(String groupId, String id); + + /** + * Deletes the specified flow file drop request. + * + * @param dropRequestId The drop request id + */ + void deleteFlowFileDropRequest(String dropRequestId); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java index 5fbc3936b1..8fa9d3bf1f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardConnectionDAO.java @@ -68,6 +68,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO return locateConnection(groupId, id); } + @Override + public void getFlowFileDropRequest(String dropRequestId) { + // TODO + } + @Override public Set getConnectionsForSource(final String groupId, final String processorId) { final Set connections = new HashSet<>(getConnections(groupId)); @@ -293,6 +298,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO return connection; } + @Override + public void createFileFlowDropRequest(String groupId, String id) { + // TODO + } + @Override public void verifyCreate(String groupId, ConnectionDTO connectionDTO) { // validate the incoming request @@ -464,6 +474,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO group.removeConnection(connection); } + @Override + public void deleteFlowFileDropRequest(String dropRequestId) { + // TODO + } + /* setters */ public void setFlowController(final FlowController flowController) { this.flowController = flowController; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js index 3b47a8d757..bab2236fd5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-actions.js @@ -845,6 +845,64 @@ nf.Actions = (function () { } }, + /** + * Deletes the flow files in the specified connection. + * + * @param {type} selection + */ + deleteQueueContents: function (selection) { + if (selection.size() !== 1 || !nf.CanvasUtils.isConnection(selection)) { + return; + } + + // process the drop request + var processDropRequest = function (dropRequest, nextDelay) { + // see if the drop request has completed + if (dropRequest.finished === true) { + deleteDropRequest(dropRequest); + } else { + schedule(dropRequest, nextDelay); + } + }; + + // schedule for the next poll iteration + var schedule = function (dropRequest, delay) { + setTimeout(function () { + $.ajax({ + type: 'GET', + url: dropRequest.uri, + dataType: 'json' + }).done(function(response) { + var dropRequest = response.dropRequest; + processDropRequest(dropRequest, Math.min(8, delay * 2)); + }).fail(nf.Common.handleAjaxError); + }, delay * 1000); + }; + + // delete the drop request + var deleteDropRequest = function (dropRequest) { + $.ajax({ + type: 'DELETE', + url: dropRequest.uri, + dataType: 'json' + }).done(function() { + // drop request has been deleted + }).fail(nf.Common.handleAjaxError); + }; + + // get the connection data + var connection = selection.datum(); + + // issue the request to delete the flow files + $.ajax({ + type: 'DELETE', + url: connection.component.uri + '/contents', + dataType: 'json' + }).done(function(response) { + processDropRequest(response.dropRequest, 1); + }).fail(nf.Common.handleAjaxError); + }, + /** * Opens the fill color dialog for the component in the specified selection. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js index e652dd4aa1..58397d4228 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-context-menu.js @@ -277,6 +277,15 @@ nf.ContextMenu = (function () { return nf.Common.isDFM() && nf.CanvasUtils.canAllStopTransmitting(selection); }; + /** + * Only DFMs can delete flow files from a connection. + * + * @param {selection} selection + */ + var canDeleteFlowFiles = function (selection) { + return nf.Common.isDFM() && isConnection(selection); + }; + /** * Determines if the components in the specified selection can be moved into a parent group. * @@ -373,6 +382,7 @@ nf.ContextMenu = (function () { {condition: isCopyable, menuItem: {img: 'images/iconCopy.png', text: 'Copy', action: 'copy'}}, {condition: isPastable, menuItem: {img: 'images/iconPaste.png', text: 'Paste', action: 'paste'}}, {condition: canMoveToParent, menuItem: {img: 'images/iconMoveToParent.png', text: 'Move to parent group', action: 'moveIntoParent'}}, + {condition: canDeleteFlowFiles, menuItem: {img: 'images/iconDelete.png', text: 'Delete Flow Files', action: 'deleteQueueContents'}}, {condition: isDeletable, menuItem: {img: 'images/iconDelete.png', text: 'Delete', action: 'delete'}} ];