NIFI-730:

- Fixing checkstyle violations.
- Wiring endpoints and framework model.
- Lowering the max delay while polling from 8 seconds to 4 seconds.
This commit is contained in:
Matt Gilman 2015-10-13 13:43:10 -04:00
parent af78354d84
commit 4b41aaab02
9 changed files with 231 additions and 55 deletions

View File

@ -50,7 +50,7 @@ public class QueueSize {
/**
* Returns a new QueueSize that is the sum of this QueueSize and the provided QueueSize
*
*
* @param other the other QueueSize to add to this QueueSize
* @return a new QueueSize that is the sum of this QueueSize and the provided QueueSize
*/

View File

@ -32,11 +32,20 @@ public class DropRequestDTO {
private String uri;
private Date submissionTime;
private Date expiration;
private Date lastUpdated;
private Integer percentCompleted;
private Boolean finished;
private Integer currentCount;
private String currentSize;
private Integer originalCount;
private String originalSize;
private Integer droppedCount;
private String droppedSize;
private String state;
/**
* The id for this component.
*
@ -84,21 +93,6 @@ public class DropRequestDTO {
this.submissionTime = submissionTime;
}
/**
* @return expiration time of the query results
*/
@XmlJavaTypeAdapter(TimestampAdapter.class)
@ApiModelProperty(
value = "The timestamp when the query will expire."
)
public Date getExpiration() {
return expiration;
}
public void setExpiration(Date expiration) {
this.expiration = expiration;
}
/**
* @return percent completed
*/
@ -126,4 +120,118 @@ public class DropRequestDTO {
public void setFinished(Boolean finished) {
this.finished = finished;
}
/**
* @return the time this request was last updated
*/
@XmlJavaTypeAdapter(TimestampAdapter.class)
@ApiModelProperty(
value = "The last time this drop request was updated."
)
public Date getLastUpdated() {
return lastUpdated;
}
public void setLastUpdated(Date lastUpdated) {
this.lastUpdated = lastUpdated;
}
/**
* @return the number of flow files currently queued.
*/
@ApiModelProperty(
value = "The number of flow files currently queued."
)
public Integer getCurrentCount() {
return currentCount;
}
public void setCurrentCount(Integer currentCount) {
this.currentCount = currentCount;
}
/**
* @return the siez of the flow files currently queued.
*/
@ApiModelProperty(
value = "The size of flow files currently queued."
)
public String getCurrentSize() {
return currentSize;
}
public void setCurrentSize(String currentSize) {
this.currentSize = currentSize;
}
/**
* @return the number of flow files to be dropped as a result of this request.
*/
@ApiModelProperty(
value = "The number of flow files to be dropped as a result of this request."
)
public Integer getOriginalCount() {
return originalCount;
}
public void setOriginalCount(Integer originalCount) {
this.originalCount = originalCount;
}
/**
* @return the size of the flow files to be dropped as a result of this request.
*/
@ApiModelProperty(
value = "The size of flow files to be dropped as a result of this request."
)
public String getOriginalSize() {
return originalSize;
}
public void setOriginalSize(String originalSize) {
this.originalSize = originalSize;
}
/**
* @return the number of flow files that have been dropped thus far.
*/
@ApiModelProperty(
value = "The number of flow files that have been dropped thus far."
)
public Integer getDroppedCount() {
return droppedCount;
}
public void setDroppedCount(Integer droppedCount) {
this.droppedCount = droppedCount;
}
/**
* @return the size of the flow files that have been dropped thus far.
*/
@ApiModelProperty(
value = "The size of flow files that have been dropped thus far."
)
public String getDroppedSize() {
return droppedSize;
}
public void setDroppedSize(String droppedSize) {
this.droppedSize = droppedSize;
}
/**
* @return the current state of the drop request.
*/
@ApiModelProperty(
value = "The current state of the drop request."
)
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
}

View File

@ -531,24 +531,29 @@ public interface NiFiServiceFacade {
*
* @param groupId group
* @param connectionId The ID of the connection
* @param dropRequestId The ID of the drop request
* @return The DropRequest
*/
DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId);
DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId, String dropRequestId);
/**
* Gets the specified flow file drop request.
*
* @param groupId group
* @param connectionId The ID of the connection
* @param dropRequestId The flow file drop request
* @return The DropRequest
*/
DropRequestDTO getFlowFileDropRequest(String dropRequestId);
DropRequestDTO getFlowFileDropRequest(String groupId, String connectionId, String dropRequestId);
/**
* Cancels/removes the specified flow file drop request.
*
* @param groupId group
* @param connectionId The ID of the connection
* @param dropRequestId The flow file drop request
*/
void deleteFlowFileDropRequest(String dropRequestId);
void deleteFlowFileDropRequest(String groupId, String connectionId, String dropRequestId);
// ----------------------------------------
// InputPort methods

View File

@ -810,8 +810,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public void deleteFlowFileDropRequest(String dropRequestId) {
// TODO
public void deleteFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) {
connectionDAO.deleteFlowFileDropRequest(groupId, connectionId, dropRequestId);
}
@Override
@ -1066,15 +1066,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId) {
// TODO
final DropRequestDTO dto = new DropRequestDTO();
dto.setFinished(false);
dto.setSubmissionTime(new Date());
dto.setExpiration(new Date(System.currentTimeMillis() + 10000));
dto.setId(UUID.randomUUID().toString());
dto.setPercentCompleted(100);
return dto;
public DropRequestDTO createFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) {
return dtoFactory.createDropRequestDTO(connectionDAO.createFileFlowDropRequest(groupId, connectionId, dropRequestId));
}
@Override
@ -2110,9 +2103,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
@Override
public DropRequestDTO getFlowFileDropRequest(String dropRequestId) {
// TODO
return null;
public DropRequestDTO getFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) {
return dtoFactory.createDropRequestDTO(connectionDAO.getFlowFileDropRequest(groupId, connectionId, dropRequestId));
}
@Override
@ -3460,8 +3452,7 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
/**
* Utility method for extracting component counts from the specified group
* status.
* Utility method for extracting component counts from the specified group status.
*/
private ProcessGroupCounts extractProcessGroupCounts(ProcessGroupStatus groupStatus) {
int running = 0;

View File

@ -946,8 +946,8 @@ public class ConnectionResource extends ApplicationResource {
}
// submit the drop request
final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id);
dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequestId));
final DropRequestDTO dropRequest = serviceFacade.createFlowFileDropRequest(groupId, id, dropRequestId);
dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", id, "contents", "drop-requests", dropRequest.getId()));
// create the revision
final RevisionDTO revision = new RevisionDTO();
@ -1019,7 +1019,7 @@ public class ConnectionResource extends ApplicationResource {
}
// get the drop request
final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(dropRequestId);
final DropRequestDTO dropRequest = serviceFacade.getFlowFileDropRequest(groupId, connectionId, dropRequestId);
dropRequest.setUri(generateResourceUri("controller", "process-groups", groupId, "connections", connectionId, "contents", "drop-requests", dropRequestId));
// create the revision
@ -1086,7 +1086,7 @@ public class ConnectionResource extends ApplicationResource {
}
// delete the drop request
serviceFacade.deleteFlowFileDropRequest(dropRequestId);
serviceFacade.deleteFlowFileDropRequest(groupId, connectionId, dropRequestId);
// create the revision
final RevisionDTO revision = new RevisionDTO();

