NIFI-730:

- Starting to add support for deleting flow files from a queue by creating endpoints and starting to wire everything together.
- Adding context menu item for initiating the request to drop flow files.
This commit is contained in:
Matt Gilman 2015-10-12 10:00:54 -04:00
parent b4bfcc1f21
commit e0ac7cde37
9 changed files with 542 additions and 2 deletions

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto;
import com.wordnik.swagger.annotations.ApiModelProperty;
import java.util.Date;
import javax.xml.bind.annotation.XmlType;
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
import org.apache.nifi.web.api.dto.util.TimestampAdapter;
/**
* A request to drop the contents of a connection.
*/
@XmlType(name = "dropRequest")
public class DropRequestDTO {
private String id;
private String uri;
private Date submissionTime;
private Date expiration;
private Integer percentCompleted;
private Boolean finished;
/**
* The id for this component.
*
* @return The id
*/
@ApiModelProperty(
value = "The id of the component."
)
public String getId() {
return this.id;
}
public void setId(final String id) {
this.id = id;
}
/**
* The uri for linking to this component in this NiFi.
*
* @return The uri
*/
@ApiModelProperty(
value = "The URI for futures requests to the component."
)
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
/**
* @return time the query was submitted
*/
@XmlJavaTypeAdapter(TimestampAdapter.class)
@ApiModelProperty(
value = "The timestamp when the query was submitted."
)
public Date getSubmissionTime() {
return submissionTime;
}
public void setSubmissionTime(Date submissionTime) {
this.submissionTime = submissionTime;
}
/**
* @return expiration time of the query results
*/
@XmlJavaTypeAdapter(TimestampAdapter.class)
@ApiModelProperty(
value = "The timestamp when the query will expire."
)
public Date getExpiration() {
return expiration;
}
public void setExpiration(Date expiration) {
this.expiration = expiration;
}
/**
* @return percent completed
*/
@ApiModelProperty(
value = "The current percent complete."
)
public Integer getPercentCompleted() {
return percentCompleted;
}
public void setPercentCompleted(Integer percentCompleted) {
this.percentCompleted = percentCompleted;
}
/**
* @return whether the query has finished
*/
@ApiModelProperty(
value = "Whether the query has finished."
)
public Boolean isFinished() {
return finished;
}
public void setFinished(Boolean finished) {
this.finished = finished;
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.entity;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.nifi.web.api.dto.DropRequestDTO;
/**
* A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a DropRequestDTO.
*/
@XmlRootElement(name = "dropRequestEntity")
public class DropRequestEntity extends Entity {
private DropRequestDTO dropRequest;
/**
* The DropRequestDTO that is being serialized.
*
* @return The DropRequestDTO object
*/
public DropRequestDTO getDropRequest() {
return dropRequest;
}
public void setDropRequest(DropRequestDTO dropRequest) {
this.dropRequest = dropRequest;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
@ -525,6 +526,30 @@ public interface NiFiServiceFacade {
*/
ConfigurationSnapshot<Void> deleteConnection(Revision revision, String groupId, String connectionId);
/**
* Creates a new flow file drop request.
*
* @param groupId group
* @param connectionId The ID of the connection
* @return
*/
DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId);
/**
* Gets the specified flow file drop request.
*
* @param dropRequestId The flow file drop request
* @return The DropRequest
*/
DropRequestDTO getFlowFileDropRequest(String dropRequestId);
/**
* Cancels/removes the specified flow file drop request.
*
* @param dropRequestId The flow file drop request
*/
void deleteFlowFileDropRequest(String dropRequestId);
// ----------------------------------------
// InputPort methods
// ----------------------------------------

View File

@ -162,6 +162,7 @@ import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
@ -808,6 +809,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
});
}
@Override
public void deleteFlowFileDropRequest(String dropRequestId) {
// TODO
}
@Override
public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
@ -1059,6 +1065,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
});
}
@Override
public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId) {
// TODO
final DropRequestDTO dto = new DropRequestDTO();
dto.setFinished(false);
dto.setSubmissionTime(new Date());
dto.setExpiration(new Date(System.currentTimeMillis() + 10000));
dto.setId(UUID.randomUUID().toString());
dto.setPercentCompleted(100);
return dto;
}
@Override
public ConfigurationSnapshot<ProcessorDTO> createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
@ -2091,6 +2109,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return dtoFactory.createConnectionDto(connectionDAO.getConnection(groupId, connectionId));
}
@Override
public DropRequestDTO getFlowFileDropRequest(String dropRequestId) {
// TODO
return null;
}
@Override
public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) {
return controllerFacade.getConnectionStatusHistory(groupId, connectionId);

View File

@ -24,6 +24,7 @@ import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -49,6 +50,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
import org.apache.nifi.util.NiFiProperties;
@ -71,6 +73,10 @@ import org.apache.nifi.web.api.request.IntegerParameter;
import org.apache.nifi.web.api.request.LongParameter;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
import org.apache.nifi.web.api.dto.DropRequestDTO;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.prepost.PreAuthorize;
@ -476,8 +482,7 @@ public class ConnectionResource extends ApplicationResource {
@ApiParam(
value = "The connection configuration details.",
required = true
)
ConnectionEntity connectionEntity) {
) ConnectionEntity connectionEntity) {
if (connectionEntity == null || connectionEntity.getConnection() == null) {
throw new IllegalArgumentException("Connection details must be specified.");
@ -886,6 +891,214 @@ public class ConnectionResource extends ApplicationResource {
return clusterContext(generateOkResponse(entity)).build();
}
/**
* Drops the flowfiles in the queue of the specified connection.
*
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param id The id of the connection
* @return A dropRequestEntity
*/
@DELETE
@Consumes(MediaType.WILDCARD)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/{connection-id}/contents")
@PreAuthorize("hasRole('ROLE_DFM')")
@ApiOperation(
value = "Drops the contents of the queue in this connection.",
response = DropRequestEntity.class,
authorizations = {
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response dropQueueContents(
@ApiParam(
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
required = false
)
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@ApiParam(
value = "The connection id.",
required = true
)
@PathParam("connection-id") String id) {
// replicate if cluster manager
if (properties.isClusterManager()) {
return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
// ensure the id is the same across the cluster
final String dropRequestId;
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
if (clusterContext != null) {
dropRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString();
} else {
dropRequestId = UUID.randomUUID().toString();
}
// submit the drop request
final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id);
dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequestId));
// create the revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create the response entity
final DropRequestEntity entity = new DropRequestEntity();
entity.setRevision(revision);
entity.setDropRequest(dropRequest);
// generate the URI where the response will be
final URI location = URI.create(dropRequest.getUri());
if (dropRequest.isFinished()) {
return generateCreatedResponse(location, entity).build();
} else {
return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
}
}
/**
* Checks the status of an outstanding drop request.
*
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param connectionId The id of the connection
* @param dropRequestId The id of the drop request
* @return A dropRequestEntity
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
@PreAuthorize("hasRole('ROLE_DFM')")
@ApiOperation(
value = "Gets the current status of a drop request for the specified connection.",
response = DropRequestEntity.class,
authorizations = {
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response getDropRequest(
@ApiParam(
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
required = false
)
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@ApiParam(
value = "The connection id.",
required = true
)
@PathParam("connection-id") String connectionId,
@ApiParam(
value = "The drop request id.",
required = true
)
@PathParam("drop-request-id") String dropRequestId) {
// replicate if cluster manager
if (properties.isClusterManager()) {
return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
// get the drop request
final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(dropRequestId);
dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId));
// create the revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create the response entity
final DropRequestEntity entity = new DropRequestEntity();
entity.setRevision(revision);
entity.setDropRequest(dropRequest);
return generateOkResponse(entity).build();
}
/**
* Deletes the specified drop request.
*
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
* @param connectionId The connection id
* @param dropRequestId The drop request id
* @return A dropRequestEntity
*/
@DELETE
@Consumes(MediaType.WILDCARD)
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
@Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
@PreAuthorize("hasRole('ROLE_DFM')")
@ApiOperation(
value = "Cancels and/or removes a request drop of the contents in this connection.",
response = DropRequestEntity.class,
authorizations = {
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
}
)
@ApiResponses(
value = {
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(code = 401, message = "Client could not be authenticated."),
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
@ApiResponse(code = 404, message = "The specified resource could not be found."),
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
}
)
public Response removeDropRequest(
@ApiParam(
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
required = false
)
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
@ApiParam(
value = "The connection id.",
required = true
)
@PathParam("connection-id") String connectionId,
@ApiParam(
value = "The drop request id.",
required = true
)
@PathParam("drop-request-id") String dropRequestId) {
// replicate if cluster manager
if (properties.isClusterManager()) {
return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
}
// delete the drop request
serviceFacade.deleteFlowFileDropRequest(dropRequestId);
// create the revision
final RevisionDTO revision = new RevisionDTO();
revision.setClientId(clientId.getClientId());
// create the response entity
final DropRequestEntity entity = new DropRequestEntity();
entity.setRevision(revision);
return generateOkResponse(entity).build();
}
// setters
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
this.serviceFacade = serviceFacade;

View File

@ -31,6 +31,13 @@ public interface ConnectionDAO {
*/
Connection getConnection(String groupId, String id);
/**
* Gets the specified flow file drop request.
*
* @param dropRequestId The drop request id
*/
void getFlowFileDropRequest(String dropRequestId);
/**
* Gets the connections for the specified source processor.
*
@ -66,6 +73,14 @@ public interface ConnectionDAO {
*/
Connection createConnection(String groupId, ConnectionDTO connectionDTO);
/**
* Creates a new flow file drop request.
*
* @param groupId group id
* @param id connection id
*/
void createFileFlowDropRequest(String groupId, String id);
/**
* Verifies the create request can be processed.
*
@ -106,4 +121,11 @@ public interface ConnectionDAO {
* @param id The id of the connection
*/
void deleteConnection(String groupId, String id);
/**
* Deletes the specified flow file drop request.
*
* @param dropRequestId The drop request id
*/
void deleteFlowFileDropRequest(String dropRequestId);
}

View File

@ -68,6 +68,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
return locateConnection(groupId, id);
}
@Override
public void getFlowFileDropRequest(String dropRequestId) {
// TODO
}
@Override
public Set<Connection> getConnectionsForSource(final String groupId, final String processorId) {
final Set<Connection> connections = new HashSet<>(getConnections(groupId));
@ -293,6 +298,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
return connection;
}
@Override
public void createFileFlowDropRequest(String groupId, String id) {
// TODO
}
@Override
public void verifyCreate(String groupId, ConnectionDTO connectionDTO) {
// validate the incoming request
@ -464,6 +474,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
group.removeConnection(connection);
}
@Override
public void deleteFlowFileDropRequest(String dropRequestId) {
// TODO
}
/* setters */
public void setFlowController(final FlowController flowController) {
this.flowController = flowController;

View File

@ -845,6 +845,64 @@ nf.Actions = (function () {
}
},
/**
* Deletes the flow files in the specified connection.
*
* @param {type} selection
*/
deleteQueueContents: function (selection) {
if (selection.size() !== 1 || !nf.CanvasUtils.isConnection(selection)) {
return;
}
// process the drop request
var processDropRequest = function (dropRequest, nextDelay) {
// see if the drop request has completed
if (dropRequest.finished === true) {
deleteDropRequest(dropRequest);
} else {
schedule(dropRequest, nextDelay);
}
};
// schedule for the next poll iteration
var schedule = function (dropRequest, delay) {
setTimeout(function () {
$.ajax({
type: 'GET',
url: dropRequest.uri,
dataType: 'json'
}).done(function(response) {
var dropRequest = response.dropRequest;
processDropRequest(dropRequest, Math.min(8, delay * 2));
}).fail(nf.Common.handleAjaxError);
}, delay * 1000);
};
// delete the drop request
var deleteDropRequest = function (dropRequest) {
$.ajax({
type: 'DELETE',
url: dropRequest.uri,
dataType: 'json'
}).done(function() {
// drop request has been deleted
}).fail(nf.Common.handleAjaxError);
};
// get the connection data
var connection = selection.datum();
// issue the request to delete the flow files
$.ajax({
type: 'DELETE',
url: connection.component.uri + '/contents',
dataType: 'json'
}).done(function(response) {
processDropRequest(response.dropRequest, 1);
}).fail(nf.Common.handleAjaxError);
},
/**
* Opens the fill color dialog for the component in the specified selection.
*

View File

@ -277,6 +277,15 @@ nf.ContextMenu = (function () {
return nf.Common.isDFM() && nf.CanvasUtils.canAllStopTransmitting(selection);
};
/**
* Only DFMs can delete flow files from a connection.
*
* @param {selection} selection
*/
var canDeleteFlowFiles = function (selection) {
return nf.Common.isDFM() && isConnection(selection);
};
/**
* Determines if the components in the specified selection can be moved into a parent group.
*
@ -373,6 +382,7 @@ nf.ContextMenu = (function () {
{condition: isCopyable, menuItem: {img: 'images/iconCopy.png', text: 'Copy', action: 'copy'}},
{condition: isPastable, menuItem: {img: 'images/iconPaste.png', text: 'Paste', action: 'paste'}},
{condition: canMoveToParent, menuItem: {img: 'images/iconMoveToParent.png', text: 'Move to parent group', action: 'moveIntoParent'}},
{condition: canDeleteFlowFiles, menuItem: {img: 'images/iconDelete.png', text: 'Delete Flow Files', action: 'deleteQueueContents'}},
{condition: isDeletable, menuItem: {img: 'images/iconDelete.png', text: 'Delete', action: 'delete'}}
];