mirror of https://github.com/apache/nifi.git
NIFI-3800:
- Cleaning up the headers when replicating requests. This closes #1752. Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
parent
970ddf8f7e
commit
f3745065b0
|
@ -23,6 +23,36 @@ import com.sun.jersey.api.client.WebResource;
|
||||||
import com.sun.jersey.api.client.config.ClientConfig;
|
import com.sun.jersey.api.client.config.ClientConfig;
|
||||||
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
|
import com.sun.jersey.api.client.filter.GZIPContentEncodingFilter;
|
||||||
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
||||||
|
import org.apache.nifi.authorization.AccessDeniedException;
|
||||||
|
import org.apache.nifi.authorization.user.NiFiUser;
|
||||||
|
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
||||||
|
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
||||||
|
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
|
||||||
|
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
|
||||||
|
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
||||||
|
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
|
||||||
|
import org.apache.nifi.cluster.manager.NodeResponse;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
|
||||||
|
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
|
||||||
|
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
||||||
|
import org.apache.nifi.events.EventReporter;
|
||||||
|
import org.apache.nifi.reporting.Severity;
|
||||||
|
import org.apache.nifi.util.ComponentIdGenerator;
|
||||||
|
import org.apache.nifi.util.FormatUtils;
|
||||||
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
|
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
|
||||||
|
import org.apache.nifi.web.security.jwt.JwtAuthenticationFilter;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.ws.rs.HttpMethod;
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
import javax.ws.rs.core.MultivaluedMap;
|
||||||
|
import javax.ws.rs.core.Response.Status;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -47,34 +77,6 @@ import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import javax.ws.rs.HttpMethod;
|
|
||||||
import javax.ws.rs.core.MediaType;
|
|
||||||
import javax.ws.rs.core.MultivaluedMap;
|
|
||||||
import javax.ws.rs.core.Response.Status;
|
|
||||||
import org.apache.nifi.authorization.AccessDeniedException;
|
|
||||||
import org.apache.nifi.authorization.user.NiFiUser;
|
|
||||||
import org.apache.nifi.authorization.user.NiFiUserUtils;
|
|
||||||
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
|
|
||||||
import org.apache.nifi.cluster.coordination.http.HttpResponseMapper;
|
|
||||||
import org.apache.nifi.cluster.coordination.http.StandardHttpResponseMapper;
|
|
||||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
|
|
||||||
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
|
|
||||||
import org.apache.nifi.cluster.manager.NodeResponse;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
|
|
||||||
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
|
|
||||||
import org.apache.nifi.cluster.protocol.NodeIdentifier;
|
|
||||||
import org.apache.nifi.events.EventReporter;
|
|
||||||
import org.apache.nifi.reporting.Severity;
|
|
||||||
import org.apache.nifi.util.ComponentIdGenerator;
|
|
||||||
import org.apache.nifi.util.FormatUtils;
|
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
|
||||||
import org.apache.nifi.web.security.ProxiedEntitiesUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
public class ThreadPoolRequestReplicator implements RequestReplicator {
|
public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
|
|
||||||
|
@ -219,7 +221,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
return replicate(nodeIdSet, method, uri, entity, headers, true, true);
|
return replicate(nodeIdSet, method, uri, entity, headers, true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addProxiedEntitiesHeader(final Map<String, String> headers) {
|
void updateRequestHeaders(final Map<String, String> headers) {
|
||||||
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
final NiFiUser user = NiFiUserUtils.getNiFiUser();
|
||||||
if (user == null) {
|
if (user == null) {
|
||||||
throw new AccessDeniedException("Unknown user");
|
throw new AccessDeniedException("Unknown user");
|
||||||
|
@ -229,6 +231,13 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
// it knows that we are acting as a proxy on behalf of the current user.
|
// it knows that we are acting as a proxy on behalf of the current user.
|
||||||
final String proxiedEntitiesChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user);
|
final String proxiedEntitiesChain = ProxiedEntitiesUtils.buildProxiedEntitiesChainString(user);
|
||||||
headers.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain);
|
headers.put(ProxiedEntitiesUtils.PROXY_ENTITIES_CHAIN, proxiedEntitiesChain);
|
||||||
|
|
||||||
|
// remove the access token if present, since the user is already authenticated... authorization
|
||||||
|
// will happen when the request is replicated using the proxy chain above
|
||||||
|
headers.remove(JwtAuthenticationFilter.AUTHORIZATION);
|
||||||
|
|
||||||
|
// remove the host header
|
||||||
|
headers.remove("Host");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -242,7 +251,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
}
|
}
|
||||||
|
|
||||||
// include the proxied entities header
|
// include the proxied entities header
|
||||||
addProxiedEntitiesHeader(updatedHeaders);
|
updateRequestHeaders(updatedHeaders);
|
||||||
|
|
||||||
if (indicateReplicated) {
|
if (indicateReplicated) {
|
||||||
// If we are replicating a request and indicating that it is replicated, then this means that we are
|
// If we are replicating a request and indicating that it is replicated, then this means that we are
|
||||||
|
@ -283,7 +292,7 @@ public class ThreadPoolRequestReplicator implements RequestReplicator {
|
||||||
final Map<String, String> updatedHeaders = new HashMap<>(headers);
|
final Map<String, String> updatedHeaders = new HashMap<>(headers);
|
||||||
|
|
||||||
// include the proxied entities header
|
// include the proxied entities header
|
||||||
addProxiedEntitiesHeader(updatedHeaders);
|
updateRequestHeaders(updatedHeaders);
|
||||||
|
|
||||||
return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false, null);
|
return replicate(Collections.singleton(coordinatorNodeId), method, uri, entity, updatedHeaders, false, null, false, false, null);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,11 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.cluster.coordination.http.replication;
|
package org.apache.nifi.cluster.coordination.http.replication;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import com.sun.jersey.api.client.Client;
|
import com.sun.jersey.api.client.Client;
|
||||||
import com.sun.jersey.api.client.ClientHandlerException;
|
import com.sun.jersey.api.client.ClientHandlerException;
|
||||||
import com.sun.jersey.api.client.ClientResponse;
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
|
@ -28,21 +23,6 @@ import com.sun.jersey.api.client.ClientResponse.Status;
|
||||||
import com.sun.jersey.api.client.WebResource;
|
import com.sun.jersey.api.client.WebResource;
|
||||||
import com.sun.jersey.core.header.InBoundHeaders;
|
import com.sun.jersey.core.header.InBoundHeaders;
|
||||||
import com.sun.jersey.core.header.OutBoundHeaders;
|
import com.sun.jersey.core.header.OutBoundHeaders;
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.net.SocketTimeoutException;
|
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URISyntaxException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import javax.ws.rs.HttpMethod;
|
|
||||||
import org.apache.commons.collections4.map.MultiValueMap;
|
import org.apache.commons.collections4.map.MultiValueMap;
|
||||||
import org.apache.nifi.authorization.user.NiFiUser;
|
import org.apache.nifi.authorization.user.NiFiUser;
|
||||||
import org.apache.nifi.authorization.user.NiFiUserDetails;
|
import org.apache.nifi.authorization.user.NiFiUserDetails;
|
||||||
|
@ -70,6 +50,27 @@ import org.mockito.stubbing.Answer;
|
||||||
import org.springframework.security.core.Authentication;
|
import org.springframework.security.core.Authentication;
|
||||||
import org.springframework.security.core.context.SecurityContextHolder;
|
import org.springframework.security.core.context.SecurityContextHolder;
|
||||||
|
|
||||||
|
import javax.ws.rs.HttpMethod;
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class TestThreadPoolRequestReplicator {
|
public class TestThreadPoolRequestReplicator {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
@ -439,7 +440,7 @@ public class TestThreadPoolRequestReplicator {
|
||||||
|
|
||||||
// ensure the proxied entities header is set
|
// ensure the proxied entities header is set
|
||||||
final Map<String, String> updatedHeaders = new HashMap<>();
|
final Map<String, String> updatedHeaders = new HashMap<>();
|
||||||
replicator.addProxiedEntitiesHeader(updatedHeaders);
|
replicator.updateRequestHeaders(updatedHeaders);
|
||||||
|
|
||||||
// Pass in Collections.emptySet() for the node ID's so that an Exception is thrown
|
// Pass in Collections.emptySet() for the node ID's so that an Exception is thrown
|
||||||
replicator.replicate(Collections.emptySet(), "GET", new URI("localhost:8080/nifi"), Collections.emptyMap(),
|
replicator.replicate(Collections.emptySet(), "GET", new URI("localhost:8080/nifi"), Collections.emptyMap(),
|
||||||
|
@ -497,7 +498,7 @@ public class TestThreadPoolRequestReplicator {
|
||||||
|
|
||||||
// ensure the proxied entities header is set
|
// ensure the proxied entities header is set
|
||||||
final Map<String, String> updatedHeaders = new HashMap<>();
|
final Map<String, String> updatedHeaders = new HashMap<>();
|
||||||
replicator.addProxiedEntitiesHeader(updatedHeaders);
|
replicator.updateRequestHeaders(updatedHeaders);
|
||||||
|
|
||||||
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, updatedHeaders, true, null, true, true, monitor);
|
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, updatedHeaders, true, null, true, true, monitor);
|
||||||
|
|
||||||
|
@ -550,7 +551,7 @@ public class TestThreadPoolRequestReplicator {
|
||||||
|
|
||||||
// ensure the proxied entities header is set
|
// ensure the proxied entities header is set
|
||||||
final Map<String, String> updatedHeaders = new HashMap<>();
|
final Map<String, String> updatedHeaders = new HashMap<>();
|
||||||
replicator.addProxiedEntitiesHeader(updatedHeaders);
|
replicator.updateRequestHeaders(updatedHeaders);
|
||||||
|
|
||||||
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, updatedHeaders, true, null, true, true, monitor);
|
replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, updatedHeaders, true, null, true, true, monitor);
|
||||||
|
|
||||||
|
|
|
@ -345,11 +345,19 @@ public abstract class ApplicationResource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the proxy scheme to request scheme if not already set client
|
// set the proxy details to request details if not already set client
|
||||||
final String proxyScheme = httpServletRequest.getHeader(PROXY_SCHEME_HTTP_HEADER);
|
final String proxyScheme = httpServletRequest.getHeader(PROXY_SCHEME_HTTP_HEADER);
|
||||||
if (proxyScheme == null) {
|
if (proxyScheme == null) {
|
||||||
result.put(PROXY_SCHEME_HTTP_HEADER, httpServletRequest.getScheme());
|
result.put(PROXY_SCHEME_HTTP_HEADER, httpServletRequest.getScheme());
|
||||||
}
|
}
|
||||||
|
final String proxyHost = httpServletRequest.getHeader(PROXY_HOST_HTTP_HEADER);
|
||||||
|
if (proxyHost == null) {
|
||||||
|
result.put(PROXY_HOST_HTTP_HEADER, httpServletRequest.getServerName());
|
||||||
|
}
|
||||||
|
final String proxyPort = httpServletRequest.getHeader(PROXY_PORT_HTTP_HEADER);
|
||||||
|
if (proxyPort == null) {
|
||||||
|
result.put(PROXY_PORT_HTTP_HEADER, String.valueOf(httpServletRequest.getServerPort()));
|
||||||
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue