From bb6c5d9d4ea02433ee8cb63cd142327c02f6e9da Mon Sep 17 00:00:00 2001 From: Matt Gilman Date: Tue, 4 Oct 2016 14:52:18 -0400 Subject: [PATCH] NIFI-2777: NIFI-2856: - Only performing response merging when the node is the cluster cooridinator even if there is a single response. - Fixing PropertyDescriptor merging to ensure the 'choosen' descriptor is included in map of all responses. This closes #1095. --- ...nseMerger.java => HttpResponseMapper.java} | 8 ++-- ...r.java => StandardHttpResponseMapper.java} | 21 ++++++----- .../StandardAsyncClusterResponse.java | 22 ++++++----- .../ThreadPoolRequestReplicator.java | 26 ++++++------- .../node/NodeClusterCoordinator.java | 37 ++++++++++--------- .../manager/PropertyDescriptorDtoMerger.java | 2 +- ... => StandardHttpResponseMapperSpec.groovy} | 13 +++---- 7 files changed, 66 insertions(+), 63 deletions(-) rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/{HttpResponseMerger.java => HttpResponseMapper.java} (94%) rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/{StandardHttpResponseMerger.java => StandardHttpResponseMapper.java} (97%) rename nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/{StandardHttpResponseMergerSpec.groovy => StandardHttpResponseMapperSpec.groovy} (96%) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java similarity index 94% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java index 6102b74e8a..659f5e17e1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/HttpResponseMapper.java @@ -17,11 +17,11 @@ package org.apache.nifi.cluster.coordination.http; +import org.apache.nifi.cluster.manager.NodeResponse; + import java.net.URI; import java.util.Set; -import org.apache.nifi.cluster.manager.NodeResponse; - /** *

* An HttpResponseMapper is responsible for taking the responses from all nodes in a cluster @@ -29,7 +29,7 @@ import org.apache.nifi.cluster.manager.NodeResponse; * user/client who made the original web requests. *

