NIFI-2777:

NIFI-2856:
- Only performing response merging when the node is the cluster cooridinator even if there is a single response.
- Fixing PropertyDescriptor merging to ensure the 'choosen' descriptor is included in map of all responses.

This closes #1095.
This commit is contained in:
Matt Gilman 2016-10-04 14:52:18 -04:00 committed by Mark Payne
parent 9304df4de0
commit bb6c5d9d4e
7 changed files with 66 additions and 63 deletions

View File

@ -17,11 +17,11 @@
package org.apache.nifi.cluster.coordination.http;
import org.apache.nifi.cluster.manager.NodeResponse;
import java.net.URI;
import java.util.Set;
import org.apache.nifi.cluster.manager.NodeResponse;
/**
* <p>
* An HttpResponseMapper is responsible for taking the responses from all nodes in a cluster
@ -29,7 +29,7 @@ import org.apache.nifi.cluster.manager.NodeResponse;
* user/client who made the original web requests.
* </p>
*/
public interface HttpResponseMerger {
public interface HttpResponseMapper {
/**
* Maps the responses from all nodes in the cluster to a single NodeResponse object that
@ -41,7 +41,7 @@ public interface HttpResponseMerger {
*
* @return a single NodeResponse that represents the response that should be returned to the user/client
*/
NodeResponse mergeResponses(URI uri, String httpMethod, Set<NodeResponse> nodeResponses);
NodeResponse mapResponses(URI uri, String httpMethod, Set<NodeResponse> nodeResponses, boolean merge);
/**
* Returns a subset (or equal set) of the given Node Responses, such that all of those returned are the responses

View File

@ -62,6 +62,8 @@ import org.apache.nifi.cluster.coordination.http.endpoints.TemplatesEndpointMerg
import org.apache.nifi.cluster.coordination.http.replication.RequestReplicator;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -73,16 +75,14 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
public class StandardHttpResponseMerger implements HttpResponseMerger {
public class StandardHttpResponseMapper implements HttpResponseMapper {
private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMerger.class);
private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMapper.class);
private final List<EndpointResponseMerger> endpointMergers = new ArrayList<>();
public StandardHttpResponseMerger(final NiFiProperties nifiProperties) {
public StandardHttpResponseMapper(final NiFiProperties nifiProperties) {
final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
long snapshotMillis;
try {
@ -136,11 +136,7 @@ public class StandardHttpResponseMerger implements HttpResponseMerger {
}
@Override
public NodeResponse mergeResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses) {
if (nodeResponses.size() == 1) {
return nodeResponses.iterator().next();
}
public NodeResponse mapResponses(final URI uri, final String httpMethod, final Set<NodeResponse> nodeResponses, final boolean merge) {
final boolean hasSuccess = hasSuccessfulResponse(nodeResponses);
if (!hasSuccess) {
// If we have a response that is a 3xx, 4xx, or 5xx, then we want to choose that.
@ -171,6 +167,11 @@ public class StandardHttpResponseMerger implements HttpResponseMerger {
// Choose any of the successful responses to be the 'chosen one'.
clientResponse = successResponses.iterator().next();
}
if (merge == false) {
return clientResponse;
}
EndpointResponseMerger merger = getEndpointResponseMerger(uri, httpMethod);
if (merger == null) {
return clientResponse;

View File

@ -17,6 +17,12 @@
package org.apache.nifi.cluster.coordination.http.replication;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
@ -27,12 +33,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StandardAsyncClusterResponse implements AsyncClusterResponse {
private static final Logger logger = LoggerFactory.getLogger(StandardAsyncClusterResponse.class);
@ -40,10 +40,11 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
private final Set<NodeIdentifier> nodeIds;
private final URI uri;
private final String method;
private final HttpResponseMerger responseMerger;
private final HttpResponseMapper responseMapper;
private final CompletionCallback completionCallback;
private final Runnable completedResultFetchedCallback;
private final long creationTimeNanos;
private final boolean merge;
private final Map<NodeIdentifier, ResponseHolder> responseMap = new HashMap<>();
private final AtomicInteger requestsCompleted = new AtomicInteger(0);
@ -52,18 +53,19 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
private RuntimeException failure; // guarded by synchronizing on this
public StandardAsyncClusterResponse(final String id, final URI uri, final String method, final Set<NodeIdentifier> nodeIds,
final HttpResponseMerger responseMerger, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback) {
final HttpResponseMapper responseMapper, final CompletionCallback completionCallback, final Runnable completedResultFetchedCallback, final boolean merge) {
this.id = id;
this.nodeIds = Collections.unmodifiableSet(new HashSet<>(nodeIds));
this.uri = uri;
this.method = method;
this.merge = merge;
creationTimeNanos = System.nanoTime();
for (final NodeIdentifier nodeId : nodeIds) {
responseMap.put(nodeId, new ResponseHolder(creationTimeNanos));
}
this.responseMerger = responseMerger;
this.responseMapper = responseMapper;
this.completionCallback = completionCallback;
this.completedResultFetchedCallback = completedResultFetchedCallback;
}
@ -142,7 +144,7 @@ public class StandardAsyncClusterResponse implements AsyncClusterResponse {
.map(p -> p.getResponse())
.filter(response -> response != null)
.collect(Collectors.toSet());
mergedResponse = responseMerger.mergeResponses(uri, method, nodeResponses);
mergedResponse = responseMapper.mapResponses(uri, method, nodeResponses, merge);
logger.debug("Notifying all that merged response is complete for {}", id);
this.notifyAll();

View File

@ -27,8 +27,8 @@ import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse;
@ -85,7 +85,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
private final Client client; // the client to use for issuing requests
private final int connectionTimeoutMs; // connection timeout per node request
private final int readTimeoutMs; // read timeout per node request
private final HttpResponseMerger responseMerger;
private final HttpResponseMapper responseMapper;
private final EventReporter eventReporter;
private final RequestCompletionCallback callback;
private final ClusterCoordinator clusterCoordinator;
@ -140,7 +140,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
this.clusterCoordinator = clusterCoordinator;
this.connectionTimeoutMs = (int) FormatUtils.getTimeDuration(connectionTimeout, TimeUnit.MILLISECONDS);
this.readTimeoutMs = (int) FormatUtils.getTimeDuration(readTimeout, TimeUnit.MILLISECONDS);
this.responseMerger = new StandardHttpResponseMerger(nifiProperties);
this.responseMapper = new StandardHttpResponseMapper(nifiProperties);
this.eventReporter = eventReporter;
this.callback = callback;
@ -249,12 +249,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
lock.lock();
try {
logger.debug("Lock {} obtained in order to replicate request {} {}", method, uri);
return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification);
return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true);
} finally {
lock.unlock();
}
} else {
return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification);
return replicate(nodeIds, method, uri, entity, updatedHeaders, performVerification, null, !performVerification, true);
}
}
@ -269,7 +269,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
updatedHeaders.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain);
}
return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false);
return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false);
}
/**
@ -286,7 +286,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
* @return an AsyncClusterResponse that can be used to obtain the result
*/
private AsyncClusterResponse replicate(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, boolean performVerification,
StandardAsyncClusterResponse response, boolean executionPhase) {
StandardAsyncClusterResponse response, boolean executionPhase, boolean merge) {
// state validation
Objects.requireNonNull(nodeIds);
@ -344,7 +344,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// create a response object if one was not already passed to us
if (response == null) {
response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds,
responseMerger, completionCallback, responseConsumedCallback);
responseMapper, completionCallback, responseConsumedCallback, merge);
responseMap.put(requestId, response);
}
@ -358,7 +358,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
final boolean mutableRequest = isMutableRequest(method, uri.getPath());
if (mutableRequest && performVerification) {
logger.debug("Performing verification (first phase of two-phase commit) for Request ID {}", requestId);
performVerification(nodeIds, method, uri, entity, updatedHeaders, response);
performVerification(nodeIds, method, uri, entity, updatedHeaders, response, merge);
return response;
}
@ -383,7 +383,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
}
private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse) {
private void performVerification(Set<NodeIdentifier> nodeIds, String method, URI uri, Object entity, Map<String, String> headers, StandardAsyncClusterResponse clusterResponse, boolean merge) {
logger.debug("Verifying that mutable request {} {} can be made", method, uri.getPath());
final Map<String, String> validationHeaders = new HashMap<>(headers);
@ -418,7 +418,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// to all nodes and we are finished.
if (dissentingCount == 0) {
logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath());
replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true);
replicate(nodeIds, method, uri, entity, headers, false, clusterResponse, true, merge);
return;
}
@ -743,7 +743,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
// create the resource
WebResource resource = client.resource(uri);
if (responseMerger.isResponseInterpreted(uri, method)) {
if (responseMapper.isResponseInterpreted(uri, method)) {
resource.addFilter(new GZIPContentEncodingFilter(false));
}

View File

@ -16,27 +16,12 @@
*/
package org.apache.nifi.cluster.coordination.node;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.flow.FlowElection;
import org.apache.nifi.cluster.coordination.http.HttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMerger;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.event.NodeEvent;
@ -73,6 +58,22 @@ import org.apache.nifi.web.revision.RevisionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandler, RequestCompletionCallback {
private static final Logger logger = LoggerFactory.getLogger(NodeClusterCoordinator.class);
@ -974,7 +975,7 @@ public class NodeClusterCoordinator implements ClusterCoordinator, ProtocolHandl
* state even if they had problems handling the request.
*/
if (mutableRequest) {
final HttpResponseMerger responseMerger = new StandardHttpResponseMerger(nifiProperties);
final HttpResponseMapper responseMerger = new StandardHttpResponseMapper(nifiProperties);
final Set<NodeResponse> problematicNodeResponses = responseMerger.getProblematicNodeResponses(nodeResponses);
// all nodes failed

View File

@ -33,7 +33,7 @@ public class PropertyDescriptorDtoMerger {
for (final Map.Entry<NodeIdentifier, PropertyDescriptorDTO> nodeEntry : dtoMap.entrySet()) {
final PropertyDescriptorDTO nodePropertyDescriptor = nodeEntry.getValue();
final List<AllowableValueEntity> nodePropertyDescriptorAllowableValues = nodePropertyDescriptor.getAllowableValues();
if (clientPropertyDescriptor != nodePropertyDescriptor && nodePropertyDescriptorAllowableValues != null) {
if (nodePropertyDescriptorAllowableValues != null) {
nodePropertyDescriptorAllowableValues.stream().forEach(allowableValueEntity -> {
allowableValueMap.computeIfAbsent(nodePropertyDescriptorAllowableValues.indexOf(allowableValueEntity), propertyDescriptorToAllowableValue -> new ArrayList<>())
.add(allowableValueEntity);

View File

@ -20,7 +20,6 @@ import com.sun.jersey.api.client.ClientResponse
import org.apache.nifi.cluster.manager.NodeResponse
import org.apache.nifi.cluster.protocol.NodeIdentifier
import org.apache.nifi.util.NiFiProperties
import org.apache.nifi.web.api.dto.AccessPolicyDTO
import org.apache.nifi.web.api.dto.ConnectionDTO
import org.apache.nifi.web.api.dto.ControllerConfigurationDTO
import org.apache.nifi.web.api.dto.FunnelDTO
@ -43,10 +42,10 @@ import spock.lang.Specification
import spock.lang.Unroll
@Unroll
class StandardHttpResponseMergerSpec extends Specification {
class StandardHttpResponseMapperSpec extends Specification {
def setup() {
def propFile = StandardHttpResponseMergerSpec.class.getResource("/conf/nifi.properties").getFile()
def propFile = StandardHttpResponseMapperSpec.class.getResource("/conf/nifi.properties").getFile()
System.setProperty NiFiProperties.PROPERTIES_FILE_PATH, propFile
}
@ -56,7 +55,7 @@ class StandardHttpResponseMergerSpec extends Specification {
def "MergeResponses: mixed HTTP GET response statuses, expecting #expectedStatus"() {
given:
def responseMerger = new StandardHttpResponseMerger(NiFiProperties.createBasicNiFiProperties(null,null))
def responseMapper = new StandardHttpResponseMapper(NiFiProperties.createBasicNiFiProperties(null,null))
def requestUri = new URI('http://server/resource')
def requestId = UUID.randomUUID().toString()
def Map<ClientResponse, Map<String, Integer>> mockToRequestEntity = [:]
@ -68,7 +67,7 @@ class StandardHttpResponseMergerSpec extends Specification {
} as Set
when:
def returnedResponse = responseMerger.mergeResponses(requestUri, 'get', nodeResponseSet).getStatus()
def returnedResponse = responseMapper.mapResponses(requestUri, 'get', nodeResponseSet, true).getStatus()
then:
mockToRequestEntity.entrySet().forEach {
@ -94,7 +93,7 @@ class StandardHttpResponseMergerSpec extends Specification {
mapper.setSerializationConfig(serializationConfig.withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL).withAnnotationIntrospector(jaxbIntrospector));
and: "setup of the data to be used in the test"
def responseMerger = new StandardHttpResponseMerger(NiFiProperties.createBasicNiFiProperties(null,null))
def responseMerger = new StandardHttpResponseMapper(NiFiProperties.createBasicNiFiProperties(null,null))
def requestUri = new URI("http://server/$requestUriPart")
def requestId = UUID.randomUUID().toString()
def Map<ClientResponse, Object> mockToRequestEntity = [:]
@ -107,7 +106,7 @@ class StandardHttpResponseMergerSpec extends Specification {
} as Set
when:
def returnedResponse = responseMerger.mergeResponses(requestUri, httpMethod, nodeResponseSet)
def returnedResponse = responseMerger.mapResponses(requestUri, httpMethod, nodeResponseSet, true)
then:
mockToRequestEntity.entrySet().forEach {