NIFI-2438:

- If a node reports forbidden, using an appropriate response failure exception.
This closes #760.
This commit is contained in:
Matt Gilman 2016-08-01 11:32:43 -04:00 committed by Mark Payne
parent 698cde69ba
commit fca59ff9d0
1 changed files with 24 additions and 10 deletions

View File

@ -23,6 +23,7 @@ import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter; import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
import com.sun.jersey.core.util.MultivaluedMapImpl; import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.cluster.coordination.ClusterCoordinator; import org.apache.nifi.cluster.coordination.ClusterCoordinator;
@ -40,8 +41,8 @@ import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.events.EventReporter; import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity; import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ComponentIdGenerator; import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.web.security.ProxiedEntitiesUtils; import org.apache.nifi.web.security.ProxiedEntitiesUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.HttpMethod; import javax.ws.rs.HttpMethod;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response.Status;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Collections; import java.util.Collections;
@ -424,24 +426,36 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) { if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
final ClientResponse clientResponse = response.getClientResponse(); final ClientResponse clientResponse = response.getClientResponse();
final RuntimeException failure; final String message;
if (clientResponse == null) { if (clientResponse == null) {
failure = new IllegalClusterStateException("Node " + response.getNodeId() message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus();
+ " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus());
logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. " logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur",
+ "Will respond with CONFLICT response and action will not occur",
response.getStatus(), response.getNodeId(), method, uri.getPath()); response.getStatus(), response.getNodeId(), method, uri.getPath());
} else { } else {
final String nodeExplanation = clientResponse.getEntity(String.class); final String nodeExplanation = clientResponse.getEntity(String.class);
failure = new IllegalClusterStateException("Node " + response.getNodeId() + " is unable to fulfill this request due to: " message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation;
+ nodeExplanation, response.getThrowable());
logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. " logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur. Node explanation: {}",
+ "Will respond with CONFLICT response and action will not occur. Node explanation: {}",
response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation); response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation);
} }
// if a node reports forbidden, use that as the response failure
final RuntimeException failure;
if (response.getStatus() == Status.FORBIDDEN.getStatusCode()) {
if (response.hasThrowable()) {
failure = new AccessDeniedException(message, response.getThrowable());
} else {
failure = new AccessDeniedException(message);
}
} else {
if (response.hasThrowable()) {
failure = new IllegalClusterStateException(message, response.getThrowable());
} else {
failure = new IllegalClusterStateException(message);
}
}
clusterResponse.setFailure(failure); clusterResponse.setFailure(failure);
break; break;
} }