From 3f4bd919a9455998209ad7de4dde8e8a3e65e985 Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Wed, 13 Jan 2016 15:12:17 -0500 Subject: [PATCH] NIFI-259: - Merging responses when clustered to populate node details. - Fixed bug when clearing processor state when clustered. - Cleared the table after successfully clearing state. --- .../manager/impl/WebClusterManager.java | 65 ++++++++++++++++++- .../nifi/web/api/ProcessorResource.java | 9 +++ .../webapp/js/nf/canvas/nf-component-state.js | 10 +-- 3 files changed, 76 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index a58de2ac58..db98e7d1b3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -203,6 +203,7 @@ import org.apache.nifi.util.ReflectionUtils; import org.apache.nifi.web.OptimisticLockingManager; import org.apache.nifi.web.Revision; import org.apache.nifi.web.UpdateRevision; +import org.apache.nifi.web.api.dto.ComponentStateDTO; import org.apache.nifi.web.api.dto.ControllerServiceDTO; import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO; import org.apache.nifi.web.api.dto.DropRequestDTO; @@ -217,6 +218,8 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.dto.ReportingTaskDTO; +import org.apache.nifi.web.api.dto.StateEntryDTO; +import org.apache.nifi.web.api.dto.StateMapDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO; @@ -225,6 +228,7 @@ import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO; import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO; +import org.apache.nifi.web.api.entity.ComponentStateEntity; import org.apache.nifi.web.api.entity.ControllerServiceEntity; import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity; import org.apache.nifi.web.api.entity.ControllerServicesEntity; @@ -311,6 +315,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors"); public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}"); + public static final Pattern PROCESSOR_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}/state"); public static final Pattern CLUSTER_PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/cluster/processors/[a-f0-9\\-]{36}"); public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups"); @@ -327,9 +332,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C public static final Pattern COUNTERS_URI = Pattern.compile("/nifi-api/controller/counters/[a-f0-9\\-]{36}"); public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node"; public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}"); + public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/state"); public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references"); public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node"; public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}"); + public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}/state"); @Deprecated public static final Pattern QUEUE_CONTENTS_URI = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/connections/[a-f0-9\\-]{36}/contents"); @@ -2431,6 +2438,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return false; } + private static boolean isProcessorStateEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && PROCESSOR_STATE_URI_PATTERN.matcher(uri.getPath()).matches(); + } + private static boolean isProcessGroupEndpoint(final URI uri, final String method) { return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches(); } @@ -2498,6 +2509,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return false; } + private static boolean isControllerServiceStateEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICE_STATE_URI_PATTERN.matcher(uri.getPath()).matches(); + } + private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) { if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) { return true; @@ -2520,6 +2535,10 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C return false; } + private static boolean isReportingTaskStateEndpoint(final URI uri, final String method) { + return "GET".equalsIgnoreCase(method) && REPORTING_TASK_STATE_URI_PATTERN.matcher(uri.getPath()).matches(); + } + private static boolean isDropRequestEndpoint(final URI uri, final String method) { if ("DELETE".equalsIgnoreCase(method) && QUEUE_CONTENTS_URI.matcher(uri.getPath()).matches()) { return true; @@ -2533,13 +2552,13 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } static boolean isResponseInterpreted(final URI uri, final String method) { - return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method) + return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method) || isProcessorStateEndpoint(uri, method) || isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method) || isProcessGroupEndpoint(uri, method) || isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method) || isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method) - || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method) - || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) + || isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method) + || isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method) || isDropRequestEndpoint(uri, method); } @@ -2558,6 +2577,28 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, processorMap.size())); } + private void mergeComponentState(final ComponentStateDTO componentState, Map componentStateMap) { + final List localStateEntries = new ArrayList<>(); + + for (final Map.Entry nodeEntry : componentStateMap.entrySet()) { + final ComponentStateDTO nodeComponentState = nodeEntry.getValue(); + final NodeIdentifier nodeId = nodeEntry.getKey(); + final String nodeAddress = nodeId.getApiAddress() + ":" + nodeId.getApiPort(); + + final StateMapDTO nodeLocalStateMap = nodeComponentState.getLocalState(); + if (nodeLocalStateMap.getState() != null) { + for (final StateEntryDTO nodeStateEntry : nodeLocalStateMap.getState()) { + nodeStateEntry.setClusterNodeId(nodeId.getId()); + nodeStateEntry.setClusterNodeAddress(nodeAddress); + localStateEntries.add(nodeStateEntry); + } + } + } + + // add all the local state entries + componentState.getLocalState().setState(localStateEntries); + } + private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map resultMap, final Set problematicResponses) { final ProvenanceResultsDTO results = provenanceDto.getResults(); final ProvenanceRequestDTO request = provenanceDto.getRequest(); @@ -3451,6 +3492,24 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C } mergeListingRequests(listingRequest, resultsMap); + clientResponse = new NodeResponse(clientResponse, responseEntity); + } else if (hasSuccessfulClientResponse && (isProcessorStateEndpoint(uri, method) || isControllerServiceStateEndpoint(uri, method) || isReportingTaskStateEndpoint(uri, method))) { + final ComponentStateEntity responseEntity = clientResponse.getClientResponse().getEntity(ComponentStateEntity.class); + final ComponentStateDTO componentState = responseEntity.getComponentState(); + + final Map resultsMap = new HashMap<>(); + for (final NodeResponse nodeResponse : updatedNodesMap.values()) { + if (problematicNodeResponses.contains(nodeResponse)) { + continue; + } + + final ComponentStateEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ComponentStateEntity.class); + final ComponentStateDTO nodeComponentState = nodeResponseEntity.getComponentState(); + + resultsMap.put(nodeResponse.getNodeId(), nodeComponentState); + } + mergeComponentState(componentState, resultsMap); + clientResponse = new NodeResponse(clientResponse, responseEntity); } else { if (!nodeResponsesToDrain.isEmpty()) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java index 9418749032..778c9bfdda 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessorResource.java @@ -624,6 +624,7 @@ public class ProcessorResource extends ApplicationResource { /** * Clears the state for a processor. * + * @param httpServletRequest servlet request * @param clientId Optional client id. If the client id is not specified, a new one will be generated. This value (whether specified or generated) is included in the response. * @param version The revision is used to verify the client is working with the latest version of the flow. * @param id The id of the processor @@ -651,6 +652,7 @@ public class ProcessorResource extends ApplicationResource { } ) public Response clearState( + @Context HttpServletRequest httpServletRequest, @ApiParam( value = "If the client id is not specified, new one will be generated. This value (whether specified or generated) is included in the response.", required = false @@ -672,6 +674,13 @@ public class ProcessorResource extends ApplicationResource { return clusterManager.applyRequest(HttpMethod.POST, getAbsolutePath(), getRequestParameters(true), getHeaders()).getResponse(); } + // handle expects request (usually from the cluster manager) + final String expects = httpServletRequest.getHeader(WebClusterManager.NCM_EXPECTS_HTTP_HEADER); + if (expects != null) { + serviceFacade.verifyCanClearProcessorState(groupId, id); + return generateContinueResponse().build(); + } + // get the revision specified by the user Long revision = null; if (version != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-component-state.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-component-state.js index d4fb63c59c..c8a2d4144f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-component-state.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-component-state.js @@ -141,7 +141,7 @@ nf.ComponentState = (function () { $.each(localState.state, function (i, stateEntry) { componentStateData.addItem($.extend({ id: count++, - scope: stateEntry.nodeAddress + scope: stateEntry.clusterNodeAddress }, stateEntry)); }); } @@ -224,9 +224,6 @@ nf.ComponentState = (function () { // clear state link $('#clear-link').on('click', function () { if ($(this).hasClass('disabled') === false) { - // clear the table - clearTable(); - // clear the state var revision = nf.Client.getRevision(); var component = $('#component-state-table').data('component'); @@ -242,6 +239,9 @@ nf.ComponentState = (function () { // update the revision nf.Client.setRevision(response.revision); + // clear the table + clearTable(); + // reload the table with no state loadComponentState() }).fail(nf.Common.handleAjaxError); @@ -256,7 +256,7 @@ nf.ComponentState = (function () { // conditionally show the cluster node identifier if (nf.Canvas.isClustered()) { - componentStateColumns.push({id: 'scope', field: 'scope', name: 'Scope', sortable: true, resizable: true, formatter: scopeFormatter}); + componentStateColumns.push({id: 'scope', field: 'scope', name: 'Scope', sortable: true, resizable: true}); } var componentStateOptions = {