View File

@ -124,6 +124,9 @@ import org.apache.nifi.web.api.dto.status.StatusDTO;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.reporting.ReportingTask;
@ -293,6 +296,42 @@ public final class DtoFactory {
return new PositionDTO(position.getX(), position.getY());
}
/**
* Creates a DropRequestDTO from the specified flow file status.
*
* @param dropRequest dropRequest
* @return dto
*/
public DropRequestDTO createDropRequestDTO(final DropFlowFileStatus dropRequest) {
final DropRequestDTO dto = new DropRequestDTO();
dto.setId(dropRequest.getRequestIdentifier());
dto.setSubmissionTime(new Date(dropRequest.getRequestSubmissionTime()));
dto.setLastUpdated(new Date(dropRequest.getLastUpdated()));
dto.setState(dropRequest.getState().toString());
dto.setFinished(DropFlowFileState.COMPLETE.equals(dropRequest.getState()));
final QueueSize dropped = dropRequest.getDroppedSize();
dto.setDroppedCount(dropped.getObjectCount());
dto.setDroppedSize(FormatUtils.formatDataSize(dropped.getByteCount()));
if (dropRequest.getOriginalSize() != null) {
final QueueSize original = dropRequest.getOriginalSize();
dto.setOriginalCount(original.getObjectCount());
dto.setOriginalSize(FormatUtils.formatDataSize(original.getByteCount()));
dto.setPercentCompleted((dropped.getObjectCount() * 100 ) / original.getObjectCount());
} else {
dto.setPercentCompleted(0);
}
if (dropRequest.getCurrentSize() != null) {
final QueueSize current = dropRequest.getCurrentSize();
dto.setCurrentCount(current.getObjectCount());
dto.setCurrentSize(FormatUtils.formatDataSize(current.getByteCount()));
}
return dto;
}
/**
* Creates a ConnectionDTO from the specified Connection.
*
@ -1521,8 +1560,7 @@ public final class DtoFactory {
}
/**
* Creates a ProvenanceEventNodeDTO for the specified
* ProvenanceEventLineageNode.
* Creates a ProvenanceEventNodeDTO for the specified ProvenanceEventLineageNode.
*
* @param node node
* @return dto
@ -2173,9 +2211,8 @@ public final class DtoFactory {
/**
*
* @param original orig
* @param deep if <code>true</code>, all Connections, ProcessGroups, Ports,
* Processors, etc. will be copied. If <code>false</code>, the copy will
* have links to the same objects referenced by <code>original</code>.
* @param deep if <code>true</code>, all Connections, ProcessGroups, Ports, Processors, etc. will be copied. If <code>false</code>, the copy will have links to the same objects referenced by
* <code>original</code>.
*
* @return dto
*/