*/ -public interface HttpResponseMerger { +public interface HttpResponseMapper { /** * Maps the responses from all nodes in the cluster to a single NodeResponse object that @@ -41,7 +41,7 @@ public interface HttpResponseMerger { * * @return a single NodeResponse that represents the response that should be returned to the user/client */ - NodeResponse mergeResponses(URI uri, String httpMethod, Set nodeResponses); + NodeResponse mapResponses(URI uri, String httpMethod, Set nodeResponses, boolean merge); /** * Returns a subset (or equal set) of the given Node Responses, such that all of those returned are the responses diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java similarity index 97% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java index 512a0b3810..68719fe401 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java @@ -62,6 +62,8 @@ import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerg import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator; import org.apache.nifi.cluster.manager.NodeResponse; import org.apache.nifi.stream.io.NullOutputStream; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,16 +75,14 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; -public class StandardHttpResponseMerger implements HttpResponseMerger { +public class StandardHttpResponseMapper implements HttpResponseMapper { - private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMerger.class); + private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMapper.class); private final List endpointMergers = new ArrayList<>(); - public StandardHttpResponseMerger(final NiFiProperties nifiProperties) { + public StandardHttpResponseMapper(final NiFiProperties nifiProperties) { final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY); long snapshotMillis; try { @@ -136,11 +136,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { } @Override - public NodeResponse mergeResponses(final URI uri, final String httpMethod, final Set nodeResponses) { - if (nodeResponses.size() == 1) { - return nodeResponses.iterator().next(); - } - + public NodeResponse mapResponses(final URI uri, final String httpMethod, final Set nodeResponses, final boolean merge) { final boolean hasSuccess = hasSuccessfulResponse(nodeResponses); if (!hasSuccess) { // If we have a response that is a 3xx, 4xx, or 5xx, then we want to choose that. @@ -171,6 +167,11 @@ public class StandardHttpResponseMerger implements HttpResponseMerger { // Choose any of the successful responses to be the 'chosen one'. clientResponse = successResponses.iterator().next(); } + + if (merge == false) { + return clientResponse; + } + EndpointResponseMerger merger = getEndpointResponseMerger(uri, httpMethod); if (merger == null) { return clientResponse; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java index 3bcc8e75e1..926151e56e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/StandardAsyncClusterResponse.java @@ -17,6 +17,12 @@ package org.apache.nifi.cluster.coordination.http.replication; +import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.net.URI; import java.util.Collections; import java.util.HashMap; @@ -27,12 +33,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; -import org.apache.nifi.cluster.manager.NodeResponse; -import org.apache.nifi.cluster.protocol.NodeIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class StandardAsyncClusterResponse implements AsyncClusterResponse { private static final Logger logger = LoggerFactory.getLogger(StandardAsyncClusterResponse.class); @@ -40,10 +40,11 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { private final Set nodeIds; private final URI uri; private final String method; - private final HttpResponseMerger responseMerger; + private final HttpResponseMapper responseMapper; private final CompletionCallback completionCallback; private final Runnable completedResultFetchedCallback; private final long creationTimeNanos; + private final boolean merge; private final Map responseMap = new HashMap<>(); private final AtomicInteger requestsCompleted = new AtomicInteger(0); @@ -52,18 +53,19 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { private RuntimeException failure; // guarded by synchronizing on this public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set nodeIds, - final HttpResponseMerger responseMerger, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback) { + final HttpResponseMapper responseMapper, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge) { this.id = id; this.nodeIds = Collections.unmodifiableSet(new HashSet<>(nodeIds)); this.uri = uri; this.method = method; + this.merge = merge; creationTimeNanos = System.nanoTime(); for (final NodeIdentifier nodeId : nodeIds) { responseMap.put(nodeId, new ResponseHolder(creationTimeNanos)); } - this.responseMerger = responseMerger; + this.responseMapper = responseMapper; this.completionCallback = completionCallback; this.completedResultFetchedCallback = completedResultFetchedCallback; } @@ -142,7 +144,7 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse { .map(p -> p.getResponse()) .filter(response -> response != null) .collect(Collectors.toSet()); - mergedResponse = responseMerger.mergeResponses(uri, method, nodeResponses); + mergedResponse = responseMapper.mapResponses(uri, method, nodeResponses, merge); logger.debug("Notifying all that merged response is complete for {}", id); this.notifyAll(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index c1ee77b9d0..258588dd27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -27,8 +27,8 @@ import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; -import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; -import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; +import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; +import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; import org.apache.nifi.cluster.manager.NodeResponse; @@ -85,7 +85,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { private final Client client; // the client to use for issuing requests private final int connectionTimeoutMs; // connection timeout per node request private final int readTimeoutMs; // read timeout per node request - private final HttpResponseMerger responseMerger; + private final HttpResponseMapper responseMapper; private final EventReporter eventReporter; private final RequestCompletionCallback callback; private final ClusterCoordinator clusterCoordinator; @@ -140,7 +140,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { this.clusterCoordinator = clusterCoordinator; this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS); this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS); - this.responseMerger = new StandardHttpResponseMerger(nifiProperties); + this.responseMapper = new StandardHttpResponseMapper(nifiProperties); this.eventReporter = eventReporter; this.callback = callback; @@ -249,12 +249,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { lock.lock(); try { logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri); - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification); + return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true); } finally { lock.unlock(); } } else { - return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification); + return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true); } } @@ -269,7 +269,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain); } - return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false); + return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false); } /** @@ -286,7 +286,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { * @return an AsyncClusterResponse that can be used to obtain the result */ private AsyncClusterResponse replicate(Set nodeIds, String method, URI uri, Object entity, Map headers, boolean performVerification, - StandardAsyncClusterResponse response, boolean executionPhase) { + StandardAsyncClusterResponse response, boolean executionPhase, boolean merge) { // state validation Objects.requireNonNull(nodeIds); @@ -344,7 +344,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // create a response object if one was not already passed to us if (response == null) { response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, - responseMerger, completionCallback, responseConsumedCallback); + responseMapper, completionCallback, responseConsumedCallback, merge); responseMap.put(requestId, response); } @@ -358,7 +358,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final boolean mutableRequest = isMutableRequest(method, uri.getPath()); if (mutableRequest && performVerification) { logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId); - performVerification(nodeIds, method, uri, entity, updatedHeaders, response); + performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge); return response; } @@ -383,7 +383,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { } - private void performVerification(Set nodeIds, String method, URI uri, Object entity, Map headers, StandardAsyncClusterResponse clusterResponse) { + private void performVerification(Set nodeIds, String method, URI uri, Object entity, Map headers, StandardAsyncClusterResponse clusterResponse, boolean merge) { logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath()); final Map validationHeaders = new HashMap<>(headers); @@ -418,7 +418,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // to all nodes and we are finished. if (dissentingCount == 0) { logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath()); - replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true); + replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge); return; } @@ -743,7 +743,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { // create the resource WebResource resource = client.resource(uri); - if (responseMerger.isResponseInterpreted(uri, method)) { + if (responseMapper.isResponseInterpreted(uri, method)) { resource.addFilter(new GZIPContentEncodingFilter(false)); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java index 4b74e1b75a..c27f186ab9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java @@ -16,27 +16,12 @@ */ package org.apache.nifi.cluster.coordination.node; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Supplier; -import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.flow.FlowElection; -import org.apache.nifi.cluster.coordination.http.HttpResponseMerger; -import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger; +import org.apache.nifi.cluster.coordination.http.HttpResponseMapper; +import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper; import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback; import org.apache.nifi.cluster.event.Event; import org.apache.nifi.cluster.event.NodeEvent; @@ -73,6 +58,22 @@ import org.apache.nifi.web.revision.RevisionManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandler, RequestCompletionCallback { private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class); @@ -974,7 +975,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl * state even if they had problems handling the request. */ if (mutableRequest) { - final HttpResponseMerger responseMerger = new StandardHttpResponseMerger(nifiProperties); + final HttpResponseMapper responseMerger = new StandardHttpResponseMapper(nifiProperties); final Set problematicNodeResponses = responseMerger.getProblematicNodeResponses(nodeResponses); // all nodes failed diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java index e7ab8812ef..3c18ced4cb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/PropertyDescriptorDtoMerger.java @@ -33,7 +33,7 @@ public class PropertyDescriptorDtoMerger { for (final Map.Entry nodeEntry : dtoMap.entrySet()) { final PropertyDescriptorDTO nodePropertyDescriptor = nodeEntry.getValue(); final List nodePropertyDescriptorAllowableValues = nodePropertyDescriptor.getAllowableValues(); - if (clientPropertyDescriptor != nodePropertyDescriptor && nodePropertyDescriptorAllowableValues != null) { + if (nodePropertyDescriptorAllowableValues != null) { nodePropertyDescriptorAllowableValues.stream().forEach(allowableValueEntity -> { allowableValueMap.computeIfAbsent(nodePropertyDescriptorAllowableValues.indexOf(allowableValueEntity), propertyDescriptorToAllowableValue -> new ArrayList<>()) .add(allowableValueEntity); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy similarity index 96% rename from nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy rename to nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy index 03aa08ad0d..243fd1a310 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMergerSpec.groovy +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapperSpec.groovy @@ -20,7 +20,6 @@ import com.sun.jersey.api.client.ClientResponse import org.apache.nifi.cluster.manager.NodeResponse import org.apache.nifi.cluster.protocol.NodeIdentifier import org.apache.nifi.util.NiFiProperties -import org.apache.nifi.web.api.dto.AccessPolicyDTO import org.apache.nifi.web.api.dto.ConnectionDTO import org.apache.nifi.web.api.dto.ControllerConfigurationDTO import org.apache.nifi.web.api.dto.FunnelDTO @@ -43,10 +42,10 @@ import spock.lang.Specification import spock.lang.Unroll @Unroll -class StandardHttpResponseMergerSpec extends Specification { +class StandardHttpResponseMapperSpec extends Specification { def setup() { - def propFile = StandardHttpResponseMergerSpec.class.getResource("/conf/nifi.properties").getFile() + def propFile = StandardHttpResponseMapperSpec.class.getResource("/conf/nifi.properties").getFile() System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, propFile } @@ -56,7 +55,7 @@ class StandardHttpResponseMergerSpec extends Specification { def "MergeResponses: mixed HTTP GET response statuses, expecting #expectedStatus"() { given: - def responseMerger = new StandardHttpResponseMerger(NiFiProperties.createBasicNiFiProperties(null,null)) + def responseMapper = new StandardHttpResponseMapper(NiFiProperties.createBasicNiFiProperties(null,null)) def requestUri = new URI('http://server/resource') def requestId = UUID.randomUUID().toString() def Map> mockToRequestEntity = [:] @@ -68,7 +67,7 @@ class StandardHttpResponseMergerSpec extends Specification { } as Set when: - def returnedResponse = responseMerger.mergeResponses(requestUri, 'get', nodeResponseSet).getStatus() + def returnedResponse = responseMapper.mapResponses(requestUri, 'get', nodeResponseSet, true).getStatus() then: mockToRequestEntity.entrySet().forEach { @@ -94,7 +93,7 @@ class StandardHttpResponseMergerSpec extends Specification { mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector)); and: "setup of the data to be used in the test" - def responseMerger = new StandardHttpResponseMerger(NiFiProperties.createBasicNiFiProperties(null,null)) + def responseMerger = new StandardHttpResponseMapper(NiFiProperties.createBasicNiFiProperties(null,null)) def requestUri = new URI("http://server/$requestUriPart") def requestId = UUID.randomUUID().toString() def Map mockToRequestEntity = [:] @@ -107,7 +106,7 @@ class StandardHttpResponseMergerSpec extends Specification { } as Set when: - def returnedResponse = responseMerger.mergeResponses(requestUri, httpMethod, nodeResponseSet) + def returnedResponse = responseMerger.mapResponses(requestUri, httpMethod, nodeResponseSet, true) then: mockToRequestEntity.entrySet().forEach {