mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
Profiles: Add client server transport filter
This PR allows to configure different ServerTransportFilters per profile. By default there is a `server` transport filter, that does authentication and a `client` on that rejects internal actions and shard actions. Closes elastic/elasticsearch#312 Original commit: elastic/x-pack-elasticsearch@1ce66495a5
This commit is contained in:
parent
dca9f3115e
commit
1d4422fc79
@ -8,6 +8,7 @@ package org.elasticsearch.shield.transport;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.inject.PreProcessModule;
|
||||
import org.elasticsearch.common.inject.multibindings.MapBinder;
|
||||
import org.elasticsearch.common.inject.util.Providers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.shield.ShieldPlugin;
|
||||
@ -47,19 +48,24 @@ public class SecuredTransportModule extends AbstractShieldModule.Spawn implement
|
||||
|
||||
@Override
|
||||
protected void configure(boolean clientMode) {
|
||||
MapBinder<String, ServerTransportFilter> mapBinder = MapBinder.newMapBinder(binder(), String.class, ServerTransportFilter.class);
|
||||
mapBinder.addBinding(ServerTransportFilters.SERVER_TRANSPORT_FILTER_TRANSPORT_CLIENT).to(ServerTransportFilter.TransportClient.class);
|
||||
|
||||
if (clientMode) {
|
||||
// no ip filtering on the client
|
||||
bind(IPFilter.class).toProvider(Providers.<IPFilter>of(null));
|
||||
bind(ServerTransportFilter.class).to(ServerTransportFilter.Client.class).asEagerSingleton();
|
||||
bind(ClientTransportFilter.class).to(ClientTransportFilter.Client.class).asEagerSingleton();
|
||||
return;
|
||||
} else {
|
||||
mapBinder.addBinding(ServerTransportFilters.SERVER_TRANSPORT_FILTER_AUTHENTICATE_REJECT_INTERNAL_ACTIONS).to(ServerTransportFilter.RejectInternalActionsFilter.class);
|
||||
mapBinder.addBinding(ServerTransportFilters.SERVER_TRANSPORT_FILTER_AUTHENTICATE_ONLY).to(ServerTransportFilter.Node.class);
|
||||
|
||||
bind(ClientTransportFilter.class).to(ClientTransportFilter.Node.class).asEagerSingleton();
|
||||
|
||||
if (settings.getAsBoolean("shield.transport.filter.enabled", true)) {
|
||||
bind(IPFilter.class).asEagerSingleton();
|
||||
}
|
||||
}
|
||||
|
||||
bind(ServerTransportFilter.class).to(ServerTransportFilter.Node.class).asEagerSingleton();
|
||||
bind(ClientTransportFilter.class).to(ClientTransportFilter.Node.class).asEagerSingleton();
|
||||
if (settings.getAsBoolean("shield.transport.filter.enabled", true)) {
|
||||
bind(IPFilter.class).asEagerSingleton();
|
||||
}
|
||||
bind(ServerTransportFilters.class).asEagerSingleton();
|
||||
}
|
||||
}
|
||||
|
@ -8,6 +8,8 @@ package org.elasticsearch.shield.transport;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.shield.transport.netty.NettySecuredTransport;
|
||||
import org.elasticsearch.shield.transport.netty.SecuredMessageChannelHandler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
|
||||
@ -16,19 +18,25 @@ import org.elasticsearch.transport.*;
|
||||
*/
|
||||
public class SecuredTransportService extends TransportService {
|
||||
|
||||
private final ServerTransportFilter serverFilter;
|
||||
private final ClientTransportFilter clientFilter;
|
||||
private final ServerTransportFilters serverTransportFilters;
|
||||
|
||||
@Inject
|
||||
public SecuredTransportService(Settings settings, Transport transport, ThreadPool threadPool, ServerTransportFilter serverFilter, ClientTransportFilter clientFilter) {
|
||||
public SecuredTransportService(Settings settings, Transport transport, ThreadPool threadPool, ClientTransportFilter clientFilter, ServerTransportFilters serverTransportFilters) {
|
||||
super(settings, transport, threadPool);
|
||||
this.serverFilter = serverFilter;
|
||||
this.clientFilter = clientFilter;
|
||||
this.serverTransportFilters = serverTransportFilters;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerHandler(String action, TransportRequestHandler handler) {
|
||||
super.registerHandler(action, new SecuredRequestHandler(action, handler, serverFilter));
|
||||
// Only try to access the profile, if we use netty and SSL
|
||||
// otherwise use the regular secured request handler (this still allows for LocalTransport)
|
||||
if (transport instanceof NettySecuredTransport) {
|
||||
super.registerHandler(action, new ProfileSecuredRequestHandler(action, handler, serverTransportFilters));
|
||||
} else {
|
||||
super.registerHandler(action, new SecuredRequestHandler(action, handler, serverTransportFilters));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -41,16 +49,12 @@ public class SecuredTransportService extends TransportService {
|
||||
}
|
||||
}
|
||||
|
||||
static class SecuredRequestHandler implements TransportRequestHandler {
|
||||
static abstract class AbstractSecuredRequestHandler implements TransportRequestHandler {
|
||||
|
||||
private final String action;
|
||||
private final TransportRequestHandler handler;
|
||||
private final ServerTransportFilter filter;
|
||||
protected TransportRequestHandler handler;
|
||||
|
||||
SecuredRequestHandler(String action, TransportRequestHandler handler, ServerTransportFilter filter) {
|
||||
this.action = action;
|
||||
public AbstractSecuredRequestHandler(TransportRequestHandler handler) {
|
||||
this.handler = handler;
|
||||
this.filter = filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -58,17 +62,6 @@ public class SecuredTransportService extends TransportService {
|
||||
return handler.newInstance();
|
||||
}
|
||||
|
||||
@Override @SuppressWarnings("unchecked")
|
||||
public void messageReceived(TransportRequest request, TransportChannel channel) throws Exception {
|
||||
try {
|
||||
filter.inbound(action, request);
|
||||
} catch (Throwable t) {
|
||||
channel.sendResponse(t);
|
||||
return;
|
||||
}
|
||||
handler.messageReceived(request, channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return handler.executor();
|
||||
@ -79,4 +72,53 @@ public class SecuredTransportService extends TransportService {
|
||||
return handler.isForceExecution();
|
||||
}
|
||||
}
|
||||
|
||||
static class SecuredRequestHandler extends AbstractSecuredRequestHandler {
|
||||
|
||||
protected final String action;
|
||||
protected final ServerTransportFilter transportFilter;
|
||||
|
||||
SecuredRequestHandler(String action, TransportRequestHandler handler, ServerTransportFilters serverTransportFilters) {
|
||||
super(handler);
|
||||
this.action = action;
|
||||
this.transportFilter = serverTransportFilters.getTransportFilterForProfile("default");
|
||||
}
|
||||
|
||||
@Override @SuppressWarnings("unchecked")
|
||||
public void messageReceived(TransportRequest request, TransportChannel channel) throws Exception {
|
||||
try {
|
||||
transportFilter.inbound(action, request);
|
||||
} catch (Throwable t) {
|
||||
channel.sendResponse(t);
|
||||
return;
|
||||
}
|
||||
handler.messageReceived(request, channel);
|
||||
}
|
||||
}
|
||||
|
||||
static class ProfileSecuredRequestHandler extends AbstractSecuredRequestHandler {
|
||||
|
||||
protected final String action;
|
||||
protected final ServerTransportFilters serverTransportFilters;
|
||||
|
||||
ProfileSecuredRequestHandler(String action, TransportRequestHandler handler, ServerTransportFilters serverTransportFilters) {
|
||||
super(handler);
|
||||
this.action = action;
|
||||
this.serverTransportFilters = serverTransportFilters;
|
||||
}
|
||||
|
||||
@Override @SuppressWarnings("unchecked")
|
||||
public void messageReceived(TransportRequest request, TransportChannel channel) throws Exception {
|
||||
try {
|
||||
SecuredMessageChannelHandler.VisibleNettyTransportChannel nettyTransportChannel = (SecuredMessageChannelHandler.VisibleNettyTransportChannel) channel;
|
||||
String profile = nettyTransportChannel.getProfile();
|
||||
ServerTransportFilter filter = serverTransportFilters.getTransportFilterForProfile(profile);
|
||||
filter.inbound(action, request);
|
||||
} catch (Throwable t) {
|
||||
channel.sendResponse(t);
|
||||
return;
|
||||
}
|
||||
handler.messageReceived(request, channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ package org.elasticsearch.shield.transport;
|
||||
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.shield.User;
|
||||
import org.elasticsearch.shield.authc.AuthenticationException;
|
||||
import org.elasticsearch.shield.authc.AuthenticationService;
|
||||
import org.elasticsearch.shield.authz.AuthorizationService;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
@ -23,7 +24,7 @@ public interface ServerTransportFilter {
|
||||
/**
|
||||
* The server trasnport filter that should be used in transport clients
|
||||
*/
|
||||
public static class Client implements ServerTransportFilter {
|
||||
public static class TransportClient implements ServerTransportFilter {
|
||||
|
||||
@Override
|
||||
public void inbound(String action, TransportRequest request) {
|
||||
@ -58,4 +59,27 @@ public interface ServerTransportFilter {
|
||||
authzService.authorize(user, action, request);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A server transport filter rejects internal calls, which should be used on connections
|
||||
* where only clients connect to
|
||||
*/
|
||||
public static class RejectInternalActionsFilter extends ServerTransportFilter.Node {
|
||||
|
||||
@Inject
|
||||
public RejectInternalActionsFilter(AuthenticationService authcService, AuthorizationService authzService) {
|
||||
super(authcService, authzService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void inbound(String action, TransportRequest request) {
|
||||
// TODO is ']' sufficient to mark as shard action?
|
||||
boolean isInternalOrShardAction = action.startsWith("internal:") || action.endsWith("]");
|
||||
if (isInternalOrShardAction) {
|
||||
throw new AuthenticationException("Not allowed to execute internal/shard actions");
|
||||
}
|
||||
super.inbound(action, request);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.shield.transport;
|
||||
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class ServerTransportFilters extends AbstractComponent {
|
||||
|
||||
public static final String SETTING_NAME = "shield.type";
|
||||
|
||||
public static final String SERVER_TRANSPORT_FILTER_TRANSPORT_CLIENT = "transportclient";
|
||||
public static final String SERVER_TRANSPORT_FILTER_AUTHENTICATE_REJECT_INTERNAL_ACTIONS = "client";
|
||||
public static final String SERVER_TRANSPORT_FILTER_AUTHENTICATE_ONLY = "server";
|
||||
|
||||
|
||||
private final Map<String, ServerTransportFilter> transportFilters;
|
||||
private final boolean isTransportClient;
|
||||
private final ServerTransportFilter clientServerTransportFilter;
|
||||
|
||||
@Inject
|
||||
public ServerTransportFilters(Settings settings, Map<String, ServerTransportFilter> configuredTransportFilter) {
|
||||
super(settings);
|
||||
this.isTransportClient = "transport".equals(settings.get("client.type"));
|
||||
this.clientServerTransportFilter = configuredTransportFilter.get(SERVER_TRANSPORT_FILTER_TRANSPORT_CLIENT);
|
||||
|
||||
Map<String, Settings> profileSettings = settings.getGroups("transport.profiles.", true);
|
||||
this.transportFilters = Maps.newHashMapWithExpectedSize(profileSettings.size());
|
||||
|
||||
for (Map.Entry<String, Settings> entry : profileSettings.entrySet()) {
|
||||
String type = entry.getValue().get(SETTING_NAME, SERVER_TRANSPORT_FILTER_AUTHENTICATE_ONLY);
|
||||
transportFilters.put(entry.getKey(), configuredTransportFilter.get(type));
|
||||
}
|
||||
|
||||
if (!transportFilters.containsKey("default")) {
|
||||
transportFilters.put("default", configuredTransportFilter.get(SERVER_TRANSPORT_FILTER_AUTHENTICATE_ONLY));
|
||||
}
|
||||
|
||||
logger.trace("Added shield transport filters: {}", transportFilters.keySet());
|
||||
}
|
||||
|
||||
public ServerTransportFilter getTransportFilterForProfile(String profile) {
|
||||
if (isTransportClient) {
|
||||
return clientServerTransportFilter;
|
||||
}
|
||||
|
||||
if (!transportFilters.containsKey(profile)) {
|
||||
return transportFilters.get("default");
|
||||
}
|
||||
|
||||
return transportFilters.get(profile);
|
||||
}
|
||||
}
|
@ -73,8 +73,8 @@ public class NettySecuredTransport extends NettyTransport {
|
||||
serverEngine.setNeedClientAuth(profileSettings.getAsBoolean("shield.ssl.client.auth", settings.getAsBoolean("shield.transport.ssl.client.auth", true)));
|
||||
|
||||
pipeline.addFirst("ssl", new SslHandler(serverEngine));
|
||||
pipeline.replace("dispatcher", "dispatcher", new SecuredMessageChannelHandler(nettyTransport, logger));
|
||||
}
|
||||
pipeline.replace("dispatcher", "dispatcher", new SecuredMessageChannelHandler(nettyTransport, name, logger));
|
||||
if (authenticator != null) {
|
||||
pipeline.addFirst("ipfilter", new NettyIPFilterUpstreamHandler(authenticator, name));
|
||||
}
|
||||
@ -96,8 +96,8 @@ public class NettySecuredTransport extends NettyTransport {
|
||||
clientEngine.setUseClientMode(true);
|
||||
|
||||
pipeline.addFirst("ssl", new SslHandler(clientEngine));
|
||||
pipeline.replace("dispatcher", "dispatcher", new SecuredMessageChannelHandler(nettyTransport, logger));
|
||||
}
|
||||
pipeline.replace("dispatcher", "dispatcher", new SecuredMessageChannelHandler(nettyTransport, "default", logger));
|
||||
return pipeline;
|
||||
}
|
||||
}
|
||||
|
@ -5,18 +5,32 @@
|
||||
*/
|
||||
package org.elasticsearch.shield.transport.netty;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.netty.channel.ChannelFuture;
|
||||
import org.elasticsearch.common.netty.channel.ChannelFutureListener;
|
||||
import org.elasticsearch.common.netty.channel.ChannelHandlerContext;
|
||||
import org.elasticsearch.common.netty.channel.ChannelStateEvent;
|
||||
import org.elasticsearch.common.netty.channel.*;
|
||||
import org.elasticsearch.common.netty.handler.ssl.SslHandler;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportRequestHandler;
|
||||
import org.elasticsearch.transport.netty.MessageChannelHandler;
|
||||
import org.elasticsearch.transport.netty.NettyTransport;
|
||||
import org.elasticsearch.transport.netty.NettyTransportChannel;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
public class SecuredMessageChannelHandler extends MessageChannelHandler {
|
||||
|
||||
public SecuredMessageChannelHandler(org.elasticsearch.transport.netty.NettyTransport transport, ESLogger logger) {
|
||||
super(transport, logger);
|
||||
private final String profileName;
|
||||
|
||||
public SecuredMessageChannelHandler(NettyTransport nettyTransport, String profileName, ESLogger logger) {
|
||||
super(nettyTransport, logger);
|
||||
this.profileName = profileName;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -24,6 +38,10 @@ public class SecuredMessageChannelHandler extends MessageChannelHandler {
|
||||
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
|
||||
|
||||
// Get notified when SSL handshake is done.
|
||||
if (sslHandler == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final ChannelFuture handshakeFuture = sslHandler.handshake();
|
||||
handshakeFuture.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
@ -42,4 +60,96 @@ public class SecuredMessageChannelHandler extends MessageChannelHandler {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// TODO ADD PREPROCESSING
|
||||
|
||||
/**
|
||||
* This is just here to create VisibleNettyTransportChannel() and should be removed after core refactoring
|
||||
*/
|
||||
@Override
|
||||
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
|
||||
final String action = buffer.readString();
|
||||
|
||||
final VisibleNettyTransportChannel transportChannel = new VisibleNettyTransportChannel(profileName, transport, action, channel, requestId, version);
|
||||
try {
|
||||
final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);
|
||||
if (handler == null) {
|
||||
throw new ActionNotFoundTransportException(action);
|
||||
}
|
||||
final TransportRequest request = handler.newInstance();
|
||||
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
||||
request.readFrom(buffer);
|
||||
if (handler.executor() == ThreadPool.Names.SAME) {
|
||||
//noinspection unchecked
|
||||
handler.messageReceived(request, transportChannel);
|
||||
} else {
|
||||
threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
|
||||
logger.warn("Actual Exception", e1);
|
||||
}
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
||||
/**
|
||||
* should be removed after upgrade
|
||||
*/
|
||||
public static class VisibleNettyTransportChannel extends NettyTransportChannel {
|
||||
|
||||
private final String profile;
|
||||
|
||||
public VisibleNettyTransportChannel(String profile, NettyTransport transport, String action, Channel channel, long requestId, Version version) {
|
||||
super(transport, action, channel, requestId, version);
|
||||
this.profile = profile;
|
||||
}
|
||||
|
||||
public String getProfile() {
|
||||
return profile;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is just here to make this class visible
|
||||
*/
|
||||
class RequestHandler extends AbstractRunnable {
|
||||
private final TransportRequestHandler handler;
|
||||
private final TransportRequest request;
|
||||
private final NettyTransportChannel transportChannel;
|
||||
private final String action;
|
||||
|
||||
public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) {
|
||||
this.handler = handler;
|
||||
this.request = request;
|
||||
this.transportChannel = transportChannel;
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
handler.messageReceived(request, transportChannel);
|
||||
} catch (Throwable e) {
|
||||
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
|
||||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return handler.isForceExecution();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,134 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.shield.transport;
|
||||
|
||||
import org.elasticsearch.ElasticsearchTimeoutException;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.node.internal.InternalNode;
|
||||
import org.elasticsearch.shield.signature.InternalSignatureService;
|
||||
import org.elasticsearch.test.ShieldIntegrationTest;
|
||||
import org.elasticsearch.test.ShieldSettingsSource;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
|
||||
@ClusterScope(scope = Scope.SUITE)
|
||||
public class ServerTransportFilterIntegrationTest extends ShieldIntegrationTest {
|
||||
|
||||
private static int randomClientPort;
|
||||
|
||||
@BeforeClass
|
||||
public static void getRandomPort() {
|
||||
randomClientPort = randomIntBetween(49000, 65500); // ephemeral port
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean sslTransportEnabled() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
ImmutableSettings.Builder settingsBuilder = settingsBuilder();
|
||||
String randomClientPortRange = randomClientPort + "-" + (randomClientPort+100);
|
||||
|
||||
File store;
|
||||
try {
|
||||
store = new File(getClass().getResource("/org/elasticsearch/shield/transport/ssl/certs/simple/testnode.jks").toURI());
|
||||
assertThat(store.exists(), is(true));
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
if (sslTransportEnabled()) {
|
||||
settingsBuilder.put("transport.profiles.client.shield.truststore.path", store.getAbsolutePath()) // settings for client truststore
|
||||
.put("transport.profiles.client.shield.truststore.password", "testnode")
|
||||
.put("shield.transport.ssl", true);
|
||||
}
|
||||
|
||||
return settingsBuilder
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put("transport.profiles.default.shield.type", "server")
|
||||
.put("transport.profiles.client.shield.type", "client")
|
||||
.put("transport.profiles.client.port", randomClientPortRange)
|
||||
.put("transport.profiles.client.bind_host", "localhost") // make sure this is "localhost", no matter if ipv4 or ipv6, but be consistent
|
||||
.put("shield.audit.enabled", false)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThatConnectionToServerTypeConnectionWorks() {
|
||||
Settings dataNodeSettings = internalCluster().getDataNodeInstance(Settings.class);
|
||||
String systemKeyFile = dataNodeSettings.get(InternalSignatureService.FILE_SETTING);
|
||||
|
||||
Transport transport = internalCluster().getDataNodeInstance(Transport.class);
|
||||
TransportAddress transportAddress = transport.boundAddress().publishAddress();
|
||||
assertThat(transportAddress, instanceOf(InetSocketTransportAddress.class));
|
||||
InetSocketAddress inetSocketAddress = ((InetSocketTransportAddress) transportAddress).address();
|
||||
String unicastHost = inetSocketAddress.getHostName() + ":" + inetSocketAddress.getPort();
|
||||
|
||||
// test that starting up a node works
|
||||
Settings nodeSettings = settingsBuilder()
|
||||
.put(ShieldSettingsSource.getSSLSettingsForStore("/org/elasticsearch/shield/transport/ssl/certs/simple/testnode.jks", "testnode"))
|
||||
.put("node.mode", "network")
|
||||
.put("node.name", "my-test-node")
|
||||
.put("cluster.name", internalCluster().getClusterName())
|
||||
.put("discovery.zen.ping.multicast.enabled", false)
|
||||
.put("discovery.zen.ping.unicast.hosts", unicastHost)
|
||||
.put("shield.transport.ssl", sslTransportEnabled())
|
||||
.put("shield.audit.enabled", false)
|
||||
.put(InternalNode.HTTP_ENABLED, false)
|
||||
.put(InternalSignatureService.FILE_SETTING, systemKeyFile)
|
||||
.build();
|
||||
try (Node node = nodeBuilder().client(true).settings(nodeSettings).node()) {
|
||||
assertGreenClusterState(node.client());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThatConnectionToClientTypeConnectionIsRejected() {
|
||||
Settings dataNodeSettings = internalCluster().getDataNodeInstance(Settings.class);
|
||||
String systemKeyFile = dataNodeSettings.get(InternalSignatureService.FILE_SETTING);
|
||||
|
||||
// test that starting up a node works
|
||||
Settings nodeSettings = settingsBuilder()
|
||||
.put(ShieldSettingsSource.getSSLSettingsForStore("/org/elasticsearch/shield/transport/ssl/certs/simple/testnode.jks", "testnode"))
|
||||
.put("node.mode", "network")
|
||||
.put("node.name", "my-test-node")
|
||||
.put("cluster.name", internalCluster().getClusterName())
|
||||
.put("discovery.zen.ping.multicast.enabled", false)
|
||||
.put("discovery.zen.ping.unicast.hosts", "localhost:" + randomClientPort)
|
||||
.put("shield.transport.ssl", sslTransportEnabled())
|
||||
.put("shield.audit.enabled", false)
|
||||
.put(InternalNode.HTTP_ENABLED, false)
|
||||
.put(InternalSignatureService.FILE_SETTING, systemKeyFile)
|
||||
.put("discovery.initial_state_timeout", "2s")
|
||||
.build();
|
||||
try (Node node = nodeBuilder().client(true).settings(nodeSettings).build()) {
|
||||
node.start();
|
||||
|
||||
// assert that node is not connected by waiting for the timeout
|
||||
try {
|
||||
node.client().admin().cluster().prepareHealth().get("1s");
|
||||
fail("Expected timeout exception due to node unable to connect");
|
||||
} catch (ElasticsearchTimeoutException e) {}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.shield.transport;
|
||||
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
public class ServerTransportFiltersTest extends ElasticsearchTestCase {
|
||||
|
||||
private Map<String, ServerTransportFilter> filters = Maps.newHashMap();
|
||||
|
||||
public class TestAuthenticateTransportFilter extends ServerTransportFilter.TransportClient {}
|
||||
public class TestRejectInternalActionsTransportFilter extends ServerTransportFilter.TransportClient {}
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
filters.put(ServerTransportFilters.SERVER_TRANSPORT_FILTER_TRANSPORT_CLIENT, new ServerTransportFilter.TransportClient());
|
||||
filters.put(ServerTransportFilters.SERVER_TRANSPORT_FILTER_AUTHENTICATE_REJECT_INTERNAL_ACTIONS, new TestRejectInternalActionsTransportFilter());
|
||||
filters.put(ServerTransportFilters.SERVER_TRANSPORT_FILTER_AUTHENTICATE_ONLY, new TestAuthenticateTransportFilter());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
Settings settings = settingsBuilder()
|
||||
.put("transport.profiles.default.shield.type", "client")
|
||||
.put("transport.profiles.alternative.shield.type", "server")
|
||||
.build();
|
||||
|
||||
ServerTransportFilters serverTransportFilters = new ServerTransportFilters(settings, filters);
|
||||
|
||||
// default filter is returned by default
|
||||
ServerTransportFilter expectedClientFilter = serverTransportFilters.getTransportFilterForProfile("default");
|
||||
assertThat(expectedClientFilter, instanceOf(TestRejectInternalActionsTransportFilter.class));
|
||||
|
||||
ServerTransportFilter expectedDummyFilter = serverTransportFilters.getTransportFilterForProfile("alternative");
|
||||
assertThat(expectedDummyFilter, instanceOf(TestAuthenticateTransportFilter.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThatExceptionIsThrownForUnknownProfile() {
|
||||
ServerTransportFilters serverTransportFilters = new ServerTransportFilters(settingsBuilder().build(), filters);
|
||||
assertThat(serverTransportFilters.getTransportFilterForProfile("unknown"), instanceOf(TestAuthenticateTransportFilter.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThatClientFilterIsReturnedOnClientNodes() {
|
||||
Settings settings = settingsBuilder()
|
||||
.put("node.client", true)
|
||||
.put("client.type", "transport")
|
||||
.build();
|
||||
|
||||
ServerTransportFilters serverTransportFilters = new ServerTransportFilters(settings, filters);
|
||||
|
||||
// no matter the profile, client node means client filter
|
||||
ServerTransportFilter expectedDummyFilter = serverTransportFilters.getTransportFilterForProfile("a");
|
||||
assertThat(expectedDummyFilter, instanceOf(ServerTransportFilter.TransportClient.class));
|
||||
expectedDummyFilter = serverTransportFilters.getTransportFilterForProfile("b");
|
||||
assertThat(expectedDummyFilter, instanceOf(ServerTransportFilter.TransportClient.class));
|
||||
expectedDummyFilter = serverTransportFilters.getTransportFilterForProfile("c");
|
||||
assertThat(expectedDummyFilter, instanceOf(ServerTransportFilter.TransportClient.class));
|
||||
}
|
||||
|
||||
}
|
@ -10,6 +10,7 @@ import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.Module;
|
||||
import org.elasticsearch.common.inject.multibindings.MapBinder;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
@ -67,9 +68,11 @@ public class TransportFilterTests extends ElasticsearchIntegrationTest {
|
||||
targetService.sendRequest(sourceNode, "_action", new Request("trgt_to_src"), new ResponseHandler(new Response("src_to_trgt"), latch));
|
||||
await(latch);
|
||||
|
||||
ServerTransportFilter sourceServerFilter = internalCluster().getInstance(ServerTransportFilter.class, source);
|
||||
ServerTransportFilters sourceFilters = internalCluster().getInstance(ServerTransportFilters.class, source);
|
||||
ServerTransportFilter sourceServerFilter = sourceFilters.getTransportFilterForProfile("default");
|
||||
ClientTransportFilter sourceClientFilter = internalCluster().getInstance(ClientTransportFilter.class, source);
|
||||
ServerTransportFilter targetServerFilter = internalCluster().getInstance(ServerTransportFilter.class, target);
|
||||
ServerTransportFilters targetFilters = internalCluster().getInstance(ServerTransportFilters.class, target);
|
||||
ServerTransportFilter targetServerFilter = targetFilters.getTransportFilterForProfile("default");
|
||||
ClientTransportFilter targetClientFilter = internalCluster().getInstance(ClientTransportFilter.class, target);
|
||||
InOrder inOrder = inOrder(sourceServerFilter, sourceClientFilter, targetServerFilter, targetClientFilter);
|
||||
inOrder.verify(sourceClientFilter).outbound("_action", new Request("src_to_trgt"));
|
||||
@ -99,8 +102,14 @@ public class TransportFilterTests extends ElasticsearchIntegrationTest {
|
||||
public static class TestTransportFilterModule extends AbstractModule {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(ServerTransportFilter.class).toInstance(mock(ServerTransportFilter.class));
|
||||
bind(ClientTransportFilter.class).toInstance(mock(ClientTransportFilter.class));
|
||||
|
||||
// bind all to our dummy impls
|
||||
MapBinder<String, ServerTransportFilter> mapBinder = MapBinder.newMapBinder(binder(), String.class, ServerTransportFilter.class);
|
||||
mapBinder.addBinding(ServerTransportFilters.SERVER_TRANSPORT_FILTER_TRANSPORT_CLIENT).toInstance(mock(ServerTransportFilter.class));
|
||||
mapBinder.addBinding(ServerTransportFilters.SERVER_TRANSPORT_FILTER_AUTHENTICATE_ONLY).toInstance(mock(ServerTransportFilter.class));
|
||||
mapBinder.addBinding(ServerTransportFilters.SERVER_TRANSPORT_FILTER_AUTHENTICATE_REJECT_INTERNAL_ACTIONS).toInstance(mock(ServerTransportFilter.class));
|
||||
bind(ServerTransportFilters.class).asEagerSingleton();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -71,12 +71,12 @@ public class SslMultiPortTests extends ShieldIntegrationTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThatStandardTransportClientCanConnectToDefaultProfile() throws Exception {
|
||||
public void testThatStandardTransportClientCanConnectToDefaultProfile() throws Exception {
|
||||
assertGreenClusterState(internalCluster().transportClient());
|
||||
}
|
||||
|
||||
@Test(expected = NoNodeAvailableException.class)
|
||||
public void testThatStandardTransportClientCannotConnectToClientProfile() throws Exception {
|
||||
public void testThatStandardTransportClientCannotConnectToClientProfile() throws Exception {
|
||||
try(TransportClient transportClient = createTransportClient(ImmutableSettings.EMPTY)) {
|
||||
transportClient.addTransportAddress(new InetSocketTransportAddress("localhost", randomClientPort));
|
||||
transportClient.admin().cluster().prepareHealth().get();
|
||||
@ -84,7 +84,7 @@ public class SslMultiPortTests extends ShieldIntegrationTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThatProfileTransportClientCanConnectToClientProfile() throws Exception {
|
||||
public void testThatProfileTransportClientCanConnectToClientProfile() throws Exception {
|
||||
Settings settings = ShieldSettingsSource.getSSLSettingsForStore("/org/elasticsearch/shield/transport/ssl/certs/simple/testclient-client-profile.jks", "testclient-client-profile");
|
||||
try (TransportClient transportClient = createTransportClient(settings)) {
|
||||
transportClient.addTransportAddress(new InetSocketTransportAddress("localhost", randomClientPort));
|
||||
@ -93,7 +93,7 @@ public class SslMultiPortTests extends ShieldIntegrationTest {
|
||||
}
|
||||
|
||||
@Test(expected = NoNodeAvailableException.class)
|
||||
public void testThatProfileTransportClientCannotConnectToDefaultProfile() throws Exception {
|
||||
public void testThatProfileTransportClientCannotConnectToDefaultProfile() throws Exception {
|
||||
Settings settings = ShieldSettingsSource.getSSLSettingsForStore("/org/elasticsearch/shield/transport/ssl/certs/simple/testclient-client-profile.jks", "testclient-client-profile");
|
||||
try (TransportClient transportClient = createTransportClient(settings)) {
|
||||
TransportAddress transportAddress = internalCluster().getInstance(Transport.class).boundAddress().boundAddress();
|
||||
|
Loading…
x
Reference in New Issue
Block a user