diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index f4d461179f..07cefbbaf7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -23,6 +23,7 @@ import com.sun.jersey.api.client.WebResource; import com.sun.jersey.api.client.config.ClientConfig; import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter; import com.sun.jersey.core.util.MultivaluedMapImpl; +import org.apache.nifi.authorization.AccessDeniedException; import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.authorization.user.NiFiUserUtils; import org.apache.nifi.cluster.coordination.ClusterCoordinator; @@ -40,8 +41,8 @@ import org.apache.nifi.cluster.manager.exception.UriConstructionException; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.events.EventReporter; import org.apache.nifi.reporting.Severity; -import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.ComponentIdGenerator; +import org.apache.nifi.util.FormatUtils; import org.apache.nifi.web.security.ProxiedEntitiesUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory; import javax.ws.rs.HttpMethod; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response.Status; import java.net.URI; import java.net.URISyntaxException; import java.util.Collections; @@ -424,24 +426,36 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) { final ClientResponse clientResponse = response.getClientResponse(); - final RuntimeException failure; + final String message; if (clientResponse == null) { - failure = new IllegalClusterStateException("Node " + response.getNodeId() - + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus()); + message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus(); - logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. " - + "Will respond with CONFLICT response and action will not occur", + logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur", response.getStatus(), response.getNodeId(), method, uri.getPath()); } else { final String nodeExplanation = clientResponse.getEntity(String.class); - failure = new IllegalClusterStateException("Node " + response.getNodeId() + " is unable to fulfill this request due to: " - + nodeExplanation, response.getThrowable()); + message = "Node " + response.getNodeId() + " is unable to fulfill this request due to: " + nodeExplanation; - logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. " - + "Will respond with CONFLICT response and action will not occur. Node explanation: {}", + logger.info("Received a status of {} from {} for request {} {} when performing first stage of two-stage commit. The action will not occur. Node explanation: {}", response.getStatus(), response.getNodeId(), method, uri.getPath(), nodeExplanation); } + // if a node reports forbidden, use that as the response failure + final RuntimeException failure; + if (response.getStatus() == Status.FORBIDDEN.getStatusCode()) { + if (response.hasThrowable()) { + failure = new AccessDeniedException(message, response.getThrowable()); + } else { + failure = new AccessDeniedException(message); + } + } else { + if (response.hasThrowable()) { + failure = new IllegalClusterStateException(message, response.getThrowable()); + } else { + failure = new IllegalClusterStateException(message); + } + } + clusterResponse.setFailure(failure); break; }