View File

@ -18,6 +18,7 @@ package org.apache.nifi.web.dao;
import java.util.Set;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.web.api.dto.ConnectionDTO;
public interface ConnectionDAO {
@ -34,9 +35,12 @@ public interface ConnectionDAO {
/**
* Gets the specified flow file drop request.
*
* @param groupId group id
* @param id The id of the connection
* @param dropRequestId The drop request id
* @return The drop request status
*/
void getFlowFileDropRequest(String dropRequestId);
DropFlowFileStatus getFlowFileDropRequest(String groupId, String id, String dropRequestId);
/**
* Gets the connections for the specified source processor.
@ -78,8 +82,10 @@ public interface ConnectionDAO {
*
* @param groupId group id
* @param id connection id
* @param dropRequestId drop request id
* @return The drop request status
*/
void createFileFlowDropRequest(String groupId, String id);
DropFlowFileStatus createFileFlowDropRequest(String groupId, String id, String dropRequestId);
/**
* Verifies the create request can be processed.
@ -125,7 +131,9 @@ public interface ConnectionDAO {
/**
* Deletes the specified flow file drop request.
*
* @param groupId group id
* @param id The id of the connection
* @param dropRequestId The drop request id
*/
void deleteFlowFileDropRequest(String dropRequestId);
void deleteFlowFileDropRequest(String groupId, String id, String dropRequestId);
}

View File

@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import javax.ws.rs.WebApplicationException;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
@ -31,17 +32,21 @@ import org.apache.nifi.connectable.Position;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.exception.ValidationException;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.user.NiFiUser;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.dao.ConnectionDAO;
import org.apache.nifi.web.security.user.NiFiUserUtils;
public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO {
@ -69,8 +74,16 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
}
@Override
public void getFlowFileDropRequest(String dropRequestId) {
// TODO
public DropFlowFileStatus getFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) {
final Connection connection = locateConnection(groupId, connectionId);
final FlowFileQueue queue = connection.getFlowFileQueue();
final DropFlowFileStatus dropRequest = queue.getDropFlowFileStatus(dropRequestId);
if (dropRequest == null) {
throw new ResourceNotFoundException(String.format("Unable to find drop request with id '%s'.", dropRequestId));
}
return dropRequest;
}
@Override
@ -299,8 +312,16 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
}
@Override
public void createFileFlowDropRequest(String groupId, String id) {
// TODO
public DropFlowFileStatus createFileFlowDropRequest(String groupId, String id, String dropRequestId) {
final Connection connection = locateConnection(groupId, id);
final FlowFileQueue queue = connection.getFlowFileQueue();
final NiFiUser user = NiFiUserUtils.getNiFiUser();
if (user == null) {
throw new WebApplicationException(new Throwable("Unable to access details for current user."));
}
return queue.dropFlowFiles(dropRequestId, user.getDn());
}
@Override
@ -475,8 +496,10 @@ public class StandardConnectionDAO extends ComponentDAO implements ConnectionDAO
}
@Override
public void deleteFlowFileDropRequest(String dropRequestId) {
// TODO
public void deleteFlowFileDropRequest(String groupId, String connectionId, String dropRequestId) {
final Connection connection = locateConnection(groupId, connectionId);
final FlowFileQueue queue = connection.getFlowFileQueue();
queue.cancelDropFlowFileRequest(dropRequestId);
}
/* setters */

View File

@ -854,13 +854,17 @@ nf.Actions = (function () {
if (selection.size() !== 1 || !nf.CanvasUtils.isConnection(selection)) {
return;
}
var MAX_DELAY = 4;
// process the drop request
var processDropRequest = function (dropRequest, nextDelay) {
// see if the drop request has completed
if (dropRequest.finished === true) {
// request is finished so it can be removed
deleteDropRequest(dropRequest);
} else {
// update the UI with the current status of the drop request
schedule(dropRequest, nextDelay);
}
};
@ -874,7 +878,7 @@ nf.Actions = (function () {
dataType: 'json'
}).done(function(response) {
var dropRequest = response.dropRequest;
processDropRequest(dropRequest, Math.min(8, delay * 2));
processDropRequest(dropRequest, Math.min(MAX_DELAY, delay * 2));
}).fail(nf.Common.handleAjaxError);
}, delay * 1000);
};