NIFI-5581: Fix replicate request timeout

This closes #3044

- Revert 87cf474e54 to enable connection
pooling
- Changes the expected HTTP status code for the 1st request of a
two-phase commit transaction from 150 (NiFi custom) to 202 Accepted
- Corrected RevisionManager Javadoc about revision varidation protocol
This commit is contained in:
Koji Kawamura 2018-10-04 13:48:26 +09:00 committed by Matt Gilman
parent f65286be83
commit 8f4d13eeac
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
7 changed files with 50 additions and 51 deletions

View File

@ -166,7 +166,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper {
// If we have a response that is a 3xx, 4xx, or 5xx, then we want to choose that.
// Otherwise, it doesn't matter which one we choose. We do this because if we replicate
// a mutable request, it's possible that one node will respond with a 409, for instance, while
// others respond with a 150-Continue. We do not want to pick the 150-Continue; instead, we want
// others respond with a 202-Accepted. We do not want to pick the 202-Accepted; instead, we want
// the failed response.
final NodeResponse clientResponse = nodeResponses.stream().filter(p -> p.getStatus() > 299).findAny().orElse(nodeResponses.iterator().next());
@ -236,7 +236,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper {
responses.stream()
.parallel() // "parallelize" the draining of the responses, since we have multiple streams to consume
.filter(response -> response != exclude) // don't include the explicitly excluded node
.filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any 150-NodeContinue responses because they contain no content
.filter(response -> response.getStatus() != RequestReplicator.NODE_CONTINUE_STATUS_CODE) // don't include any continue responses because they contain no content
.forEach(response -> drainResponse(response)); // drain all node responses that didn't get filtered out
}

View File

@ -30,13 +30,13 @@ public interface RequestReplicator {
public static final String CLUSTER_ID_GENERATION_SEED_HEADER = "X-Cluster-Id-Generation-Seed";
/**
* The HTTP header that the requestor specifies to ask a node if they are able to process a given request. The value
* is always 150-NodeContinue. The node will respond with 150 CONTINUE if it is able to
* The HTTP header that the requestor specifies to ask a node if they are able to process a given request.
* The value is always 202-Accepted. The node will respond with 202 ACCEPTED if it is able to
* process the request, 417 EXPECTATION_FAILED otherwise.
*/
public static final String REQUEST_VALIDATION_HTTP_HEADER = "X-Validation-Expects";
public static final String NODE_CONTINUE = "150-NodeContinue";
public static final int NODE_CONTINUE_STATUS_CODE = 150;
public static final String NODE_CONTINUE = "202-Accepted";
public static final int NODE_CONTINUE_STATUS_CODE = 202;
/**
* Indicates that the request is intended to cancel a transaction that was previously created without performing the action

View File

@ -17,6 +17,35 @@
package org.apache.nifi.cluster.coordination.http.replication;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@ -45,36 +74,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ThreadPoolRequestReplicator implements RequestReplicator {
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolRequestReplicator.class);
@ -503,10 +502,10 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
if (allNodesResponded) {
clusterResponse.addTiming("Verification Completed", "All Nodes", nanos);
// Check if we have any requests that do not have a 150-Continue status code.
// Check if we have any requests that do not have a 202-Accepted status code.
final long dissentingCount = nodeResponses.stream().filter(p -> p.getStatus() != NODE_CONTINUE_STATUS_CODE).count();
// If all nodes responded with 150-Continue, then we can replicate the original request
// If all nodes responded with 202-Accepted, then we can replicate the original request
// to all nodes and we are finished.
if (dissentingCount == 0) {
logger.debug("Received verification from all {} nodes that mutable request {} {} can be made", numNodes, method, uri.getPath());

View File

@ -308,7 +308,8 @@ public class OkHttpReplicationClient implements HttpReplicationClient {
okHttpClientBuilder.connectTimeout(connectionTimeoutMs, TimeUnit.MILLISECONDS);
okHttpClientBuilder.readTimeout(readTimeoutMs, TimeUnit.MILLISECONDS);
okHttpClientBuilder.followRedirects(true);
okHttpClientBuilder.connectionPool(new ConnectionPool(0, 5, TimeUnit.MINUTES));
final int connectionPoolSize = properties.getClusterNodeMaxConcurrentRequests();
okHttpClientBuilder.connectionPool(new ConnectionPool(connectionPoolSize, 5, TimeUnit.MINUTES));
final Tuple<SSLSocketFactory, X509TrustManager> tuple = createSslSocketFactory(properties);
if (tuple != null) {

View File

@ -248,7 +248,7 @@ public class TestThreadPoolRequestReplicator {
final int statusCode;
if (requestCount.incrementAndGet() == 1) {
assertEquals(ThreadPoolRequestReplicator.NODE_CONTINUE, expectsHeader);
statusCode = 150;
statusCode = Status.ACCEPTED.getStatusCode();
} else {
assertNull(expectsHeader);
statusCode = Status.OK.getStatusCode();
@ -390,7 +390,7 @@ public class TestThreadPoolRequestReplicator {
if (requestIndex == 1) {
final Response clientResponse = mock(Response.class);
when(clientResponse.getStatus()).thenReturn(150);
when(clientResponse.getStatus()).thenReturn(202);
return new NodeResponse(nodeId, request.getMethod(), uri, clientResponse, -1L, requestId);
} else {
final IllegalClusterStateException explanation = new IllegalClusterStateException("Intentional Exception for Unit Testing");

View File

@ -33,20 +33,19 @@ import org.apache.nifi.web.Revision;
*
* <p>
* Clients that will modify a resource must do so using a two-phase commit. First,
* the client will issue a request that includes an HTTP Header of "X-NcmExpects".
* the client will issue a request that includes an HTTP Header of "X-Validation-Expects".
* This indicates that the request will not actually be performed but rather that the
* node should validate that the request could in fact be performed. If all nodes respond
* with a 150-Continue response, then the second phase will commence. The second phase
* will consist of replicating the same request but without the "X-NcmExpects" header.
* with a 202-Accepted response, then the second phase will commence. The second phase
* will consist of replicating the same request but without the "X-Validation-Expects" header.
* </p>
*
* <p>
* When the first phase of the two-phase commit is processed, the Revision Manager should
* be used to verify that the client-provided Revisions are current by calling the
* {@link #verifyRevisions(Collection)}
* method. If the revisions are up-to-date, the method will return successfully and the
* request validation may continue. Otherwise, the request should fail and the second phase
* should not be performed.
* be used to retrieve the current revision by calling the {@link #getRevision(String)} method
* to verify that the client-provided Revisions are current.
* If the revisions are up-to-date, the request validation may continue.
* Otherwise, the request should fail and the second phase should not be performed.
* </p>
*
* <p>

View File

@ -272,9 +272,9 @@ public abstract class ApplicationResource {
}
/**
* Generates a 150 Node Continue response to be used within the cluster request handshake.
* Generates a 202 Accepted (Node Continue) response to be used within the cluster request handshake.
*
* @return a 150 Node Continue response to be used within the cluster request handshake
* @return a 202 Accepted (Node Continue) response to be used within the cluster request handshake
*/
protected ResponseBuilder generateContinueResponse() {
return Response.status(RequestReplicator.NODE_CONTINUE_STATUS_CODE);