mirror of https://github.com/apache/nifi.git
Merge branch 'NIFI-730' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-730
This commit is contained in:
commit
9be37914dd
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.web.api.dto;
|
||||
|
||||
import com.wordnik.swagger.annotations.ApiModelProperty;
|
||||
import java.util.Date;
|
||||
import javax.xml.bind.annotation.XmlType;
|
||||
import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
|
||||
import org.apache.nifi.web.api.dto.util.TimestampAdapter;
|
||||
|
||||
/**
|
||||
* A request to drop the contents of a connection.
|
||||
*/
|
||||
@XmlType(name = "dropRequest")
|
||||
public class DropRequestDTO {
|
||||
|
||||
private String id;
|
||||
private String uri;
|
||||
|
||||
private Date submissionTime;
|
||||
private Date expiration;
|
||||
|
||||
private Integer percentCompleted;
|
||||
private Boolean finished;
|
||||
|
||||
/**
|
||||
* The id for this component.
|
||||
*
|
||||
* @return The id
|
||||
*/
|
||||
@ApiModelProperty(
|
||||
value = "The id of the component."
|
||||
)
|
||||
public String getId() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
public void setId(final String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
/**
|
||||
* The uri for linking to this component in this NiFi.
|
||||
*
|
||||
* @return The uri
|
||||
*/
|
||||
@ApiModelProperty(
|
||||
value = "The URI for futures requests to the component."
|
||||
)
|
||||
public String getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
public void setUri(String uri) {
|
||||
this.uri = uri;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return time the query was submitted
|
||||
*/
|
||||
@XmlJavaTypeAdapter(TimestampAdapter.class)
|
||||
@ApiModelProperty(
|
||||
value = "The timestamp when the query was submitted."
|
||||
)
|
||||
public Date getSubmissionTime() {
|
||||
return submissionTime;
|
||||
}
|
||||
|
||||
public void setSubmissionTime(Date submissionTime) {
|
||||
this.submissionTime = submissionTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return expiration time of the query results
|
||||
*/
|
||||
@XmlJavaTypeAdapter(TimestampAdapter.class)
|
||||
@ApiModelProperty(
|
||||
value = "The timestamp when the query will expire."
|
||||
)
|
||||
public Date getExpiration() {
|
||||
return expiration;
|
||||
}
|
||||
|
||||
public void setExpiration(Date expiration) {
|
||||
this.expiration = expiration;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return percent completed
|
||||
*/
|
||||
@ApiModelProperty(
|
||||
value = "The current percent complete."
|
||||
)
|
||||
public Integer getPercentCompleted() {
|
||||
return percentCompleted;
|
||||
}
|
||||
|
||||
public void setPercentCompleted(Integer percentCompleted) {
|
||||
this.percentCompleted = percentCompleted;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether the query has finished
|
||||
*/
|
||||
@ApiModelProperty(
|
||||
value = "Whether the query has finished."
|
||||
)
|
||||
public Boolean isFinished() {
|
||||
return finished;
|
||||
}
|
||||
|
||||
public void setFinished(Boolean finished) {
|
||||
this.finished = finished;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.web.api.entity;
|
||||
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
|
||||
import org.apache.nifi.web.api.dto.DropRequestDTO;
|
||||
|
||||
/**
|
||||
* A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a DropRequestDTO.
|
||||
*/
|
||||
@XmlRootElement(name = "dropRequestEntity")
|
||||
public class DropRequestEntity extends Entity {
|
||||
|
||||
private DropRequestDTO dropRequest;
|
||||
|
||||
/**
|
||||
* The DropRequestDTO that is being serialized.
|
||||
*
|
||||
* @return The DropRequestDTO object
|
||||
*/
|
||||
public DropRequestDTO getDropRequest() {
|
||||
return dropRequest;
|
||||
}
|
||||
|
||||
public void setDropRequest(DropRequestDTO dropRequest) {
|
||||
this.dropRequest = dropRequest;
|
||||
}
|
||||
|
||||
}
|
|
@ -43,6 +43,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
|
|||
import org.apache.nifi.web.api.dto.ProcessorDTO;
|
||||
import org.apache.nifi.web.api.dto.ComponentHistoryDTO;
|
||||
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
|
||||
import org.apache.nifi.web.api.dto.DropRequestDTO;
|
||||
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
|
||||
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
|
||||
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
|
||||
|
@ -525,6 +526,30 @@ public interface NiFiServiceFacade {
|
|||
*/
|
||||
ConfigurationSnapshot<Void> deleteConnection(Revision revision, String groupId, String connectionId);
|
||||
|
||||
/**
|
||||
* Creates a new flow file drop request.
|
||||
*
|
||||
* @param groupId group
|
||||
* @param connectionId The ID of the connection
|
||||
* @return
|
||||
*/
|
||||
DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId);
|
||||
|
||||
/**
|
||||
* Gets the specified flow file drop request.
|
||||
*
|
||||
* @param dropRequestId The flow file drop request
|
||||
* @return The DropRequest
|
||||
*/
|
||||
DropRequestDTO getFlowFileDropRequest(String dropRequestId);
|
||||
|
||||
/**
|
||||
* Cancels/removes the specified flow file drop request.
|
||||
*
|
||||
* @param dropRequestId The flow file drop request
|
||||
*/
|
||||
void deleteFlowFileDropRequest(String dropRequestId);
|
||||
|
||||
// ----------------------------------------
|
||||
// InputPort methods
|
||||
// ----------------------------------------
|
||||
|
|
|
@ -162,6 +162,7 @@ import org.apache.nifi.controller.service.ControllerServiceState;
|
|||
import org.apache.nifi.reporting.ComponentType;
|
||||
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
|
||||
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
|
||||
import org.apache.nifi.web.api.dto.DropRequestDTO;
|
||||
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
|
||||
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
|
||||
import org.apache.nifi.web.api.dto.status.ClusterProcessGroupStatusDTO;
|
||||
|
@ -808,6 +809,11 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteFlowFileDropRequest(String dropRequestId) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigurationSnapshot<Void> deleteProcessor(final Revision revision, final String groupId, final String processorId) {
|
||||
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<Void>() {
|
||||
|
@ -1059,6 +1065,18 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId) {
|
||||
// TODO
|
||||
final DropRequestDTO dto = new DropRequestDTO();
|
||||
dto.setFinished(false);
|
||||
dto.setSubmissionTime(new Date());
|
||||
dto.setExpiration(new Date(System.currentTimeMillis() + 10000));
|
||||
dto.setId(UUID.randomUUID().toString());
|
||||
dto.setPercentCompleted(100);
|
||||
return dto;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigurationSnapshot<ProcessorDTO> createProcessor(final Revision revision, final String groupId, final ProcessorDTO processorDTO) {
|
||||
return optimisticLockingManager.configureFlow(revision, new ConfigurationRequest<ProcessorDTO>() {
|
||||
|
@ -2091,6 +2109,12 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
|
|||
return dtoFactory.createConnectionDto(connectionDAO.getConnection(groupId, connectionId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public DropRequestDTO getFlowFileDropRequest(String dropRequestId) {
|
||||
// TODO
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public StatusHistoryDTO getConnectionStatusHistory(String groupId, String connectionId) {
|
||||
return controllerFacade.getConnectionStatusHistory(groupId, connectionId);
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.wordnik.swagger.annotations.ApiResponses;
|
|||
import com.wordnik.swagger.annotations.Authorization;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -49,6 +50,7 @@ import javax.ws.rs.core.Context;
|
|||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.MultivaluedMap;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
import org.apache.nifi.cluster.manager.impl.WebClusterManager;
|
||||
import org.apache.nifi.util.NiFiProperties;
|
||||
|
@ -71,6 +73,10 @@ import org.apache.nifi.web.api.request.IntegerParameter;
|
|||
import org.apache.nifi.web.api.request.LongParameter;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.cluster.context.ClusterContext;
|
||||
import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
|
||||
import org.apache.nifi.web.api.dto.DropRequestDTO;
|
||||
import org.apache.nifi.web.api.entity.DropRequestEntity;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.security.access.prepost.PreAuthorize;
|
||||
|
@ -476,8 +482,7 @@ public class ConnectionResource extends ApplicationResource {
|
|||
@ApiParam(
|
||||
value = "The connection configuration details.",
|
||||
required = true
|
||||
)
|
||||
ConnectionEntity connectionEntity) {
|
||||
) ConnectionEntity connectionEntity) {
|
||||
|
||||
if (connectionEntity == null || connectionEntity.getConnection() == null) {
|
||||
throw new IllegalArgumentException("Connection details must be specified.");
|
||||
|
@ -886,6 +891,214 @@ public class ConnectionResource extends ApplicationResource {
|
|||
return clusterContext(generateOkResponse(entity)).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Drops the flowfiles in the queue of the specified connection.
|
||||
*
|
||||
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
|
||||
* @param id The id of the connection
|
||||
* @return A dropRequestEntity
|
||||
*/
|
||||
@DELETE
|
||||
@Consumes(MediaType.WILDCARD)
|
||||
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
|
||||
@Path("/{connection-id}/contents")
|
||||
@PreAuthorize("hasRole('ROLE_DFM')")
|
||||
@ApiOperation(
|
||||
value = "Drops the contents of the queue in this connection.",
|
||||
response = DropRequestEntity.class,
|
||||
authorizations = {
|
||||
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
|
||||
}
|
||||
)
|
||||
@ApiResponses(
|
||||
value = {
|
||||
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
|
||||
@ApiResponse(code = 401, message = "Client could not be authenticated."),
|
||||
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
|
||||
@ApiResponse(code = 404, message = "The specified resource could not be found."),
|
||||
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
|
||||
}
|
||||
)
|
||||
public Response dropQueueContents(
|
||||
@ApiParam(
|
||||
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
|
||||
required = false
|
||||
)
|
||||
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
|
||||
@ApiParam(
|
||||
value = "The connection id.",
|
||||
required = true
|
||||
)
|
||||
@PathParam("connection-id") String id) {
|
||||
|
||||
// replicate if cluster manager
|
||||
if (properties.isClusterManager()) {
|
||||
return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
|
||||
}
|
||||
|
||||
// ensure the id is the same across the cluster
|
||||
final String dropRequestId;
|
||||
final ClusterContext clusterContext = ClusterContextThreadLocal.getContext();
|
||||
if (clusterContext != null) {
|
||||
dropRequestId = UUID.nameUUIDFromBytes(clusterContext.getIdGenerationSeed().getBytes(StandardCharsets.UTF_8)).toString();
|
||||
} else {
|
||||
dropRequestId = UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
// submit the drop request
|
||||
final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id);
|
||||
dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequestId));
|
||||
|
||||
// create the revision
|
||||
final RevisionDTO revision = new RevisionDTO();
|
||||
revision.setClientId(clientId.getClientId());
|
||||
|
||||
// create the response entity
|
||||
final DropRequestEntity entity = new DropRequestEntity();
|
||||
entity.setRevision(revision);
|
||||
entity.setDropRequest(dropRequest);
|
||||
|
||||
// generate the URI where the response will be
|
||||
final URI location = URI.create(dropRequest.getUri());
|
||||
if (dropRequest.isFinished()) {
|
||||
return generateCreatedResponse(location, entity).build();
|
||||
} else {
|
||||
return Response.status(Status.ACCEPTED).location(location).entity(entity).build();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks the status of an outstanding drop request.
|
||||
*
|
||||
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
|
||||
* @param connectionId The id of the connection
|
||||
* @param dropRequestId The id of the drop request
|
||||
* @return A dropRequestEntity
|
||||
*/
|
||||
@GET
|
||||
@Consumes(MediaType.WILDCARD)
|
||||
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
|
||||
@Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
|
||||
@PreAuthorize("hasRole('ROLE_DFM')")
|
||||
@ApiOperation(
|
||||
value = "Gets the current status of a drop request for the specified connection.",
|
||||
response = DropRequestEntity.class,
|
||||
authorizations = {
|
||||
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
|
||||
}
|
||||
)
|
||||
@ApiResponses(
|
||||
value = {
|
||||
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
|
||||
@ApiResponse(code = 401, message = "Client could not be authenticated."),
|
||||
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
|
||||
@ApiResponse(code = 404, message = "The specified resource could not be found."),
|
||||
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
|
||||
}
|
||||
)
|
||||
public Response getDropRequest(
|
||||
@ApiParam(
|
||||
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
|
||||
required = false
|
||||
)
|
||||
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
|
||||
@ApiParam(
|
||||
value = "The connection id.",
|
||||
required = true
|
||||
)
|
||||
@PathParam("connection-id") String connectionId,
|
||||
@ApiParam(
|
||||
value = "The drop request id.",
|
||||
required = true
|
||||
)
|
||||
@PathParam("drop-request-id") String dropRequestId) {
|
||||
|
||||
// replicate if cluster manager
|
||||
if (properties.isClusterManager()) {
|
||||
return clusterManager.applyRequest(HttpMethod.GET, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
|
||||
}
|
||||
|
||||
// get the drop request
|
||||
final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(dropRequestId);
|
||||
dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId));
|
||||
|
||||
// create the revision
|
||||
final RevisionDTO revision = new RevisionDTO();
|
||||
revision.setClientId(clientId.getClientId());
|
||||
|
||||
// create the response entity
|
||||
final DropRequestEntity entity = new DropRequestEntity();
|
||||
entity.setRevision(revision);
|
||||
entity.setDropRequest(dropRequest);
|
||||
|
||||
return generateOkResponse(entity).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the specified drop request.
|
||||
*
|
||||
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
|
||||
* @param connectionId The connection id
|
||||
* @param dropRequestId The drop request id
|
||||
* @return A dropRequestEntity
|
||||
*/
|
||||
@DELETE
|
||||
@Consumes(MediaType.WILDCARD)
|
||||
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
|
||||
@Path("/{connection-id}/contents/drop-requests/{drop-request-id}")
|
||||
@PreAuthorize("hasRole('ROLE_DFM')")
|
||||
@ApiOperation(
|
||||
value = "Cancels and/or removes a request drop of the contents in this connection.",
|
||||
response = DropRequestEntity.class,
|
||||
authorizations = {
|
||||
@Authorization(value = "Data Flow Manager", type = "ROLE_DFM")
|
||||
}
|
||||
)
|
||||
@ApiResponses(
|
||||
value = {
|
||||
@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
|
||||
@ApiResponse(code = 401, message = "Client could not be authenticated."),
|
||||
@ApiResponse(code = 403, message = "Client is not authorized to make this request."),
|
||||
@ApiResponse(code = 404, message = "The specified resource could not be found."),
|
||||
@ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
|
||||
}
|
||||
)
|
||||
public Response removeDropRequest(
|
||||
@ApiParam(
|
||||
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
|
||||
required = false
|
||||
)
|
||||
@QueryParam(CLIENT_ID) @DefaultValue(StringUtils.EMPTY) ClientIdParameter clientId,
|
||||
@ApiParam(
|
||||
value = "The connection id.",
|
||||
required = true
|
||||
)
|
||||
@PathParam("connection-id") String connectionId,
|
||||
@ApiParam(
|
||||
value = "The drop request id.",
|
||||
required = true
|
||||
)
|
||||
@PathParam("drop-request-id") String dropRequestId) {
|
||||
|
||||
// replicate if cluster manager
|
||||
if (properties.isClusterManager()) {
|
||||
return clusterManager.applyRequest(HttpMethod.DELETE, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
|
||||
}
|
||||
|
||||
// delete the drop request
|
||||
serviceFacade.deleteFlowFileDropRequest(dropRequestId);
|
||||
|
||||
// create the revision
|
||||
final RevisionDTO revision = new RevisionDTO();
|
||||
revision.setClientId(clientId.getClientId());
|
||||
|
||||
// create the response entity
|
||||
final DropRequestEntity entity = new DropRequestEntity();
|
||||
entity.setRevision(revision);
|
||||
|
||||
return generateOkResponse(entity).build();
|
||||
}
|
||||
|
||||
// setters
|
||||
public void setServiceFacade(NiFiServiceFacade serviceFacade) {
|
||||
this.serviceFacade = serviceFacade;
|
||||
|
|
|
@ -31,6 +31,13 @@ public interface ConnectionDAO {
|
|||
*/
|
||||
Connection getConnection(String groupId, String id);
|
||||
|
||||
/**
|
||||
* Gets the specified flow file drop request.
|
||||
*
|
||||
* @param dropRequestId The drop request id
|
||||
*/
|
||||
void getFlowFileDropRequest(String dropRequestId);
|
||||
|
||||
/**
|
||||
* Gets the connections for the specified source processor.
|
||||
*
|
||||
|
@ -66,6 +73,14 @@ public interface ConnectionDAO {
|
|||
*/
|
||||
Connection createConnection(String groupId, ConnectionDTO connectionDTO);
|
||||
|
||||
/**
|
||||
* Creates a new flow file drop request.
|
||||
*
|
||||
* @param groupId group id
|
||||
* @param id connection id
|
||||
*/
|
||||
void createFileFlowDropRequest(String groupId, String id);
|
||||
|
||||
/**
|
||||
* Verifies the create request can be processed.
|
||||
*
|
||||
|
@ -106,4 +121,11 @@ public interface ConnectionDAO {
|
|||
* @param id The id of the connection
|
||||
*/
|
||||
void deleteConnection(String groupId, String id);
|
||||
|
||||
/**
|
||||
* Deletes the specified flow file drop request.
|
||||
*
|
||||
* @param dropRequestId The drop request id
|
||||
*/
|
||||
void deleteFlowFileDropRequest(String dropRequestId);
|
||||
}
|
||||
|
|
|
@ -68,6 +68,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
|
|||
return locateConnection(groupId, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getFlowFileDropRequest(String dropRequestId) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Connection> getConnectionsForSource(final String groupId, final String processorId) {
|
||||
final Set<Connection> connections = new HashSet<>(getConnections(groupId));
|
||||
|
@ -293,6 +298,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
|
|||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createFileFlowDropRequest(String groupId, String id) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
@Override
|
||||
public void verifyCreate(String groupId, ConnectionDTO connectionDTO) {
|
||||
// validate the incoming request
|
||||
|
@ -464,6 +474,11 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
|
|||
group.removeConnection(connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteFlowFileDropRequest(String dropRequestId) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
/* setters */
|
||||
public void setFlowController(final FlowController flowController) {
|
||||
this.flowController = flowController;
|
||||
|
|
|
@ -845,6 +845,64 @@ nf.Actions = (function () {
|
|||
}
|
||||
},
|
||||
|
||||
/**
|
||||
* Deletes the flow files in the specified connection.
|
||||
*
|
||||
* @param {type} selection
|
||||
*/
|
||||
deleteQueueContents: function (selection) {
|
||||
if (selection.size() !== 1 || !nf.CanvasUtils.isConnection(selection)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// process the drop request
|
||||
var processDropRequest = function (dropRequest, nextDelay) {
|
||||
// see if the drop request has completed
|
||||
if (dropRequest.finished === true) {
|
||||
deleteDropRequest(dropRequest);
|
||||
} else {
|
||||
schedule(dropRequest, nextDelay);
|
||||
}
|
||||
};
|
||||
|
||||
// schedule for the next poll iteration
|
||||
var schedule = function (dropRequest, delay) {
|
||||
setTimeout(function () {
|
||||
$.ajax({
|
||||
type: 'GET',
|
||||
url: dropRequest.uri,
|
||||
dataType: 'json'
|
||||
}).done(function(response) {
|
||||
var dropRequest = response.dropRequest;
|
||||
processDropRequest(dropRequest, Math.min(8, delay * 2));
|
||||
}).fail(nf.Common.handleAjaxError);
|
||||
}, delay * 1000);
|
||||
};
|
||||
|
||||
// delete the drop request
|
||||
var deleteDropRequest = function (dropRequest) {
|
||||
$.ajax({
|
||||
type: 'DELETE',
|
||||
url: dropRequest.uri,
|
||||
dataType: 'json'
|
||||
}).done(function() {
|
||||
// drop request has been deleted
|
||||
}).fail(nf.Common.handleAjaxError);
|
||||
};
|
||||
|
||||
// get the connection data
|
||||
var connection = selection.datum();
|
||||
|
||||
// issue the request to delete the flow files
|
||||
$.ajax({
|
||||
type: 'DELETE',
|
||||
url: connection.component.uri + '/contents',
|
||||
dataType: 'json'
|
||||
}).done(function(response) {
|
||||
processDropRequest(response.dropRequest, 1);
|
||||
}).fail(nf.Common.handleAjaxError);
|
||||
},
|
||||
|
||||
/**
|
||||
* Opens the fill color dialog for the component in the specified selection.
|
||||
*
|
||||
|
|
|
@ -277,6 +277,15 @@ nf.ContextMenu = (function () {
|
|||
return nf.Common.isDFM() && nf.CanvasUtils.canAllStopTransmitting(selection);
|
||||
};
|
||||
|
||||
/**
|
||||
* Only DFMs can delete flow files from a connection.
|
||||
*
|
||||
* @param {selection} selection
|
||||
*/
|
||||
var canDeleteFlowFiles = function (selection) {
|
||||
return nf.Common.isDFM() && isConnection(selection);
|
||||
};
|
||||
|
||||
/**
|
||||
* Determines if the components in the specified selection can be moved into a parent group.
|
||||
*
|
||||
|
@ -373,6 +382,7 @@ nf.ContextMenu = (function () {
|
|||
{condition: isCopyable, menuItem: {img: 'images/iconCopy.png', text: 'Copy', action: 'copy'}},
|
||||
{condition: isPastable, menuItem: {img: 'images/iconPaste.png', text: 'Paste', action: 'paste'}},
|
||||
{condition: canMoveToParent, menuItem: {img: 'images/iconMoveToParent.png', text: 'Move to parent group', action: 'moveIntoParent'}},
|
||||
{condition: canDeleteFlowFiles, menuItem: {img: 'images/iconDelete.png', text: 'Delete Flow Files', action: 'deleteQueueContents'}},
|
||||
{condition: isDeletable, menuItem: {img: 'images/iconDelete.png', text: 'Delete', action: 'delete'}}
|
||||
];
|
||||
|
||||
|
|
Loading…
Reference in New Issue