mirror of https://github.com/apache/nifi.git
Merge branch 'NIFI-259' of https://git-wip-us.apache.org/repos/asf/nifi into NIFI-259
This commit is contained in:
commit
c65829f0cb
|
@ -203,6 +203,7 @@ import org.apache.nifi.util.ReflectionUtils;
|
||||||
import org.apache.nifi.web.OptimisticLockingManager;
|
import org.apache.nifi.web.OptimisticLockingManager;
|
||||||
import org.apache.nifi.web.Revision;
|
import org.apache.nifi.web.Revision;
|
||||||
import org.apache.nifi.web.UpdateRevision;
|
import org.apache.nifi.web.UpdateRevision;
|
||||||
|
import org.apache.nifi.web.api.dto.ComponentStateDTO;
|
||||||
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
|
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
|
||||||
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
|
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
|
||||||
import org.apache.nifi.web.api.dto.DropRequestDTO;
|
import org.apache.nifi.web.api.dto.DropRequestDTO;
|
||||||
|
@ -217,6 +218,8 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
|
||||||
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
|
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
|
||||||
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
|
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
|
||||||
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
|
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
|
||||||
|
import org.apache.nifi.web.api.dto.StateEntryDTO;
|
||||||
|
import org.apache.nifi.web.api.dto.StateMapDTO;
|
||||||
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
|
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
|
||||||
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
|
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
|
||||||
import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
|
import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
|
||||||
|
@ -225,6 +228,7 @@ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
|
||||||
import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
|
import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
|
||||||
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
|
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
|
||||||
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
|
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
|
||||||
|
import org.apache.nifi.web.api.entity.ComponentStateEntity;
|
||||||
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
|
||||||
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
|
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
|
||||||
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
|
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
|
||||||
|
@ -311,6 +315,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
|
|
||||||
public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
|
public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
|
||||||
public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
|
public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
|
||||||
|
public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/state");
|
||||||
public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
|
public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}");
|
||||||
|
|
||||||
public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
|
public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
|
||||||
|
@ -327,9 +332,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
public static final Pattern COUNTERS_URI = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
|
public static final Pattern COUNTERS_URI = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}");
|
||||||
public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
|
public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
|
||||||
public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
|
public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
|
||||||
|
public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/state");
|
||||||
public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
|
public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
|
||||||
public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
|
public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
|
||||||
public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
|
public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
|
||||||
|
public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state");
|
||||||
|
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
|
public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents");
|
||||||
|
@ -2431,6 +2438,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isProcessorStateEndpoint(final URI uri, final String method) {
|
||||||
|
return "GET".equalsIgnoreCase(method) && PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean isProcessGroupEndpoint(final URI uri, final String method) {
|
private static boolean isProcessGroupEndpoint(final URI uri, final String method) {
|
||||||
return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches();
|
return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches();
|
||||||
}
|
}
|
||||||
|
@ -2498,6 +2509,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isControllerServiceStateEndpoint(final URI uri, final String method) {
|
||||||
|
return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICE_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) {
|
private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) {
|
||||||
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
|
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -2520,6 +2535,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isReportingTaskStateEndpoint(final URI uri, final String method) {
|
||||||
|
return "GET".equalsIgnoreCase(method) && REPORTING_TASK_STATE_URI_PATTERN.matcher(uri.getPath()).matches();
|
||||||
|
}
|
||||||
|
|
||||||
private static boolean isDropRequestEndpoint(final URI uri, final String method) {
|
private static boolean isDropRequestEndpoint(final URI uri, final String method) {
|
||||||
if ("DELETE".equalsIgnoreCase(method) && QUEUE_CONTENTS_URI.matcher(uri.getPath()).matches()) {
|
if ("DELETE".equalsIgnoreCase(method) && QUEUE_CONTENTS_URI.matcher(uri.getPath()).matches()) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -2533,13 +2552,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
}
|
}
|
||||||
|
|
||||||
static boolean isResponseInterpreted(final URI uri, final String method) {
|
static boolean isResponseInterpreted(final URI uri, final String method) {
|
||||||
return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method)
|
return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method) || isProcessorStateEndpoint(uri, method)
|
||||||
|| isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method)
|
|| isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method)
|
||||||
|| isProcessGroupEndpoint(uri, method)
|
|| isProcessGroupEndpoint(uri, method)
|
||||||
|| isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method)
|
|| isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method)
|
||||||
|| isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method)
|
|| isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method)
|
||||||
|| isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method)
|
|| isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method)
|
||||||
|| isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method)
|
|| isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method)
|
||||||
|| isDropRequestEndpoint(uri, method);
|
|| isDropRequestEndpoint(uri, method);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2558,6 +2577,28 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, processorMap.size()));
|
processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, processorMap.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void mergeComponentState(final ComponentStateDTO componentState, Map<NodeIdentifier, ComponentStateDTO> componentStateMap) {
|
||||||
|
final List<StateEntryDTO> localStateEntries = new ArrayList<>();
|
||||||
|
|
||||||
|
for (final Map.Entry<NodeIdentifier, ComponentStateDTO> nodeEntry : componentStateMap.entrySet()) {
|
||||||
|
final ComponentStateDTO nodeComponentState = nodeEntry.getValue();
|
||||||
|
final NodeIdentifier nodeId = nodeEntry.getKey();
|
||||||
|
final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort();
|
||||||
|
|
||||||
|
final StateMapDTO nodeLocalStateMap = nodeComponentState.getLocalState();
|
||||||
|
if (nodeLocalStateMap.getState() != null) {
|
||||||
|
for (final StateEntryDTO nodeStateEntry : nodeLocalStateMap.getState()) {
|
||||||
|
nodeStateEntry.setClusterNodeId(nodeId.getId());
|
||||||
|
nodeStateEntry.setClusterNodeAddress(nodeAddress);
|
||||||
|
localStateEntries.add(nodeStateEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// add all the local state entries
|
||||||
|
componentState.getLocalState().setState(localStateEntries);
|
||||||
|
}
|
||||||
|
|
||||||
private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) {
|
private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) {
|
||||||
final ProvenanceResultsDTO results = provenanceDto.getResults();
|
final ProvenanceResultsDTO results = provenanceDto.getResults();
|
||||||
final ProvenanceRequestDTO request = provenanceDto.getRequest();
|
final ProvenanceRequestDTO request = provenanceDto.getRequest();
|
||||||
|
@ -3451,6 +3492,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
|
||||||
}
|
}
|
||||||
mergeListingRequests(listingRequest, resultsMap);
|
mergeListingRequests(listingRequest, resultsMap);
|
||||||
|
|
||||||
|
clientResponse = new NodeResponse(clientResponse, responseEntity);
|
||||||
|
} else if (hasSuccessfulClientResponse && (isProcessorStateEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method))) {
|
||||||
|
final ComponentStateEntity responseEntity = clientResponse.getClientResponse().getEntity(ComponentStateEntity.class);
|
||||||
|
final ComponentStateDTO componentState = responseEntity.getComponentState();
|
||||||
|
|
||||||
|
final Map<NodeIdentifier, ComponentStateDTO> resultsMap = new HashMap<>();
|
||||||
|
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
|
||||||
|
if (problematicNodeResponses.contains(nodeResponse)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
final ComponentStateEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ComponentStateEntity.class);
|
||||||
|
final ComponentStateDTO nodeComponentState = nodeResponseEntity.getComponentState();
|
||||||
|
|
||||||
|
resultsMap.put(nodeResponse.getNodeId(), nodeComponentState);
|
||||||
|
}
|
||||||
|
mergeComponentState(componentState, resultsMap);
|
||||||
|
|
||||||
clientResponse = new NodeResponse(clientResponse, responseEntity);
|
clientResponse = new NodeResponse(clientResponse, responseEntity);
|
||||||
} else {
|
} else {
|
||||||
if (!nodeResponsesToDrain.isEmpty()) {
|
if (!nodeResponsesToDrain.isEmpty()) {
|
||||||
|
|
|
@ -624,6 +624,7 @@ public class ProcessorResource extends ApplicationResource {
|
||||||
/**
|
/**
|
||||||
* Clears the state for a processor.
|
* Clears the state for a processor.
|
||||||
*
|
*
|
||||||
|
* @param httpServletRequest servlet request
|
||||||
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
|
* @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response.
|
||||||
* @param version The revision is used to verify the client is working with the latest version of the flow.
|
* @param version The revision is used to verify the client is working with the latest version of the flow.
|
||||||
* @param id The id of the processor
|
* @param id The id of the processor
|
||||||
|
@ -651,6 +652,7 @@ public class ProcessorResource extends ApplicationResource {
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
public Response clearState(
|
public Response clearState(
|
||||||
|
@Context HttpServletRequest httpServletRequest,
|
||||||
@ApiParam(
|
@ApiParam(
|
||||||
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
|
value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.",
|
||||||
required = false
|
required = false
|
||||||
|
@ -672,6 +674,13 @@ public class ProcessorResource extends ApplicationResource {
|
||||||
return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
|
return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handle expects request (usually from the cluster manager)
|
||||||
|
final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER);
|
||||||
|
if (expects != null) {
|
||||||
|
serviceFacade.verifyCanClearProcessorState(groupId, id);
|
||||||
|
return generateContinueResponse().build();
|
||||||
|
}
|
||||||
|
|
||||||
// get the revision specified by the user
|
// get the revision specified by the user
|
||||||
Long revision = null;
|
Long revision = null;
|
||||||
if (version != null) {
|
if (version != null) {
|
||||||
|
|
|
@ -141,7 +141,7 @@ nf.ComponentState = (function () {
|
||||||
$.each(localState.state, function (i, stateEntry) {
|
$.each(localState.state, function (i, stateEntry) {
|
||||||
componentStateData.addItem($.extend({
|
componentStateData.addItem($.extend({
|
||||||
id: count++,
|
id: count++,
|
||||||
scope: stateEntry.nodeAddress
|
scope: stateEntry.clusterNodeAddress
|
||||||
}, stateEntry));
|
}, stateEntry));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -224,9 +224,6 @@ nf.ComponentState = (function () {
|
||||||
// clear state link
|
// clear state link
|
||||||
$('#clear-link').on('click', function () {
|
$('#clear-link').on('click', function () {
|
||||||
if ($(this).hasClass('disabled') === false) {
|
if ($(this).hasClass('disabled') === false) {
|
||||||
// clear the table
|
|
||||||
clearTable();
|
|
||||||
|
|
||||||
// clear the state
|
// clear the state
|
||||||
var revision = nf.Client.getRevision();
|
var revision = nf.Client.getRevision();
|
||||||
var component = $('#component-state-table').data('component');
|
var component = $('#component-state-table').data('component');
|
||||||
|
@ -242,6 +239,9 @@ nf.ComponentState = (function () {
|
||||||
// update the revision
|
// update the revision
|
||||||
nf.Client.setRevision(response.revision);
|
nf.Client.setRevision(response.revision);
|
||||||
|
|
||||||
|
// clear the table
|
||||||
|
clearTable();
|
||||||
|
|
||||||
// reload the table with no state
|
// reload the table with no state
|
||||||
loadComponentState()
|
loadComponentState()
|
||||||
}).fail(nf.Common.handleAjaxError);
|
}).fail(nf.Common.handleAjaxError);
|
||||||
|
@ -256,7 +256,7 @@ nf.ComponentState = (function () {
|
||||||
|
|
||||||
// conditionally show the cluster node identifier
|
// conditionally show the cluster node identifier
|
||||||
if (nf.Canvas.isClustered()) {
|
if (nf.Canvas.isClustered()) {
|
||||||
componentStateColumns.push({id: 'scope', field: 'scope', name: 'Scope', sortable: true, resizable: true, formatter: scopeFormatter});
|
componentStateColumns.push({id: 'scope', field: 'scope', name: 'Scope', sortable: true, resizable: true});
|
||||||
}
|
}
|
||||||
|
|
||||||
var componentStateOptions = {
|
var componentStateOptions = {
|
||||||
|
|
Loading…
Reference in New Issue